1// Ported from https://github.com/nodejs/undici/pull/907
2
3'use strict'
4
5const assert = require('assert')
6const { Readable } = require('stream')
7const { RequestAbortedError, NotSupportedError, InvalidArgumentError } = require('../core/errors')
8const util = require('../core/util')
9const { ReadableStreamFrom, toUSVString } = require('../core/util')
10
11let Blob
12
13const kConsume = Symbol('kConsume')
14const kReading = Symbol('kReading')
15const kBody = Symbol('kBody')
16const kAbort = Symbol('abort')
17const kContentType = Symbol('kContentType')
18
19const noop = () => {}
20
21module.exports = class BodyReadable extends Readable {
22  constructor ({
23    resume,
24    abort,
25    contentType = '',
26    highWaterMark = 64 * 1024 // Same as nodejs fs streams.
27  }) {
28    super({
29      autoDestroy: true,
30      read: resume,
31      highWaterMark
32    })
33
34    this._readableState.dataEmitted = false
35
36    this[kAbort] = abort
37    this[kConsume] = null
38    this[kBody] = null
39    this[kContentType] = contentType
40
41    // Is stream being consumed through Readable API?
42    // This is an optimization so that we avoid checking
43    // for 'data' and 'readable' listeners in the hot path
44    // inside push().
45    this[kReading] = false
46  }
47
48  destroy (err) {
49    if (this.destroyed) {
50      // Node < 16
51      return this
52    }
53
54    if (!err && !this._readableState.endEmitted) {
55      err = new RequestAbortedError()
56    }
57
58    if (err) {
59      this[kAbort]()
60    }
61
62    return super.destroy(err)
63  }
64
65  emit (ev, ...args) {
66    if (ev === 'data') {
67      // Node < 16.7
68      this._readableState.dataEmitted = true
69    } else if (ev === 'error') {
70      // Node < 16
71      this._readableState.errorEmitted = true
72    }
73    return super.emit(ev, ...args)
74  }
75
76  on (ev, ...args) {
77    if (ev === 'data' || ev === 'readable') {
78      this[kReading] = true
79    }
80    return super.on(ev, ...args)
81  }
82
83  addListener (ev, ...args) {
84    return this.on(ev, ...args)
85  }
86
87  off (ev, ...args) {
88    const ret = super.off(ev, ...args)
89    if (ev === 'data' || ev === 'readable') {
90      this[kReading] = (
91        this.listenerCount('data') > 0 ||
92        this.listenerCount('readable') > 0
93      )
94    }
95    return ret
96  }
97
98  removeListener (ev, ...args) {
99    return this.off(ev, ...args)
100  }
101
102  push (chunk) {
103    if (this[kConsume] && chunk !== null && this.readableLength === 0) {
104      consumePush(this[kConsume], chunk)
105      return this[kReading] ? super.push(chunk) : true
106    }
107    return super.push(chunk)
108  }
109
110  // https://fetch.spec.whatwg.org/#dom-body-text
111  async text () {
112    return consume(this, 'text')
113  }
114
115  // https://fetch.spec.whatwg.org/#dom-body-json
116  async json () {
117    return consume(this, 'json')
118  }
119
120  // https://fetch.spec.whatwg.org/#dom-body-blob
121  async blob () {
122    return consume(this, 'blob')
123  }
124
125  // https://fetch.spec.whatwg.org/#dom-body-arraybuffer
126  async arrayBuffer () {
127    return consume(this, 'arrayBuffer')
128  }
129
130  // https://fetch.spec.whatwg.org/#dom-body-formdata
131  async formData () {
132    // TODO: Implement.
133    throw new NotSupportedError()
134  }
135
136  // https://fetch.spec.whatwg.org/#dom-body-bodyused
137  get bodyUsed () {
138    return util.isDisturbed(this)
139  }
140
141  // https://fetch.spec.whatwg.org/#dom-body-body
142  get body () {
143    if (!this[kBody]) {
144      this[kBody] = ReadableStreamFrom(this)
145      if (this[kConsume]) {
146        // TODO: Is this the best way to force a lock?
147        this[kBody].getReader() // Ensure stream is locked.
148        assert(this[kBody].locked)
149      }
150    }
151    return this[kBody]
152  }
153
154  dump (opts) {
155    let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144
156    const signal = opts && opts.signal
157
158    if (signal) {
159      try {
160        if (typeof signal !== 'object' || !('aborted' in signal)) {
161          throw new InvalidArgumentError('signal must be an AbortSignal')
162        }
163        util.throwIfAborted(signal)
164      } catch (err) {
165        return Promise.reject(err)
166      }
167    }
168
169    if (this.closed) {
170      return Promise.resolve(null)
171    }
172
173    return new Promise((resolve, reject) => {
174      const signalListenerCleanup = signal
175        ? util.addAbortListener(signal, () => {
176          this.destroy()
177        })
178        : noop
179
180      this
181        .on('close', function () {
182          signalListenerCleanup()
183          if (signal && signal.aborted) {
184            reject(signal.reason || Object.assign(new Error('The operation was aborted'), { name: 'AbortError' }))
185          } else {
186            resolve(null)
187          }
188        })
189        .on('error', noop)
190        .on('data', function (chunk) {
191          limit -= chunk.length
192          if (limit <= 0) {
193            this.destroy()
194          }
195        })
196        .resume()
197    })
198  }
199}
200
201// https://streams.spec.whatwg.org/#readablestream-locked
202function isLocked (self) {
203  // Consume is an implicit lock.
204  return (self[kBody] && self[kBody].locked === true) || self[kConsume]
205}
206
207// https://fetch.spec.whatwg.org/#body-unusable
208function isUnusable (self) {
209  return util.isDisturbed(self) || isLocked(self)
210}
211
212async function consume (stream, type) {
213  if (isUnusable(stream)) {
214    throw new TypeError('unusable')
215  }
216
217  assert(!stream[kConsume])
218
219  return new Promise((resolve, reject) => {
220    stream[kConsume] = {
221      type,
222      stream,
223      resolve,
224      reject,
225      length: 0,
226      body: []
227    }
228
229    stream
230      .on('error', function (err) {
231        consumeFinish(this[kConsume], err)
232      })
233      .on('close', function () {
234        if (this[kConsume].body !== null) {
235          consumeFinish(this[kConsume], new RequestAbortedError())
236        }
237      })
238
239    process.nextTick(consumeStart, stream[kConsume])
240  })
241}
242
243function consumeStart (consume) {
244  if (consume.body === null) {
245    return
246  }
247
248  const { _readableState: state } = consume.stream
249
250  for (const chunk of state.buffer) {
251    consumePush(consume, chunk)
252  }
253
254  if (state.endEmitted) {
255    consumeEnd(this[kConsume])
256  } else {
257    consume.stream.on('end', function () {
258      consumeEnd(this[kConsume])
259    })
260  }
261
262  consume.stream.resume()
263
264  while (consume.stream.read() != null) {
265    // Loop
266  }
267}
268
269function consumeEnd (consume) {
270  const { type, body, resolve, stream, length } = consume
271
272  try {
273    if (type === 'text') {
274      resolve(toUSVString(Buffer.concat(body)))
275    } else if (type === 'json') {
276      resolve(JSON.parse(Buffer.concat(body)))
277    } else if (type === 'arrayBuffer') {
278      const dst = new Uint8Array(length)
279
280      let pos = 0
281      for (const buf of body) {
282        dst.set(buf, pos)
283        pos += buf.byteLength
284      }
285
286      resolve(dst.buffer)
287    } else if (type === 'blob') {
288      if (!Blob) {
289        Blob = require('buffer').Blob
290      }
291      resolve(new Blob(body, { type: stream[kContentType] }))
292    }
293
294    consumeFinish(consume)
295  } catch (err) {
296    stream.destroy(err)
297  }
298}
299
300function consumePush (consume, chunk) {
301  consume.length += chunk.length
302  consume.body.push(chunk)
303}
304
305function consumeFinish (consume, err) {
306  if (consume.body === null) {
307    return
308  }
309
310  if (err) {
311    consume.reject(err)
312  } else {
313    consume.resolve()
314  }
315
316  consume.type = null
317  consume.stream = null
318  consume.resolve = null
319  consume.reject = null
320  consume.length = 0
321  consume.body = null
322}
323