1'use strict' 2 3const Readable = require('./readable') 4const { 5 InvalidArgumentError, 6 RequestAbortedError 7} = require('../core/errors') 8const util = require('../core/util') 9const { getResolveErrorBodyCallback } = require('./util') 10const { AsyncResource } = require('async_hooks') 11const { addSignal, removeSignal } = require('./abort-signal') 12 13class RequestHandler extends AsyncResource { 14 constructor (opts, callback) { 15 if (!opts || typeof opts !== 'object') { 16 throw new InvalidArgumentError('invalid opts') 17 } 18 19 const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts 20 21 try { 22 if (typeof callback !== 'function') { 23 throw new InvalidArgumentError('invalid callback') 24 } 25 26 if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) { 27 throw new InvalidArgumentError('invalid highWaterMark') 28 } 29 30 if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { 31 throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') 32 } 33 34 if (method === 'CONNECT') { 35 throw new InvalidArgumentError('invalid method') 36 } 37 38 if (onInfo && typeof onInfo !== 'function') { 39 throw new InvalidArgumentError('invalid onInfo callback') 40 } 41 42 super('UNDICI_REQUEST') 43 } catch (err) { 44 if (util.isStream(body)) { 45 util.destroy(body.on('error', util.nop), err) 46 } 47 throw err 48 } 49 50 this.responseHeaders = responseHeaders || null 51 this.opaque = opaque || null 52 this.callback = callback 53 this.res = null 54 this.abort = null 55 this.body = body 56 this.trailers = {} 57 this.context = null 58 this.onInfo = onInfo || null 59 this.throwOnError = throwOnError 60 this.highWaterMark = highWaterMark 61 62 if (util.isStream(body)) { 63 body.on('error', (err) => { 64 this.onError(err) 65 }) 66 } 67 68 addSignal(this, signal) 69 } 70 71 onConnect (abort, context) { 72 if (!this.callback) { 73 throw new RequestAbortedError() 74 } 75 76 this.abort = abort 77 this.context = context 78 } 79 80 onHeaders (statusCode, rawHeaders, resume, statusMessage) { 81 const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this 82 83 const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) 84 85 if (statusCode < 200) { 86 if (this.onInfo) { 87 this.onInfo({ statusCode, headers }) 88 } 89 return 90 } 91 92 const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers 93 const contentType = parsedHeaders['content-type'] 94 const body = new Readable({ resume, abort, contentType, highWaterMark }) 95 96 this.callback = null 97 this.res = body 98 if (callback !== null) { 99 if (this.throwOnError && statusCode >= 400) { 100 this.runInAsyncScope(getResolveErrorBodyCallback, null, 101 { callback, body, contentType, statusCode, statusMessage, headers } 102 ) 103 } else { 104 this.runInAsyncScope(callback, null, null, { 105 statusCode, 106 headers, 107 trailers: this.trailers, 108 opaque, 109 body, 110 context 111 }) 112 } 113 } 114 } 115 116 onData (chunk) { 117 const { res } = this 118 return res.push(chunk) 119 } 120 121 onComplete (trailers) { 122 const { res } = this 123 124 removeSignal(this) 125 126 util.parseHeaders(trailers, this.trailers) 127 128 res.push(null) 129 } 130 131 onError (err) { 132 const { res, callback, body, opaque } = this 133 134 removeSignal(this) 135 136 if (callback) { 137 // TODO: Does this need queueMicrotask? 138 this.callback = null 139 queueMicrotask(() => { 140 this.runInAsyncScope(callback, null, err, { opaque }) 141 }) 142 } 143 144 if (res) { 145 this.res = null 146 // Ensure all queued handlers are invoked before destroying res. 147 queueMicrotask(() => { 148 util.destroy(res, err) 149 }) 150 } 151 152 if (body) { 153 this.body = null 154 util.destroy(body, err) 155 } 156 } 157} 158 159function request (opts, callback) { 160 if (callback === undefined) { 161 return new Promise((resolve, reject) => { 162 request.call(this, opts, (err, data) => { 163 return err ? reject(err) : resolve(data) 164 }) 165 }) 166 } 167 168 try { 169 this.dispatch(opts, new RequestHandler(opts, callback)) 170 } catch (err) { 171 if (typeof callback !== 'function') { 172 throw err 173 } 174 const opaque = opts && opts.opaque 175 queueMicrotask(() => callback(err, { opaque })) 176 } 177} 178 179module.exports = request 180module.exports.RequestHandler = RequestHandler 181