xref: /third_party/node/lib/stream/consumers.js (revision 1cb0ef41)
1'use strict';
2
3const {
4  JSONParse,
5} = primordials;
6
7const {
8  TextDecoder,
9} = require('internal/encoding');
10
11const {
12  Blob,
13} = require('internal/blob');
14
15const {
16  Buffer,
17} = require('buffer');
18
19/**
20 * @typedef {import('../internal/webstreams/readablestream').ReadableStream
21 * } ReadableStream
22 * @typedef {import('../internal/streams/readable')} Readable
23 */
24
25/**
26 * @param {AsyncIterable|ReadableStream|Readable} stream
27 * @returns {Promise<Blob>}
28 */
29async function blob(stream) {
30  const chunks = [];
31  for await (const chunk of stream)
32    chunks.push(chunk);
33  return new Blob(chunks);
34}
35
36/**
37 * @param {AsyncIterable|ReadableStream|Readable} stream
38 * @returns {Promise<ArrayBuffer>}
39 */
40async function arrayBuffer(stream) {
41  const ret = await blob(stream);
42  return ret.arrayBuffer();
43}
44
45/**
46 * @param {AsyncIterable|ReadableStream|Readable} stream
47 * @returns {Promise<Buffer>}
48 */
49async function buffer(stream) {
50  return Buffer.from(await arrayBuffer(stream));
51}
52
53/**
54 * @param {AsyncIterable|ReadableStream|Readable} stream
55 * @returns {Promise<string>}
56 */
57async function text(stream) {
58  const dec = new TextDecoder();
59  let str = '';
60  for await (const chunk of stream) {
61    if (typeof chunk === 'string')
62      str += chunk;
63    else
64      str += dec.decode(chunk, { stream: true });
65  }
66  // Flush the streaming TextDecoder so that any pending
67  // incomplete multibyte characters are handled.
68  str += dec.decode(undefined, { stream: false });
69  return str;
70}
71
72/**
73 * @param {AsyncIterable|ReadableStream|Readable} stream
74 * @returns {Promise<any>}
75 */
76async function json(stream) {
77  const str = await text(stream);
78  return JSONParse(str);
79}
80
81module.exports = {
82  arrayBuffer,
83  blob,
84  buffer,
85  text,
86  json,
87};
88