1// Ported from https://github.com/nodejs/undici/pull/907 2 3'use strict' 4 5const assert = require('assert') 6const { Readable } = require('stream') 7const { RequestAbortedError, NotSupportedError, InvalidArgumentError } = require('../core/errors') 8const util = require('../core/util') 9const { ReadableStreamFrom, toUSVString } = require('../core/util') 10 11let Blob 12 13const kConsume = Symbol('kConsume') 14const kReading = Symbol('kReading') 15const kBody = Symbol('kBody') 16const kAbort = Symbol('abort') 17const kContentType = Symbol('kContentType') 18 19const noop = () => {} 20 21module.exports = class BodyReadable extends Readable { 22 constructor ({ 23 resume, 24 abort, 25 contentType = '', 26 highWaterMark = 64 * 1024 // Same as nodejs fs streams. 27 }) { 28 super({ 29 autoDestroy: true, 30 read: resume, 31 highWaterMark 32 }) 33 34 this._readableState.dataEmitted = false 35 36 this[kAbort] = abort 37 this[kConsume] = null 38 this[kBody] = null 39 this[kContentType] = contentType 40 41 // Is stream being consumed through Readable API? 42 // This is an optimization so that we avoid checking 43 // for 'data' and 'readable' listeners in the hot path 44 // inside push(). 45 this[kReading] = false 46 } 47 48 destroy (err) { 49 if (this.destroyed) { 50 // Node < 16 51 return this 52 } 53 54 if (!err && !this._readableState.endEmitted) { 55 err = new RequestAbortedError() 56 } 57 58 if (err) { 59 this[kAbort]() 60 } 61 62 return super.destroy(err) 63 } 64 65 emit (ev, ...args) { 66 if (ev === 'data') { 67 // Node < 16.7 68 this._readableState.dataEmitted = true 69 } else if (ev === 'error') { 70 // Node < 16 71 this._readableState.errorEmitted = true 72 } 73 return super.emit(ev, ...args) 74 } 75 76 on (ev, ...args) { 77 if (ev === 'data' || ev === 'readable') { 78 this[kReading] = true 79 } 80 return super.on(ev, ...args) 81 } 82 83 addListener (ev, ...args) { 84 return this.on(ev, ...args) 85 } 86 87 off (ev, ...args) { 88 const ret = super.off(ev, ...args) 89 if (ev === 'data' || ev === 'readable') { 90 this[kReading] = ( 91 this.listenerCount('data') > 0 || 92 this.listenerCount('readable') > 0 93 ) 94 } 95 return ret 96 } 97 98 removeListener (ev, ...args) { 99 return this.off(ev, ...args) 100 } 101 102 push (chunk) { 103 if (this[kConsume] && chunk !== null && this.readableLength === 0) { 104 consumePush(this[kConsume], chunk) 105 return this[kReading] ? super.push(chunk) : true 106 } 107 return super.push(chunk) 108 } 109 110 // https://fetch.spec.whatwg.org/#dom-body-text 111 async text () { 112 return consume(this, 'text') 113 } 114 115 // https://fetch.spec.whatwg.org/#dom-body-json 116 async json () { 117 return consume(this, 'json') 118 } 119 120 // https://fetch.spec.whatwg.org/#dom-body-blob 121 async blob () { 122 return consume(this, 'blob') 123 } 124 125 // https://fetch.spec.whatwg.org/#dom-body-arraybuffer 126 async arrayBuffer () { 127 return consume(this, 'arrayBuffer') 128 } 129 130 // https://fetch.spec.whatwg.org/#dom-body-formdata 131 async formData () { 132 // TODO: Implement. 133 throw new NotSupportedError() 134 } 135 136 // https://fetch.spec.whatwg.org/#dom-body-bodyused 137 get bodyUsed () { 138 return util.isDisturbed(this) 139 } 140 141 // https://fetch.spec.whatwg.org/#dom-body-body 142 get body () { 143 if (!this[kBody]) { 144 this[kBody] = ReadableStreamFrom(this) 145 if (this[kConsume]) { 146 // TODO: Is this the best way to force a lock? 147 this[kBody].getReader() // Ensure stream is locked. 148 assert(this[kBody].locked) 149 } 150 } 151 return this[kBody] 152 } 153 154 dump (opts) { 155 let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144 156 const signal = opts && opts.signal 157 158 if (signal) { 159 try { 160 if (typeof signal !== 'object' || !('aborted' in signal)) { 161 throw new InvalidArgumentError('signal must be an AbortSignal') 162 } 163 util.throwIfAborted(signal) 164 } catch (err) { 165 return Promise.reject(err) 166 } 167 } 168 169 if (this.closed) { 170 return Promise.resolve(null) 171 } 172 173 return new Promise((resolve, reject) => { 174 const signalListenerCleanup = signal 175 ? util.addAbortListener(signal, () => { 176 this.destroy() 177 }) 178 : noop 179 180 this 181 .on('close', function () { 182 signalListenerCleanup() 183 if (signal && signal.aborted) { 184 reject(signal.reason || Object.assign(new Error('The operation was aborted'), { name: 'AbortError' })) 185 } else { 186 resolve(null) 187 } 188 }) 189 .on('error', noop) 190 .on('data', function (chunk) { 191 limit -= chunk.length 192 if (limit <= 0) { 193 this.destroy() 194 } 195 }) 196 .resume() 197 }) 198 } 199} 200 201// https://streams.spec.whatwg.org/#readablestream-locked 202function isLocked (self) { 203 // Consume is an implicit lock. 204 return (self[kBody] && self[kBody].locked === true) || self[kConsume] 205} 206 207// https://fetch.spec.whatwg.org/#body-unusable 208function isUnusable (self) { 209 return util.isDisturbed(self) || isLocked(self) 210} 211 212async function consume (stream, type) { 213 if (isUnusable(stream)) { 214 throw new TypeError('unusable') 215 } 216 217 assert(!stream[kConsume]) 218 219 return new Promise((resolve, reject) => { 220 stream[kConsume] = { 221 type, 222 stream, 223 resolve, 224 reject, 225 length: 0, 226 body: [] 227 } 228 229 stream 230 .on('error', function (err) { 231 consumeFinish(this[kConsume], err) 232 }) 233 .on('close', function () { 234 if (this[kConsume].body !== null) { 235 consumeFinish(this[kConsume], new RequestAbortedError()) 236 } 237 }) 238 239 process.nextTick(consumeStart, stream[kConsume]) 240 }) 241} 242 243function consumeStart (consume) { 244 if (consume.body === null) { 245 return 246 } 247 248 const { _readableState: state } = consume.stream 249 250 for (const chunk of state.buffer) { 251 consumePush(consume, chunk) 252 } 253 254 if (state.endEmitted) { 255 consumeEnd(this[kConsume]) 256 } else { 257 consume.stream.on('end', function () { 258 consumeEnd(this[kConsume]) 259 }) 260 } 261 262 consume.stream.resume() 263 264 while (consume.stream.read() != null) { 265 // Loop 266 } 267} 268 269function consumeEnd (consume) { 270 const { type, body, resolve, stream, length } = consume 271 272 try { 273 if (type === 'text') { 274 resolve(toUSVString(Buffer.concat(body))) 275 } else if (type === 'json') { 276 resolve(JSON.parse(Buffer.concat(body))) 277 } else if (type === 'arrayBuffer') { 278 const dst = new Uint8Array(length) 279 280 let pos = 0 281 for (const buf of body) { 282 dst.set(buf, pos) 283 pos += buf.byteLength 284 } 285 286 resolve(dst.buffer) 287 } else if (type === 'blob') { 288 if (!Blob) { 289 Blob = require('buffer').Blob 290 } 291 resolve(new Blob(body, { type: stream[kContentType] })) 292 } 293 294 consumeFinish(consume) 295 } catch (err) { 296 stream.destroy(err) 297 } 298} 299 300function consumePush (consume, chunk) { 301 consume.length += chunk.length 302 consume.body.push(chunk) 303} 304 305function consumeFinish (consume, err) { 306 if (consume.body === null) { 307 return 308 } 309 310 if (err) { 311 consume.reject(err) 312 } else { 313 consume.resolve() 314 } 315 316 consume.type = null 317 consume.stream = null 318 consume.resolve = null 319 consume.reject = null 320 consume.length = 0 321 consume.body = null 322} 323