xref: /third_party/node/doc/api/stream.md (revision 1cb0ef41)
1# Stream
2
3<!--introduced_in=v0.10.0-->
4
5> Stability: 2 - Stable
6
7<!-- source_link=lib/stream.js -->
8
9A stream is an abstract interface for working with streaming data in Node.js.
10The `node:stream` module provides an API for implementing the stream interface.
11
12There are many stream objects provided by Node.js. For instance, a
13[request to an HTTP server][http-incoming-message] and [`process.stdout`][]
14are both stream instances.
15
16Streams can be readable, writable, or both. All streams are instances of
17[`EventEmitter`][].
18
19To access the `node:stream` module:
20
21```js
22const stream = require('node:stream');
23```
24
25The `node:stream` module is useful for creating new types of stream instances.
26It is usually not necessary to use the `node:stream` module to consume streams.
27
28## Organization of this document
29
30This document contains two primary sections and a third section for notes. The
31first section explains how to use existing streams within an application. The
32second section explains how to create new types of streams.
33
34## Types of streams
35
36There are four fundamental stream types within Node.js:
37
38* [`Writable`][]: streams to which data can be written (for example,
39  [`fs.createWriteStream()`][]).
40* [`Readable`][]: streams from which data can be read (for example,
41  [`fs.createReadStream()`][]).
42* [`Duplex`][]: streams that are both `Readable` and `Writable` (for example,
43  [`net.Socket`][]).
44* [`Transform`][]: `Duplex` streams that can modify or transform the data as it
45  is written and read (for example, [`zlib.createDeflate()`][]).
46
47Additionally, this module includes the utility functions
48[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
49and [`stream.addAbortSignal()`][].
50
51### Streams Promises API
52
53<!-- YAML
54added: v15.0.0
55-->
56
57The `stream/promises` API provides an alternative set of asynchronous utility
58functions for streams that return `Promise` objects rather than using
59callbacks. The API is accessible via `require('node:stream/promises')`
60or `require('node:stream').promises`.
61
62### `stream.pipeline(source[, ...transforms], destination[, options])`
63
64### `stream.pipeline(streams[, options])`
65
66<!-- YAML
67added: v15.0.0
68-->
69
70* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]}
71* `source` {Stream|Iterable|AsyncIterable|Function}
72  * Returns: {Promise|AsyncIterable}
73* `...transforms` {Stream|Function}
74  * `source` {AsyncIterable}
75  * Returns: {Promise|AsyncIterable}
76* `destination` {Stream|Function}
77  * `source` {AsyncIterable}
78  * Returns: {Promise|AsyncIterable}
79* `options` {Object}
80  * `signal` {AbortSignal}
81  * `end` {boolean}
82* Returns: {Promise} Fulfills when the pipeline is complete.
83
84```cjs
85const { pipeline } = require('node:stream/promises');
86const fs = require('node:fs');
87const zlib = require('node:zlib');
88
89async function run() {
90  await pipeline(
91    fs.createReadStream('archive.tar'),
92    zlib.createGzip(),
93    fs.createWriteStream('archive.tar.gz'),
94  );
95  console.log('Pipeline succeeded.');
96}
97
98run().catch(console.error);
99```
100
101```mjs
102import { pipeline } from 'node:stream/promises';
103import { createReadStream, createWriteStream } from 'node:fs';
104import { createGzip } from 'node:zlib';
105
106await pipeline(
107  createReadStream('archive.tar'),
108  createGzip(),
109  createWriteStream('archive.tar.gz'),
110);
111console.log('Pipeline succeeded.');
112```
113
114To use an `AbortSignal`, pass it inside an options object, as the last argument.
115When the signal is aborted, `destroy` will be called on the underlying pipeline,
116with an `AbortError`.
117
118```cjs
119const { pipeline } = require('node:stream/promises');
120const fs = require('node:fs');
121const zlib = require('node:zlib');
122
123async function run() {
124  const ac = new AbortController();
125  const signal = ac.signal;
126
127  setImmediate(() => ac.abort());
128  await pipeline(
129    fs.createReadStream('archive.tar'),
130    zlib.createGzip(),
131    fs.createWriteStream('archive.tar.gz'),
132    { signal },
133  );
134}
135
136run().catch(console.error); // AbortError
137```
138
139```mjs
140import { pipeline } from 'node:stream/promises';
141import { createReadStream, createWriteStream } from 'node:fs';
142import { createGzip } from 'node:zlib';
143
144const ac = new AbortController();
145const { signal } = ac;
146setImmediate(() => ac.abort());
147try {
148  await pipeline(
149    createReadStream('archive.tar'),
150    createGzip(),
151    createWriteStream('archive.tar.gz'),
152    { signal },
153  );
154} catch (err) {
155  console.error(err); // AbortError
156}
157```
158
159The `pipeline` API also supports async generators:
160
161```cjs
162const { pipeline } = require('node:stream/promises');
163const fs = require('node:fs');
164
165async function run() {
166  await pipeline(
167    fs.createReadStream('lowercase.txt'),
168    async function* (source, { signal }) {
169      source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
170      for await (const chunk of source) {
171        yield await processChunk(chunk, { signal });
172      }
173    },
174    fs.createWriteStream('uppercase.txt'),
175  );
176  console.log('Pipeline succeeded.');
177}
178
179run().catch(console.error);
180```
181
182```mjs
183import { pipeline } from 'node:stream/promises';
184import { createReadStream, createWriteStream } from 'node:fs';
185
186await pipeline(
187  createReadStream('lowercase.txt'),
188  async function* (source, { signal }) {
189    source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
190    for await (const chunk of source) {
191      yield await processChunk(chunk, { signal });
192    }
193  },
194  createWriteStream('uppercase.txt'),
195);
196console.log('Pipeline succeeded.');
197```
198
199Remember to handle the `signal` argument passed into the async generator.
200Especially in the case where the async generator is the source for the
201pipeline (i.e. first argument) or the pipeline will never complete.
202
203```cjs
204const { pipeline } = require('node:stream/promises');
205const fs = require('node:fs');
206
207async function run() {
208  await pipeline(
209    async function* ({ signal }) {
210      await someLongRunningfn({ signal });
211      yield 'asd';
212    },
213    fs.createWriteStream('uppercase.txt'),
214  );
215  console.log('Pipeline succeeded.');
216}
217
218run().catch(console.error);
219```
220
221```mjs
222import { pipeline } from 'node:stream/promises';
223import fs from 'node:fs';
224await pipeline(
225  async function* ({ signal }) {
226    await someLongRunningfn({ signal });
227    yield 'asd';
228  },
229  fs.createWriteStream('uppercase.txt'),
230);
231console.log('Pipeline succeeded.');
232```
233
234The `pipeline` API provides [callback version][stream-pipeline]:
235
236### `stream.finished(stream[, options])`
237
238<!-- YAML
239added: v15.0.0
240-->
241
242* `stream` {Stream}
243* `options` {Object}
244  * `error` {boolean|undefined}
245  * `readable` {boolean|undefined}
246  * `writable` {boolean|undefined}
247  * `signal`: {AbortSignal|undefined}
248* Returns: {Promise} Fulfills when the stream is no
249  longer readable or writable.
250
251```cjs
252const { finished } = require('node:stream/promises');
253const fs = require('node:fs');
254
255const rs = fs.createReadStream('archive.tar');
256
257async function run() {
258  await finished(rs);
259  console.log('Stream is done reading.');
260}
261
262run().catch(console.error);
263rs.resume(); // Drain the stream.
264```
265
266```mjs
267import { finished } from 'node:stream/promises';
268import { createReadStream } from 'node:fs';
269
270const rs = createReadStream('archive.tar');
271
272async function run() {
273  await finished(rs);
274  console.log('Stream is done reading.');
275}
276
277run().catch(console.error);
278rs.resume(); // Drain the stream.
279```
280
281The `finished` API also provides a [callback version][stream-finished].
282
283### Object mode
284
285All streams created by Node.js APIs operate exclusively on strings and `Buffer`
286(or `Uint8Array`) objects. It is possible, however, for stream implementations
287to work with other types of JavaScript values (with the exception of `null`,
288which serves a special purpose within streams). Such streams are considered to
289operate in "object mode".
290
291Stream instances are switched into object mode using the `objectMode` option
292when the stream is created. Attempting to switch an existing stream into
293object mode is not safe.
294
295### Buffering
296
297<!--type=misc-->
298
299Both [`Writable`][] and [`Readable`][] streams will store data in an internal
300buffer.
301
302The amount of data potentially buffered depends on the `highWaterMark` option
303passed into the stream's constructor. For normal streams, the `highWaterMark`
304option specifies a [total number of bytes][hwm-gotcha]. For streams operating
305in object mode, the `highWaterMark` specifies a total number of objects.
306
307Data is buffered in `Readable` streams when the implementation calls
308[`stream.push(chunk)`][stream-push]. If the consumer of the Stream does not
309call [`stream.read()`][stream-read], the data will sit in the internal
310queue until it is consumed.
311
312Once the total size of the internal read buffer reaches the threshold specified
313by `highWaterMark`, the stream will temporarily stop reading data from the
314underlying resource until the data currently buffered can be consumed (that is,
315the stream will stop calling the internal [`readable._read()`][] method that is
316used to fill the read buffer).
317
318Data is buffered in `Writable` streams when the
319[`writable.write(chunk)`][stream-write] method is called repeatedly. While the
320total size of the internal write buffer is below the threshold set by
321`highWaterMark`, calls to `writable.write()` will return `true`. Once
322the size of the internal buffer reaches or exceeds the `highWaterMark`, `false`
323will be returned.
324
325A key goal of the `stream` API, particularly the [`stream.pipe()`][] method,
326is to limit the buffering of data to acceptable levels such that sources and
327destinations of differing speeds will not overwhelm the available memory.
328
329The `highWaterMark` option is a threshold, not a limit: it dictates the amount
330of data that a stream buffers before it stops asking for more data. It does not
331enforce a strict memory limitation in general. Specific stream implementations
332may choose to enforce stricter limits but doing so is optional.
333
334Because [`Duplex`][] and [`Transform`][] streams are both `Readable` and
335`Writable`, each maintains _two_ separate internal buffers used for reading and
336writing, allowing each side to operate independently of the other while
337maintaining an appropriate and efficient flow of data. For example,
338[`net.Socket`][] instances are [`Duplex`][] streams whose `Readable` side allows
339consumption of data received _from_ the socket and whose `Writable` side allows
340writing data _to_ the socket. Because data may be written to the socket at a
341faster or slower rate than data is received, each side should
342operate (and buffer) independently of the other.
343
344The mechanics of the internal buffering are an internal implementation detail
345and may be changed at any time. However, for certain advanced implementations,
346the internal buffers can be retrieved using `writable.writableBuffer` or
347`readable.readableBuffer`. Use of these undocumented properties is discouraged.
348
349## API for stream consumers
350
351<!--type=misc-->
352
353Almost all Node.js applications, no matter how simple, use streams in some
354manner. The following is an example of using streams in a Node.js application
355that implements an HTTP server:
356
357```js
358const http = require('node:http');
359
360const server = http.createServer((req, res) => {
361  // `req` is an http.IncomingMessage, which is a readable stream.
362  // `res` is an http.ServerResponse, which is a writable stream.
363
364  let body = '';
365  // Get the data as utf8 strings.
366  // If an encoding is not set, Buffer objects will be received.
367  req.setEncoding('utf8');
368
369  // Readable streams emit 'data' events once a listener is added.
370  req.on('data', (chunk) => {
371    body += chunk;
372  });
373
374  // The 'end' event indicates that the entire body has been received.
375  req.on('end', () => {
376    try {
377      const data = JSON.parse(body);
378      // Write back something interesting to the user:
379      res.write(typeof data);
380      res.end();
381    } catch (er) {
382      // uh oh! bad json!
383      res.statusCode = 400;
384      return res.end(`error: ${er.message}`);
385    }
386  });
387});
388
389server.listen(1337);
390
391// $ curl localhost:1337 -d "{}"
392// object
393// $ curl localhost:1337 -d "\"foo\""
394// string
395// $ curl localhost:1337 -d "not json"
396// error: Unexpected token 'o', "not json" is not valid JSON
397```
398
399[`Writable`][] streams (such as `res` in the example) expose methods such as
400`write()` and `end()` that are used to write data onto the stream.
401
402[`Readable`][] streams use the [`EventEmitter`][] API for notifying application
403code when data is available to be read off the stream. That available data can
404be read from the stream in multiple ways.
405
406Both [`Writable`][] and [`Readable`][] streams use the [`EventEmitter`][] API in
407various ways to communicate the current state of the stream.
408
409[`Duplex`][] and [`Transform`][] streams are both [`Writable`][] and
410[`Readable`][].
411
412Applications that are either writing data to or consuming data from a stream
413are not required to implement the stream interfaces directly and will generally
414have no reason to call `require('node:stream')`.
415
416Developers wishing to implement new types of streams should refer to the
417section [API for stream implementers][].
418
419### Writable streams
420
421Writable streams are an abstraction for a _destination_ to which data is
422written.
423
424Examples of [`Writable`][] streams include:
425
426* [HTTP requests, on the client][]
427* [HTTP responses, on the server][]
428* [fs write streams][]
429* [zlib streams][zlib]
430* [crypto streams][crypto]
431* [TCP sockets][]
432* [child process stdin][]
433* [`process.stdout`][], [`process.stderr`][]
434
435Some of these examples are actually [`Duplex`][] streams that implement the
436[`Writable`][] interface.
437
438All [`Writable`][] streams implement the interface defined by the
439`stream.Writable` class.
440
441While specific instances of [`Writable`][] streams may differ in various ways,
442all `Writable` streams follow the same fundamental usage pattern as illustrated
443in the example below:
444
445```js
446const myStream = getWritableStreamSomehow();
447myStream.write('some data');
448myStream.write('some more data');
449myStream.end('done writing data');
450```
451
452#### Class: `stream.Writable`
453
454<!-- YAML
455added: v0.9.4
456-->
457
458<!--type=class-->
459
460##### Event: `'close'`
461
462<!-- YAML
463added: v0.9.4
464changes:
465  - version: v10.0.0
466    pr-url: https://github.com/nodejs/node/pull/18438
467    description: Add `emitClose` option to specify if `'close'` is emitted on
468                 destroy.
469-->
470
471The `'close'` event is emitted when the stream and any of its underlying
472resources (a file descriptor, for example) have been closed. The event indicates
473that no more events will be emitted, and no further computation will occur.
474
475A [`Writable`][] stream will always emit the `'close'` event if it is
476created with the `emitClose` option.
477
478##### Event: `'drain'`
479
480<!-- YAML
481added: v0.9.4
482-->
483
484If a call to [`stream.write(chunk)`][stream-write] returns `false`, the
485`'drain'` event will be emitted when it is appropriate to resume writing data
486to the stream.
487
488```js
489// Write the data to the supplied writable stream one million times.
490// Be attentive to back-pressure.
491function writeOneMillionTimes(writer, data, encoding, callback) {
492  let i = 1000000;
493  write();
494  function write() {
495    let ok = true;
496    do {
497      i--;
498      if (i === 0) {
499        // Last time!
500        writer.write(data, encoding, callback);
501      } else {
502        // See if we should continue, or wait.
503        // Don't pass the callback, because we're not done yet.
504        ok = writer.write(data, encoding);
505      }
506    } while (i > 0 && ok);
507    if (i > 0) {
508      // Had to stop early!
509      // Write some more once it drains.
510      writer.once('drain', write);
511    }
512  }
513}
514```
515
516##### Event: `'error'`
517
518<!-- YAML
519added: v0.9.4
520-->
521
522* {Error}
523
524The `'error'` event is emitted if an error occurred while writing or piping
525data. The listener callback is passed a single `Error` argument when called.
526
527The stream is closed when the `'error'` event is emitted unless the
528[`autoDestroy`][writable-new] option was set to `false` when creating the
529stream.
530
531After `'error'`, no further events other than `'close'` _should_ be emitted
532(including `'error'` events).
533
534##### Event: `'finish'`
535
536<!-- YAML
537added: v0.9.4
538-->
539
540The `'finish'` event is emitted after the [`stream.end()`][stream-end] method
541has been called, and all data has been flushed to the underlying system.
542
543```js
544const writer = getWritableStreamSomehow();
545for (let i = 0; i < 100; i++) {
546  writer.write(`hello, #${i}!\n`);
547}
548writer.on('finish', () => {
549  console.log('All writes are now complete.');
550});
551writer.end('This is the end\n');
552```
553
554##### Event: `'pipe'`
555
556<!-- YAML
557added: v0.9.4
558-->
559
560* `src` {stream.Readable} source stream that is piping to this writable
561
562The `'pipe'` event is emitted when the [`stream.pipe()`][] method is called on
563a readable stream, adding this writable to its set of destinations.
564
565```js
566const writer = getWritableStreamSomehow();
567const reader = getReadableStreamSomehow();
568writer.on('pipe', (src) => {
569  console.log('Something is piping into the writer.');
570  assert.equal(src, reader);
571});
572reader.pipe(writer);
573```
574
575##### Event: `'unpipe'`
576
577<!-- YAML
578added: v0.9.4
579-->
580
581* `src` {stream.Readable} The source stream that
582  [unpiped][`stream.unpipe()`] this writable
583
584The `'unpipe'` event is emitted when the [`stream.unpipe()`][] method is called
585on a [`Readable`][] stream, removing this [`Writable`][] from its set of
586destinations.
587
588This is also emitted in case this [`Writable`][] stream emits an error when a
589[`Readable`][] stream pipes into it.
590
591```js
592const writer = getWritableStreamSomehow();
593const reader = getReadableStreamSomehow();
594writer.on('unpipe', (src) => {
595  console.log('Something has stopped piping into the writer.');
596  assert.equal(src, reader);
597});
598reader.pipe(writer);
599reader.unpipe(writer);
600```
601
602##### `writable.cork()`
603
604<!-- YAML
605added: v0.11.2
606-->
607
608The `writable.cork()` method forces all written data to be buffered in memory.
609The buffered data will be flushed when either the [`stream.uncork()`][] or
610[`stream.end()`][stream-end] methods are called.
611
612The primary intent of `writable.cork()` is to accommodate a situation in which
613several small chunks are written to the stream in rapid succession. Instead of
614immediately forwarding them to the underlying destination, `writable.cork()`
615buffers all the chunks until `writable.uncork()` is called, which will pass them
616all to `writable._writev()`, if present. This prevents a head-of-line blocking
617situation where data is being buffered while waiting for the first small chunk
618to be processed. However, use of `writable.cork()` without implementing
619`writable._writev()` may have an adverse effect on throughput.
620
621See also: [`writable.uncork()`][], [`writable._writev()`][stream-_writev].
622
623##### `writable.destroy([error])`
624
625<!-- YAML
626added: v8.0.0
627changes:
628  - version: v14.0.0
629    pr-url: https://github.com/nodejs/node/pull/29197
630    description: Work as a no-op on a stream that has already been destroyed.
631-->
632
633* `error` {Error} Optional, an error to emit with `'error'` event.
634* Returns: {this}
635
636Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'`
637event (unless `emitClose` is set to `false`). After this call, the writable
638stream has ended and subsequent calls to `write()` or `end()` will result in
639an `ERR_STREAM_DESTROYED` error.
640This is a destructive and immediate way to destroy a stream. Previous calls to
641`write()` may not have drained, and may trigger an `ERR_STREAM_DESTROYED` error.
642Use `end()` instead of destroy if data should flush before close, or wait for
643the `'drain'` event before destroying the stream.
644
645```cjs
646const { Writable } = require('node:stream');
647
648const myStream = new Writable();
649
650const fooErr = new Error('foo error');
651myStream.destroy(fooErr);
652myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error
653```
654
655```cjs
656const { Writable } = require('node:stream');
657
658const myStream = new Writable();
659
660myStream.destroy();
661myStream.on('error', function wontHappen() {});
662```
663
664```cjs
665const { Writable } = require('node:stream');
666
667const myStream = new Writable();
668myStream.destroy();
669
670myStream.write('foo', (error) => console.error(error.code));
671// ERR_STREAM_DESTROYED
672```
673
674Once `destroy()` has been called any further calls will be a no-op and no
675further errors except from `_destroy()` may be emitted as `'error'`.
676
677Implementors should not override this method,
678but instead implement [`writable._destroy()`][writable-_destroy].
679
680##### `writable.closed`
681
682<!-- YAML
683added: v18.0.0
684-->
685
686* {boolean}
687
688Is `true` after `'close'` has been emitted.
689
690##### `writable.destroyed`
691
692<!-- YAML
693added: v8.0.0
694-->
695
696* {boolean}
697
698Is `true` after [`writable.destroy()`][writable-destroy] has been called.
699
700```cjs
701const { Writable } = require('node:stream');
702
703const myStream = new Writable();
704
705console.log(myStream.destroyed); // false
706myStream.destroy();
707console.log(myStream.destroyed); // true
708```
709
710##### `writable.end([chunk[, encoding]][, callback])`
711
712<!-- YAML
713added: v0.9.4
714changes:
715  - version: v15.0.0
716    pr-url: https://github.com/nodejs/node/pull/34101
717    description: The `callback` is invoked before 'finish' or on error.
718  - version: v14.0.0
719    pr-url: https://github.com/nodejs/node/pull/29747
720    description: The `callback` is invoked if 'finish' or 'error' is emitted.
721  - version: v10.0.0
722    pr-url: https://github.com/nodejs/node/pull/18780
723    description: This method now returns a reference to `writable`.
724  - version: v8.0.0
725    pr-url: https://github.com/nodejs/node/pull/11608
726    description: The `chunk` argument can now be a `Uint8Array` instance.
727-->
728
729* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
730  not operating in object mode, `chunk` must be a string, `Buffer` or
731  `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
732  other than `null`.
733* `encoding` {string} The encoding if `chunk` is a string
734* `callback` {Function} Callback for when the stream is finished.
735* Returns: {this}
736
737Calling the `writable.end()` method signals that no more data will be written
738to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one
739final additional chunk of data to be written immediately before closing the
740stream.
741
742Calling the [`stream.write()`][stream-write] method after calling
743[`stream.end()`][stream-end] will raise an error.
744
745```js
746// Write 'hello, ' and then end with 'world!'.
747const fs = require('node:fs');
748const file = fs.createWriteStream('example.txt');
749file.write('hello, ');
750file.end('world!');
751// Writing more now is not allowed!
752```
753
754##### `writable.setDefaultEncoding(encoding)`
755
756<!-- YAML
757added: v0.11.15
758changes:
759  - version: v6.1.0
760    pr-url: https://github.com/nodejs/node/pull/5040
761    description: This method now returns a reference to `writable`.
762-->
763
764* `encoding` {string} The new default encoding
765* Returns: {this}
766
767The `writable.setDefaultEncoding()` method sets the default `encoding` for a
768[`Writable`][] stream.
769
770##### `writable.uncork()`
771
772<!-- YAML
773added: v0.11.2
774-->
775
776The `writable.uncork()` method flushes all data buffered since
777[`stream.cork()`][] was called.
778
779When using [`writable.cork()`][] and `writable.uncork()` to manage the buffering
780of writes to a stream, defer calls to `writable.uncork()` using
781`process.nextTick()`. Doing so allows batching of all
782`writable.write()` calls that occur within a given Node.js event loop phase.
783
784```js
785stream.cork();
786stream.write('some ');
787stream.write('data ');
788process.nextTick(() => stream.uncork());
789```
790
791If the [`writable.cork()`][] method is called multiple times on a stream, the
792same number of calls to `writable.uncork()` must be called to flush the buffered
793data.
794
795```js
796stream.cork();
797stream.write('some ');
798stream.cork();
799stream.write('data ');
800process.nextTick(() => {
801  stream.uncork();
802  // The data will not be flushed until uncork() is called a second time.
803  stream.uncork();
804});
805```
806
807See also: [`writable.cork()`][].
808
809##### `writable.writable`
810
811<!-- YAML
812added: v11.4.0
813-->
814
815* {boolean}
816
817Is `true` if it is safe to call [`writable.write()`][stream-write], which means
818the stream has not been destroyed, errored, or ended.
819
820##### `writable.writableAborted`
821
822<!-- YAML
823added: v18.0.0
824-->
825
826> Stability: 1 - Experimental
827
828* {boolean}
829
830Returns whether the stream was destroyed or errored before emitting `'finish'`.
831
832##### `writable.writableEnded`
833
834<!-- YAML
835added: v12.9.0
836-->
837
838* {boolean}
839
840Is `true` after [`writable.end()`][] has been called. This property
841does not indicate whether the data has been flushed, for this use
842[`writable.writableFinished`][] instead.
843
844##### `writable.writableCorked`
845
846<!-- YAML
847added:
848 - v13.2.0
849 - v12.16.0
850-->
851
852* {integer}
853
854Number of times [`writable.uncork()`][stream-uncork] needs to be
855called in order to fully uncork the stream.
856
857##### `writable.errored`
858
859<!-- YAML
860added:
861  v18.0.0
862-->
863
864* {Error}
865
866Returns error if the stream has been destroyed with an error.
867
868##### `writable.writableFinished`
869
870<!-- YAML
871added: v12.6.0
872-->
873
874* {boolean}
875
876Is set to `true` immediately before the [`'finish'`][] event is emitted.
877
878##### `writable.writableHighWaterMark`
879
880<!-- YAML
881added: v9.3.0
882-->
883
884* {number}
885
886Return the value of `highWaterMark` passed when creating this `Writable`.
887
888##### `writable.writableLength`
889
890<!-- YAML
891added: v9.4.0
892-->
893
894* {number}
895
896This property contains the number of bytes (or objects) in the queue
897ready to be written. The value provides introspection data regarding
898the status of the `highWaterMark`.
899
900##### `writable.writableNeedDrain`
901
902<!-- YAML
903added:
904  - v15.2.0
905  - v14.17.0
906-->
907
908* {boolean}
909
910Is `true` if the stream's buffer has been full and stream will emit `'drain'`.
911
912##### `writable.writableObjectMode`
913
914<!-- YAML
915added: v12.3.0
916-->
917
918* {boolean}
919
920Getter for the property `objectMode` of a given `Writable` stream.
921
922##### `writable.write(chunk[, encoding][, callback])`
923
924<!-- YAML
925added: v0.9.4
926changes:
927  - version: v8.0.0
928    pr-url: https://github.com/nodejs/node/pull/11608
929    description: The `chunk` argument can now be a `Uint8Array` instance.
930  - version: v6.0.0
931    pr-url: https://github.com/nodejs/node/pull/6170
932    description: Passing `null` as the `chunk` parameter will always be
933                 considered invalid now, even in object mode.
934-->
935
936* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams
937  not operating in object mode, `chunk` must be a string, `Buffer` or
938  `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value
939  other than `null`.
940* `encoding` {string|null} The encoding, if `chunk` is a string. **Default:** `'utf8'`
941* `callback` {Function} Callback for when this chunk of data is flushed.
942* Returns: {boolean} `false` if the stream wishes for the calling code to
943  wait for the `'drain'` event to be emitted before continuing to write
944  additional data; otherwise `true`.
945
946The `writable.write()` method writes some data to the stream, and calls the
947supplied `callback` once the data has been fully handled. If an error
948occurs, the `callback` will be called with the error as its
949first argument. The `callback` is called asynchronously and before `'error'` is
950emitted.
951
952The return value is `true` if the internal buffer is less than the
953`highWaterMark` configured when the stream was created after admitting `chunk`.
954If `false` is returned, further attempts to write data to the stream should
955stop until the [`'drain'`][] event is emitted.
956
957While a stream is not draining, calls to `write()` will buffer `chunk`, and
958return false. Once all currently buffered chunks are drained (accepted for
959delivery by the operating system), the `'drain'` event will be emitted.
960Once `write()` returns false, do not write more chunks
961until the `'drain'` event is emitted. While calling `write()` on a stream that
962is not draining is allowed, Node.js will buffer all written chunks until
963maximum memory usage occurs, at which point it will abort unconditionally.
964Even before it aborts, high memory usage will cause poor garbage collector
965performance and high RSS (which is not typically released back to the system,
966even after the memory is no longer required). Since TCP sockets may never
967drain if the remote peer does not read the data, writing a socket that is
968not draining may lead to a remotely exploitable vulnerability.
969
970Writing data while the stream is not draining is particularly
971problematic for a [`Transform`][], because the `Transform` streams are paused
972by default until they are piped or a `'data'` or `'readable'` event handler
973is added.
974
975If the data to be written can be generated or fetched on demand, it is
976recommended to encapsulate the logic into a [`Readable`][] and use
977[`stream.pipe()`][]. However, if calling `write()` is preferred, it is
978possible to respect backpressure and avoid memory issues using the
979[`'drain'`][] event:
980
981```js
982function write(data, cb) {
983  if (!stream.write(data)) {
984    stream.once('drain', cb);
985  } else {
986    process.nextTick(cb);
987  }
988}
989
990// Wait for cb to be called before doing any other write.
991write('hello', () => {
992  console.log('Write completed, do more writes now.');
993});
994```
995
996A `Writable` stream in object mode will always ignore the `encoding` argument.
997
998### Readable streams
999
1000Readable streams are an abstraction for a _source_ from which data is
1001consumed.
1002
1003Examples of `Readable` streams include:
1004
1005* [HTTP responses, on the client][http-incoming-message]
1006* [HTTP requests, on the server][http-incoming-message]
1007* [fs read streams][]
1008* [zlib streams][zlib]
1009* [crypto streams][crypto]
1010* [TCP sockets][]
1011* [child process stdout and stderr][]
1012* [`process.stdin`][]
1013
1014All [`Readable`][] streams implement the interface defined by the
1015`stream.Readable` class.
1016
1017#### Two reading modes
1018
1019`Readable` streams effectively operate in one of two modes: flowing and
1020paused. These modes are separate from [object mode][object-mode].
1021A [`Readable`][] stream can be in object mode or not, regardless of whether
1022it is in flowing mode or paused mode.
1023
1024* In flowing mode, data is read from the underlying system automatically
1025  and provided to an application as quickly as possible using events via the
1026  [`EventEmitter`][] interface.
1027
1028* In paused mode, the [`stream.read()`][stream-read] method must be called
1029  explicitly to read chunks of data from the stream.
1030
1031All [`Readable`][] streams begin in paused mode but can be switched to flowing
1032mode in one of the following ways:
1033
1034* Adding a [`'data'`][] event handler.
1035* Calling the [`stream.resume()`][stream-resume] method.
1036* Calling the [`stream.pipe()`][] method to send the data to a [`Writable`][].
1037
1038The `Readable` can switch back to paused mode using one of the following:
1039
1040* If there are no pipe destinations, by calling the
1041  [`stream.pause()`][stream-pause] method.
1042* If there are pipe destinations, by removing all pipe destinations.
1043  Multiple pipe destinations may be removed by calling the
1044  [`stream.unpipe()`][] method.
1045
1046The important concept to remember is that a `Readable` will not generate data
1047until a mechanism for either consuming or ignoring that data is provided. If
1048the consuming mechanism is disabled or taken away, the `Readable` will _attempt_
1049to stop generating the data.
1050
1051For backward compatibility reasons, removing [`'data'`][] event handlers will
1052**not** automatically pause the stream. Also, if there are piped destinations,
1053then calling [`stream.pause()`][stream-pause] will not guarantee that the
1054stream will _remain_ paused once those destinations drain and ask for more data.
1055
1056If a [`Readable`][] is switched into flowing mode and there are no consumers
1057available to handle the data, that data will be lost. This can occur, for
1058instance, when the `readable.resume()` method is called without a listener
1059attached to the `'data'` event, or when a `'data'` event handler is removed
1060from the stream.
1061
1062Adding a [`'readable'`][] event handler automatically makes the stream
1063stop flowing, and the data has to be consumed via
1064[`readable.read()`][stream-read]. If the [`'readable'`][] event handler is
1065removed, then the stream will start flowing again if there is a
1066[`'data'`][] event handler.
1067
1068#### Three states
1069
1070The "two modes" of operation for a `Readable` stream are a simplified
1071abstraction for the more complicated internal state management that is happening
1072within the `Readable` stream implementation.
1073
1074Specifically, at any given point in time, every `Readable` is in one of three
1075possible states:
1076
1077* `readable.readableFlowing === null`
1078* `readable.readableFlowing === false`
1079* `readable.readableFlowing === true`
1080
1081When `readable.readableFlowing` is `null`, no mechanism for consuming the
1082stream's data is provided. Therefore, the stream will not generate data.
1083While in this state, attaching a listener for the `'data'` event, calling the
1084`readable.pipe()` method, or calling the `readable.resume()` method will switch
1085`readable.readableFlowing` to `true`, causing the `Readable` to begin actively
1086emitting events as data is generated.
1087
1088Calling `readable.pause()`, `readable.unpipe()`, or receiving backpressure
1089will cause the `readable.readableFlowing` to be set as `false`,
1090temporarily halting the flowing of events but _not_ halting the generation of
1091data. While in this state, attaching a listener for the `'data'` event
1092will not switch `readable.readableFlowing` to `true`.
1093
1094```js
1095const { PassThrough, Writable } = require('node:stream');
1096const pass = new PassThrough();
1097const writable = new Writable();
1098
1099pass.pipe(writable);
1100pass.unpipe(writable);
1101// readableFlowing is now false.
1102
1103pass.on('data', (chunk) => { console.log(chunk.toString()); });
1104// readableFlowing is still false.
1105pass.write('ok');  // Will not emit 'data'.
1106pass.resume();     // Must be called to make stream emit 'data'.
1107// readableFlowing is now true.
1108```
1109
1110While `readable.readableFlowing` is `false`, data may be accumulating
1111within the stream's internal buffer.
1112
1113#### Choose one API style
1114
1115The `Readable` stream API evolved across multiple Node.js versions and provides
1116multiple methods of consuming stream data. In general, developers should choose
1117_one_ of the methods of consuming data and _should never_ use multiple methods
1118to consume data from a single stream. Specifically, using a combination
1119of `on('data')`, `on('readable')`, `pipe()`, or async iterators could
1120lead to unintuitive behavior.
1121
1122#### Class: `stream.Readable`
1123
1124<!-- YAML
1125added: v0.9.4
1126-->
1127
1128<!--type=class-->
1129
1130##### Event: `'close'`
1131
1132<!-- YAML
1133added: v0.9.4
1134changes:
1135  - version: v10.0.0
1136    pr-url: https://github.com/nodejs/node/pull/18438
1137    description: Add `emitClose` option to specify if `'close'` is emitted on
1138                 destroy.
1139-->
1140
1141The `'close'` event is emitted when the stream and any of its underlying
1142resources (a file descriptor, for example) have been closed. The event indicates
1143that no more events will be emitted, and no further computation will occur.
1144
1145A [`Readable`][] stream will always emit the `'close'` event if it is
1146created with the `emitClose` option.
1147
1148##### Event: `'data'`
1149
1150<!-- YAML
1151added: v0.9.4
1152-->
1153
1154* `chunk` {Buffer|string|any} The chunk of data. For streams that are not
1155  operating in object mode, the chunk will be either a string or `Buffer`.
1156  For streams that are in object mode, the chunk can be any JavaScript value
1157  other than `null`.
1158
1159The `'data'` event is emitted whenever the stream is relinquishing ownership of
1160a chunk of data to a consumer. This may occur whenever the stream is switched
1161in flowing mode by calling `readable.pipe()`, `readable.resume()`, or by
1162attaching a listener callback to the `'data'` event. The `'data'` event will
1163also be emitted whenever the `readable.read()` method is called and a chunk of
1164data is available to be returned.
1165
1166Attaching a `'data'` event listener to a stream that has not been explicitly
1167paused will switch the stream into flowing mode. Data will then be passed as
1168soon as it is available.
1169
1170The listener callback will be passed the chunk of data as a string if a default
1171encoding has been specified for the stream using the
1172`readable.setEncoding()` method; otherwise the data will be passed as a
1173`Buffer`.
1174
1175```js
1176const readable = getReadableStreamSomehow();
1177readable.on('data', (chunk) => {
1178  console.log(`Received ${chunk.length} bytes of data.`);
1179});
1180```
1181
1182##### Event: `'end'`
1183
1184<!-- YAML
1185added: v0.9.4
1186-->
1187
1188The `'end'` event is emitted when there is no more data to be consumed from
1189the stream.
1190
1191The `'end'` event **will not be emitted** unless the data is completely
1192consumed. This can be accomplished by switching the stream into flowing mode,
1193or by calling [`stream.read()`][stream-read] repeatedly until all data has been
1194consumed.
1195
1196```js
1197const readable = getReadableStreamSomehow();
1198readable.on('data', (chunk) => {
1199  console.log(`Received ${chunk.length} bytes of data.`);
1200});
1201readable.on('end', () => {
1202  console.log('There will be no more data.');
1203});
1204```
1205
1206##### Event: `'error'`
1207
1208<!-- YAML
1209added: v0.9.4
1210-->
1211
1212* {Error}
1213
1214The `'error'` event may be emitted by a `Readable` implementation at any time.
1215Typically, this may occur if the underlying stream is unable to generate data
1216due to an underlying internal failure, or when a stream implementation attempts
1217to push an invalid chunk of data.
1218
1219The listener callback will be passed a single `Error` object.
1220
1221##### Event: `'pause'`
1222
1223<!-- YAML
1224added: v0.9.4
1225-->
1226
1227The `'pause'` event is emitted when [`stream.pause()`][stream-pause] is called
1228and `readableFlowing` is not `false`.
1229
1230##### Event: `'readable'`
1231
1232<!-- YAML
1233added: v0.9.4
1234changes:
1235  - version: v10.0.0
1236    pr-url: https://github.com/nodejs/node/pull/17979
1237    description: The `'readable'` is always emitted in the next tick after
1238                 `.push()` is called.
1239  - version: v10.0.0
1240    pr-url: https://github.com/nodejs/node/pull/18994
1241    description: Using `'readable'` requires calling `.read()`.
1242-->
1243
1244The `'readable'` event is emitted when there is data available to be read from
1245the stream or when the end of the stream has been reached. Effectively, the
1246`'readable'` event indicates that the stream has new information. If data is
1247available, [`stream.read()`][stream-read] will return that data.
1248
1249```js
1250const readable = getReadableStreamSomehow();
1251readable.on('readable', function() {
1252  // There is some data to read now.
1253  let data;
1254
1255  while ((data = this.read()) !== null) {
1256    console.log(data);
1257  }
1258});
1259```
1260
1261If the end of the stream has been reached, calling
1262[`stream.read()`][stream-read] will return `null` and trigger the `'end'`
1263event. This is also true if there never was any data to be read. For instance,
1264in the following example, `foo.txt` is an empty file:
1265
1266```js
1267const fs = require('node:fs');
1268const rr = fs.createReadStream('foo.txt');
1269rr.on('readable', () => {
1270  console.log(`readable: ${rr.read()}`);
1271});
1272rr.on('end', () => {
1273  console.log('end');
1274});
1275```
1276
1277The output of running this script is:
1278
1279```console
1280$ node test.js
1281readable: null
1282end
1283```
1284
1285In some cases, attaching a listener for the `'readable'` event will cause some
1286amount of data to be read into an internal buffer.
1287
1288In general, the `readable.pipe()` and `'data'` event mechanisms are easier to
1289understand than the `'readable'` event. However, handling `'readable'` might
1290result in increased throughput.
1291
1292If both `'readable'` and [`'data'`][] are used at the same time, `'readable'`
1293takes precedence in controlling the flow, i.e. `'data'` will be emitted
1294only when [`stream.read()`][stream-read] is called. The
1295`readableFlowing` property would become `false`.
1296If there are `'data'` listeners when `'readable'` is removed, the stream
1297will start flowing, i.e. `'data'` events will be emitted without calling
1298`.resume()`.
1299
1300##### Event: `'resume'`
1301
1302<!-- YAML
1303added: v0.9.4
1304-->
1305
1306The `'resume'` event is emitted when [`stream.resume()`][stream-resume] is
1307called and `readableFlowing` is not `true`.
1308
1309##### `readable.destroy([error])`
1310
1311<!-- YAML
1312added: v8.0.0
1313changes:
1314  - version: v14.0.0
1315    pr-url: https://github.com/nodejs/node/pull/29197
1316    description: Work as a no-op on a stream that has already been destroyed.
1317-->
1318
1319* `error` {Error} Error which will be passed as payload in `'error'` event
1320* Returns: {this}
1321
1322Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'`
1323event (unless `emitClose` is set to `false`). After this call, the readable
1324stream will release any internal resources and subsequent calls to `push()`
1325will be ignored.
1326
1327Once `destroy()` has been called any further calls will be a no-op and no
1328further errors except from `_destroy()` may be emitted as `'error'`.
1329
1330Implementors should not override this method, but instead implement
1331[`readable._destroy()`][readable-_destroy].
1332
1333##### `readable.closed`
1334
1335<!-- YAML
1336added: v18.0.0
1337-->
1338
1339* {boolean}
1340
1341Is `true` after `'close'` has been emitted.
1342
1343##### `readable.destroyed`
1344
1345<!-- YAML
1346added: v8.0.0
1347-->
1348
1349* {boolean}
1350
1351Is `true` after [`readable.destroy()`][readable-destroy] has been called.
1352
1353##### `readable.isPaused()`
1354
1355<!-- YAML
1356added: v0.11.14
1357-->
1358
1359* Returns: {boolean}
1360
1361The `readable.isPaused()` method returns the current operating state of the
1362`Readable`. This is used primarily by the mechanism that underlies the
1363`readable.pipe()` method. In most typical cases, there will be no reason to
1364use this method directly.
1365
1366```js
1367const readable = new stream.Readable();
1368
1369readable.isPaused(); // === false
1370readable.pause();
1371readable.isPaused(); // === true
1372readable.resume();
1373readable.isPaused(); // === false
1374```
1375
1376##### `readable.pause()`
1377
1378<!-- YAML
1379added: v0.9.4
1380-->
1381
1382* Returns: {this}
1383
1384The `readable.pause()` method will cause a stream in flowing mode to stop
1385emitting [`'data'`][] events, switching out of flowing mode. Any data that
1386becomes available will remain in the internal buffer.
1387
1388```js
1389const readable = getReadableStreamSomehow();
1390readable.on('data', (chunk) => {
1391  console.log(`Received ${chunk.length} bytes of data.`);
1392  readable.pause();
1393  console.log('There will be no additional data for 1 second.');
1394  setTimeout(() => {
1395    console.log('Now data will start flowing again.');
1396    readable.resume();
1397  }, 1000);
1398});
1399```
1400
1401The `readable.pause()` method has no effect if there is a `'readable'`
1402event listener.
1403
1404##### `readable.pipe(destination[, options])`
1405
1406<!-- YAML
1407added: v0.9.4
1408-->
1409
1410* `destination` {stream.Writable} The destination for writing data
1411* `options` {Object} Pipe options
1412  * `end` {boolean} End the writer when the reader ends. **Default:** `true`.
1413* Returns: {stream.Writable} The _destination_, allowing for a chain of pipes if
1414  it is a [`Duplex`][] or a [`Transform`][] stream
1415
1416The `readable.pipe()` method attaches a [`Writable`][] stream to the `readable`,
1417causing it to switch automatically into flowing mode and push all of its data
1418to the attached [`Writable`][]. The flow of data will be automatically managed
1419so that the destination `Writable` stream is not overwhelmed by a faster
1420`Readable` stream.
1421
1422The following example pipes all of the data from the `readable` into a file
1423named `file.txt`:
1424
1425```js
1426const fs = require('node:fs');
1427const readable = getReadableStreamSomehow();
1428const writable = fs.createWriteStream('file.txt');
1429// All the data from readable goes into 'file.txt'.
1430readable.pipe(writable);
1431```
1432
1433It is possible to attach multiple `Writable` streams to a single `Readable`
1434stream.
1435
1436The `readable.pipe()` method returns a reference to the _destination_ stream
1437making it possible to set up chains of piped streams:
1438
1439```js
1440const fs = require('node:fs');
1441const zlib = require('node:zlib');
1442const r = fs.createReadStream('file.txt');
1443const z = zlib.createGzip();
1444const w = fs.createWriteStream('file.txt.gz');
1445r.pipe(z).pipe(w);
1446```
1447
1448By default, [`stream.end()`][stream-end] is called on the destination `Writable`
1449stream when the source `Readable` stream emits [`'end'`][], so that the
1450destination is no longer writable. To disable this default behavior, the `end`
1451option can be passed as `false`, causing the destination stream to remain open:
1452
1453```js
1454reader.pipe(writer, { end: false });
1455reader.on('end', () => {
1456  writer.end('Goodbye\n');
1457});
1458```
1459
1460One important caveat is that if the `Readable` stream emits an error during
1461processing, the `Writable` destination _is not closed_ automatically. If an
1462error occurs, it will be necessary to _manually_ close each stream in order
1463to prevent memory leaks.
1464
1465The [`process.stderr`][] and [`process.stdout`][] `Writable` streams are never
1466closed until the Node.js process exits, regardless of the specified options.
1467
1468##### `readable.read([size])`
1469
1470<!-- YAML
1471added: v0.9.4
1472-->
1473
1474* `size` {number} Optional argument to specify how much data to read.
1475* Returns: {string|Buffer|null|any}
1476
1477The `readable.read()` method reads data out of the internal buffer and
1478returns it. If no data is available to be read, `null` is returned. By default,
1479the data is returned as a `Buffer` object unless an encoding has been
1480specified using the `readable.setEncoding()` method or the stream is operating
1481in object mode.
1482
1483The optional `size` argument specifies a specific number of bytes to read. If
1484`size` bytes are not available to be read, `null` will be returned _unless_
1485the stream has ended, in which case all of the data remaining in the internal
1486buffer will be returned.
1487
1488If the `size` argument is not specified, all of the data contained in the
1489internal buffer will be returned.
1490
1491The `size` argument must be less than or equal to 1 GiB.
1492
1493The `readable.read()` method should only be called on `Readable` streams
1494operating in paused mode. In flowing mode, `readable.read()` is called
1495automatically until the internal buffer is fully drained.
1496
1497```js
1498const readable = getReadableStreamSomehow();
1499
1500// 'readable' may be triggered multiple times as data is buffered in
1501readable.on('readable', () => {
1502  let chunk;
1503  console.log('Stream is readable (new data received in buffer)');
1504  // Use a loop to make sure we read all currently available data
1505  while (null !== (chunk = readable.read())) {
1506    console.log(`Read ${chunk.length} bytes of data...`);
1507  }
1508});
1509
1510// 'end' will be triggered once when there is no more data available
1511readable.on('end', () => {
1512  console.log('Reached end of stream.');
1513});
1514```
1515
1516Each call to `readable.read()` returns a chunk of data, or `null`. The chunks
1517are not concatenated. A `while` loop is necessary to consume all data
1518currently in the buffer. When reading a large file `.read()` may return `null`,
1519having consumed all buffered content so far, but there is still more data to
1520come not yet buffered. In this case a new `'readable'` event will be emitted
1521when there is more data in the buffer. Finally the `'end'` event will be
1522emitted when there is no more data to come.
1523
1524Therefore to read a file's whole contents from a `readable`, it is necessary
1525to collect chunks across multiple `'readable'` events:
1526
1527```js
1528const chunks = [];
1529
1530readable.on('readable', () => {
1531  let chunk;
1532  while (null !== (chunk = readable.read())) {
1533    chunks.push(chunk);
1534  }
1535});
1536
1537readable.on('end', () => {
1538  const content = chunks.join('');
1539});
1540```
1541
1542A `Readable` stream in object mode will always return a single item from
1543a call to [`readable.read(size)`][stream-read], regardless of the value of the
1544`size` argument.
1545
1546If the `readable.read()` method returns a chunk of data, a `'data'` event will
1547also be emitted.
1548
1549Calling [`stream.read([size])`][stream-read] after the [`'end'`][] event has
1550been emitted will return `null`. No runtime error will be raised.
1551
1552##### `readable.readable`
1553
1554<!-- YAML
1555added: v11.4.0
1556-->
1557
1558* {boolean}
1559
1560Is `true` if it is safe to call [`readable.read()`][stream-read], which means
1561the stream has not been destroyed or emitted `'error'` or `'end'`.
1562
1563##### `readable.readableAborted`
1564
1565<!-- YAML
1566added: v16.8.0
1567-->
1568
1569> Stability: 1 - Experimental
1570
1571* {boolean}
1572
1573Returns whether the stream was destroyed or errored before emitting `'end'`.
1574
1575##### `readable.readableDidRead`
1576
1577<!-- YAML
1578added:
1579  - v16.7.0
1580  - v14.18.0
1581-->
1582
1583> Stability: 1 - Experimental
1584
1585* {boolean}
1586
1587Returns whether `'data'` has been emitted.
1588
1589##### `readable.readableEncoding`
1590
1591<!-- YAML
1592added: v12.7.0
1593-->
1594
1595* {null|string}
1596
1597Getter for the property `encoding` of a given `Readable` stream. The `encoding`
1598property can be set using the [`readable.setEncoding()`][] method.
1599
1600##### `readable.readableEnded`
1601
1602<!-- YAML
1603added: v12.9.0
1604-->
1605
1606* {boolean}
1607
1608Becomes `true` when [`'end'`][] event is emitted.
1609
1610##### `readable.errored`
1611
1612<!-- YAML
1613added:
1614  v18.0.0
1615-->
1616
1617* {Error}
1618
1619Returns error if the stream has been destroyed with an error.
1620
1621##### `readable.readableFlowing`
1622
1623<!-- YAML
1624added: v9.4.0
1625-->
1626
1627* {boolean}
1628
1629This property reflects the current state of a `Readable` stream as described
1630in the [Three states][] section.
1631
1632##### `readable.readableHighWaterMark`
1633
1634<!-- YAML
1635added: v9.3.0
1636-->
1637
1638* {number}
1639
1640Returns the value of `highWaterMark` passed when creating this `Readable`.
1641
1642##### `readable.readableLength`
1643
1644<!-- YAML
1645added: v9.4.0
1646-->
1647
1648* {number}
1649
1650This property contains the number of bytes (or objects) in the queue
1651ready to be read. The value provides introspection data regarding
1652the status of the `highWaterMark`.
1653
1654##### `readable.readableObjectMode`
1655
1656<!-- YAML
1657added: v12.3.0
1658-->
1659
1660* {boolean}
1661
1662Getter for the property `objectMode` of a given `Readable` stream.
1663
1664##### `readable.resume()`
1665
1666<!-- YAML
1667added: v0.9.4
1668changes:
1669  - version: v10.0.0
1670    pr-url: https://github.com/nodejs/node/pull/18994
1671    description: The `resume()` has no effect if there is a `'readable'` event
1672                 listening.
1673-->
1674
1675* Returns: {this}
1676
1677The `readable.resume()` method causes an explicitly paused `Readable` stream to
1678resume emitting [`'data'`][] events, switching the stream into flowing mode.
1679
1680The `readable.resume()` method can be used to fully consume the data from a
1681stream without actually processing any of that data:
1682
1683```js
1684getReadableStreamSomehow()
1685  .resume()
1686  .on('end', () => {
1687    console.log('Reached the end, but did not read anything.');
1688  });
1689```
1690
1691The `readable.resume()` method has no effect if there is a `'readable'`
1692event listener.
1693
1694##### `readable.setEncoding(encoding)`
1695
1696<!-- YAML
1697added: v0.9.4
1698-->
1699
1700* `encoding` {string} The encoding to use.
1701* Returns: {this}
1702
1703The `readable.setEncoding()` method sets the character encoding for
1704data read from the `Readable` stream.
1705
1706By default, no encoding is assigned and stream data will be returned as
1707`Buffer` objects. Setting an encoding causes the stream data
1708to be returned as strings of the specified encoding rather than as `Buffer`
1709objects. For instance, calling `readable.setEncoding('utf8')` will cause the
1710output data to be interpreted as UTF-8 data, and passed as strings. Calling
1711`readable.setEncoding('hex')` will cause the data to be encoded in hexadecimal
1712string format.
1713
1714The `Readable` stream will properly handle multi-byte characters delivered
1715through the stream that would otherwise become improperly decoded if simply
1716pulled from the stream as `Buffer` objects.
1717
1718```js
1719const readable = getReadableStreamSomehow();
1720readable.setEncoding('utf8');
1721readable.on('data', (chunk) => {
1722  assert.equal(typeof chunk, 'string');
1723  console.log('Got %d characters of string data:', chunk.length);
1724});
1725```
1726
1727##### `readable.unpipe([destination])`
1728
1729<!-- YAML
1730added: v0.9.4
1731-->
1732
1733* `destination` {stream.Writable} Optional specific stream to unpipe
1734* Returns: {this}
1735
1736The `readable.unpipe()` method detaches a `Writable` stream previously attached
1737using the [`stream.pipe()`][] method.
1738
1739If the `destination` is not specified, then _all_ pipes are detached.
1740
1741If the `destination` is specified, but no pipe is set up for it, then
1742the method does nothing.
1743
1744```js
1745const fs = require('node:fs');
1746const readable = getReadableStreamSomehow();
1747const writable = fs.createWriteStream('file.txt');
1748// All the data from readable goes into 'file.txt',
1749// but only for the first second.
1750readable.pipe(writable);
1751setTimeout(() => {
1752  console.log('Stop writing to file.txt.');
1753  readable.unpipe(writable);
1754  console.log('Manually close the file stream.');
1755  writable.end();
1756}, 1000);
1757```
1758
1759##### `readable.unshift(chunk[, encoding])`
1760
1761<!-- YAML
1762added: v0.9.11
1763changes:
1764  - version: v8.0.0
1765    pr-url: https://github.com/nodejs/node/pull/11608
1766    description: The `chunk` argument can now be a `Uint8Array` instance.
1767-->
1768
1769* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to unshift onto the
1770  read queue. For streams not operating in object mode, `chunk` must be a
1771  string, `Buffer`, `Uint8Array`, or `null`. For object mode streams, `chunk`
1772  may be any JavaScript value.
1773* `encoding` {string} Encoding of string chunks. Must be a valid
1774  `Buffer` encoding, such as `'utf8'` or `'ascii'`.
1775
1776Passing `chunk` as `null` signals the end of the stream (EOF) and behaves the
1777same as `readable.push(null)`, after which no more data can be written. The EOF
1778signal is put at the end of the buffer and any buffered data will still be
1779flushed.
1780
1781The `readable.unshift()` method pushes a chunk of data back into the internal
1782buffer. This is useful in certain situations where a stream is being consumed by
1783code that needs to "un-consume" some amount of data that it has optimistically
1784pulled out of the source, so that the data can be passed on to some other party.
1785
1786The `stream.unshift(chunk)` method cannot be called after the [`'end'`][] event
1787has been emitted or a runtime error will be thrown.
1788
1789Developers using `stream.unshift()` often should consider switching to
1790use of a [`Transform`][] stream instead. See the [API for stream implementers][]
1791section for more information.
1792
1793```js
1794// Pull off a header delimited by \n\n.
1795// Use unshift() if we get too much.
1796// Call the callback with (error, header, stream).
1797const { StringDecoder } = require('node:string_decoder');
1798function parseHeader(stream, callback) {
1799  stream.on('error', callback);
1800  stream.on('readable', onReadable);
1801  const decoder = new StringDecoder('utf8');
1802  let header = '';
1803  function onReadable() {
1804    let chunk;
1805    while (null !== (chunk = stream.read())) {
1806      const str = decoder.write(chunk);
1807      if (str.includes('\n\n')) {
1808        // Found the header boundary.
1809        const split = str.split(/\n\n/);
1810        header += split.shift();
1811        const remaining = split.join('\n\n');
1812        const buf = Buffer.from(remaining, 'utf8');
1813        stream.removeListener('error', callback);
1814        // Remove the 'readable' listener before unshifting.
1815        stream.removeListener('readable', onReadable);
1816        if (buf.length)
1817          stream.unshift(buf);
1818        // Now the body of the message can be read from the stream.
1819        callback(null, header, stream);
1820        return;
1821      }
1822      // Still reading the header.
1823      header += str;
1824    }
1825  }
1826}
1827```
1828
1829Unlike [`stream.push(chunk)`][stream-push], `stream.unshift(chunk)` will not
1830end the reading process by resetting the internal reading state of the stream.
1831This can cause unexpected results if `readable.unshift()` is called during a
1832read (i.e. from within a [`stream._read()`][stream-_read] implementation on a
1833custom stream). Following the call to `readable.unshift()` with an immediate
1834[`stream.push('')`][stream-push] will reset the reading state appropriately,
1835however it is best to simply avoid calling `readable.unshift()` while in the
1836process of performing a read.
1837
1838##### `readable.wrap(stream)`
1839
1840<!-- YAML
1841added: v0.9.4
1842-->
1843
1844* `stream` {Stream} An "old style" readable stream
1845* Returns: {this}
1846
1847Prior to Node.js 0.10, streams did not implement the entire `node:stream`
1848module API as it is currently defined. (See [Compatibility][] for more
1849information.)
1850
1851When using an older Node.js library that emits [`'data'`][] events and has a
1852[`stream.pause()`][stream-pause] method that is advisory only, the
1853`readable.wrap()` method can be used to create a [`Readable`][] stream that uses
1854the old stream as its data source.
1855
1856It will rarely be necessary to use `readable.wrap()` but the method has been
1857provided as a convenience for interacting with older Node.js applications and
1858libraries.
1859
1860```js
1861const { OldReader } = require('./old-api-module.js');
1862const { Readable } = require('node:stream');
1863const oreader = new OldReader();
1864const myReader = new Readable().wrap(oreader);
1865
1866myReader.on('readable', () => {
1867  myReader.read(); // etc.
1868});
1869```
1870
1871##### `readable[Symbol.asyncIterator]()`
1872
1873<!-- YAML
1874added: v10.0.0
1875changes:
1876  - version: v11.14.0
1877    pr-url: https://github.com/nodejs/node/pull/26989
1878    description: Symbol.asyncIterator support is no longer experimental.
1879-->
1880
1881* Returns: {AsyncIterator} to fully consume the stream.
1882
1883```js
1884const fs = require('node:fs');
1885
1886async function print(readable) {
1887  readable.setEncoding('utf8');
1888  let data = '';
1889  for await (const chunk of readable) {
1890    data += chunk;
1891  }
1892  console.log(data);
1893}
1894
1895print(fs.createReadStream('file')).catch(console.error);
1896```
1897
1898If the loop terminates with a `break`, `return`, or a `throw`, the stream will
1899be destroyed. In other terms, iterating over a stream will consume the stream
1900fully. The stream will be read in chunks of size equal to the `highWaterMark`
1901option. In the code example above, data will be in a single chunk if the file
1902has less then 64 KiB of data because no `highWaterMark` option is provided to
1903[`fs.createReadStream()`][].
1904
1905##### `readable[Symbol.asyncDispose]()`
1906
1907<!-- YAML
1908added: v18.18.0
1909-->
1910
1911> Stability: 1 - Experimental
1912
1913Calls [`readable.destroy()`][readable-destroy] with an `AbortError` and returns
1914a promise that fulfills when the stream is finished.
1915
1916##### `readable.compose(stream[, options])`
1917
1918<!-- YAML
1919added: v18.13.0
1920-->
1921
1922> Stability: 1 - Experimental
1923
1924* `stream` {Stream|Iterable|AsyncIterable|Function}
1925* `options` {Object}
1926  * `signal` {AbortSignal} allows destroying the stream if the signal is
1927    aborted.
1928* Returns: {Duplex} a stream composed with the stream `stream`.
1929
1930```mjs
1931import { Readable } from 'node:stream';
1932
1933async function* splitToWords(source) {
1934  for await (const chunk of source) {
1935    const words = String(chunk).split(' ');
1936
1937    for (const word of words) {
1938      yield word;
1939    }
1940  }
1941}
1942
1943const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
1944const words = await wordsStream.toArray();
1945
1946console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator']
1947```
1948
1949See [`stream.compose`][] for more information.
1950
1951##### `readable.iterator([options])`
1952
1953<!-- YAML
1954added: v16.3.0
1955-->
1956
1957> Stability: 1 - Experimental
1958
1959* `options` {Object}
1960  * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the
1961    async iterator, or exiting a `for await...of` iteration using a `break`,
1962    `return`, or `throw` will not destroy the stream. **Default:** `true`.
1963* Returns: {AsyncIterator} to consume the stream.
1964
1965The iterator created by this method gives users the option to cancel the
1966destruction of the stream if the `for await...of` loop is exited by `return`,
1967`break`, or `throw`, or if the iterator should destroy the stream if the stream
1968emitted an error during iteration.
1969
1970```js
1971const { Readable } = require('node:stream');
1972
1973async function printIterator(readable) {
1974  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
1975    console.log(chunk); // 1
1976    break;
1977  }
1978
1979  console.log(readable.destroyed); // false
1980
1981  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
1982    console.log(chunk); // Will print 2 and then 3
1983  }
1984
1985  console.log(readable.destroyed); // True, stream was totally consumed
1986}
1987
1988async function printSymbolAsyncIterator(readable) {
1989  for await (const chunk of readable) {
1990    console.log(chunk); // 1
1991    break;
1992  }
1993
1994  console.log(readable.destroyed); // true
1995}
1996
1997async function showBoth() {
1998  await printIterator(Readable.from([1, 2, 3]));
1999  await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
2000}
2001
2002showBoth();
2003```
2004
2005##### `readable.map(fn[, options])`
2006
2007<!-- YAML
2008added:
2009  - v17.4.0
2010  - v16.14.0
2011changes:
2012  - version: v18.19.0
2013    pr-url: https://github.com/nodejs/node/pull/49249
2014    description: added `highWaterMark` in options.
2015-->
2016
2017> Stability: 1 - Experimental
2018
2019* `fn` {Function|AsyncFunction} a function to map over every chunk in the
2020  stream.
2021  * `data` {any} a chunk of data from the stream.
2022  * `options` {Object}
2023    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2024      abort the `fn` call early.
2025* `options` {Object}
2026  * `concurrency` {number} the maximum concurrent invocation of `fn` to call
2027    on the stream at once. **Default:** `1`.
2028  * `highWaterMark` {number} how many items to buffer while waiting for user
2029    consumption of the mapped items. **Default:** `concurrency * 2 - 1`.
2030  * `signal` {AbortSignal} allows destroying the stream if the signal is
2031    aborted.
2032* Returns: {Readable} a stream mapped with the function `fn`.
2033
2034This method allows mapping over the stream. The `fn` function will be called
2035for every chunk in the stream. If the `fn` function returns a promise - that
2036promise will be `await`ed before being passed to the result stream.
2037
2038```mjs
2039import { Readable } from 'node:stream';
2040import { Resolver } from 'node:dns/promises';
2041
2042// With a synchronous mapper.
2043for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
2044  console.log(chunk); // 2, 4, 6, 8
2045}
2046// With an asynchronous mapper, making at most 2 queries at a time.
2047const resolver = new Resolver();
2048const dnsResults = Readable.from([
2049  'nodejs.org',
2050  'openjsf.org',
2051  'www.linuxfoundation.org',
2052]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
2053for await (const result of dnsResults) {
2054  console.log(result); // Logs the DNS result of resolver.resolve4.
2055}
2056```
2057
2058##### `readable.filter(fn[, options])`
2059
2060<!-- YAML
2061added:
2062  - v17.4.0
2063  - v16.14.0
2064changes:
2065  - version: v18.19.0
2066    pr-url: https://github.com/nodejs/node/pull/49249
2067    description: added `highWaterMark` in options.
2068-->
2069
2070> Stability: 1 - Experimental
2071
2072* `fn` {Function|AsyncFunction} a function to filter chunks from the stream.
2073  * `data` {any} a chunk of data from the stream.
2074  * `options` {Object}
2075    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2076      abort the `fn` call early.
2077* `options` {Object}
2078  * `concurrency` {number} the maximum concurrent invocation of `fn` to call
2079    on the stream at once. **Default:** `1`.
2080  * `highWaterMark` {number} how many items to buffer while waiting for user
2081    consumption of the filtered items. **Default:** `concurrency * 2 - 1`.
2082  * `signal` {AbortSignal} allows destroying the stream if the signal is
2083    aborted.
2084* Returns: {Readable} a stream filtered with the predicate `fn`.
2085
2086This method allows filtering the stream. For each chunk in the stream the `fn`
2087function will be called and if it returns a truthy value, the chunk will be
2088passed to the result stream. If the `fn` function returns a promise - that
2089promise will be `await`ed.
2090
2091```mjs
2092import { Readable } from 'node:stream';
2093import { Resolver } from 'node:dns/promises';
2094
2095// With a synchronous predicate.
2096for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
2097  console.log(chunk); // 3, 4
2098}
2099// With an asynchronous predicate, making at most 2 queries at a time.
2100const resolver = new Resolver();
2101const dnsResults = Readable.from([
2102  'nodejs.org',
2103  'openjsf.org',
2104  'www.linuxfoundation.org',
2105]).filter(async (domain) => {
2106  const { address } = await resolver.resolve4(domain, { ttl: true });
2107  return address.ttl > 60;
2108}, { concurrency: 2 });
2109for await (const result of dnsResults) {
2110  // Logs domains with more than 60 seconds on the resolved dns record.
2111  console.log(result);
2112}
2113```
2114
2115##### `readable.forEach(fn[, options])`
2116
2117<!-- YAML
2118added: v17.5.0
2119-->
2120
2121> Stability: 1 - Experimental
2122
2123* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
2124  * `data` {any} a chunk of data from the stream.
2125  * `options` {Object}
2126    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2127      abort the `fn` call early.
2128* `options` {Object}
2129  * `concurrency` {number} the maximum concurrent invocation of `fn` to call
2130    on the stream at once. **Default:** `1`.
2131  * `signal` {AbortSignal} allows destroying the stream if the signal is
2132    aborted.
2133* Returns: {Promise} a promise for when the stream has finished.
2134
2135This method allows iterating a stream. For each chunk in the stream the
2136`fn` function will be called. If the `fn` function returns a promise - that
2137promise will be `await`ed.
2138
2139This method is different from `for await...of` loops in that it can optionally
2140process chunks concurrently. In addition, a `forEach` iteration can only be
2141stopped by having passed a `signal` option and aborting the related
2142`AbortController` while `for await...of` can be stopped with `break` or
2143`return`. In either case the stream will be destroyed.
2144
2145This method is different from listening to the [`'data'`][] event in that it
2146uses the [`readable`][] event in the underlying machinary and can limit the
2147number of concurrent `fn` calls.
2148
2149```mjs
2150import { Readable } from 'node:stream';
2151import { Resolver } from 'node:dns/promises';
2152
2153// With a synchronous predicate.
2154for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
2155  console.log(chunk); // 3, 4
2156}
2157// With an asynchronous predicate, making at most 2 queries at a time.
2158const resolver = new Resolver();
2159const dnsResults = Readable.from([
2160  'nodejs.org',
2161  'openjsf.org',
2162  'www.linuxfoundation.org',
2163]).map(async (domain) => {
2164  const { address } = await resolver.resolve4(domain, { ttl: true });
2165  return address;
2166}, { concurrency: 2 });
2167await dnsResults.forEach((result) => {
2168  // Logs result, similar to `for await (const result of dnsResults)`
2169  console.log(result);
2170});
2171console.log('done'); // Stream has finished
2172```
2173
2174##### `readable.toArray([options])`
2175
2176<!-- YAML
2177added: v17.5.0
2178-->
2179
2180> Stability: 1 - Experimental
2181
2182* `options` {Object}
2183  * `signal` {AbortSignal} allows cancelling the toArray operation if the
2184    signal is aborted.
2185* Returns: {Promise} a promise containing an array with the contents of the
2186  stream.
2187
2188This method allows easily obtaining the contents of a stream.
2189
2190As this method reads the entire stream into memory, it negates the benefits of
2191streams. It's intended for interoperability and convenience, not as the primary
2192way to consume streams.
2193
2194```mjs
2195import { Readable } from 'node:stream';
2196import { Resolver } from 'node:dns/promises';
2197
2198await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
2199
2200// Make dns queries concurrently using .map and collect
2201// the results into an array using toArray
2202const dnsResults = await Readable.from([
2203  'nodejs.org',
2204  'openjsf.org',
2205  'www.linuxfoundation.org',
2206]).map(async (domain) => {
2207  const { address } = await resolver.resolve4(domain, { ttl: true });
2208  return address;
2209}, { concurrency: 2 }).toArray();
2210```
2211
2212##### `readable.some(fn[, options])`
2213
2214<!-- YAML
2215added: v17.5.0
2216-->
2217
2218> Stability: 1 - Experimental
2219
2220* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
2221  * `data` {any} a chunk of data from the stream.
2222  * `options` {Object}
2223    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2224      abort the `fn` call early.
2225* `options` {Object}
2226  * `concurrency` {number} the maximum concurrent invocation of `fn` to call
2227    on the stream at once. **Default:** `1`.
2228  * `signal` {AbortSignal} allows destroying the stream if the signal is
2229    aborted.
2230* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
2231  value for at least one of the chunks.
2232
2233This method is similar to `Array.prototype.some` and calls `fn` on each chunk
2234in the stream until the awaited return value is `true` (or any truthy value).
2235Once an `fn` call on a chunk awaited return value is truthy, the stream is
2236destroyed and the promise is fulfilled with `true`. If none of the `fn`
2237calls on the chunks return a truthy value, the promise is fulfilled with
2238`false`.
2239
2240```mjs
2241import { Readable } from 'node:stream';
2242import { stat } from 'node:fs/promises';
2243
2244// With a synchronous predicate.
2245await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
2246await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
2247
2248// With an asynchronous predicate, making at most 2 file checks at a time.
2249const anyBigFile = await Readable.from([
2250  'file1',
2251  'file2',
2252  'file3',
2253]).some(async (fileName) => {
2254  const stats = await stat(fileName);
2255  return stats.size > 1024 * 1024;
2256}, { concurrency: 2 });
2257console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
2258console.log('done'); // Stream has finished
2259```
2260
2261##### `readable.find(fn[, options])`
2262
2263<!-- YAML
2264added: v17.5.0
2265-->
2266
2267> Stability: 1 - Experimental
2268
2269* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
2270  * `data` {any} a chunk of data from the stream.
2271  * `options` {Object}
2272    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2273      abort the `fn` call early.
2274* `options` {Object}
2275  * `concurrency` {number} the maximum concurrent invocation of `fn` to call
2276    on the stream at once. **Default:** `1`.
2277  * `signal` {AbortSignal} allows destroying the stream if the signal is
2278    aborted.
2279* Returns: {Promise} a promise evaluating to the first chunk for which `fn`
2280  evaluated with a truthy value, or `undefined` if no element was found.
2281
2282This method is similar to `Array.prototype.find` and calls `fn` on each chunk
2283in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's
2284awaited return value is truthy, the stream is destroyed and the promise is
2285fulfilled with value for which `fn` returned a truthy value. If all of the
2286`fn` calls on the chunks return a falsy value, the promise is fulfilled with
2287`undefined`.
2288
2289```mjs
2290import { Readable } from 'node:stream';
2291import { stat } from 'node:fs/promises';
2292
2293// With a synchronous predicate.
2294await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
2295await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
2296await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined
2297
2298// With an asynchronous predicate, making at most 2 file checks at a time.
2299const foundBigFile = await Readable.from([
2300  'file1',
2301  'file2',
2302  'file3',
2303]).find(async (fileName) => {
2304  const stats = await stat(fileName);
2305  return stats.size > 1024 * 1024;
2306}, { concurrency: 2 });
2307console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
2308console.log('done'); // Stream has finished
2309```
2310
2311##### `readable.every(fn[, options])`
2312
2313<!-- YAML
2314added: v17.5.0
2315-->
2316
2317> Stability: 1 - Experimental
2318
2319* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream.
2320  * `data` {any} a chunk of data from the stream.
2321  * `options` {Object}
2322    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2323      abort the `fn` call early.
2324* `options` {Object}
2325  * `concurrency` {number} the maximum concurrent invocation of `fn` to call
2326    on the stream at once. **Default:** `1`.
2327  * `signal` {AbortSignal} allows destroying the stream if the signal is
2328    aborted.
2329* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
2330  value for all of the chunks.
2331
2332This method is similar to `Array.prototype.every` and calls `fn` on each chunk
2333in the stream to check if all awaited return values are truthy value for `fn`.
2334Once an `fn` call on a chunk awaited return value is falsy, the stream is
2335destroyed and the promise is fulfilled with `false`. If all of the `fn` calls
2336on the chunks return a truthy value, the promise is fulfilled with `true`.
2337
2338```mjs
2339import { Readable } from 'node:stream';
2340import { stat } from 'node:fs/promises';
2341
2342// With a synchronous predicate.
2343await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
2344await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
2345
2346// With an asynchronous predicate, making at most 2 file checks at a time.
2347const allBigFiles = await Readable.from([
2348  'file1',
2349  'file2',
2350  'file3',
2351]).every(async (fileName) => {
2352  const stats = await stat(fileName);
2353  return stats.size > 1024 * 1024;
2354}, { concurrency: 2 });
2355// `true` if all files in the list are bigger than 1MiB
2356console.log(allBigFiles);
2357console.log('done'); // Stream has finished
2358```
2359
2360##### `readable.flatMap(fn[, options])`
2361
2362<!-- YAML
2363added: v17.5.0
2364-->
2365
2366> Stability: 1 - Experimental
2367
2368* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over
2369  every chunk in the stream.
2370  * `data` {any} a chunk of data from the stream.
2371  * `options` {Object}
2372    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2373      abort the `fn` call early.
2374* `options` {Object}
2375  * `concurrency` {number} the maximum concurrent invocation of `fn` to call
2376    on the stream at once. **Default:** `1`.
2377  * `signal` {AbortSignal} allows destroying the stream if the signal is
2378    aborted.
2379* Returns: {Readable} a stream flat-mapped with the function `fn`.
2380
2381This method returns a new stream by applying the given callback to each
2382chunk of the stream and then flattening the result.
2383
2384It is possible to return a stream or another iterable or async iterable from
2385`fn` and the result streams will be merged (flattened) into the returned
2386stream.
2387
2388```mjs
2389import { Readable } from 'node:stream';
2390import { createReadStream } from 'node:fs';
2391
2392// With a synchronous mapper.
2393for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
2394  console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
2395}
2396// With an asynchronous mapper, combine the contents of 4 files
2397const concatResult = Readable.from([
2398  './1.mjs',
2399  './2.mjs',
2400  './3.mjs',
2401  './4.mjs',
2402]).flatMap((fileName) => createReadStream(fileName));
2403for await (const result of concatResult) {
2404  // This will contain the contents (all chunks) of all 4 files
2405  console.log(result);
2406}
2407```
2408
2409##### `readable.drop(limit[, options])`
2410
2411<!-- YAML
2412added: v17.5.0
2413-->
2414
2415> Stability: 1 - Experimental
2416
2417* `limit` {number} the number of chunks to drop from the readable.
2418* `options` {Object}
2419  * `signal` {AbortSignal} allows destroying the stream if the signal is
2420    aborted.
2421* Returns: {Readable} a stream with `limit` chunks dropped.
2422
2423This method returns a new stream with the first `limit` chunks dropped.
2424
2425```mjs
2426import { Readable } from 'node:stream';
2427
2428await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4]
2429```
2430
2431##### `readable.take(limit[, options])`
2432
2433<!-- YAML
2434added: v17.5.0
2435-->
2436
2437> Stability: 1 - Experimental
2438
2439* `limit` {number} the number of chunks to take from the readable.
2440* `options` {Object}
2441  * `signal` {AbortSignal} allows destroying the stream if the signal is
2442    aborted.
2443* Returns: {Readable} a stream with `limit` chunks taken.
2444
2445This method returns a new stream with the first `limit` chunks.
2446
2447```mjs
2448import { Readable } from 'node:stream';
2449
2450await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
2451```
2452
2453##### `readable.asIndexedPairs([options])`
2454
2455<!-- YAML
2456added: v17.5.0
2457changes:
2458  - version: v18.17.0
2459    pr-url: https://github.com/nodejs/node/pull/48102
2460    description: Using the `asIndexedPairs` method emits a runtime warning that
2461                  it will be removed in a future version.
2462-->
2463
2464> Stability: 1 - Experimental
2465
2466* `options` {Object}
2467  * `signal` {AbortSignal} allows destroying the stream if the signal is
2468    aborted.
2469* Returns: {Readable} a stream of indexed pairs.
2470
2471This method returns a new stream with chunks of the underlying stream paired
2472with a counter in the form `[index, chunk]`. The first index value is 0 and it
2473increases by 1 for each chunk produced.
2474
2475```mjs
2476import { Readable } from 'node:stream';
2477
2478const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
2479console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
2480```
2481
2482##### `readable.reduce(fn[, initial[, options]])`
2483
2484<!-- YAML
2485added: v17.5.0
2486-->
2487
2488> Stability: 1 - Experimental
2489
2490* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
2491  in the stream.
2492  * `previous` {any} the value obtained from the last call to `fn` or the
2493    `initial` value if specified or the first chunk of the stream otherwise.
2494  * `data` {any} a chunk of data from the stream.
2495  * `options` {Object}
2496    * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2497      abort the `fn` call early.
2498* `initial` {any} the initial value to use in the reduction.
2499* `options` {Object}
2500  * `signal` {AbortSignal} allows destroying the stream if the signal is
2501    aborted.
2502* Returns: {Promise} a promise for the final value of the reduction.
2503
2504This method calls `fn` on each chunk of the stream in order, passing it the
2505result from the calculation on the previous element. It returns a promise for
2506the final value of the reduction.
2507
2508If no `initial` value is supplied the first chunk of the stream is used as the
2509initial value. If the stream is empty, the promise is rejected with a
2510`TypeError` with the `ERR_INVALID_ARGS` code property.
2511
2512```mjs
2513import { Readable } from 'node:stream';
2514import { readdir, stat } from 'node:fs/promises';
2515import { join } from 'node:path';
2516
2517const directoryPath = './src';
2518const filesInDir = await readdir(directoryPath);
2519
2520const folderSize = await Readable.from(filesInDir)
2521  .reduce(async (totalSize, file) => {
2522    const { size } = await stat(join(directoryPath, file));
2523    return totalSize + size;
2524  }, 0);
2525
2526console.log(folderSize);
2527```
2528
2529The reducer function iterates the stream element-by-element which means that
2530there is no `concurrency` parameter or parallelism. To perform a `reduce`
2531concurrently, you can extract the async function to [`readable.map`][] method.
2532
2533```mjs
2534import { Readable } from 'node:stream';
2535import { readdir, stat } from 'node:fs/promises';
2536import { join } from 'node:path';
2537
2538const directoryPath = './src';
2539const filesInDir = await readdir(directoryPath);
2540
2541const folderSize = await Readable.from(filesInDir)
2542  .map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
2543  .reduce((totalSize, { size }) => totalSize + size, 0);
2544
2545console.log(folderSize);
2546```
2547
2548### Duplex and transform streams
2549
2550#### Class: `stream.Duplex`
2551
2552<!-- YAML
2553added: v0.9.4
2554changes:
2555  - version: v6.8.0
2556    pr-url: https://github.com/nodejs/node/pull/8834
2557    description: Instances of `Duplex` now return `true` when
2558                 checking `instanceof stream.Writable`.
2559-->
2560
2561<!--type=class-->
2562
2563Duplex streams are streams that implement both the [`Readable`][] and
2564[`Writable`][] interfaces.
2565
2566Examples of `Duplex` streams include:
2567
2568* [TCP sockets][]
2569* [zlib streams][zlib]
2570* [crypto streams][crypto]
2571
2572##### `duplex.allowHalfOpen`
2573
2574<!-- YAML
2575added: v0.9.4
2576-->
2577
2578* {boolean}
2579
2580If `false` then the stream will automatically end the writable side when the
2581readable side ends. Set initially by the `allowHalfOpen` constructor option,
2582which defaults to `true`.
2583
2584This can be changed manually to change the half-open behavior of an existing
2585`Duplex` stream instance, but must be changed before the `'end'` event is
2586emitted.
2587
2588#### Class: `stream.Transform`
2589
2590<!-- YAML
2591added: v0.9.4
2592-->
2593
2594<!--type=class-->
2595
2596Transform streams are [`Duplex`][] streams where the output is in some way
2597related to the input. Like all [`Duplex`][] streams, `Transform` streams
2598implement both the [`Readable`][] and [`Writable`][] interfaces.
2599
2600Examples of `Transform` streams include:
2601
2602* [zlib streams][zlib]
2603* [crypto streams][crypto]
2604
2605##### `transform.destroy([error])`
2606
2607<!-- YAML
2608added: v8.0.0
2609changes:
2610  - version: v14.0.0
2611    pr-url: https://github.com/nodejs/node/pull/29197
2612    description: Work as a no-op on a stream that has already been destroyed.
2613-->
2614
2615* `error` {Error}
2616* Returns: {this}
2617
2618Destroy the stream, and optionally emit an `'error'` event. After this call, the
2619transform stream would release any internal resources.
2620Implementors should not override this method, but instead implement
2621[`readable._destroy()`][readable-_destroy].
2622The default implementation of `_destroy()` for `Transform` also emit `'close'`
2623unless `emitClose` is set in false.
2624
2625Once `destroy()` has been called, any further calls will be a no-op and no
2626further errors except from `_destroy()` may be emitted as `'error'`.
2627
2628### `stream.finished(stream[, options], callback)`
2629
2630<!-- YAML
2631added: v10.0.0
2632changes:
2633  - version: v18.14.0
2634    pr-url: https://github.com/nodejs/node/pull/46205
2635    description: Added support for `ReadableStream` and `WritableStream`.
2636  - version: v15.11.0
2637    pr-url: https://github.com/nodejs/node/pull/37354
2638    description: The `signal` option was added.
2639  - version: v14.0.0
2640    pr-url: https://github.com/nodejs/node/pull/32158
2641    description: The `finished(stream, cb)` will wait for the `'close'` event
2642                 before invoking the callback. The implementation tries to
2643                 detect legacy streams and only apply this behavior to streams
2644                 which are expected to emit `'close'`.
2645  - version: v14.0.0
2646    pr-url: https://github.com/nodejs/node/pull/31545
2647    description: Emitting `'close'` before `'end'` on a `Readable` stream
2648                 will cause an `ERR_STREAM_PREMATURE_CLOSE` error.
2649  - version: v14.0.0
2650    pr-url: https://github.com/nodejs/node/pull/31509
2651    description: Callback will be invoked on streams which have already
2652                 finished before the call to `finished(stream, cb)`.
2653-->
2654
2655* `stream` {Stream|ReadableStream|WritableStream}
2656
2657A readable and/or writable stream/webstream.
2658
2659* `options` {Object}
2660  * `error` {boolean} If set to `false`, then a call to `emit('error', err)` is
2661    not treated as finished. **Default:** `true`.
2662  * `readable` {boolean} When set to `false`, the callback will be called when
2663    the stream ends even though the stream might still be readable.
2664    **Default:** `true`.
2665  * `writable` {boolean} When set to `false`, the callback will be called when
2666    the stream ends even though the stream might still be writable.
2667    **Default:** `true`.
2668  * `signal` {AbortSignal} allows aborting the wait for the stream finish. The
2669    underlying stream will _not_ be aborted if the signal is aborted. The
2670    callback will get called with an `AbortError`. All registered
2671    listeners added by this function will also be removed.
2672  * `cleanup` {boolean} remove all registered stream listeners.
2673    **Default:** `false`.
2674
2675* `callback` {Function} A callback function that takes an optional error
2676  argument.
2677
2678* Returns: {Function} A cleanup function which removes all registered
2679  listeners.
2680
2681A function to get notified when a stream is no longer readable, writable
2682or has experienced an error or a premature close event.
2683
2684```js
2685const { finished } = require('node:stream');
2686const fs = require('node:fs');
2687
2688const rs = fs.createReadStream('archive.tar');
2689
2690finished(rs, (err) => {
2691  if (err) {
2692    console.error('Stream failed.', err);
2693  } else {
2694    console.log('Stream is done reading.');
2695  }
2696});
2697
2698rs.resume(); // Drain the stream.
2699```
2700
2701Especially useful in error handling scenarios where a stream is destroyed
2702prematurely (like an aborted HTTP request), and will not emit `'end'`
2703or `'finish'`.
2704
2705The `finished` API provides [promise version][stream-finished-promise].
2706
2707`stream.finished()` leaves dangling event listeners (in particular
2708`'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been
2709invoked. The reason for this is so that unexpected `'error'` events (due to
2710incorrect stream implementations) do not cause unexpected crashes.
2711If this is unwanted behavior then the returned cleanup function needs to be
2712invoked in the callback:
2713
2714```js
2715const cleanup = finished(rs, (err) => {
2716  cleanup();
2717  // ...
2718});
2719```
2720
2721### `stream.pipeline(source[, ...transforms], destination, callback)`
2722
2723### `stream.pipeline(streams, callback)`
2724
2725<!-- YAML
2726added: v10.0.0
2727changes:
2728  - version: v18.16.0
2729    pr-url: https://github.com/nodejs/node/pull/46307
2730    description: Added support for webstreams.
2731  - version: v18.0.0
2732    pr-url: https://github.com/nodejs/node/pull/41678
2733    description: Passing an invalid callback to the `callback` argument
2734                 now throws `ERR_INVALID_ARG_TYPE` instead of
2735                 `ERR_INVALID_CALLBACK`.
2736  - version: v14.0.0
2737    pr-url: https://github.com/nodejs/node/pull/32158
2738    description: The `pipeline(..., cb)` will wait for the `'close'` event
2739                 before invoking the callback. The implementation tries to
2740                 detect legacy streams and only apply this behavior to streams
2741                 which are expected to emit `'close'`.
2742  - version: v13.10.0
2743    pr-url: https://github.com/nodejs/node/pull/31223
2744    description: Add support for async generators.
2745-->
2746
2747* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
2748  ReadableStream\[]|WritableStream\[]|TransformStream\[]}
2749* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream}
2750  * Returns: {Iterable|AsyncIterable}
2751* `...transforms` {Stream|Function|TransformStream}
2752  * `source` {AsyncIterable}
2753  * Returns: {AsyncIterable}
2754* `destination` {Stream|Function|WritableStream}
2755  * `source` {AsyncIterable}
2756  * Returns: {AsyncIterable|Promise}
2757* `callback` {Function} Called when the pipeline is fully done.
2758  * `err` {Error}
2759  * `val` Resolved value of `Promise` returned by `destination`.
2760* Returns: {Stream}
2761
2762A module method to pipe between streams and generators forwarding errors and
2763properly cleaning up and provide a callback when the pipeline is complete.
2764
2765```js
2766const { pipeline } = require('node:stream');
2767const fs = require('node:fs');
2768const zlib = require('node:zlib');
2769
2770// Use the pipeline API to easily pipe a series of streams
2771// together and get notified when the pipeline is fully done.
2772
2773// A pipeline to gzip a potentially huge tar file efficiently:
2774
2775pipeline(
2776  fs.createReadStream('archive.tar'),
2777  zlib.createGzip(),
2778  fs.createWriteStream('archive.tar.gz'),
2779  (err) => {
2780    if (err) {
2781      console.error('Pipeline failed.', err);
2782    } else {
2783      console.log('Pipeline succeeded.');
2784    }
2785  },
2786);
2787```
2788
2789The `pipeline` API provides a [promise version][stream-pipeline-promise].
2790
2791`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
2792
2793* `Readable` streams which have emitted `'end'` or `'close'`.
2794* `Writable` streams which have emitted `'finish'` or `'close'`.
2795
2796`stream.pipeline()` leaves dangling event listeners on the streams
2797after the `callback` has been invoked. In the case of reuse of streams after
2798failure, this can cause event listener leaks and swallowed errors. If the last
2799stream is readable, dangling event listeners will be removed so that the last
2800stream can be consumed later.
2801
2802`stream.pipeline()` closes all the streams when an error is raised.
2803The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior
2804once it would destroy the socket without sending the expected response.
2805See the example below:
2806
2807```js
2808const fs = require('node:fs');
2809const http = require('node:http');
2810const { pipeline } = require('node:stream');
2811
2812const server = http.createServer((req, res) => {
2813  const fileStream = fs.createReadStream('./fileNotExist.txt');
2814  pipeline(fileStream, res, (err) => {
2815    if (err) {
2816      console.log(err); // No such file
2817      // this message can't be sent once `pipeline` already destroyed the socket
2818      return res.end('error!!!');
2819    }
2820  });
2821});
2822```
2823
2824### `stream.compose(...streams)`
2825
2826<!-- YAML
2827added: v16.9.0
2828changes:
2829  - version: v18.16.0
2830    pr-url: https://github.com/nodejs/node/pull/46675
2831    description: Added support for webstreams.
2832-->
2833
2834> Stability: 1 - `stream.compose` is experimental.
2835
2836* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
2837  ReadableStream\[]|WritableStream\[]|TransformStream\[]}
2838* Returns: {stream.Duplex}
2839
2840Combines two or more streams into a `Duplex` stream that writes to the
2841first stream and reads from the last. Each provided stream is piped into
2842the next, using `stream.pipeline`. If any of the streams error then all
2843are destroyed, including the outer `Duplex` stream.
2844
2845Because `stream.compose` returns a new stream that in turn can (and
2846should) be piped into other streams, it enables composition. In contrast,
2847when passing streams to `stream.pipeline`, typically the first stream is
2848a readable stream and the last a writable stream, forming a closed
2849circuit.
2850
2851If passed a `Function` it must be a factory method taking a `source`
2852`Iterable`.
2853
2854```mjs
2855import { compose, Transform } from 'node:stream';
2856
2857const removeSpaces = new Transform({
2858  transform(chunk, encoding, callback) {
2859    callback(null, String(chunk).replace(' ', ''));
2860  },
2861});
2862
2863async function* toUpper(source) {
2864  for await (const chunk of source) {
2865    yield String(chunk).toUpperCase();
2866  }
2867}
2868
2869let res = '';
2870for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
2871  res += buf;
2872}
2873
2874console.log(res); // prints 'HELLOWORLD'
2875```
2876
2877`stream.compose` can be used to convert async iterables, generators and
2878functions into streams.
2879
2880* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
2881  `null`.
2882* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`.
2883  Must take a source `AsyncIterable` as first parameter. Cannot yield
2884  `null`.
2885* `AsyncFunction` converts into a writable `Duplex`. Must return
2886  either `null` or `undefined`.
2887
2888```mjs
2889import { compose } from 'node:stream';
2890import { finished } from 'node:stream/promises';
2891
2892// Convert AsyncIterable into readable Duplex.
2893const s1 = compose(async function*() {
2894  yield 'Hello';
2895  yield 'World';
2896}());
2897
2898// Convert AsyncGenerator into transform Duplex.
2899const s2 = compose(async function*(source) {
2900  for await (const chunk of source) {
2901    yield String(chunk).toUpperCase();
2902  }
2903});
2904
2905let res = '';
2906
2907// Convert AsyncFunction into writable Duplex.
2908const s3 = compose(async function(source) {
2909  for await (const chunk of source) {
2910    res += chunk;
2911  }
2912});
2913
2914await finished(compose(s1, s2, s3));
2915
2916console.log(res); // prints 'HELLOWORLD'
2917```
2918
2919See [`readable.compose(stream)`][] for `stream.compose` as operator.
2920
2921### `stream.Readable.from(iterable[, options])`
2922
2923<!-- YAML
2924added:
2925  - v12.3.0
2926  - v10.17.0
2927-->
2928
2929* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
2930  `Symbol.iterator` iterable protocol. Emits an 'error' event if a null
2931  value is passed.
2932* `options` {Object} Options provided to `new stream.Readable([options])`.
2933  By default, `Readable.from()` will set `options.objectMode` to `true`, unless
2934  this is explicitly opted out by setting `options.objectMode` to `false`.
2935* Returns: {stream.Readable}
2936
2937A utility method for creating readable streams out of iterators.
2938
2939```js
2940const { Readable } = require('node:stream');
2941
2942async function * generate() {
2943  yield 'hello';
2944  yield 'streams';
2945}
2946
2947const readable = Readable.from(generate());
2948
2949readable.on('data', (chunk) => {
2950  console.log(chunk);
2951});
2952```
2953
2954Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
2955the strings or buffers be iterated to match the other streams semantics
2956for performance reasons.
2957
2958If an `Iterable` object containing promises is passed as an argument,
2959it might result in unhandled rejection.
2960
2961```js
2962const { Readable } = require('node:stream');
2963
2964Readable.from([
2965  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
2966  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
2967]);
2968```
2969
2970### `stream.Readable.fromWeb(readableStream[, options])`
2971
2972<!-- YAML
2973added: v17.0.0
2974-->
2975
2976> Stability: 1 - Experimental
2977
2978* `readableStream` {ReadableStream}
2979* `options` {Object}
2980  * `encoding` {string}
2981  * `highWaterMark` {number}
2982  * `objectMode` {boolean}
2983  * `signal` {AbortSignal}
2984* Returns: {stream.Readable}
2985
2986### `stream.Readable.isDisturbed(stream)`
2987
2988<!-- YAML
2989added: v16.8.0
2990-->
2991
2992> Stability: 1 - Experimental
2993
2994* `stream` {stream.Readable|ReadableStream}
2995* Returns: `boolean`
2996
2997Returns whether the stream has been read from or cancelled.
2998
2999### `stream.isErrored(stream)`
3000
3001<!-- YAML
3002added:
3003  - v17.3.0
3004  - v16.14.0
3005-->
3006
3007> Stability: 1 - Experimental
3008
3009* `stream` {Readable|Writable|Duplex|WritableStream|ReadableStream}
3010* Returns: {boolean}
3011
3012Returns whether the stream has encountered an error.
3013
3014### `stream.isReadable(stream)`
3015
3016<!-- YAML
3017added:
3018  - v17.4.0
3019  - v16.14.0
3020-->
3021
3022> Stability: 1 - Experimental
3023
3024* `stream` {Readable|Duplex|ReadableStream}
3025* Returns: {boolean}
3026
3027Returns whether the stream is readable.
3028
3029### `stream.Readable.toWeb(streamReadable[, options])`
3030
3031<!-- YAML
3032added: v17.0.0
3033-->
3034
3035> Stability: 1 - Experimental
3036
3037* `streamReadable` {stream.Readable}
3038* `options` {Object}
3039  * `strategy` {Object}
3040    * `highWaterMark` {number} The maximum internal queue size (of the created
3041      `ReadableStream`) before backpressure is applied in reading from the given
3042      `stream.Readable`. If no value is provided, it will be taken from the
3043      given `stream.Readable`.
3044    * `size` {Function} A function that size of the given chunk of data.
3045      If no value is provided, the size will be `1` for all the chunks.
3046      * `chunk` {any}
3047      * Returns: {number}
3048* Returns: {ReadableStream}
3049
3050### `stream.Writable.fromWeb(writableStream[, options])`
3051
3052<!-- YAML
3053added: v17.0.0
3054-->
3055
3056> Stability: 1 - Experimental
3057
3058* `writableStream` {WritableStream}
3059* `options` {Object}
3060  * `decodeStrings` {boolean}
3061  * `highWaterMark` {number}
3062  * `objectMode` {boolean}
3063  * `signal` {AbortSignal}
3064* Returns: {stream.Writable}
3065
3066### `stream.Writable.toWeb(streamWritable)`
3067
3068<!-- YAML
3069added: v17.0.0
3070-->
3071
3072> Stability: 1 - Experimental
3073
3074* `streamWritable` {stream.Writable}
3075* Returns: {WritableStream}
3076
3077### `stream.Duplex.from(src)`
3078
3079<!-- YAML
3080added: v16.8.0
3081changes:
3082  - version: v18.17.0
3083    pr-url: https://github.com/nodejs/node/pull/46190
3084    description: The `src` argument can now be a `ReadableStream` or
3085                 `WritableStream`.
3086-->
3087
3088* `src` {Stream|Blob|ArrayBuffer|string|Iterable|AsyncIterable|
3089  AsyncGeneratorFunction|AsyncFunction|Promise|Object|
3090  ReadableStream|WritableStream}
3091
3092A utility method for creating duplex streams.
3093
3094* `Stream` converts writable stream into writable `Duplex` and readable stream
3095  to `Duplex`.
3096* `Blob` converts into readable `Duplex`.
3097* `string` converts into readable `Duplex`.
3098* `ArrayBuffer` converts into readable `Duplex`.
3099* `AsyncIterable` converts into a readable `Duplex`. Cannot yield
3100  `null`.
3101* `AsyncGeneratorFunction` converts into a readable/writable transform
3102  `Duplex`. Must take a source `AsyncIterable` as first parameter. Cannot yield
3103  `null`.
3104* `AsyncFunction` converts into a writable `Duplex`. Must return
3105  either `null` or `undefined`
3106* `Object ({ writable, readable })` converts `readable` and
3107  `writable` into `Stream` and then combines them into `Duplex` where the
3108  `Duplex` will write to the `writable` and read from the `readable`.
3109* `Promise` converts into readable `Duplex`. Value `null` is ignored.
3110* `ReadableStream` converts into readable `Duplex`.
3111* `WritableStream` converts into writable `Duplex`.
3112* Returns: {stream.Duplex}
3113
3114If an `Iterable` object containing promises is passed as an argument,
3115it might result in unhandled rejection.
3116
3117```js
3118const { Duplex } = require('node:stream');
3119
3120Duplex.from([
3121  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
3122  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
3123]);
3124```
3125
3126### `stream.Duplex.fromWeb(pair[, options])`
3127
3128<!-- YAML
3129added: v17.0.0
3130-->
3131
3132> Stability: 1 - Experimental
3133
3134* `pair` {Object}
3135  * `readable` {ReadableStream}
3136  * `writable` {WritableStream}
3137* `options` {Object}
3138  * `allowHalfOpen` {boolean}
3139  * `decodeStrings` {boolean}
3140  * `encoding` {string}
3141  * `highWaterMark` {number}
3142  * `objectMode` {boolean}
3143  * `signal` {AbortSignal}
3144* Returns: {stream.Duplex}
3145
3146```mjs
3147import { Duplex } from 'node:stream';
3148import {
3149  ReadableStream,
3150  WritableStream,
3151} from 'node:stream/web';
3152
3153const readable = new ReadableStream({
3154  start(controller) {
3155    controller.enqueue('world');
3156  },
3157});
3158
3159const writable = new WritableStream({
3160  write(chunk) {
3161    console.log('writable', chunk);
3162  },
3163});
3164
3165const pair = {
3166  readable,
3167  writable,
3168};
3169const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
3170
3171duplex.write('hello');
3172
3173for await (const chunk of duplex) {
3174  console.log('readable', chunk);
3175}
3176```
3177
3178```cjs
3179const { Duplex } = require('node:stream');
3180const {
3181  ReadableStream,
3182  WritableStream,
3183} = require('node:stream/web');
3184
3185const readable = new ReadableStream({
3186  start(controller) {
3187    controller.enqueue('world');
3188  },
3189});
3190
3191const writable = new WritableStream({
3192  write(chunk) {
3193    console.log('writable', chunk);
3194  },
3195});
3196
3197const pair = {
3198  readable,
3199  writable,
3200};
3201const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });
3202
3203duplex.write('hello');
3204duplex.once('readable', () => console.log('readable', duplex.read()));
3205```
3206
3207### `stream.Duplex.toWeb(streamDuplex)`
3208
3209<!-- YAML
3210added: v17.0.0
3211-->
3212
3213> Stability: 1 - Experimental
3214
3215* `streamDuplex` {stream.Duplex}
3216* Returns: {Object}
3217  * `readable` {ReadableStream}
3218  * `writable` {WritableStream}
3219
3220```mjs
3221import { Duplex } from 'node:stream';
3222
3223const duplex = Duplex({
3224  objectMode: true,
3225  read() {
3226    this.push('world');
3227    this.push(null);
3228  },
3229  write(chunk, encoding, callback) {
3230    console.log('writable', chunk);
3231    callback();
3232  },
3233});
3234
3235const { readable, writable } = Duplex.toWeb(duplex);
3236writable.getWriter().write('hello');
3237
3238const { value } = await readable.getReader().read();
3239console.log('readable', value);
3240```
3241
3242```cjs
3243const { Duplex } = require('node:stream');
3244
3245const duplex = Duplex({
3246  objectMode: true,
3247  read() {
3248    this.push('world');
3249    this.push(null);
3250  },
3251  write(chunk, encoding, callback) {
3252    console.log('writable', chunk);
3253    callback();
3254  },
3255});
3256
3257const { readable, writable } = Duplex.toWeb(duplex);
3258writable.getWriter().write('hello');
3259
3260readable.getReader().read().then((result) => {
3261  console.log('readable', result.value);
3262});
3263```
3264
3265### `stream.addAbortSignal(signal, stream)`
3266
3267<!-- YAML
3268added: v15.4.0
3269changes:
3270  - version: v18.16.0
3271    pr-url: https://github.com/nodejs/node/pull/46273
3272    description: Added support for `ReadableStream` and
3273                 `WritableStream`.
3274-->
3275
3276* `signal` {AbortSignal} A signal representing possible cancellation
3277* `stream` {Stream|ReadableStream|WritableStream}
3278
3279A stream to attach a signal to.
3280
3281Attaches an AbortSignal to a readable or writeable stream. This lets code
3282control stream destruction using an `AbortController`.
3283
3284Calling `abort` on the `AbortController` corresponding to the passed
3285`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
3286on the stream, and `controller.error(new AbortError())` for webstreams.
3287
3288```js
3289const fs = require('node:fs');
3290
3291const controller = new AbortController();
3292const read = addAbortSignal(
3293  controller.signal,
3294  fs.createReadStream(('object.json')),
3295);
3296// Later, abort the operation closing the stream
3297controller.abort();
3298```
3299
3300Or using an `AbortSignal` with a readable stream as an async iterable:
3301
3302```js
3303const controller = new AbortController();
3304setTimeout(() => controller.abort(), 10_000); // set a timeout
3305const stream = addAbortSignal(
3306  controller.signal,
3307  fs.createReadStream(('object.json')),
3308);
3309(async () => {
3310  try {
3311    for await (const chunk of stream) {
3312      await process(chunk);
3313    }
3314  } catch (e) {
3315    if (e.name === 'AbortError') {
3316      // The operation was cancelled
3317    } else {
3318      throw e;
3319    }
3320  }
3321})();
3322```
3323
3324Or using an `AbortSignal` with a ReadableStream:
3325
3326```js
3327const controller = new AbortController();
3328const rs = new ReadableStream({
3329  start(controller) {
3330    controller.enqueue('hello');
3331    controller.enqueue('world');
3332    controller.close();
3333  },
3334});
3335
3336addAbortSignal(controller.signal, rs);
3337
3338finished(rs, (err) => {
3339  if (err) {
3340    if (err.name === 'AbortError') {
3341      // The operation was cancelled
3342    }
3343  }
3344});
3345
3346const reader = rs.getReader();
3347
3348reader.read().then(({ value, done }) => {
3349  console.log(value); // hello
3350  console.log(done); // false
3351  controller.abort();
3352});
3353```
3354
3355### `stream.getDefaultHighWaterMark(objectMode)`
3356
3357<!-- YAML
3358added: v18.17.0
3359-->
3360
3361* `objectMode` {boolean}
3362* Returns: {integer}
3363
3364Returns the default highWaterMark used by streams.
3365Defaults to `16384` (16 KiB), or `16` for `objectMode`.
3366
3367### `stream.setDefaultHighWaterMark(objectMode, value)`
3368
3369<!-- YAML
3370added: v18.17.0
3371-->
3372
3373* `objectMode` {boolean}
3374* `value` {integer} highWaterMark value
3375
3376Sets the default highWaterMark used by streams.
3377
3378## API for stream implementers
3379
3380<!--type=misc-->
3381
3382The `node:stream` module API has been designed to make it possible to easily
3383implement streams using JavaScript's prototypal inheritance model.
3384
3385First, a stream developer would declare a new JavaScript class that extends one
3386of the four basic stream classes (`stream.Writable`, `stream.Readable`,
3387`stream.Duplex`, or `stream.Transform`), making sure they call the appropriate
3388parent class constructor:
3389
3390<!-- eslint-disable no-useless-constructor -->
3391
3392```js
3393const { Writable } = require('node:stream');
3394
3395class MyWritable extends Writable {
3396  constructor({ highWaterMark, ...options }) {
3397    super({ highWaterMark });
3398    // ...
3399  }
3400}
3401```
3402
3403When extending streams, keep in mind what options the user
3404can and should provide before forwarding these to the base constructor. For
3405example, if the implementation makes assumptions in regard to the
3406`autoDestroy` and `emitClose` options, do not allow the
3407user to override these. Be explicit about what
3408options are forwarded instead of implicitly forwarding all options.
3409
3410The new stream class must then implement one or more specific methods, depending
3411on the type of stream being created, as detailed in the chart below:
3412
3413| Use-case                                      | Class           | Method(s) to implement                                                                                             |
3414| --------------------------------------------- | --------------- | ------------------------------------------------------------------------------------------------------------------ |
3415| Reading only                                  | [`Readable`][]  | [`_read()`][stream-_read]                                                                                          |
3416| Writing only                                  | [`Writable`][]  | [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final]                            |
3417| Reading and writing                           | [`Duplex`][]    | [`_read()`][stream-_read], [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] |
3418| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][stream-_transform], [`_flush()`][stream-_flush], [`_final()`][stream-_final]                      |
3419
3420The implementation code for a stream should _never_ call the "public" methods
3421of a stream that are intended for use by consumers (as described in the
3422[API for stream consumers][] section). Doing so may lead to adverse side effects
3423in application code consuming the stream.
3424
3425Avoid overriding public methods such as `write()`, `end()`, `cork()`,
3426`uncork()`, `read()` and `destroy()`, or emitting internal events such
3427as `'error'`, `'data'`, `'end'`, `'finish'` and `'close'` through `.emit()`.
3428Doing so can break current and future stream invariants leading to behavior
3429and/or compatibility issues with other streams, stream utilities, and user
3430expectations.
3431
3432### Simplified construction
3433
3434<!-- YAML
3435added: v1.2.0
3436-->
3437
3438For many simple cases, it is possible to create a stream without relying on
3439inheritance. This can be accomplished by directly creating instances of the
3440`stream.Writable`, `stream.Readable`, `stream.Duplex`, or `stream.Transform`
3441objects and passing appropriate methods as constructor options.
3442
3443```js
3444const { Writable } = require('node:stream');
3445
3446const myWritable = new Writable({
3447  construct(callback) {
3448    // Initialize state and load resources...
3449  },
3450  write(chunk, encoding, callback) {
3451    // ...
3452  },
3453  destroy() {
3454    // Free resources...
3455  },
3456});
3457```
3458
3459### Implementing a writable stream
3460
3461The `stream.Writable` class is extended to implement a [`Writable`][] stream.
3462
3463Custom `Writable` streams _must_ call the `new stream.Writable([options])`
3464constructor and implement the `writable._write()` and/or `writable._writev()`
3465method.
3466
3467#### `new stream.Writable([options])`
3468
3469<!-- YAML
3470changes:
3471  - version: v15.5.0
3472    pr-url: https://github.com/nodejs/node/pull/36431
3473    description: support passing in an AbortSignal.
3474  - version: v14.0.0
3475    pr-url: https://github.com/nodejs/node/pull/30623
3476    description: Change `autoDestroy` option default to `true`.
3477  - version:
3478     - v11.2.0
3479     - v10.16.0
3480    pr-url: https://github.com/nodejs/node/pull/22795
3481    description: Add `autoDestroy` option to automatically `destroy()` the
3482                 stream when it emits `'finish'` or errors.
3483  - version: v10.0.0
3484    pr-url: https://github.com/nodejs/node/pull/18438
3485    description: Add `emitClose` option to specify if `'close'` is emitted on
3486                 destroy.
3487-->
3488
3489* `options` {Object}
3490  * `highWaterMark` {number} Buffer level when
3491    [`stream.write()`][stream-write] starts returning `false`. **Default:**
3492    `16384` (16 KiB), or `16` for `objectMode` streams.
3493  * `decodeStrings` {boolean} Whether to encode `string`s passed to
3494    [`stream.write()`][stream-write] to `Buffer`s (with the encoding
3495    specified in the [`stream.write()`][stream-write] call) before passing
3496    them to [`stream._write()`][stream-_write]. Other types of data are not
3497    converted (i.e. `Buffer`s are not decoded into `string`s). Setting to
3498    false will prevent `string`s from being converted. **Default:** `true`.
3499  * `defaultEncoding` {string} The default encoding that is used when no
3500    encoding is specified as an argument to [`stream.write()`][stream-write].
3501    **Default:** `'utf8'`.
3502  * `objectMode` {boolean} Whether or not the
3503    [`stream.write(anyObj)`][stream-write] is a valid operation. When set,
3504    it becomes possible to write JavaScript values other than string,
3505    `Buffer` or `Uint8Array` if supported by the stream implementation.
3506    **Default:** `false`.
3507  * `emitClose` {boolean} Whether or not the stream should emit `'close'`
3508    after it has been destroyed. **Default:** `true`.
3509  * `write` {Function} Implementation for the
3510    [`stream._write()`][stream-_write] method.
3511  * `writev` {Function} Implementation for the
3512    [`stream._writev()`][stream-_writev] method.
3513  * `destroy` {Function} Implementation for the
3514    [`stream._destroy()`][writable-_destroy] method.
3515  * `final` {Function} Implementation for the
3516    [`stream._final()`][stream-_final] method.
3517  * `construct` {Function} Implementation for the
3518    [`stream._construct()`][writable-_construct] method.
3519  * `autoDestroy` {boolean} Whether this stream should automatically call
3520    `.destroy()` on itself after ending. **Default:** `true`.
3521  * `signal` {AbortSignal} A signal representing possible cancellation.
3522
3523<!-- eslint-disable no-useless-constructor -->
3524
3525```js
3526const { Writable } = require('node:stream');
3527
3528class MyWritable extends Writable {
3529  constructor(options) {
3530    // Calls the stream.Writable() constructor.
3531    super(options);
3532    // ...
3533  }
3534}
3535```
3536
3537Or, when using pre-ES6 style constructors:
3538
3539```js
3540const { Writable } = require('node:stream');
3541const util = require('node:util');
3542
3543function MyWritable(options) {
3544  if (!(this instanceof MyWritable))
3545    return new MyWritable(options);
3546  Writable.call(this, options);
3547}
3548util.inherits(MyWritable, Writable);
3549```
3550
3551Or, using the simplified constructor approach:
3552
3553```js
3554const { Writable } = require('node:stream');
3555
3556const myWritable = new Writable({
3557  write(chunk, encoding, callback) {
3558    // ...
3559  },
3560  writev(chunks, callback) {
3561    // ...
3562  },
3563});
3564```
3565
3566Calling `abort` on the `AbortController` corresponding to the passed
3567`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
3568on the writeable stream.
3569
3570```js
3571const { Writable } = require('node:stream');
3572
3573const controller = new AbortController();
3574const myWritable = new Writable({
3575  write(chunk, encoding, callback) {
3576    // ...
3577  },
3578  writev(chunks, callback) {
3579    // ...
3580  },
3581  signal: controller.signal,
3582});
3583// Later, abort the operation closing the stream
3584controller.abort();
3585```
3586
3587#### `writable._construct(callback)`
3588
3589<!-- YAML
3590added: v15.0.0
3591-->
3592
3593* `callback` {Function} Call this function (optionally with an error
3594  argument) when the stream has finished initializing.
3595
3596The `_construct()` method MUST NOT be called directly. It may be implemented
3597by child classes, and if so, will be called by the internal `Writable`
3598class methods only.
3599
3600This optional function will be called in a tick after the stream constructor
3601has returned, delaying any `_write()`, `_final()` and `_destroy()` calls until
3602`callback` is called. This is useful to initialize state or asynchronously
3603initialize resources before the stream can be used.
3604
3605```js
3606const { Writable } = require('node:stream');
3607const fs = require('node:fs');
3608
3609class WriteStream extends Writable {
3610  constructor(filename) {
3611    super();
3612    this.filename = filename;
3613    this.fd = null;
3614  }
3615  _construct(callback) {
3616    fs.open(this.filename, (err, fd) => {
3617      if (err) {
3618        callback(err);
3619      } else {
3620        this.fd = fd;
3621        callback();
3622      }
3623    });
3624  }
3625  _write(chunk, encoding, callback) {
3626    fs.write(this.fd, chunk, callback);
3627  }
3628  _destroy(err, callback) {
3629    if (this.fd) {
3630      fs.close(this.fd, (er) => callback(er || err));
3631    } else {
3632      callback(err);
3633    }
3634  }
3635}
3636```
3637
3638#### `writable._write(chunk, encoding, callback)`
3639
3640<!-- YAML
3641changes:
3642  - version: v12.11.0
3643    pr-url: https://github.com/nodejs/node/pull/29639
3644    description: _write() is optional when providing _writev().
3645-->
3646
3647* `chunk` {Buffer|string|any} The `Buffer` to be written, converted from the
3648  `string` passed to [`stream.write()`][stream-write]. If the stream's
3649  `decodeStrings` option is `false` or the stream is operating in object mode,
3650  the chunk will not be converted & will be whatever was passed to
3651  [`stream.write()`][stream-write].
3652* `encoding` {string} If the chunk is a string, then `encoding` is the
3653  character encoding of that string. If chunk is a `Buffer`, or if the
3654  stream is operating in object mode, `encoding` may be ignored.
3655* `callback` {Function} Call this function (optionally with an error
3656  argument) when processing is complete for the supplied chunk.
3657
3658All `Writable` stream implementations must provide a
3659[`writable._write()`][stream-_write] and/or
3660[`writable._writev()`][stream-_writev] method to send data to the underlying
3661resource.
3662
3663[`Transform`][] streams provide their own implementation of the
3664[`writable._write()`][stream-_write].
3665
3666This function MUST NOT be called by application code directly. It should be
3667implemented by child classes, and called by the internal `Writable` class
3668methods only.
3669
3670The `callback` function must be called synchronously inside of
3671`writable._write()` or asynchronously (i.e. different tick) to signal either
3672that the write completed successfully or failed with an error.
3673The first argument passed to the `callback` must be the `Error` object if the
3674call failed or `null` if the write succeeded.
3675
3676All calls to `writable.write()` that occur between the time `writable._write()`
3677is called and the `callback` is called will cause the written data to be
3678buffered. When the `callback` is invoked, the stream might emit a [`'drain'`][]
3679event. If a stream implementation is capable of processing multiple chunks of
3680data at once, the `writable._writev()` method should be implemented.
3681
3682If the `decodeStrings` property is explicitly set to `false` in the constructor
3683options, then `chunk` will remain the same object that is passed to `.write()`,
3684and may be a string rather than a `Buffer`. This is to support implementations
3685that have an optimized handling for certain string data encodings. In that case,
3686the `encoding` argument will indicate the character encoding of the string.
3687Otherwise, the `encoding` argument can be safely ignored.
3688
3689The `writable._write()` method is prefixed with an underscore because it is
3690internal to the class that defines it, and should never be called directly by
3691user programs.
3692
3693#### `writable._writev(chunks, callback)`
3694
3695* `chunks` {Object\[]} The data to be written. The value is an array of {Object}
3696  that each represent a discrete chunk of data to write. The properties of
3697  these objects are:
3698  * `chunk` {Buffer|string} A buffer instance or string containing the data to
3699    be written. The `chunk` will be a string if the `Writable` was created with
3700    the `decodeStrings` option set to `false` and a string was passed to `write()`.
3701  * `encoding` {string} The character encoding of the `chunk`. If `chunk` is
3702    a `Buffer`, the `encoding` will be `'buffer'`.
3703* `callback` {Function} A callback function (optionally with an error
3704  argument) to be invoked when processing is complete for the supplied chunks.
3705
3706This function MUST NOT be called by application code directly. It should be
3707implemented by child classes, and called by the internal `Writable` class
3708methods only.
3709
3710The `writable._writev()` method may be implemented in addition or alternatively
3711to `writable._write()` in stream implementations that are capable of processing
3712multiple chunks of data at once. If implemented and if there is buffered data
3713from previous writes, `_writev()` will be called instead of `_write()`.
3714
3715The `writable._writev()` method is prefixed with an underscore because it is
3716internal to the class that defines it, and should never be called directly by
3717user programs.
3718
3719#### `writable._destroy(err, callback)`
3720
3721<!-- YAML
3722added: v8.0.0
3723-->
3724
3725* `err` {Error} A possible error.
3726* `callback` {Function} A callback function that takes an optional error
3727  argument.
3728
3729The `_destroy()` method is called by [`writable.destroy()`][writable-destroy].
3730It can be overridden by child classes but it **must not** be called directly.
3731Furthermore, the `callback` should not be mixed with async/await
3732once it is executed when a promise is resolved.
3733
3734#### `writable._final(callback)`
3735
3736<!-- YAML
3737added: v8.0.0
3738-->
3739
3740* `callback` {Function} Call this function (optionally with an error
3741  argument) when finished writing any remaining data.
3742
3743The `_final()` method **must not** be called directly. It may be implemented
3744by child classes, and if so, will be called by the internal `Writable`
3745class methods only.
3746
3747This optional function will be called before the stream closes, delaying the
3748`'finish'` event until `callback` is called. This is useful to close resources
3749or write buffered data before a stream ends.
3750
3751#### Errors while writing
3752
3753Errors occurring during the processing of the [`writable._write()`][],
3754[`writable._writev()`][] and [`writable._final()`][] methods must be propagated
3755by invoking the callback and passing the error as the first argument.
3756Throwing an `Error` from within these methods or manually emitting an `'error'`
3757event results in undefined behavior.
3758
3759If a `Readable` stream pipes into a `Writable` stream when `Writable` emits an
3760error, the `Readable` stream will be unpiped.
3761
3762```js
3763const { Writable } = require('node:stream');
3764
3765const myWritable = new Writable({
3766  write(chunk, encoding, callback) {
3767    if (chunk.toString().indexOf('a') >= 0) {
3768      callback(new Error('chunk is invalid'));
3769    } else {
3770      callback();
3771    }
3772  },
3773});
3774```
3775
3776#### An example writable stream
3777
3778The following illustrates a rather simplistic (and somewhat pointless) custom
3779`Writable` stream implementation. While this specific `Writable` stream instance
3780is not of any real particular usefulness, the example illustrates each of the
3781required elements of a custom [`Writable`][] stream instance:
3782
3783```js
3784const { Writable } = require('node:stream');
3785
3786class MyWritable extends Writable {
3787  _write(chunk, encoding, callback) {
3788    if (chunk.toString().indexOf('a') >= 0) {
3789      callback(new Error('chunk is invalid'));
3790    } else {
3791      callback();
3792    }
3793  }
3794}
3795```
3796
3797#### Decoding buffers in a writable stream
3798
3799Decoding buffers is a common task, for instance, when using transformers whose
3800input is a string. This is not a trivial process when using multi-byte
3801characters encoding, such as UTF-8. The following example shows how to decode
3802multi-byte strings using `StringDecoder` and [`Writable`][].
3803
3804```js
3805const { Writable } = require('node:stream');
3806const { StringDecoder } = require('node:string_decoder');
3807
3808class StringWritable extends Writable {
3809  constructor(options) {
3810    super(options);
3811    this._decoder = new StringDecoder(options && options.defaultEncoding);
3812    this.data = '';
3813  }
3814  _write(chunk, encoding, callback) {
3815    if (encoding === 'buffer') {
3816      chunk = this._decoder.write(chunk);
3817    }
3818    this.data += chunk;
3819    callback();
3820  }
3821  _final(callback) {
3822    this.data += this._decoder.end();
3823    callback();
3824  }
3825}
3826
3827const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
3828const w = new StringWritable();
3829
3830w.write('currency: ');
3831w.write(euro[0]);
3832w.end(euro[1]);
3833
3834console.log(w.data); // currency: €
3835```
3836
3837### Implementing a readable stream
3838
3839The `stream.Readable` class is extended to implement a [`Readable`][] stream.
3840
3841Custom `Readable` streams _must_ call the `new stream.Readable([options])`
3842constructor and implement the [`readable._read()`][] method.
3843
3844#### `new stream.Readable([options])`
3845
3846<!-- YAML
3847changes:
3848  - version: v15.5.0
3849    pr-url: https://github.com/nodejs/node/pull/36431
3850    description: support passing in an AbortSignal.
3851  - version: v14.0.0
3852    pr-url: https://github.com/nodejs/node/pull/30623
3853    description: Change `autoDestroy` option default to `true`.
3854  - version:
3855     - v11.2.0
3856     - v10.16.0
3857    pr-url: https://github.com/nodejs/node/pull/22795
3858    description: Add `autoDestroy` option to automatically `destroy()` the
3859                 stream when it emits `'end'` or errors.
3860-->
3861
3862* `options` {Object}
3863  * `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store
3864    in the internal buffer before ceasing to read from the underlying resource.
3865    **Default:** `16384` (16 KiB), or `16` for `objectMode` streams.
3866  * `encoding` {string} If specified, then buffers will be decoded to
3867    strings using the specified encoding. **Default:** `null`.
3868  * `objectMode` {boolean} Whether this stream should behave
3869    as a stream of objects. Meaning that [`stream.read(n)`][stream-read] returns
3870    a single value instead of a `Buffer` of size `n`. **Default:** `false`.
3871  * `emitClose` {boolean} Whether or not the stream should emit `'close'`
3872    after it has been destroyed. **Default:** `true`.
3873  * `read` {Function} Implementation for the [`stream._read()`][stream-_read]
3874    method.
3875  * `destroy` {Function} Implementation for the
3876    [`stream._destroy()`][readable-_destroy] method.
3877  * `construct` {Function} Implementation for the
3878    [`stream._construct()`][readable-_construct] method.
3879  * `autoDestroy` {boolean} Whether this stream should automatically call
3880    `.destroy()` on itself after ending. **Default:** `true`.
3881  * `signal` {AbortSignal} A signal representing possible cancellation.
3882
3883<!-- eslint-disable no-useless-constructor -->
3884
3885```js
3886const { Readable } = require('node:stream');
3887
3888class MyReadable extends Readable {
3889  constructor(options) {
3890    // Calls the stream.Readable(options) constructor.
3891    super(options);
3892    // ...
3893  }
3894}
3895```
3896
3897Or, when using pre-ES6 style constructors:
3898
3899```js
3900const { Readable } = require('node:stream');
3901const util = require('node:util');
3902
3903function MyReadable(options) {
3904  if (!(this instanceof MyReadable))
3905    return new MyReadable(options);
3906  Readable.call(this, options);
3907}
3908util.inherits(MyReadable, Readable);
3909```
3910
3911Or, using the simplified constructor approach:
3912
3913```js
3914const { Readable } = require('node:stream');
3915
3916const myReadable = new Readable({
3917  read(size) {
3918    // ...
3919  },
3920});
3921```
3922
3923Calling `abort` on the `AbortController` corresponding to the passed
3924`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
3925on the readable created.
3926
3927```js
3928const { Readable } = require('node:stream');
3929const controller = new AbortController();
3930const read = new Readable({
3931  read(size) {
3932    // ...
3933  },
3934  signal: controller.signal,
3935});
3936// Later, abort the operation closing the stream
3937controller.abort();
3938```
3939
3940#### `readable._construct(callback)`
3941
3942<!-- YAML
3943added: v15.0.0
3944-->
3945
3946* `callback` {Function} Call this function (optionally with an error
3947  argument) when the stream has finished initializing.
3948
3949The `_construct()` method MUST NOT be called directly. It may be implemented
3950by child classes, and if so, will be called by the internal `Readable`
3951class methods only.
3952
3953This optional function will be scheduled in the next tick by the stream
3954constructor, delaying any `_read()` and `_destroy()` calls until `callback` is
3955called. This is useful to initialize state or asynchronously initialize
3956resources before the stream can be used.
3957
3958```js
3959const { Readable } = require('node:stream');
3960const fs = require('node:fs');
3961
3962class ReadStream extends Readable {
3963  constructor(filename) {
3964    super();
3965    this.filename = filename;
3966    this.fd = null;
3967  }
3968  _construct(callback) {
3969    fs.open(this.filename, (err, fd) => {
3970      if (err) {
3971        callback(err);
3972      } else {
3973        this.fd = fd;
3974        callback();
3975      }
3976    });
3977  }
3978  _read(n) {
3979    const buf = Buffer.alloc(n);
3980    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
3981      if (err) {
3982        this.destroy(err);
3983      } else {
3984        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
3985      }
3986    });
3987  }
3988  _destroy(err, callback) {
3989    if (this.fd) {
3990      fs.close(this.fd, (er) => callback(er || err));
3991    } else {
3992      callback(err);
3993    }
3994  }
3995}
3996```
3997
3998#### `readable._read(size)`
3999
4000<!-- YAML
4001added: v0.9.4
4002-->
4003
4004* `size` {number} Number of bytes to read asynchronously
4005
4006This function MUST NOT be called by application code directly. It should be
4007implemented by child classes, and called by the internal `Readable` class
4008methods only.
4009
4010All `Readable` stream implementations must provide an implementation of the
4011[`readable._read()`][] method to fetch data from the underlying resource.
4012
4013When [`readable._read()`][] is called, if data is available from the resource,
4014the implementation should begin pushing that data into the read queue using the
4015[`this.push(dataChunk)`][stream-push] method. `_read()` will be called again
4016after each call to [`this.push(dataChunk)`][stream-push] once the stream is
4017ready to accept more data. `_read()` may continue reading from the resource and
4018pushing data until `readable.push()` returns `false`. Only when `_read()` is
4019called again after it has stopped should it resume pushing additional data into
4020the queue.
4021
4022Once the [`readable._read()`][] method has been called, it will not be called
4023again until more data is pushed through the [`readable.push()`][stream-push]
4024method. Empty data such as empty buffers and strings will not cause
4025[`readable._read()`][] to be called.
4026
4027The `size` argument is advisory. For implementations where a "read" is a
4028single operation that returns data can use the `size` argument to determine how
4029much data to fetch. Other implementations may ignore this argument and simply
4030provide data whenever it becomes available. There is no need to "wait" until
4031`size` bytes are available before calling [`stream.push(chunk)`][stream-push].
4032
4033The [`readable._read()`][] method is prefixed with an underscore because it is
4034internal to the class that defines it, and should never be called directly by
4035user programs.
4036
4037#### `readable._destroy(err, callback)`
4038
4039<!-- YAML
4040added: v8.0.0
4041-->
4042
4043* `err` {Error} A possible error.
4044* `callback` {Function} A callback function that takes an optional error
4045  argument.
4046
4047The `_destroy()` method is called by [`readable.destroy()`][readable-destroy].
4048It can be overridden by child classes but it **must not** be called directly.
4049
4050#### `readable.push(chunk[, encoding])`
4051
4052<!-- YAML
4053changes:
4054  - version: v8.0.0
4055    pr-url: https://github.com/nodejs/node/pull/11608
4056    description: The `chunk` argument can now be a `Uint8Array` instance.
4057-->
4058
4059* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the
4060  read queue. For streams not operating in object mode, `chunk` must be a
4061  string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be
4062  any JavaScript value.
4063* `encoding` {string} Encoding of string chunks. Must be a valid
4064  `Buffer` encoding, such as `'utf8'` or `'ascii'`.
4065* Returns: {boolean} `true` if additional chunks of data may continue to be
4066  pushed; `false` otherwise.
4067
4068When `chunk` is a `Buffer`, `Uint8Array`, or `string`, the `chunk` of data will
4069be added to the internal queue for users of the stream to consume.
4070Passing `chunk` as `null` signals the end of the stream (EOF), after which no
4071more data can be written.
4072
4073When the `Readable` is operating in paused mode, the data added with
4074`readable.push()` can be read out by calling the
4075[`readable.read()`][stream-read] method when the [`'readable'`][] event is
4076emitted.
4077
4078When the `Readable` is operating in flowing mode, the data added with
4079`readable.push()` will be delivered by emitting a `'data'` event.
4080
4081The `readable.push()` method is designed to be as flexible as possible. For
4082example, when wrapping a lower-level source that provides some form of
4083pause/resume mechanism, and a data callback, the low-level source can be wrapped
4084by the custom `Readable` instance:
4085
4086```js
4087// `_source` is an object with readStop() and readStart() methods,
4088// and an `ondata` member that gets called when it has data, and
4089// an `onend` member that gets called when the data is over.
4090
4091class SourceWrapper extends Readable {
4092  constructor(options) {
4093    super(options);
4094
4095    this._source = getLowLevelSourceObject();
4096
4097    // Every time there's data, push it into the internal buffer.
4098    this._source.ondata = (chunk) => {
4099      // If push() returns false, then stop reading from source.
4100      if (!this.push(chunk))
4101        this._source.readStop();
4102    };
4103
4104    // When the source ends, push the EOF-signaling `null` chunk.
4105    this._source.onend = () => {
4106      this.push(null);
4107    };
4108  }
4109  // _read() will be called when the stream wants to pull more data in.
4110  // The advisory size argument is ignored in this case.
4111  _read(size) {
4112    this._source.readStart();
4113  }
4114}
4115```
4116
4117The `readable.push()` method is used to push the content
4118into the internal buffer. It can be driven by the [`readable._read()`][] method.
4119
4120For streams not operating in object mode, if the `chunk` parameter of
4121`readable.push()` is `undefined`, it will be treated as empty string or
4122buffer. See [`readable.push('')`][] for more information.
4123
4124#### Errors while reading
4125
4126Errors occurring during processing of the [`readable._read()`][] must be
4127propagated through the [`readable.destroy(err)`][readable-_destroy] method.
4128Throwing an `Error` from within [`readable._read()`][] or manually emitting an
4129`'error'` event results in undefined behavior.
4130
4131```js
4132const { Readable } = require('node:stream');
4133
4134const myReadable = new Readable({
4135  read(size) {
4136    const err = checkSomeErrorCondition();
4137    if (err) {
4138      this.destroy(err);
4139    } else {
4140      // Do some work.
4141    }
4142  },
4143});
4144```
4145
4146#### An example counting stream
4147
4148<!--type=example-->
4149
4150The following is a basic example of a `Readable` stream that emits the numerals
4151from 1 to 1,000,000 in ascending order, and then ends.
4152
4153```js
4154const { Readable } = require('node:stream');
4155
4156class Counter extends Readable {
4157  constructor(opt) {
4158    super(opt);
4159    this._max = 1000000;
4160    this._index = 1;
4161  }
4162
4163  _read() {
4164    const i = this._index++;
4165    if (i > this._max)
4166      this.push(null);
4167    else {
4168      const str = String(i);
4169      const buf = Buffer.from(str, 'ascii');
4170      this.push(buf);
4171    }
4172  }
4173}
4174```
4175
4176### Implementing a duplex stream
4177
4178A [`Duplex`][] stream is one that implements both [`Readable`][] and
4179[`Writable`][], such as a TCP socket connection.
4180
4181Because JavaScript does not have support for multiple inheritance, the
4182`stream.Duplex` class is extended to implement a [`Duplex`][] stream (as opposed
4183to extending the `stream.Readable` _and_ `stream.Writable` classes).
4184
4185The `stream.Duplex` class prototypically inherits from `stream.Readable` and
4186parasitically from `stream.Writable`, but `instanceof` will work properly for
4187both base classes due to overriding [`Symbol.hasInstance`][] on
4188`stream.Writable`.
4189
4190Custom `Duplex` streams _must_ call the `new stream.Duplex([options])`
4191constructor and implement _both_ the [`readable._read()`][] and
4192`writable._write()` methods.
4193
4194#### `new stream.Duplex(options)`
4195
4196<!-- YAML
4197changes:
4198  - version: v8.4.0
4199    pr-url: https://github.com/nodejs/node/pull/14636
4200    description: The `readableHighWaterMark` and `writableHighWaterMark` options
4201                 are supported now.
4202-->
4203
4204* `options` {Object} Passed to both `Writable` and `Readable`
4205  constructors. Also has the following fields:
4206  * `allowHalfOpen` {boolean} If set to `false`, then the stream will
4207    automatically end the writable side when the readable side ends.
4208    **Default:** `true`.
4209  * `readable` {boolean} Sets whether the `Duplex` should be readable.
4210    **Default:** `true`.
4211  * `writable` {boolean} Sets whether the `Duplex` should be writable.
4212    **Default:** `true`.
4213  * `readableObjectMode` {boolean} Sets `objectMode` for readable side of the
4214    stream. Has no effect if `objectMode` is `true`. **Default:** `false`.
4215  * `writableObjectMode` {boolean} Sets `objectMode` for writable side of the
4216    stream. Has no effect if `objectMode` is `true`. **Default:** `false`.
4217  * `readableHighWaterMark` {number} Sets `highWaterMark` for the readable side
4218    of the stream. Has no effect if `highWaterMark` is provided.
4219  * `writableHighWaterMark` {number} Sets `highWaterMark` for the writable side
4220    of the stream. Has no effect if `highWaterMark` is provided.
4221
4222<!-- eslint-disable no-useless-constructor -->
4223
4224```js
4225const { Duplex } = require('node:stream');
4226
4227class MyDuplex extends Duplex {
4228  constructor(options) {
4229    super(options);
4230    // ...
4231  }
4232}
4233```
4234
4235Or, when using pre-ES6 style constructors:
4236
4237```js
4238const { Duplex } = require('node:stream');
4239const util = require('node:util');
4240
4241function MyDuplex(options) {
4242  if (!(this instanceof MyDuplex))
4243    return new MyDuplex(options);
4244  Duplex.call(this, options);
4245}
4246util.inherits(MyDuplex, Duplex);
4247```
4248
4249Or, using the simplified constructor approach:
4250
4251```js
4252const { Duplex } = require('node:stream');
4253
4254const myDuplex = new Duplex({
4255  read(size) {
4256    // ...
4257  },
4258  write(chunk, encoding, callback) {
4259    // ...
4260  },
4261});
4262```
4263
4264When using pipeline:
4265
4266```js
4267const { Transform, pipeline } = require('node:stream');
4268const fs = require('node:fs');
4269
4270pipeline(
4271  fs.createReadStream('object.json')
4272    .setEncoding('utf8'),
4273  new Transform({
4274    decodeStrings: false, // Accept string input rather than Buffers
4275    construct(callback) {
4276      this.data = '';
4277      callback();
4278    },
4279    transform(chunk, encoding, callback) {
4280      this.data += chunk;
4281      callback();
4282    },
4283    flush(callback) {
4284      try {
4285        // Make sure is valid json.
4286        JSON.parse(this.data);
4287        this.push(this.data);
4288        callback();
4289      } catch (err) {
4290        callback(err);
4291      }
4292    },
4293  }),
4294  fs.createWriteStream('valid-object.json'),
4295  (err) => {
4296    if (err) {
4297      console.error('failed', err);
4298    } else {
4299      console.log('completed');
4300    }
4301  },
4302);
4303```
4304
4305#### An example duplex stream
4306
4307The following illustrates a simple example of a `Duplex` stream that wraps a
4308hypothetical lower-level source object to which data can be written, and
4309from which data can be read, albeit using an API that is not compatible with
4310Node.js streams.
4311The following illustrates a simple example of a `Duplex` stream that buffers
4312incoming written data via the [`Writable`][] interface that is read back out
4313via the [`Readable`][] interface.
4314
4315```js
4316const { Duplex } = require('node:stream');
4317const kSource = Symbol('source');
4318
4319class MyDuplex extends Duplex {
4320  constructor(source, options) {
4321    super(options);
4322    this[kSource] = source;
4323  }
4324
4325  _write(chunk, encoding, callback) {
4326    // The underlying source only deals with strings.
4327    if (Buffer.isBuffer(chunk))
4328      chunk = chunk.toString();
4329    this[kSource].writeSomeData(chunk);
4330    callback();
4331  }
4332
4333  _read(size) {
4334    this[kSource].fetchSomeData(size, (data, encoding) => {
4335      this.push(Buffer.from(data, encoding));
4336    });
4337  }
4338}
4339```
4340
4341The most important aspect of a `Duplex` stream is that the `Readable` and
4342`Writable` sides operate independently of one another despite co-existing within
4343a single object instance.
4344
4345#### Object mode duplex streams
4346
4347For `Duplex` streams, `objectMode` can be set exclusively for either the
4348`Readable` or `Writable` side using the `readableObjectMode` and
4349`writableObjectMode` options respectively.
4350
4351In the following example, for instance, a new `Transform` stream (which is a
4352type of [`Duplex`][] stream) is created that has an object mode `Writable` side
4353that accepts JavaScript numbers that are converted to hexadecimal strings on
4354the `Readable` side.
4355
4356```js
4357const { Transform } = require('node:stream');
4358
4359// All Transform streams are also Duplex Streams.
4360const myTransform = new Transform({
4361  writableObjectMode: true,
4362
4363  transform(chunk, encoding, callback) {
4364    // Coerce the chunk to a number if necessary.
4365    chunk |= 0;
4366
4367    // Transform the chunk into something else.
4368    const data = chunk.toString(16);
4369
4370    // Push the data onto the readable queue.
4371    callback(null, '0'.repeat(data.length % 2) + data);
4372  },
4373});
4374
4375myTransform.setEncoding('ascii');
4376myTransform.on('data', (chunk) => console.log(chunk));
4377
4378myTransform.write(1);
4379// Prints: 01
4380myTransform.write(10);
4381// Prints: 0a
4382myTransform.write(100);
4383// Prints: 64
4384```
4385
4386### Implementing a transform stream
4387
4388A [`Transform`][] stream is a [`Duplex`][] stream where the output is computed
4389in some way from the input. Examples include [zlib][] streams or [crypto][]
4390streams that compress, encrypt, or decrypt data.
4391
4392There is no requirement that the output be the same size as the input, the same
4393number of chunks, or arrive at the same time. For example, a `Hash` stream will
4394only ever have a single chunk of output which is provided when the input is
4395ended. A `zlib` stream will produce output that is either much smaller or much
4396larger than its input.
4397
4398The `stream.Transform` class is extended to implement a [`Transform`][] stream.
4399
4400The `stream.Transform` class prototypically inherits from `stream.Duplex` and
4401implements its own versions of the `writable._write()` and
4402[`readable._read()`][] methods. Custom `Transform` implementations _must_
4403implement the [`transform._transform()`][stream-_transform] method and _may_
4404also implement the [`transform._flush()`][stream-_flush] method.
4405
4406Care must be taken when using `Transform` streams in that data written to the
4407stream can cause the `Writable` side of the stream to become paused if the
4408output on the `Readable` side is not consumed.
4409
4410#### `new stream.Transform([options])`
4411
4412* `options` {Object} Passed to both `Writable` and `Readable`
4413  constructors. Also has the following fields:
4414  * `transform` {Function} Implementation for the
4415    [`stream._transform()`][stream-_transform] method.
4416  * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush]
4417    method.
4418
4419<!-- eslint-disable no-useless-constructor -->
4420
4421```js
4422const { Transform } = require('node:stream');
4423
4424class MyTransform extends Transform {
4425  constructor(options) {
4426    super(options);
4427    // ...
4428  }
4429}
4430```
4431
4432Or, when using pre-ES6 style constructors:
4433
4434```js
4435const { Transform } = require('node:stream');
4436const util = require('node:util');
4437
4438function MyTransform(options) {
4439  if (!(this instanceof MyTransform))
4440    return new MyTransform(options);
4441  Transform.call(this, options);
4442}
4443util.inherits(MyTransform, Transform);
4444```
4445
4446Or, using the simplified constructor approach:
4447
4448```js
4449const { Transform } = require('node:stream');
4450
4451const myTransform = new Transform({
4452  transform(chunk, encoding, callback) {
4453    // ...
4454  },
4455});
4456```
4457
4458#### Event: `'end'`
4459
4460The [`'end'`][] event is from the `stream.Readable` class. The `'end'` event is
4461emitted after all data has been output, which occurs after the callback in
4462[`transform._flush()`][stream-_flush] has been called. In the case of an error,
4463`'end'` should not be emitted.
4464
4465#### Event: `'finish'`
4466
4467The [`'finish'`][] event is from the `stream.Writable` class. The `'finish'`
4468event is emitted after [`stream.end()`][stream-end] is called and all chunks
4469have been processed by [`stream._transform()`][stream-_transform]. In the case
4470of an error, `'finish'` should not be emitted.
4471
4472#### `transform._flush(callback)`
4473
4474* `callback` {Function} A callback function (optionally with an error
4475  argument and data) to be called when remaining data has been flushed.
4476
4477This function MUST NOT be called by application code directly. It should be
4478implemented by child classes, and called by the internal `Readable` class
4479methods only.
4480
4481In some cases, a transform operation may need to emit an additional bit of
4482data at the end of the stream. For example, a `zlib` compression stream will
4483store an amount of internal state used to optimally compress the output. When
4484the stream ends, however, that additional data needs to be flushed so that the
4485compressed data will be complete.
4486
4487Custom [`Transform`][] implementations _may_ implement the `transform._flush()`
4488method. This will be called when there is no more written data to be consumed,
4489but before the [`'end'`][] event is emitted signaling the end of the
4490[`Readable`][] stream.
4491
4492Within the `transform._flush()` implementation, the `transform.push()` method
4493may be called zero or more times, as appropriate. The `callback` function must
4494be called when the flush operation is complete.
4495
4496The `transform._flush()` method is prefixed with an underscore because it is
4497internal to the class that defines it, and should never be called directly by
4498user programs.
4499
4500#### `transform._transform(chunk, encoding, callback)`
4501
4502* `chunk` {Buffer|string|any} The `Buffer` to be transformed, converted from
4503  the `string` passed to [`stream.write()`][stream-write]. If the stream's
4504  `decodeStrings` option is `false` or the stream is operating in object mode,
4505  the chunk will not be converted & will be whatever was passed to
4506  [`stream.write()`][stream-write].
4507* `encoding` {string} If the chunk is a string, then this is the
4508  encoding type. If chunk is a buffer, then this is the special
4509  value `'buffer'`. Ignore it in that case.
4510* `callback` {Function} A callback function (optionally with an error
4511  argument and data) to be called after the supplied `chunk` has been
4512  processed.
4513
4514This function MUST NOT be called by application code directly. It should be
4515implemented by child classes, and called by the internal `Readable` class
4516methods only.
4517
4518All `Transform` stream implementations must provide a `_transform()`
4519method to accept input and produce output. The `transform._transform()`
4520implementation handles the bytes being written, computes an output, then passes
4521that output off to the readable portion using the `transform.push()` method.
4522
4523The `transform.push()` method may be called zero or more times to generate
4524output from a single input chunk, depending on how much is to be output
4525as a result of the chunk.
4526
4527It is possible that no output is generated from any given chunk of input data.
4528
4529The `callback` function must be called only when the current chunk is completely
4530consumed. The first argument passed to the `callback` must be an `Error` object
4531if an error occurred while processing the input or `null` otherwise. If a second
4532argument is passed to the `callback`, it will be forwarded on to the
4533`transform.push()` method, but only if the first argument is falsy. In other
4534words, the following are equivalent:
4535
4536```js
4537transform.prototype._transform = function(data, encoding, callback) {
4538  this.push(data);
4539  callback();
4540};
4541
4542transform.prototype._transform = function(data, encoding, callback) {
4543  callback(null, data);
4544};
4545```
4546
4547The `transform._transform()` method is prefixed with an underscore because it
4548is internal to the class that defines it, and should never be called directly by
4549user programs.
4550
4551`transform._transform()` is never called in parallel; streams implement a
4552queue mechanism, and to receive the next chunk, `callback` must be
4553called, either synchronously or asynchronously.
4554
4555#### Class: `stream.PassThrough`
4556
4557The `stream.PassThrough` class is a trivial implementation of a [`Transform`][]
4558stream that simply passes the input bytes across to the output. Its purpose is
4559primarily for examples and testing, but there are some use cases where
4560`stream.PassThrough` is useful as a building block for novel sorts of streams.
4561
4562## Additional notes
4563
4564<!--type=misc-->
4565
4566### Streams compatibility with async generators and async iterators
4567
4568With the support of async generators and iterators in JavaScript, async
4569generators are effectively a first-class language-level stream construct at
4570this point.
4571
4572Some common interop cases of using Node.js streams with async generators
4573and async iterators are provided below.
4574
4575#### Consuming readable streams with async iterators
4576
4577```js
4578(async function() {
4579  for await (const chunk of readable) {
4580    console.log(chunk);
4581  }
4582})();
4583```
4584
4585Async iterators register a permanent error handler on the stream to prevent any
4586unhandled post-destroy errors.
4587
4588#### Creating readable streams with async generators
4589
4590A Node.js readable stream can be created from an asynchronous generator using
4591the `Readable.from()` utility method:
4592
4593```js
4594const { Readable } = require('node:stream');
4595
4596const ac = new AbortController();
4597const signal = ac.signal;
4598
4599async function * generate() {
4600  yield 'a';
4601  await someLongRunningFn({ signal });
4602  yield 'b';
4603  yield 'c';
4604}
4605
4606const readable = Readable.from(generate());
4607readable.on('close', () => {
4608  ac.abort();
4609});
4610
4611readable.on('data', (chunk) => {
4612  console.log(chunk);
4613});
4614```
4615
4616#### Piping to writable streams from async iterators
4617
4618When writing to a writable stream from an async iterator, ensure correct
4619handling of backpressure and errors. [`stream.pipeline()`][] abstracts away
4620the handling of backpressure and backpressure-related errors:
4621
4622```js
4623const fs = require('node:fs');
4624const { pipeline } = require('node:stream');
4625const { pipeline: pipelinePromise } = require('node:stream/promises');
4626
4627const writable = fs.createWriteStream('./file');
4628
4629const ac = new AbortController();
4630const signal = ac.signal;
4631
4632const iterator = createIterator({ signal });
4633
4634// Callback Pattern
4635pipeline(iterator, writable, (err, value) => {
4636  if (err) {
4637    console.error(err);
4638  } else {
4639    console.log(value, 'value returned');
4640  }
4641}).on('close', () => {
4642  ac.abort();
4643});
4644
4645// Promise Pattern
4646pipelinePromise(iterator, writable)
4647  .then((value) => {
4648    console.log(value, 'value returned');
4649  })
4650  .catch((err) => {
4651    console.error(err);
4652    ac.abort();
4653  });
4654```
4655
4656<!--type=misc-->
4657
4658### Compatibility with older Node.js versions
4659
4660<!--type=misc-->
4661
4662Prior to Node.js 0.10, the `Readable` stream interface was simpler, but also
4663less powerful and less useful.
4664
4665* Rather than waiting for calls to the [`stream.read()`][stream-read] method,
4666  [`'data'`][] events would begin emitting immediately. Applications that
4667  would need to perform some amount of work to decide how to handle data
4668  were required to store read data into buffers so the data would not be lost.
4669* The [`stream.pause()`][stream-pause] method was advisory, rather than
4670  guaranteed. This meant that it was still necessary to be prepared to receive
4671  [`'data'`][] events _even when the stream was in a paused state_.
4672
4673In Node.js 0.10, the [`Readable`][] class was added. For backward
4674compatibility with older Node.js programs, `Readable` streams switch into
4675"flowing mode" when a [`'data'`][] event handler is added, or when the
4676[`stream.resume()`][stream-resume] method is called. The effect is that, even
4677when not using the new [`stream.read()`][stream-read] method and
4678[`'readable'`][] event, it is no longer necessary to worry about losing
4679[`'data'`][] chunks.
4680
4681While most applications will continue to function normally, this introduces an
4682edge case in the following conditions:
4683
4684* No [`'data'`][] event listener is added.
4685* The [`stream.resume()`][stream-resume] method is never called.
4686* The stream is not piped to any writable destination.
4687
4688For example, consider the following code:
4689
4690```js
4691// WARNING!  BROKEN!
4692net.createServer((socket) => {
4693
4694  // We add an 'end' listener, but never consume the data.
4695  socket.on('end', () => {
4696    // It will never get here.
4697    socket.end('The message was received but was not processed.\n');
4698  });
4699
4700}).listen(1337);
4701```
4702
4703Prior to Node.js 0.10, the incoming message data would be simply discarded.
4704However, in Node.js 0.10 and beyond, the socket remains paused forever.
4705
4706The workaround in this situation is to call the
4707[`stream.resume()`][stream-resume] method to begin the flow of data:
4708
4709```js
4710// Workaround.
4711net.createServer((socket) => {
4712  socket.on('end', () => {
4713    socket.end('The message was received but was not processed.\n');
4714  });
4715
4716  // Start the flow of data, discarding it.
4717  socket.resume();
4718}).listen(1337);
4719```
4720
4721In addition to new `Readable` streams switching into flowing mode,
4722pre-0.10 style streams can be wrapped in a `Readable` class using the
4723[`readable.wrap()`][`stream.wrap()`] method.
4724
4725### `readable.read(0)`
4726
4727There are some cases where it is necessary to trigger a refresh of the
4728underlying readable stream mechanisms, without actually consuming any
4729data. In such cases, it is possible to call `readable.read(0)`, which will
4730always return `null`.
4731
4732If the internal read buffer is below the `highWaterMark`, and the
4733stream is not currently reading, then calling `stream.read(0)` will trigger
4734a low-level [`stream._read()`][stream-_read] call.
4735
4736While most applications will almost never need to do this, there are
4737situations within Node.js where this is done, particularly in the
4738`Readable` stream class internals.
4739
4740### `readable.push('')`
4741
4742Use of `readable.push('')` is not recommended.
4743
4744Pushing a zero-byte string, `Buffer`, or `Uint8Array` to a stream that is not in
4745object mode has an interesting side effect. Because it _is_ a call to
4746[`readable.push()`][stream-push], the call will end the reading process.
4747However, because the argument is an empty string, no data is added to the
4748readable buffer so there is nothing for a user to consume.
4749
4750### `highWaterMark` discrepancy after calling `readable.setEncoding()`
4751
4752The use of `readable.setEncoding()` will change the behavior of how the
4753`highWaterMark` operates in non-object mode.
4754
4755Typically, the size of the current buffer is measured against the
4756`highWaterMark` in _bytes_. However, after `setEncoding()` is called, the
4757comparison function will begin to measure the buffer's size in _characters_.
4758
4759This is not a problem in common cases with `latin1` or `ascii`. But it is
4760advised to be mindful about this behavior when working with strings that could
4761contain multi-byte characters.
4762
4763[API for stream consumers]: #api-for-stream-consumers
4764[API for stream implementers]: #api-for-stream-implementers
4765[Compatibility]: #compatibility-with-older-nodejs-versions
4766[HTTP requests, on the client]: http.md#class-httpclientrequest
4767[HTTP responses, on the server]: http.md#class-httpserverresponse
4768[TCP sockets]: net.md#class-netsocket
4769[Three states]: #three-states
4770[`'data'`]: #event-data
4771[`'drain'`]: #event-drain
4772[`'end'`]: #event-end
4773[`'finish'`]: #event-finish
4774[`'readable'`]: #event-readable
4775[`Duplex`]: #class-streamduplex
4776[`EventEmitter`]: events.md#class-eventemitter
4777[`Readable`]: #class-streamreadable
4778[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance
4779[`Transform`]: #class-streamtransform
4780[`Writable`]: #class-streamwritable
4781[`fs.createReadStream()`]: fs.md#fscreatereadstreampath-options
4782[`fs.createWriteStream()`]: fs.md#fscreatewritestreampath-options
4783[`net.Socket`]: net.md#class-netsocket
4784[`process.stderr`]: process.md#processstderr
4785[`process.stdin`]: process.md#processstdin
4786[`process.stdout`]: process.md#processstdout
4787[`readable._read()`]: #readable_readsize
4788[`readable.compose(stream)`]: #readablecomposestream-options
4789[`readable.map`]: #readablemapfn-options
4790[`readable.push('')`]: #readablepush
4791[`readable.setEncoding()`]: #readablesetencodingencoding
4792[`stream.Readable.from()`]: #streamreadablefromiterable-options
4793[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream
4794[`stream.compose`]: #streamcomposestreams
4795[`stream.cork()`]: #writablecork
4796[`stream.finished()`]: #streamfinishedstream-options-callback
4797[`stream.pipe()`]: #readablepipedestination-options
4798[`stream.pipeline()`]: #streampipelinesource-transforms-destination-callback
4799[`stream.uncork()`]: #writableuncork
4800[`stream.unpipe()`]: #readableunpipedestination
4801[`stream.wrap()`]: #readablewrapstream
4802[`writable._final()`]: #writable_finalcallback
4803[`writable._write()`]: #writable_writechunk-encoding-callback
4804[`writable._writev()`]: #writable_writevchunks-callback
4805[`writable.cork()`]: #writablecork
4806[`writable.end()`]: #writableendchunk-encoding-callback
4807[`writable.uncork()`]: #writableuncork
4808[`writable.writableFinished`]: #writablewritablefinished
4809[`zlib.createDeflate()`]: zlib.md#zlibcreatedeflateoptions
4810[child process stdin]: child_process.md#subprocessstdin
4811[child process stdout and stderr]: child_process.md#subprocessstdout
4812[crypto]: crypto.md
4813[fs read streams]: fs.md#class-fsreadstream
4814[fs write streams]: fs.md#class-fswritestream
4815[http-incoming-message]: http.md#class-httpincomingmessage
4816[hwm-gotcha]: #highwatermark-discrepancy-after-calling-readablesetencoding
4817[object-mode]: #object-mode
4818[readable-_construct]: #readable_constructcallback
4819[readable-_destroy]: #readable_destroyerr-callback
4820[readable-destroy]: #readabledestroyerror
4821[stream-_final]: #writable_finalcallback
4822[stream-_flush]: #transform_flushcallback
4823[stream-_read]: #readable_readsize
4824[stream-_transform]: #transform_transformchunk-encoding-callback
4825[stream-_write]: #writable_writechunk-encoding-callback
4826[stream-_writev]: #writable_writevchunks-callback
4827[stream-end]: #writableendchunk-encoding-callback
4828[stream-finished]: #streamfinishedstream-options-callback
4829[stream-finished-promise]: #streamfinishedstream-options
4830[stream-pause]: #readablepause
4831[stream-pipeline]: #streampipelinesource-transforms-destination-callback
4832[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options
4833[stream-push]: #readablepushchunk-encoding
4834[stream-read]: #readablereadsize
4835[stream-resume]: #readableresume
4836[stream-uncork]: #writableuncork
4837[stream-write]: #writablewritechunk-encoding-callback
4838[writable-_construct]: #writable_constructcallback
4839[writable-_destroy]: #writable_destroyerr-callback
4840[writable-destroy]: #writabledestroyerror
4841[writable-new]: #new-streamwritableoptions
4842[zlib]: zlib.md
4843