1'use strict' 2 3const { finished, PassThrough } = require('stream') 4const { 5 InvalidArgumentError, 6 InvalidReturnValueError, 7 RequestAbortedError 8} = require('../core/errors') 9const util = require('../core/util') 10const { getResolveErrorBodyCallback } = require('./util') 11const { AsyncResource } = require('async_hooks') 12const { addSignal, removeSignal } = require('./abort-signal') 13 14class StreamHandler extends AsyncResource { 15 constructor (opts, factory, callback) { 16 if (!opts || typeof opts !== 'object') { 17 throw new InvalidArgumentError('invalid opts') 18 } 19 20 const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts 21 22 try { 23 if (typeof callback !== 'function') { 24 throw new InvalidArgumentError('invalid callback') 25 } 26 27 if (typeof factory !== 'function') { 28 throw new InvalidArgumentError('invalid factory') 29 } 30 31 if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { 32 throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') 33 } 34 35 if (method === 'CONNECT') { 36 throw new InvalidArgumentError('invalid method') 37 } 38 39 if (onInfo && typeof onInfo !== 'function') { 40 throw new InvalidArgumentError('invalid onInfo callback') 41 } 42 43 super('UNDICI_STREAM') 44 } catch (err) { 45 if (util.isStream(body)) { 46 util.destroy(body.on('error', util.nop), err) 47 } 48 throw err 49 } 50 51 this.responseHeaders = responseHeaders || null 52 this.opaque = opaque || null 53 this.factory = factory 54 this.callback = callback 55 this.res = null 56 this.abort = null 57 this.context = null 58 this.trailers = null 59 this.body = body 60 this.onInfo = onInfo || null 61 this.throwOnError = throwOnError || false 62 63 if (util.isStream(body)) { 64 body.on('error', (err) => { 65 this.onError(err) 66 }) 67 } 68 69 addSignal(this, signal) 70 } 71 72 onConnect (abort, context) { 73 if (!this.callback) { 74 throw new RequestAbortedError() 75 } 76 77 this.abort = abort 78 this.context = context 79 } 80 81 onHeaders (statusCode, rawHeaders, resume, statusMessage) { 82 const { factory, opaque, context, callback, responseHeaders } = this 83 84 const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) 85 86 if (statusCode < 200) { 87 if (this.onInfo) { 88 this.onInfo({ statusCode, headers }) 89 } 90 return 91 } 92 93 this.factory = null 94 95 let res 96 97 if (this.throwOnError && statusCode >= 400) { 98 const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers 99 const contentType = parsedHeaders['content-type'] 100 res = new PassThrough() 101 102 this.callback = null 103 this.runInAsyncScope(getResolveErrorBodyCallback, null, 104 { callback, body: res, contentType, statusCode, statusMessage, headers } 105 ) 106 } else { 107 if (factory === null) { 108 return 109 } 110 111 res = this.runInAsyncScope(factory, null, { 112 statusCode, 113 headers, 114 opaque, 115 context 116 }) 117 118 if ( 119 !res || 120 typeof res.write !== 'function' || 121 typeof res.end !== 'function' || 122 typeof res.on !== 'function' 123 ) { 124 throw new InvalidReturnValueError('expected Writable') 125 } 126 127 // TODO: Avoid finished. It registers an unnecessary amount of listeners. 128 finished(res, { readable: false }, (err) => { 129 const { callback, res, opaque, trailers, abort } = this 130 131 this.res = null 132 if (err || !res.readable) { 133 util.destroy(res, err) 134 } 135 136 this.callback = null 137 this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) 138 139 if (err) { 140 abort() 141 } 142 }) 143 } 144 145 res.on('drain', resume) 146 147 this.res = res 148 149 const needDrain = res.writableNeedDrain !== undefined 150 ? res.writableNeedDrain 151 : res._writableState && res._writableState.needDrain 152 153 return needDrain !== true 154 } 155 156 onData (chunk) { 157 const { res } = this 158 159 return res ? res.write(chunk) : true 160 } 161 162 onComplete (trailers) { 163 const { res } = this 164 165 removeSignal(this) 166 167 if (!res) { 168 return 169 } 170 171 this.trailers = util.parseHeaders(trailers) 172 173 res.end() 174 } 175 176 onError (err) { 177 const { res, callback, opaque, body } = this 178 179 removeSignal(this) 180 181 this.factory = null 182 183 if (res) { 184 this.res = null 185 util.destroy(res, err) 186 } else if (callback) { 187 this.callback = null 188 queueMicrotask(() => { 189 this.runInAsyncScope(callback, null, err, { opaque }) 190 }) 191 } 192 193 if (body) { 194 this.body = null 195 util.destroy(body, err) 196 } 197 } 198} 199 200function stream (opts, factory, callback) { 201 if (callback === undefined) { 202 return new Promise((resolve, reject) => { 203 stream.call(this, opts, factory, (err, data) => { 204 return err ? reject(err) : resolve(data) 205 }) 206 }) 207 } 208 209 try { 210 this.dispatch(opts, new StreamHandler(opts, factory, callback)) 211 } catch (err) { 212 if (typeof callback !== 'function') { 213 throw err 214 } 215 const opaque = opts && opts.opaque 216 queueMicrotask(() => callback(err, { opaque })) 217 } 218} 219 220module.exports = stream 221