1# Stream 2 3<!--introduced_in=v0.10.0--> 4 5> Stability: 2 - Stable 6 7<!-- source_link=lib/stream.js --> 8 9A stream is an abstract interface for working with streaming data in Node.js. 10The `node:stream` module provides an API for implementing the stream interface. 11 12There are many stream objects provided by Node.js. For instance, a 13[request to an HTTP server][http-incoming-message] and [`process.stdout`][] 14are both stream instances. 15 16Streams can be readable, writable, or both. All streams are instances of 17[`EventEmitter`][]. 18 19To access the `node:stream` module: 20 21```js 22const stream = require('node:stream'); 23``` 24 25The `node:stream` module is useful for creating new types of stream instances. 26It is usually not necessary to use the `node:stream` module to consume streams. 27 28## Organization of this document 29 30This document contains two primary sections and a third section for notes. The 31first section explains how to use existing streams within an application. The 32second section explains how to create new types of streams. 33 34## Types of streams 35 36There are four fundamental stream types within Node.js: 37 38* [`Writable`][]: streams to which data can be written (for example, 39 [`fs.createWriteStream()`][]). 40* [`Readable`][]: streams from which data can be read (for example, 41 [`fs.createReadStream()`][]). 42* [`Duplex`][]: streams that are both `Readable` and `Writable` (for example, 43 [`net.Socket`][]). 44* [`Transform`][]: `Duplex` streams that can modify or transform the data as it 45 is written and read (for example, [`zlib.createDeflate()`][]). 46 47Additionally, this module includes the utility functions 48[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][] 49and [`stream.addAbortSignal()`][]. 50 51### Streams Promises API 52 53<!-- YAML 54added: v15.0.0 55--> 56 57The `stream/promises` API provides an alternative set of asynchronous utility 58functions for streams that return `Promise` objects rather than using 59callbacks. The API is accessible via `require('node:stream/promises')` 60or `require('node:stream').promises`. 61 62### `stream.pipeline(source[, ...transforms], destination[, options])` 63 64### `stream.pipeline(streams[, options])` 65 66<!-- YAML 67added: v15.0.0 68--> 69 70* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} 71* `source` {Stream|Iterable|AsyncIterable|Function} 72 * Returns: {Promise|AsyncIterable} 73* `...transforms` {Stream|Function} 74 * `source` {AsyncIterable} 75 * Returns: {Promise|AsyncIterable} 76* `destination` {Stream|Function} 77 * `source` {AsyncIterable} 78 * Returns: {Promise|AsyncIterable} 79* `options` {Object} 80 * `signal` {AbortSignal} 81 * `end` {boolean} 82* Returns: {Promise} Fulfills when the pipeline is complete. 83 84```cjs 85const { pipeline } = require('node:stream/promises'); 86const fs = require('node:fs'); 87const zlib = require('node:zlib'); 88 89async function run() { 90 await pipeline( 91 fs.createReadStream('archive.tar'), 92 zlib.createGzip(), 93 fs.createWriteStream('archive.tar.gz'), 94 ); 95 console.log('Pipeline succeeded.'); 96} 97 98run().catch(console.error); 99``` 100 101```mjs 102import { pipeline } from 'node:stream/promises'; 103import { createReadStream, createWriteStream } from 'node:fs'; 104import { createGzip } from 'node:zlib'; 105 106await pipeline( 107 createReadStream('archive.tar'), 108 createGzip(), 109 createWriteStream('archive.tar.gz'), 110); 111console.log('Pipeline succeeded.'); 112``` 113 114To use an `AbortSignal`, pass it inside an options object, as the last argument. 115When the signal is aborted, `destroy` will be called on the underlying pipeline, 116with an `AbortError`. 117 118```cjs 119const { pipeline } = require('node:stream/promises'); 120const fs = require('node:fs'); 121const zlib = require('node:zlib'); 122 123async function run() { 124 const ac = new AbortController(); 125 const signal = ac.signal; 126 127 setImmediate(() => ac.abort()); 128 await pipeline( 129 fs.createReadStream('archive.tar'), 130 zlib.createGzip(), 131 fs.createWriteStream('archive.tar.gz'), 132 { signal }, 133 ); 134} 135 136run().catch(console.error); // AbortError 137``` 138 139```mjs 140import { pipeline } from 'node:stream/promises'; 141import { createReadStream, createWriteStream } from 'node:fs'; 142import { createGzip } from 'node:zlib'; 143 144const ac = new AbortController(); 145const { signal } = ac; 146setImmediate(() => ac.abort()); 147try { 148 await pipeline( 149 createReadStream('archive.tar'), 150 createGzip(), 151 createWriteStream('archive.tar.gz'), 152 { signal }, 153 ); 154} catch (err) { 155 console.error(err); // AbortError 156} 157``` 158 159The `pipeline` API also supports async generators: 160 161```cjs 162const { pipeline } = require('node:stream/promises'); 163const fs = require('node:fs'); 164 165async function run() { 166 await pipeline( 167 fs.createReadStream('lowercase.txt'), 168 async function* (source, { signal }) { 169 source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. 170 for await (const chunk of source) { 171 yield await processChunk(chunk, { signal }); 172 } 173 }, 174 fs.createWriteStream('uppercase.txt'), 175 ); 176 console.log('Pipeline succeeded.'); 177} 178 179run().catch(console.error); 180``` 181 182```mjs 183import { pipeline } from 'node:stream/promises'; 184import { createReadStream, createWriteStream } from 'node:fs'; 185 186await pipeline( 187 createReadStream('lowercase.txt'), 188 async function* (source, { signal }) { 189 source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. 190 for await (const chunk of source) { 191 yield await processChunk(chunk, { signal }); 192 } 193 }, 194 createWriteStream('uppercase.txt'), 195); 196console.log('Pipeline succeeded.'); 197``` 198 199Remember to handle the `signal` argument passed into the async generator. 200Especially in the case where the async generator is the source for the 201pipeline (i.e. first argument) or the pipeline will never complete. 202 203```cjs 204const { pipeline } = require('node:stream/promises'); 205const fs = require('node:fs'); 206 207async function run() { 208 await pipeline( 209 async function* ({ signal }) { 210 await someLongRunningfn({ signal }); 211 yield 'asd'; 212 }, 213 fs.createWriteStream('uppercase.txt'), 214 ); 215 console.log('Pipeline succeeded.'); 216} 217 218run().catch(console.error); 219``` 220 221```mjs 222import { pipeline } from 'node:stream/promises'; 223import fs from 'node:fs'; 224await pipeline( 225 async function* ({ signal }) { 226 await someLongRunningfn({ signal }); 227 yield 'asd'; 228 }, 229 fs.createWriteStream('uppercase.txt'), 230); 231console.log('Pipeline succeeded.'); 232``` 233 234The `pipeline` API provides [callback version][stream-pipeline]: 235 236### `stream.finished(stream[, options])` 237 238<!-- YAML 239added: v15.0.0 240--> 241 242* `stream` {Stream} 243* `options` {Object} 244 * `error` {boolean|undefined} 245 * `readable` {boolean|undefined} 246 * `writable` {boolean|undefined} 247 * `signal`: {AbortSignal|undefined} 248* Returns: {Promise} Fulfills when the stream is no 249 longer readable or writable. 250 251```cjs 252const { finished } = require('node:stream/promises'); 253const fs = require('node:fs'); 254 255const rs = fs.createReadStream('archive.tar'); 256 257async function run() { 258 await finished(rs); 259 console.log('Stream is done reading.'); 260} 261 262run().catch(console.error); 263rs.resume(); // Drain the stream. 264``` 265 266```mjs 267import { finished } from 'node:stream/promises'; 268import { createReadStream } from 'node:fs'; 269 270const rs = createReadStream('archive.tar'); 271 272async function run() { 273 await finished(rs); 274 console.log('Stream is done reading.'); 275} 276 277run().catch(console.error); 278rs.resume(); // Drain the stream. 279``` 280 281The `finished` API also provides a [callback version][stream-finished]. 282 283### Object mode 284 285All streams created by Node.js APIs operate exclusively on strings and `Buffer` 286(or `Uint8Array`) objects. It is possible, however, for stream implementations 287to work with other types of JavaScript values (with the exception of `null`, 288which serves a special purpose within streams). Such streams are considered to 289operate in "object mode". 290 291Stream instances are switched into object mode using the `objectMode` option 292when the stream is created. Attempting to switch an existing stream into 293object mode is not safe. 294 295### Buffering 296 297<!--type=misc--> 298 299Both [`Writable`][] and [`Readable`][] streams will store data in an internal 300buffer. 301 302The amount of data potentially buffered depends on the `highWaterMark` option 303passed into the stream's constructor. For normal streams, the `highWaterMark` 304option specifies a [total number of bytes][hwm-gotcha]. For streams operating 305in object mode, the `highWaterMark` specifies a total number of objects. 306 307Data is buffered in `Readable` streams when the implementation calls 308[`stream.push(chunk)`][stream-push]. If the consumer of the Stream does not 309call [`stream.read()`][stream-read], the data will sit in the internal 310queue until it is consumed. 311 312Once the total size of the internal read buffer reaches the threshold specified 313by `highWaterMark`, the stream will temporarily stop reading data from the 314underlying resource until the data currently buffered can be consumed (that is, 315the stream will stop calling the internal [`readable._read()`][] method that is 316used to fill the read buffer). 317 318Data is buffered in `Writable` streams when the 319[`writable.write(chunk)`][stream-write] method is called repeatedly. While the 320total size of the internal write buffer is below the threshold set by 321`highWaterMark`, calls to `writable.write()` will return `true`. Once 322the size of the internal buffer reaches or exceeds the `highWaterMark`, `false` 323will be returned. 324 325A key goal of the `stream` API, particularly the [`stream.pipe()`][] method, 326is to limit the buffering of data to acceptable levels such that sources and 327destinations of differing speeds will not overwhelm the available memory. 328 329The `highWaterMark` option is a threshold, not a limit: it dictates the amount 330of data that a stream buffers before it stops asking for more data. It does not 331enforce a strict memory limitation in general. Specific stream implementations 332may choose to enforce stricter limits but doing so is optional. 333 334Because [`Duplex`][] and [`Transform`][] streams are both `Readable` and 335`Writable`, each maintains _two_ separate internal buffers used for reading and 336writing, allowing each side to operate independently of the other while 337maintaining an appropriate and efficient flow of data. For example, 338[`net.Socket`][] instances are [`Duplex`][] streams whose `Readable` side allows 339consumption of data received _from_ the socket and whose `Writable` side allows 340writing data _to_ the socket. Because data may be written to the socket at a 341faster or slower rate than data is received, each side should 342operate (and buffer) independently of the other. 343 344The mechanics of the internal buffering are an internal implementation detail 345and may be changed at any time. However, for certain advanced implementations, 346the internal buffers can be retrieved using `writable.writableBuffer` or 347`readable.readableBuffer`. Use of these undocumented properties is discouraged. 348 349## API for stream consumers 350 351<!--type=misc--> 352 353Almost all Node.js applications, no matter how simple, use streams in some 354manner. The following is an example of using streams in a Node.js application 355that implements an HTTP server: 356 357```js 358const http = require('node:http'); 359 360const server = http.createServer((req, res) => { 361 // `req` is an http.IncomingMessage, which is a readable stream. 362 // `res` is an http.ServerResponse, which is a writable stream. 363 364 let body = ''; 365 // Get the data as utf8 strings. 366 // If an encoding is not set, Buffer objects will be received. 367 req.setEncoding('utf8'); 368 369 // Readable streams emit 'data' events once a listener is added. 370 req.on('data', (chunk) => { 371 body += chunk; 372 }); 373 374 // The 'end' event indicates that the entire body has been received. 375 req.on('end', () => { 376 try { 377 const data = JSON.parse(body); 378 // Write back something interesting to the user: 379 res.write(typeof data); 380 res.end(); 381 } catch (er) { 382 // uh oh! bad json! 383 res.statusCode = 400; 384 return res.end(`error: ${er.message}`); 385 } 386 }); 387}); 388 389server.listen(1337); 390 391// $ curl localhost:1337 -d "{}" 392// object 393// $ curl localhost:1337 -d "\"foo\"" 394// string 395// $ curl localhost:1337 -d "not json" 396// error: Unexpected token 'o', "not json" is not valid JSON 397``` 398 399[`Writable`][] streams (such as `res` in the example) expose methods such as 400`write()` and `end()` that are used to write data onto the stream. 401 402[`Readable`][] streams use the [`EventEmitter`][] API for notifying application 403code when data is available to be read off the stream. That available data can 404be read from the stream in multiple ways. 405 406Both [`Writable`][] and [`Readable`][] streams use the [`EventEmitter`][] API in 407various ways to communicate the current state of the stream. 408 409[`Duplex`][] and [`Transform`][] streams are both [`Writable`][] and 410[`Readable`][]. 411 412Applications that are either writing data to or consuming data from a stream 413are not required to implement the stream interfaces directly and will generally 414have no reason to call `require('node:stream')`. 415 416Developers wishing to implement new types of streams should refer to the 417section [API for stream implementers][]. 418 419### Writable streams 420 421Writable streams are an abstraction for a _destination_ to which data is 422written. 423 424Examples of [`Writable`][] streams include: 425 426* [HTTP requests, on the client][] 427* [HTTP responses, on the server][] 428* [fs write streams][] 429* [zlib streams][zlib] 430* [crypto streams][crypto] 431* [TCP sockets][] 432* [child process stdin][] 433* [`process.stdout`][], [`process.stderr`][] 434 435Some of these examples are actually [`Duplex`][] streams that implement the 436[`Writable`][] interface. 437 438All [`Writable`][] streams implement the interface defined by the 439`stream.Writable` class. 440 441While specific instances of [`Writable`][] streams may differ in various ways, 442all `Writable` streams follow the same fundamental usage pattern as illustrated 443in the example below: 444 445```js 446const myStream = getWritableStreamSomehow(); 447myStream.write('some data'); 448myStream.write('some more data'); 449myStream.end('done writing data'); 450``` 451 452#### Class: `stream.Writable` 453 454<!-- YAML 455added: v0.9.4 456--> 457 458<!--type=class--> 459 460##### Event: `'close'` 461 462<!-- YAML 463added: v0.9.4 464changes: 465 - version: v10.0.0 466 pr-url: https://github.com/nodejs/node/pull/18438 467 description: Add `emitClose` option to specify if `'close'` is emitted on 468 destroy. 469--> 470 471The `'close'` event is emitted when the stream and any of its underlying 472resources (a file descriptor, for example) have been closed. The event indicates 473that no more events will be emitted, and no further computation will occur. 474 475A [`Writable`][] stream will always emit the `'close'` event if it is 476created with the `emitClose` option. 477 478##### Event: `'drain'` 479 480<!-- YAML 481added: v0.9.4 482--> 483 484If a call to [`stream.write(chunk)`][stream-write] returns `false`, the 485`'drain'` event will be emitted when it is appropriate to resume writing data 486to the stream. 487 488```js 489// Write the data to the supplied writable stream one million times. 490// Be attentive to back-pressure. 491function writeOneMillionTimes(writer, data, encoding, callback) { 492 let i = 1000000; 493 write(); 494 function write() { 495 let ok = true; 496 do { 497 i--; 498 if (i === 0) { 499 // Last time! 500 writer.write(data, encoding, callback); 501 } else { 502 // See if we should continue, or wait. 503 // Don't pass the callback, because we're not done yet. 504 ok = writer.write(data, encoding); 505 } 506 } while (i > 0 && ok); 507 if (i > 0) { 508 // Had to stop early! 509 // Write some more once it drains. 510 writer.once('drain', write); 511 } 512 } 513} 514``` 515 516##### Event: `'error'` 517 518<!-- YAML 519added: v0.9.4 520--> 521 522* {Error} 523 524The `'error'` event is emitted if an error occurred while writing or piping 525data. The listener callback is passed a single `Error` argument when called. 526 527The stream is closed when the `'error'` event is emitted unless the 528[`autoDestroy`][writable-new] option was set to `false` when creating the 529stream. 530 531After `'error'`, no further events other than `'close'` _should_ be emitted 532(including `'error'` events). 533 534##### Event: `'finish'` 535 536<!-- YAML 537added: v0.9.4 538--> 539 540The `'finish'` event is emitted after the [`stream.end()`][stream-end] method 541has been called, and all data has been flushed to the underlying system. 542 543```js 544const writer = getWritableStreamSomehow(); 545for (let i = 0; i < 100; i++) { 546 writer.write(`hello, #${i}!\n`); 547} 548writer.on('finish', () => { 549 console.log('All writes are now complete.'); 550}); 551writer.end('This is the end\n'); 552``` 553 554##### Event: `'pipe'` 555 556<!-- YAML 557added: v0.9.4 558--> 559 560* `src` {stream.Readable} source stream that is piping to this writable 561 562The `'pipe'` event is emitted when the [`stream.pipe()`][] method is called on 563a readable stream, adding this writable to its set of destinations. 564 565```js 566const writer = getWritableStreamSomehow(); 567const reader = getReadableStreamSomehow(); 568writer.on('pipe', (src) => { 569 console.log('Something is piping into the writer.'); 570 assert.equal(src, reader); 571}); 572reader.pipe(writer); 573``` 574 575##### Event: `'unpipe'` 576 577<!-- YAML 578added: v0.9.4 579--> 580 581* `src` {stream.Readable} The source stream that 582 [unpiped][`stream.unpipe()`] this writable 583 584The `'unpipe'` event is emitted when the [`stream.unpipe()`][] method is called 585on a [`Readable`][] stream, removing this [`Writable`][] from its set of 586destinations. 587 588This is also emitted in case this [`Writable`][] stream emits an error when a 589[`Readable`][] stream pipes into it. 590 591```js 592const writer = getWritableStreamSomehow(); 593const reader = getReadableStreamSomehow(); 594writer.on('unpipe', (src) => { 595 console.log('Something has stopped piping into the writer.'); 596 assert.equal(src, reader); 597}); 598reader.pipe(writer); 599reader.unpipe(writer); 600``` 601 602##### `writable.cork()` 603 604<!-- YAML 605added: v0.11.2 606--> 607 608The `writable.cork()` method forces all written data to be buffered in memory. 609The buffered data will be flushed when either the [`stream.uncork()`][] or 610[`stream.end()`][stream-end] methods are called. 611 612The primary intent of `writable.cork()` is to accommodate a situation in which 613several small chunks are written to the stream in rapid succession. Instead of 614immediately forwarding them to the underlying destination, `writable.cork()` 615buffers all the chunks until `writable.uncork()` is called, which will pass them 616all to `writable._writev()`, if present. This prevents a head-of-line blocking 617situation where data is being buffered while waiting for the first small chunk 618to be processed. However, use of `writable.cork()` without implementing 619`writable._writev()` may have an adverse effect on throughput. 620 621See also: [`writable.uncork()`][], [`writable._writev()`][stream-_writev]. 622 623##### `writable.destroy([error])` 624 625<!-- YAML 626added: v8.0.0 627changes: 628 - version: v14.0.0 629 pr-url: https://github.com/nodejs/node/pull/29197 630 description: Work as a no-op on a stream that has already been destroyed. 631--> 632 633* `error` {Error} Optional, an error to emit with `'error'` event. 634* Returns: {this} 635 636Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'` 637event (unless `emitClose` is set to `false`). After this call, the writable 638stream has ended and subsequent calls to `write()` or `end()` will result in 639an `ERR_STREAM_DESTROYED` error. 640This is a destructive and immediate way to destroy a stream. Previous calls to 641`write()` may not have drained, and may trigger an `ERR_STREAM_DESTROYED` error. 642Use `end()` instead of destroy if data should flush before close, or wait for 643the `'drain'` event before destroying the stream. 644 645```cjs 646const { Writable } = require('node:stream'); 647 648const myStream = new Writable(); 649 650const fooErr = new Error('foo error'); 651myStream.destroy(fooErr); 652myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error 653``` 654 655```cjs 656const { Writable } = require('node:stream'); 657 658const myStream = new Writable(); 659 660myStream.destroy(); 661myStream.on('error', function wontHappen() {}); 662``` 663 664```cjs 665const { Writable } = require('node:stream'); 666 667const myStream = new Writable(); 668myStream.destroy(); 669 670myStream.write('foo', (error) => console.error(error.code)); 671// ERR_STREAM_DESTROYED 672``` 673 674Once `destroy()` has been called any further calls will be a no-op and no 675further errors except from `_destroy()` may be emitted as `'error'`. 676 677Implementors should not override this method, 678but instead implement [`writable._destroy()`][writable-_destroy]. 679 680##### `writable.closed` 681 682<!-- YAML 683added: v18.0.0 684--> 685 686* {boolean} 687 688Is `true` after `'close'` has been emitted. 689 690##### `writable.destroyed` 691 692<!-- YAML 693added: v8.0.0 694--> 695 696* {boolean} 697 698Is `true` after [`writable.destroy()`][writable-destroy] has been called. 699 700```cjs 701const { Writable } = require('node:stream'); 702 703const myStream = new Writable(); 704 705console.log(myStream.destroyed); // false 706myStream.destroy(); 707console.log(myStream.destroyed); // true 708``` 709 710##### `writable.end([chunk[, encoding]][, callback])` 711 712<!-- YAML 713added: v0.9.4 714changes: 715 - version: v15.0.0 716 pr-url: https://github.com/nodejs/node/pull/34101 717 description: The `callback` is invoked before 'finish' or on error. 718 - version: v14.0.0 719 pr-url: https://github.com/nodejs/node/pull/29747 720 description: The `callback` is invoked if 'finish' or 'error' is emitted. 721 - version: v10.0.0 722 pr-url: https://github.com/nodejs/node/pull/18780 723 description: This method now returns a reference to `writable`. 724 - version: v8.0.0 725 pr-url: https://github.com/nodejs/node/pull/11608 726 description: The `chunk` argument can now be a `Uint8Array` instance. 727--> 728 729* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams 730 not operating in object mode, `chunk` must be a string, `Buffer` or 731 `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value 732 other than `null`. 733* `encoding` {string} The encoding if `chunk` is a string 734* `callback` {Function} Callback for when the stream is finished. 735* Returns: {this} 736 737Calling the `writable.end()` method signals that no more data will be written 738to the [`Writable`][]. The optional `chunk` and `encoding` arguments allow one 739final additional chunk of data to be written immediately before closing the 740stream. 741 742Calling the [`stream.write()`][stream-write] method after calling 743[`stream.end()`][stream-end] will raise an error. 744 745```js 746// Write 'hello, ' and then end with 'world!'. 747const fs = require('node:fs'); 748const file = fs.createWriteStream('example.txt'); 749file.write('hello, '); 750file.end('world!'); 751// Writing more now is not allowed! 752``` 753 754##### `writable.setDefaultEncoding(encoding)` 755 756<!-- YAML 757added: v0.11.15 758changes: 759 - version: v6.1.0 760 pr-url: https://github.com/nodejs/node/pull/5040 761 description: This method now returns a reference to `writable`. 762--> 763 764* `encoding` {string} The new default encoding 765* Returns: {this} 766 767The `writable.setDefaultEncoding()` method sets the default `encoding` for a 768[`Writable`][] stream. 769 770##### `writable.uncork()` 771 772<!-- YAML 773added: v0.11.2 774--> 775 776The `writable.uncork()` method flushes all data buffered since 777[`stream.cork()`][] was called. 778 779When using [`writable.cork()`][] and `writable.uncork()` to manage the buffering 780of writes to a stream, defer calls to `writable.uncork()` using 781`process.nextTick()`. Doing so allows batching of all 782`writable.write()` calls that occur within a given Node.js event loop phase. 783 784```js 785stream.cork(); 786stream.write('some '); 787stream.write('data '); 788process.nextTick(() => stream.uncork()); 789``` 790 791If the [`writable.cork()`][] method is called multiple times on a stream, the 792same number of calls to `writable.uncork()` must be called to flush the buffered 793data. 794 795```js 796stream.cork(); 797stream.write('some '); 798stream.cork(); 799stream.write('data '); 800process.nextTick(() => { 801 stream.uncork(); 802 // The data will not be flushed until uncork() is called a second time. 803 stream.uncork(); 804}); 805``` 806 807See also: [`writable.cork()`][]. 808 809##### `writable.writable` 810 811<!-- YAML 812added: v11.4.0 813--> 814 815* {boolean} 816 817Is `true` if it is safe to call [`writable.write()`][stream-write], which means 818the stream has not been destroyed, errored, or ended. 819 820##### `writable.writableAborted` 821 822<!-- YAML 823added: v18.0.0 824--> 825 826> Stability: 1 - Experimental 827 828* {boolean} 829 830Returns whether the stream was destroyed or errored before emitting `'finish'`. 831 832##### `writable.writableEnded` 833 834<!-- YAML 835added: v12.9.0 836--> 837 838* {boolean} 839 840Is `true` after [`writable.end()`][] has been called. This property 841does not indicate whether the data has been flushed, for this use 842[`writable.writableFinished`][] instead. 843 844##### `writable.writableCorked` 845 846<!-- YAML 847added: 848 - v13.2.0 849 - v12.16.0 850--> 851 852* {integer} 853 854Number of times [`writable.uncork()`][stream-uncork] needs to be 855called in order to fully uncork the stream. 856 857##### `writable.errored` 858 859<!-- YAML 860added: 861 v18.0.0 862--> 863 864* {Error} 865 866Returns error if the stream has been destroyed with an error. 867 868##### `writable.writableFinished` 869 870<!-- YAML 871added: v12.6.0 872--> 873 874* {boolean} 875 876Is set to `true` immediately before the [`'finish'`][] event is emitted. 877 878##### `writable.writableHighWaterMark` 879 880<!-- YAML 881added: v9.3.0 882--> 883 884* {number} 885 886Return the value of `highWaterMark` passed when creating this `Writable`. 887 888##### `writable.writableLength` 889 890<!-- YAML 891added: v9.4.0 892--> 893 894* {number} 895 896This property contains the number of bytes (or objects) in the queue 897ready to be written. The value provides introspection data regarding 898the status of the `highWaterMark`. 899 900##### `writable.writableNeedDrain` 901 902<!-- YAML 903added: 904 - v15.2.0 905 - v14.17.0 906--> 907 908* {boolean} 909 910Is `true` if the stream's buffer has been full and stream will emit `'drain'`. 911 912##### `writable.writableObjectMode` 913 914<!-- YAML 915added: v12.3.0 916--> 917 918* {boolean} 919 920Getter for the property `objectMode` of a given `Writable` stream. 921 922##### `writable.write(chunk[, encoding][, callback])` 923 924<!-- YAML 925added: v0.9.4 926changes: 927 - version: v8.0.0 928 pr-url: https://github.com/nodejs/node/pull/11608 929 description: The `chunk` argument can now be a `Uint8Array` instance. 930 - version: v6.0.0 931 pr-url: https://github.com/nodejs/node/pull/6170 932 description: Passing `null` as the `chunk` parameter will always be 933 considered invalid now, even in object mode. 934--> 935 936* `chunk` {string|Buffer|Uint8Array|any} Optional data to write. For streams 937 not operating in object mode, `chunk` must be a string, `Buffer` or 938 `Uint8Array`. For object mode streams, `chunk` may be any JavaScript value 939 other than `null`. 940* `encoding` {string|null} The encoding, if `chunk` is a string. **Default:** `'utf8'` 941* `callback` {Function} Callback for when this chunk of data is flushed. 942* Returns: {boolean} `false` if the stream wishes for the calling code to 943 wait for the `'drain'` event to be emitted before continuing to write 944 additional data; otherwise `true`. 945 946The `writable.write()` method writes some data to the stream, and calls the 947supplied `callback` once the data has been fully handled. If an error 948occurs, the `callback` will be called with the error as its 949first argument. The `callback` is called asynchronously and before `'error'` is 950emitted. 951 952The return value is `true` if the internal buffer is less than the 953`highWaterMark` configured when the stream was created after admitting `chunk`. 954If `false` is returned, further attempts to write data to the stream should 955stop until the [`'drain'`][] event is emitted. 956 957While a stream is not draining, calls to `write()` will buffer `chunk`, and 958return false. Once all currently buffered chunks are drained (accepted for 959delivery by the operating system), the `'drain'` event will be emitted. 960Once `write()` returns false, do not write more chunks 961until the `'drain'` event is emitted. While calling `write()` on a stream that 962is not draining is allowed, Node.js will buffer all written chunks until 963maximum memory usage occurs, at which point it will abort unconditionally. 964Even before it aborts, high memory usage will cause poor garbage collector 965performance and high RSS (which is not typically released back to the system, 966even after the memory is no longer required). Since TCP sockets may never 967drain if the remote peer does not read the data, writing a socket that is 968not draining may lead to a remotely exploitable vulnerability. 969 970Writing data while the stream is not draining is particularly 971problematic for a [`Transform`][], because the `Transform` streams are paused 972by default until they are piped or a `'data'` or `'readable'` event handler 973is added. 974 975If the data to be written can be generated or fetched on demand, it is 976recommended to encapsulate the logic into a [`Readable`][] and use 977[`stream.pipe()`][]. However, if calling `write()` is preferred, it is 978possible to respect backpressure and avoid memory issues using the 979[`'drain'`][] event: 980 981```js 982function write(data, cb) { 983 if (!stream.write(data)) { 984 stream.once('drain', cb); 985 } else { 986 process.nextTick(cb); 987 } 988} 989 990// Wait for cb to be called before doing any other write. 991write('hello', () => { 992 console.log('Write completed, do more writes now.'); 993}); 994``` 995 996A `Writable` stream in object mode will always ignore the `encoding` argument. 997 998### Readable streams 999 1000Readable streams are an abstraction for a _source_ from which data is 1001consumed. 1002 1003Examples of `Readable` streams include: 1004 1005* [HTTP responses, on the client][http-incoming-message] 1006* [HTTP requests, on the server][http-incoming-message] 1007* [fs read streams][] 1008* [zlib streams][zlib] 1009* [crypto streams][crypto] 1010* [TCP sockets][] 1011* [child process stdout and stderr][] 1012* [`process.stdin`][] 1013 1014All [`Readable`][] streams implement the interface defined by the 1015`stream.Readable` class. 1016 1017#### Two reading modes 1018 1019`Readable` streams effectively operate in one of two modes: flowing and 1020paused. These modes are separate from [object mode][object-mode]. 1021A [`Readable`][] stream can be in object mode or not, regardless of whether 1022it is in flowing mode or paused mode. 1023 1024* In flowing mode, data is read from the underlying system automatically 1025 and provided to an application as quickly as possible using events via the 1026 [`EventEmitter`][] interface. 1027 1028* In paused mode, the [`stream.read()`][stream-read] method must be called 1029 explicitly to read chunks of data from the stream. 1030 1031All [`Readable`][] streams begin in paused mode but can be switched to flowing 1032mode in one of the following ways: 1033 1034* Adding a [`'data'`][] event handler. 1035* Calling the [`stream.resume()`][stream-resume] method. 1036* Calling the [`stream.pipe()`][] method to send the data to a [`Writable`][]. 1037 1038The `Readable` can switch back to paused mode using one of the following: 1039 1040* If there are no pipe destinations, by calling the 1041 [`stream.pause()`][stream-pause] method. 1042* If there are pipe destinations, by removing all pipe destinations. 1043 Multiple pipe destinations may be removed by calling the 1044 [`stream.unpipe()`][] method. 1045 1046The important concept to remember is that a `Readable` will not generate data 1047until a mechanism for either consuming or ignoring that data is provided. If 1048the consuming mechanism is disabled or taken away, the `Readable` will _attempt_ 1049to stop generating the data. 1050 1051For backward compatibility reasons, removing [`'data'`][] event handlers will 1052**not** automatically pause the stream. Also, if there are piped destinations, 1053then calling [`stream.pause()`][stream-pause] will not guarantee that the 1054stream will _remain_ paused once those destinations drain and ask for more data. 1055 1056If a [`Readable`][] is switched into flowing mode and there are no consumers 1057available to handle the data, that data will be lost. This can occur, for 1058instance, when the `readable.resume()` method is called without a listener 1059attached to the `'data'` event, or when a `'data'` event handler is removed 1060from the stream. 1061 1062Adding a [`'readable'`][] event handler automatically makes the stream 1063stop flowing, and the data has to be consumed via 1064[`readable.read()`][stream-read]. If the [`'readable'`][] event handler is 1065removed, then the stream will start flowing again if there is a 1066[`'data'`][] event handler. 1067 1068#### Three states 1069 1070The "two modes" of operation for a `Readable` stream are a simplified 1071abstraction for the more complicated internal state management that is happening 1072within the `Readable` stream implementation. 1073 1074Specifically, at any given point in time, every `Readable` is in one of three 1075possible states: 1076 1077* `readable.readableFlowing === null` 1078* `readable.readableFlowing === false` 1079* `readable.readableFlowing === true` 1080 1081When `readable.readableFlowing` is `null`, no mechanism for consuming the 1082stream's data is provided. Therefore, the stream will not generate data. 1083While in this state, attaching a listener for the `'data'` event, calling the 1084`readable.pipe()` method, or calling the `readable.resume()` method will switch 1085`readable.readableFlowing` to `true`, causing the `Readable` to begin actively 1086emitting events as data is generated. 1087 1088Calling `readable.pause()`, `readable.unpipe()`, or receiving backpressure 1089will cause the `readable.readableFlowing` to be set as `false`, 1090temporarily halting the flowing of events but _not_ halting the generation of 1091data. While in this state, attaching a listener for the `'data'` event 1092will not switch `readable.readableFlowing` to `true`. 1093 1094```js 1095const { PassThrough, Writable } = require('node:stream'); 1096const pass = new PassThrough(); 1097const writable = new Writable(); 1098 1099pass.pipe(writable); 1100pass.unpipe(writable); 1101// readableFlowing is now false. 1102 1103pass.on('data', (chunk) => { console.log(chunk.toString()); }); 1104// readableFlowing is still false. 1105pass.write('ok'); // Will not emit 'data'. 1106pass.resume(); // Must be called to make stream emit 'data'. 1107// readableFlowing is now true. 1108``` 1109 1110While `readable.readableFlowing` is `false`, data may be accumulating 1111within the stream's internal buffer. 1112 1113#### Choose one API style 1114 1115The `Readable` stream API evolved across multiple Node.js versions and provides 1116multiple methods of consuming stream data. In general, developers should choose 1117_one_ of the methods of consuming data and _should never_ use multiple methods 1118to consume data from a single stream. Specifically, using a combination 1119of `on('data')`, `on('readable')`, `pipe()`, or async iterators could 1120lead to unintuitive behavior. 1121 1122#### Class: `stream.Readable` 1123 1124<!-- YAML 1125added: v0.9.4 1126--> 1127 1128<!--type=class--> 1129 1130##### Event: `'close'` 1131 1132<!-- YAML 1133added: v0.9.4 1134changes: 1135 - version: v10.0.0 1136 pr-url: https://github.com/nodejs/node/pull/18438 1137 description: Add `emitClose` option to specify if `'close'` is emitted on 1138 destroy. 1139--> 1140 1141The `'close'` event is emitted when the stream and any of its underlying 1142resources (a file descriptor, for example) have been closed. The event indicates 1143that no more events will be emitted, and no further computation will occur. 1144 1145A [`Readable`][] stream will always emit the `'close'` event if it is 1146created with the `emitClose` option. 1147 1148##### Event: `'data'` 1149 1150<!-- YAML 1151added: v0.9.4 1152--> 1153 1154* `chunk` {Buffer|string|any} The chunk of data. For streams that are not 1155 operating in object mode, the chunk will be either a string or `Buffer`. 1156 For streams that are in object mode, the chunk can be any JavaScript value 1157 other than `null`. 1158 1159The `'data'` event is emitted whenever the stream is relinquishing ownership of 1160a chunk of data to a consumer. This may occur whenever the stream is switched 1161in flowing mode by calling `readable.pipe()`, `readable.resume()`, or by 1162attaching a listener callback to the `'data'` event. The `'data'` event will 1163also be emitted whenever the `readable.read()` method is called and a chunk of 1164data is available to be returned. 1165 1166Attaching a `'data'` event listener to a stream that has not been explicitly 1167paused will switch the stream into flowing mode. Data will then be passed as 1168soon as it is available. 1169 1170The listener callback will be passed the chunk of data as a string if a default 1171encoding has been specified for the stream using the 1172`readable.setEncoding()` method; otherwise the data will be passed as a 1173`Buffer`. 1174 1175```js 1176const readable = getReadableStreamSomehow(); 1177readable.on('data', (chunk) => { 1178 console.log(`Received ${chunk.length} bytes of data.`); 1179}); 1180``` 1181 1182##### Event: `'end'` 1183 1184<!-- YAML 1185added: v0.9.4 1186--> 1187 1188The `'end'` event is emitted when there is no more data to be consumed from 1189the stream. 1190 1191The `'end'` event **will not be emitted** unless the data is completely 1192consumed. This can be accomplished by switching the stream into flowing mode, 1193or by calling [`stream.read()`][stream-read] repeatedly until all data has been 1194consumed. 1195 1196```js 1197const readable = getReadableStreamSomehow(); 1198readable.on('data', (chunk) => { 1199 console.log(`Received ${chunk.length} bytes of data.`); 1200}); 1201readable.on('end', () => { 1202 console.log('There will be no more data.'); 1203}); 1204``` 1205 1206##### Event: `'error'` 1207 1208<!-- YAML 1209added: v0.9.4 1210--> 1211 1212* {Error} 1213 1214The `'error'` event may be emitted by a `Readable` implementation at any time. 1215Typically, this may occur if the underlying stream is unable to generate data 1216due to an underlying internal failure, or when a stream implementation attempts 1217to push an invalid chunk of data. 1218 1219The listener callback will be passed a single `Error` object. 1220 1221##### Event: `'pause'` 1222 1223<!-- YAML 1224added: v0.9.4 1225--> 1226 1227The `'pause'` event is emitted when [`stream.pause()`][stream-pause] is called 1228and `readableFlowing` is not `false`. 1229 1230##### Event: `'readable'` 1231 1232<!-- YAML 1233added: v0.9.4 1234changes: 1235 - version: v10.0.0 1236 pr-url: https://github.com/nodejs/node/pull/17979 1237 description: The `'readable'` is always emitted in the next tick after 1238 `.push()` is called. 1239 - version: v10.0.0 1240 pr-url: https://github.com/nodejs/node/pull/18994 1241 description: Using `'readable'` requires calling `.read()`. 1242--> 1243 1244The `'readable'` event is emitted when there is data available to be read from 1245the stream or when the end of the stream has been reached. Effectively, the 1246`'readable'` event indicates that the stream has new information. If data is 1247available, [`stream.read()`][stream-read] will return that data. 1248 1249```js 1250const readable = getReadableStreamSomehow(); 1251readable.on('readable', function() { 1252 // There is some data to read now. 1253 let data; 1254 1255 while ((data = this.read()) !== null) { 1256 console.log(data); 1257 } 1258}); 1259``` 1260 1261If the end of the stream has been reached, calling 1262[`stream.read()`][stream-read] will return `null` and trigger the `'end'` 1263event. This is also true if there never was any data to be read. For instance, 1264in the following example, `foo.txt` is an empty file: 1265 1266```js 1267const fs = require('node:fs'); 1268const rr = fs.createReadStream('foo.txt'); 1269rr.on('readable', () => { 1270 console.log(`readable: ${rr.read()}`); 1271}); 1272rr.on('end', () => { 1273 console.log('end'); 1274}); 1275``` 1276 1277The output of running this script is: 1278 1279```console 1280$ node test.js 1281readable: null 1282end 1283``` 1284 1285In some cases, attaching a listener for the `'readable'` event will cause some 1286amount of data to be read into an internal buffer. 1287 1288In general, the `readable.pipe()` and `'data'` event mechanisms are easier to 1289understand than the `'readable'` event. However, handling `'readable'` might 1290result in increased throughput. 1291 1292If both `'readable'` and [`'data'`][] are used at the same time, `'readable'` 1293takes precedence in controlling the flow, i.e. `'data'` will be emitted 1294only when [`stream.read()`][stream-read] is called. The 1295`readableFlowing` property would become `false`. 1296If there are `'data'` listeners when `'readable'` is removed, the stream 1297will start flowing, i.e. `'data'` events will be emitted without calling 1298`.resume()`. 1299 1300##### Event: `'resume'` 1301 1302<!-- YAML 1303added: v0.9.4 1304--> 1305 1306The `'resume'` event is emitted when [`stream.resume()`][stream-resume] is 1307called and `readableFlowing` is not `true`. 1308 1309##### `readable.destroy([error])` 1310 1311<!-- YAML 1312added: v8.0.0 1313changes: 1314 - version: v14.0.0 1315 pr-url: https://github.com/nodejs/node/pull/29197 1316 description: Work as a no-op on a stream that has already been destroyed. 1317--> 1318 1319* `error` {Error} Error which will be passed as payload in `'error'` event 1320* Returns: {this} 1321 1322Destroy the stream. Optionally emit an `'error'` event, and emit a `'close'` 1323event (unless `emitClose` is set to `false`). After this call, the readable 1324stream will release any internal resources and subsequent calls to `push()` 1325will be ignored. 1326 1327Once `destroy()` has been called any further calls will be a no-op and no 1328further errors except from `_destroy()` may be emitted as `'error'`. 1329 1330Implementors should not override this method, but instead implement 1331[`readable._destroy()`][readable-_destroy]. 1332 1333##### `readable.closed` 1334 1335<!-- YAML 1336added: v18.0.0 1337--> 1338 1339* {boolean} 1340 1341Is `true` after `'close'` has been emitted. 1342 1343##### `readable.destroyed` 1344 1345<!-- YAML 1346added: v8.0.0 1347--> 1348 1349* {boolean} 1350 1351Is `true` after [`readable.destroy()`][readable-destroy] has been called. 1352 1353##### `readable.isPaused()` 1354 1355<!-- YAML 1356added: v0.11.14 1357--> 1358 1359* Returns: {boolean} 1360 1361The `readable.isPaused()` method returns the current operating state of the 1362`Readable`. This is used primarily by the mechanism that underlies the 1363`readable.pipe()` method. In most typical cases, there will be no reason to 1364use this method directly. 1365 1366```js 1367const readable = new stream.Readable(); 1368 1369readable.isPaused(); // === false 1370readable.pause(); 1371readable.isPaused(); // === true 1372readable.resume(); 1373readable.isPaused(); // === false 1374``` 1375 1376##### `readable.pause()` 1377 1378<!-- YAML 1379added: v0.9.4 1380--> 1381 1382* Returns: {this} 1383 1384The `readable.pause()` method will cause a stream in flowing mode to stop 1385emitting [`'data'`][] events, switching out of flowing mode. Any data that 1386becomes available will remain in the internal buffer. 1387 1388```js 1389const readable = getReadableStreamSomehow(); 1390readable.on('data', (chunk) => { 1391 console.log(`Received ${chunk.length} bytes of data.`); 1392 readable.pause(); 1393 console.log('There will be no additional data for 1 second.'); 1394 setTimeout(() => { 1395 console.log('Now data will start flowing again.'); 1396 readable.resume(); 1397 }, 1000); 1398}); 1399``` 1400 1401The `readable.pause()` method has no effect if there is a `'readable'` 1402event listener. 1403 1404##### `readable.pipe(destination[, options])` 1405 1406<!-- YAML 1407added: v0.9.4 1408--> 1409 1410* `destination` {stream.Writable} The destination for writing data 1411* `options` {Object} Pipe options 1412 * `end` {boolean} End the writer when the reader ends. **Default:** `true`. 1413* Returns: {stream.Writable} The _destination_, allowing for a chain of pipes if 1414 it is a [`Duplex`][] or a [`Transform`][] stream 1415 1416The `readable.pipe()` method attaches a [`Writable`][] stream to the `readable`, 1417causing it to switch automatically into flowing mode and push all of its data 1418to the attached [`Writable`][]. The flow of data will be automatically managed 1419so that the destination `Writable` stream is not overwhelmed by a faster 1420`Readable` stream. 1421 1422The following example pipes all of the data from the `readable` into a file 1423named `file.txt`: 1424 1425```js 1426const fs = require('node:fs'); 1427const readable = getReadableStreamSomehow(); 1428const writable = fs.createWriteStream('file.txt'); 1429// All the data from readable goes into 'file.txt'. 1430readable.pipe(writable); 1431``` 1432 1433It is possible to attach multiple `Writable` streams to a single `Readable` 1434stream. 1435 1436The `readable.pipe()` method returns a reference to the _destination_ stream 1437making it possible to set up chains of piped streams: 1438 1439```js 1440const fs = require('node:fs'); 1441const zlib = require('node:zlib'); 1442const r = fs.createReadStream('file.txt'); 1443const z = zlib.createGzip(); 1444const w = fs.createWriteStream('file.txt.gz'); 1445r.pipe(z).pipe(w); 1446``` 1447 1448By default, [`stream.end()`][stream-end] is called on the destination `Writable` 1449stream when the source `Readable` stream emits [`'end'`][], so that the 1450destination is no longer writable. To disable this default behavior, the `end` 1451option can be passed as `false`, causing the destination stream to remain open: 1452 1453```js 1454reader.pipe(writer, { end: false }); 1455reader.on('end', () => { 1456 writer.end('Goodbye\n'); 1457}); 1458``` 1459 1460One important caveat is that if the `Readable` stream emits an error during 1461processing, the `Writable` destination _is not closed_ automatically. If an 1462error occurs, it will be necessary to _manually_ close each stream in order 1463to prevent memory leaks. 1464 1465The [`process.stderr`][] and [`process.stdout`][] `Writable` streams are never 1466closed until the Node.js process exits, regardless of the specified options. 1467 1468##### `readable.read([size])` 1469 1470<!-- YAML 1471added: v0.9.4 1472--> 1473 1474* `size` {number} Optional argument to specify how much data to read. 1475* Returns: {string|Buffer|null|any} 1476 1477The `readable.read()` method reads data out of the internal buffer and 1478returns it. If no data is available to be read, `null` is returned. By default, 1479the data is returned as a `Buffer` object unless an encoding has been 1480specified using the `readable.setEncoding()` method or the stream is operating 1481in object mode. 1482 1483The optional `size` argument specifies a specific number of bytes to read. If 1484`size` bytes are not available to be read, `null` will be returned _unless_ 1485the stream has ended, in which case all of the data remaining in the internal 1486buffer will be returned. 1487 1488If the `size` argument is not specified, all of the data contained in the 1489internal buffer will be returned. 1490 1491The `size` argument must be less than or equal to 1 GiB. 1492 1493The `readable.read()` method should only be called on `Readable` streams 1494operating in paused mode. In flowing mode, `readable.read()` is called 1495automatically until the internal buffer is fully drained. 1496 1497```js 1498const readable = getReadableStreamSomehow(); 1499 1500// 'readable' may be triggered multiple times as data is buffered in 1501readable.on('readable', () => { 1502 let chunk; 1503 console.log('Stream is readable (new data received in buffer)'); 1504 // Use a loop to make sure we read all currently available data 1505 while (null !== (chunk = readable.read())) { 1506 console.log(`Read ${chunk.length} bytes of data...`); 1507 } 1508}); 1509 1510// 'end' will be triggered once when there is no more data available 1511readable.on('end', () => { 1512 console.log('Reached end of stream.'); 1513}); 1514``` 1515 1516Each call to `readable.read()` returns a chunk of data, or `null`. The chunks 1517are not concatenated. A `while` loop is necessary to consume all data 1518currently in the buffer. When reading a large file `.read()` may return `null`, 1519having consumed all buffered content so far, but there is still more data to 1520come not yet buffered. In this case a new `'readable'` event will be emitted 1521when there is more data in the buffer. Finally the `'end'` event will be 1522emitted when there is no more data to come. 1523 1524Therefore to read a file's whole contents from a `readable`, it is necessary 1525to collect chunks across multiple `'readable'` events: 1526 1527```js 1528const chunks = []; 1529 1530readable.on('readable', () => { 1531 let chunk; 1532 while (null !== (chunk = readable.read())) { 1533 chunks.push(chunk); 1534 } 1535}); 1536 1537readable.on('end', () => { 1538 const content = chunks.join(''); 1539}); 1540``` 1541 1542A `Readable` stream in object mode will always return a single item from 1543a call to [`readable.read(size)`][stream-read], regardless of the value of the 1544`size` argument. 1545 1546If the `readable.read()` method returns a chunk of data, a `'data'` event will 1547also be emitted. 1548 1549Calling [`stream.read([size])`][stream-read] after the [`'end'`][] event has 1550been emitted will return `null`. No runtime error will be raised. 1551 1552##### `readable.readable` 1553 1554<!-- YAML 1555added: v11.4.0 1556--> 1557 1558* {boolean} 1559 1560Is `true` if it is safe to call [`readable.read()`][stream-read], which means 1561the stream has not been destroyed or emitted `'error'` or `'end'`. 1562 1563##### `readable.readableAborted` 1564 1565<!-- YAML 1566added: v16.8.0 1567--> 1568 1569> Stability: 1 - Experimental 1570 1571* {boolean} 1572 1573Returns whether the stream was destroyed or errored before emitting `'end'`. 1574 1575##### `readable.readableDidRead` 1576 1577<!-- YAML 1578added: 1579 - v16.7.0 1580 - v14.18.0 1581--> 1582 1583> Stability: 1 - Experimental 1584 1585* {boolean} 1586 1587Returns whether `'data'` has been emitted. 1588 1589##### `readable.readableEncoding` 1590 1591<!-- YAML 1592added: v12.7.0 1593--> 1594 1595* {null|string} 1596 1597Getter for the property `encoding` of a given `Readable` stream. The `encoding` 1598property can be set using the [`readable.setEncoding()`][] method. 1599 1600##### `readable.readableEnded` 1601 1602<!-- YAML 1603added: v12.9.0 1604--> 1605 1606* {boolean} 1607 1608Becomes `true` when [`'end'`][] event is emitted. 1609 1610##### `readable.errored` 1611 1612<!-- YAML 1613added: 1614 v18.0.0 1615--> 1616 1617* {Error} 1618 1619Returns error if the stream has been destroyed with an error. 1620 1621##### `readable.readableFlowing` 1622 1623<!-- YAML 1624added: v9.4.0 1625--> 1626 1627* {boolean} 1628 1629This property reflects the current state of a `Readable` stream as described 1630in the [Three states][] section. 1631 1632##### `readable.readableHighWaterMark` 1633 1634<!-- YAML 1635added: v9.3.0 1636--> 1637 1638* {number} 1639 1640Returns the value of `highWaterMark` passed when creating this `Readable`. 1641 1642##### `readable.readableLength` 1643 1644<!-- YAML 1645added: v9.4.0 1646--> 1647 1648* {number} 1649 1650This property contains the number of bytes (or objects) in the queue 1651ready to be read. The value provides introspection data regarding 1652the status of the `highWaterMark`. 1653 1654##### `readable.readableObjectMode` 1655 1656<!-- YAML 1657added: v12.3.0 1658--> 1659 1660* {boolean} 1661 1662Getter for the property `objectMode` of a given `Readable` stream. 1663 1664##### `readable.resume()` 1665 1666<!-- YAML 1667added: v0.9.4 1668changes: 1669 - version: v10.0.0 1670 pr-url: https://github.com/nodejs/node/pull/18994 1671 description: The `resume()` has no effect if there is a `'readable'` event 1672 listening. 1673--> 1674 1675* Returns: {this} 1676 1677The `readable.resume()` method causes an explicitly paused `Readable` stream to 1678resume emitting [`'data'`][] events, switching the stream into flowing mode. 1679 1680The `readable.resume()` method can be used to fully consume the data from a 1681stream without actually processing any of that data: 1682 1683```js 1684getReadableStreamSomehow() 1685 .resume() 1686 .on('end', () => { 1687 console.log('Reached the end, but did not read anything.'); 1688 }); 1689``` 1690 1691The `readable.resume()` method has no effect if there is a `'readable'` 1692event listener. 1693 1694##### `readable.setEncoding(encoding)` 1695 1696<!-- YAML 1697added: v0.9.4 1698--> 1699 1700* `encoding` {string} The encoding to use. 1701* Returns: {this} 1702 1703The `readable.setEncoding()` method sets the character encoding for 1704data read from the `Readable` stream. 1705 1706By default, no encoding is assigned and stream data will be returned as 1707`Buffer` objects. Setting an encoding causes the stream data 1708to be returned as strings of the specified encoding rather than as `Buffer` 1709objects. For instance, calling `readable.setEncoding('utf8')` will cause the 1710output data to be interpreted as UTF-8 data, and passed as strings. Calling 1711`readable.setEncoding('hex')` will cause the data to be encoded in hexadecimal 1712string format. 1713 1714The `Readable` stream will properly handle multi-byte characters delivered 1715through the stream that would otherwise become improperly decoded if simply 1716pulled from the stream as `Buffer` objects. 1717 1718```js 1719const readable = getReadableStreamSomehow(); 1720readable.setEncoding('utf8'); 1721readable.on('data', (chunk) => { 1722 assert.equal(typeof chunk, 'string'); 1723 console.log('Got %d characters of string data:', chunk.length); 1724}); 1725``` 1726 1727##### `readable.unpipe([destination])` 1728 1729<!-- YAML 1730added: v0.9.4 1731--> 1732 1733* `destination` {stream.Writable} Optional specific stream to unpipe 1734* Returns: {this} 1735 1736The `readable.unpipe()` method detaches a `Writable` stream previously attached 1737using the [`stream.pipe()`][] method. 1738 1739If the `destination` is not specified, then _all_ pipes are detached. 1740 1741If the `destination` is specified, but no pipe is set up for it, then 1742the method does nothing. 1743 1744```js 1745const fs = require('node:fs'); 1746const readable = getReadableStreamSomehow(); 1747const writable = fs.createWriteStream('file.txt'); 1748// All the data from readable goes into 'file.txt', 1749// but only for the first second. 1750readable.pipe(writable); 1751setTimeout(() => { 1752 console.log('Stop writing to file.txt.'); 1753 readable.unpipe(writable); 1754 console.log('Manually close the file stream.'); 1755 writable.end(); 1756}, 1000); 1757``` 1758 1759##### `readable.unshift(chunk[, encoding])` 1760 1761<!-- YAML 1762added: v0.9.11 1763changes: 1764 - version: v8.0.0 1765 pr-url: https://github.com/nodejs/node/pull/11608 1766 description: The `chunk` argument can now be a `Uint8Array` instance. 1767--> 1768 1769* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to unshift onto the 1770 read queue. For streams not operating in object mode, `chunk` must be a 1771 string, `Buffer`, `Uint8Array`, or `null`. For object mode streams, `chunk` 1772 may be any JavaScript value. 1773* `encoding` {string} Encoding of string chunks. Must be a valid 1774 `Buffer` encoding, such as `'utf8'` or `'ascii'`. 1775 1776Passing `chunk` as `null` signals the end of the stream (EOF) and behaves the 1777same as `readable.push(null)`, after which no more data can be written. The EOF 1778signal is put at the end of the buffer and any buffered data will still be 1779flushed. 1780 1781The `readable.unshift()` method pushes a chunk of data back into the internal 1782buffer. This is useful in certain situations where a stream is being consumed by 1783code that needs to "un-consume" some amount of data that it has optimistically 1784pulled out of the source, so that the data can be passed on to some other party. 1785 1786The `stream.unshift(chunk)` method cannot be called after the [`'end'`][] event 1787has been emitted or a runtime error will be thrown. 1788 1789Developers using `stream.unshift()` often should consider switching to 1790use of a [`Transform`][] stream instead. See the [API for stream implementers][] 1791section for more information. 1792 1793```js 1794// Pull off a header delimited by \n\n. 1795// Use unshift() if we get too much. 1796// Call the callback with (error, header, stream). 1797const { StringDecoder } = require('node:string_decoder'); 1798function parseHeader(stream, callback) { 1799 stream.on('error', callback); 1800 stream.on('readable', onReadable); 1801 const decoder = new StringDecoder('utf8'); 1802 let header = ''; 1803 function onReadable() { 1804 let chunk; 1805 while (null !== (chunk = stream.read())) { 1806 const str = decoder.write(chunk); 1807 if (str.includes('\n\n')) { 1808 // Found the header boundary. 1809 const split = str.split(/\n\n/); 1810 header += split.shift(); 1811 const remaining = split.join('\n\n'); 1812 const buf = Buffer.from(remaining, 'utf8'); 1813 stream.removeListener('error', callback); 1814 // Remove the 'readable' listener before unshifting. 1815 stream.removeListener('readable', onReadable); 1816 if (buf.length) 1817 stream.unshift(buf); 1818 // Now the body of the message can be read from the stream. 1819 callback(null, header, stream); 1820 return; 1821 } 1822 // Still reading the header. 1823 header += str; 1824 } 1825 } 1826} 1827``` 1828 1829Unlike [`stream.push(chunk)`][stream-push], `stream.unshift(chunk)` will not 1830end the reading process by resetting the internal reading state of the stream. 1831This can cause unexpected results if `readable.unshift()` is called during a 1832read (i.e. from within a [`stream._read()`][stream-_read] implementation on a 1833custom stream). Following the call to `readable.unshift()` with an immediate 1834[`stream.push('')`][stream-push] will reset the reading state appropriately, 1835however it is best to simply avoid calling `readable.unshift()` while in the 1836process of performing a read. 1837 1838##### `readable.wrap(stream)` 1839 1840<!-- YAML 1841added: v0.9.4 1842--> 1843 1844* `stream` {Stream} An "old style" readable stream 1845* Returns: {this} 1846 1847Prior to Node.js 0.10, streams did not implement the entire `node:stream` 1848module API as it is currently defined. (See [Compatibility][] for more 1849information.) 1850 1851When using an older Node.js library that emits [`'data'`][] events and has a 1852[`stream.pause()`][stream-pause] method that is advisory only, the 1853`readable.wrap()` method can be used to create a [`Readable`][] stream that uses 1854the old stream as its data source. 1855 1856It will rarely be necessary to use `readable.wrap()` but the method has been 1857provided as a convenience for interacting with older Node.js applications and 1858libraries. 1859 1860```js 1861const { OldReader } = require('./old-api-module.js'); 1862const { Readable } = require('node:stream'); 1863const oreader = new OldReader(); 1864const myReader = new Readable().wrap(oreader); 1865 1866myReader.on('readable', () => { 1867 myReader.read(); // etc. 1868}); 1869``` 1870 1871##### `readable[Symbol.asyncIterator]()` 1872 1873<!-- YAML 1874added: v10.0.0 1875changes: 1876 - version: v11.14.0 1877 pr-url: https://github.com/nodejs/node/pull/26989 1878 description: Symbol.asyncIterator support is no longer experimental. 1879--> 1880 1881* Returns: {AsyncIterator} to fully consume the stream. 1882 1883```js 1884const fs = require('node:fs'); 1885 1886async function print(readable) { 1887 readable.setEncoding('utf8'); 1888 let data = ''; 1889 for await (const chunk of readable) { 1890 data += chunk; 1891 } 1892 console.log(data); 1893} 1894 1895print(fs.createReadStream('file')).catch(console.error); 1896``` 1897 1898If the loop terminates with a `break`, `return`, or a `throw`, the stream will 1899be destroyed. In other terms, iterating over a stream will consume the stream 1900fully. The stream will be read in chunks of size equal to the `highWaterMark` 1901option. In the code example above, data will be in a single chunk if the file 1902has less then 64 KiB of data because no `highWaterMark` option is provided to 1903[`fs.createReadStream()`][]. 1904 1905##### `readable[Symbol.asyncDispose]()` 1906 1907<!-- YAML 1908added: v18.18.0 1909--> 1910 1911> Stability: 1 - Experimental 1912 1913Calls [`readable.destroy()`][readable-destroy] with an `AbortError` and returns 1914a promise that fulfills when the stream is finished. 1915 1916##### `readable.compose(stream[, options])` 1917 1918<!-- YAML 1919added: v18.13.0 1920--> 1921 1922> Stability: 1 - Experimental 1923 1924* `stream` {Stream|Iterable|AsyncIterable|Function} 1925* `options` {Object} 1926 * `signal` {AbortSignal} allows destroying the stream if the signal is 1927 aborted. 1928* Returns: {Duplex} a stream composed with the stream `stream`. 1929 1930```mjs 1931import { Readable } from 'node:stream'; 1932 1933async function* splitToWords(source) { 1934 for await (const chunk of source) { 1935 const words = String(chunk).split(' '); 1936 1937 for (const word of words) { 1938 yield word; 1939 } 1940 } 1941} 1942 1943const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords); 1944const words = await wordsStream.toArray(); 1945 1946console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator'] 1947``` 1948 1949See [`stream.compose`][] for more information. 1950 1951##### `readable.iterator([options])` 1952 1953<!-- YAML 1954added: v16.3.0 1955--> 1956 1957> Stability: 1 - Experimental 1958 1959* `options` {Object} 1960 * `destroyOnReturn` {boolean} When set to `false`, calling `return` on the 1961 async iterator, or exiting a `for await...of` iteration using a `break`, 1962 `return`, or `throw` will not destroy the stream. **Default:** `true`. 1963* Returns: {AsyncIterator} to consume the stream. 1964 1965The iterator created by this method gives users the option to cancel the 1966destruction of the stream if the `for await...of` loop is exited by `return`, 1967`break`, or `throw`, or if the iterator should destroy the stream if the stream 1968emitted an error during iteration. 1969 1970```js 1971const { Readable } = require('node:stream'); 1972 1973async function printIterator(readable) { 1974 for await (const chunk of readable.iterator({ destroyOnReturn: false })) { 1975 console.log(chunk); // 1 1976 break; 1977 } 1978 1979 console.log(readable.destroyed); // false 1980 1981 for await (const chunk of readable.iterator({ destroyOnReturn: false })) { 1982 console.log(chunk); // Will print 2 and then 3 1983 } 1984 1985 console.log(readable.destroyed); // True, stream was totally consumed 1986} 1987 1988async function printSymbolAsyncIterator(readable) { 1989 for await (const chunk of readable) { 1990 console.log(chunk); // 1 1991 break; 1992 } 1993 1994 console.log(readable.destroyed); // true 1995} 1996 1997async function showBoth() { 1998 await printIterator(Readable.from([1, 2, 3])); 1999 await printSymbolAsyncIterator(Readable.from([1, 2, 3])); 2000} 2001 2002showBoth(); 2003``` 2004 2005##### `readable.map(fn[, options])` 2006 2007<!-- YAML 2008added: 2009 - v17.4.0 2010 - v16.14.0 2011changes: 2012 - version: v18.19.0 2013 pr-url: https://github.com/nodejs/node/pull/49249 2014 description: added `highWaterMark` in options. 2015--> 2016 2017> Stability: 1 - Experimental 2018 2019* `fn` {Function|AsyncFunction} a function to map over every chunk in the 2020 stream. 2021 * `data` {any} a chunk of data from the stream. 2022 * `options` {Object} 2023 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2024 abort the `fn` call early. 2025* `options` {Object} 2026 * `concurrency` {number} the maximum concurrent invocation of `fn` to call 2027 on the stream at once. **Default:** `1`. 2028 * `highWaterMark` {number} how many items to buffer while waiting for user 2029 consumption of the mapped items. **Default:** `concurrency * 2 - 1`. 2030 * `signal` {AbortSignal} allows destroying the stream if the signal is 2031 aborted. 2032* Returns: {Readable} a stream mapped with the function `fn`. 2033 2034This method allows mapping over the stream. The `fn` function will be called 2035for every chunk in the stream. If the `fn` function returns a promise - that 2036promise will be `await`ed before being passed to the result stream. 2037 2038```mjs 2039import { Readable } from 'node:stream'; 2040import { Resolver } from 'node:dns/promises'; 2041 2042// With a synchronous mapper. 2043for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) { 2044 console.log(chunk); // 2, 4, 6, 8 2045} 2046// With an asynchronous mapper, making at most 2 queries at a time. 2047const resolver = new Resolver(); 2048const dnsResults = Readable.from([ 2049 'nodejs.org', 2050 'openjsf.org', 2051 'www.linuxfoundation.org', 2052]).map((domain) => resolver.resolve4(domain), { concurrency: 2 }); 2053for await (const result of dnsResults) { 2054 console.log(result); // Logs the DNS result of resolver.resolve4. 2055} 2056``` 2057 2058##### `readable.filter(fn[, options])` 2059 2060<!-- YAML 2061added: 2062 - v17.4.0 2063 - v16.14.0 2064changes: 2065 - version: v18.19.0 2066 pr-url: https://github.com/nodejs/node/pull/49249 2067 description: added `highWaterMark` in options. 2068--> 2069 2070> Stability: 1 - Experimental 2071 2072* `fn` {Function|AsyncFunction} a function to filter chunks from the stream. 2073 * `data` {any} a chunk of data from the stream. 2074 * `options` {Object} 2075 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2076 abort the `fn` call early. 2077* `options` {Object} 2078 * `concurrency` {number} the maximum concurrent invocation of `fn` to call 2079 on the stream at once. **Default:** `1`. 2080 * `highWaterMark` {number} how many items to buffer while waiting for user 2081 consumption of the filtered items. **Default:** `concurrency * 2 - 1`. 2082 * `signal` {AbortSignal} allows destroying the stream if the signal is 2083 aborted. 2084* Returns: {Readable} a stream filtered with the predicate `fn`. 2085 2086This method allows filtering the stream. For each chunk in the stream the `fn` 2087function will be called and if it returns a truthy value, the chunk will be 2088passed to the result stream. If the `fn` function returns a promise - that 2089promise will be `await`ed. 2090 2091```mjs 2092import { Readable } from 'node:stream'; 2093import { Resolver } from 'node:dns/promises'; 2094 2095// With a synchronous predicate. 2096for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { 2097 console.log(chunk); // 3, 4 2098} 2099// With an asynchronous predicate, making at most 2 queries at a time. 2100const resolver = new Resolver(); 2101const dnsResults = Readable.from([ 2102 'nodejs.org', 2103 'openjsf.org', 2104 'www.linuxfoundation.org', 2105]).filter(async (domain) => { 2106 const { address } = await resolver.resolve4(domain, { ttl: true }); 2107 return address.ttl > 60; 2108}, { concurrency: 2 }); 2109for await (const result of dnsResults) { 2110 // Logs domains with more than 60 seconds on the resolved dns record. 2111 console.log(result); 2112} 2113``` 2114 2115##### `readable.forEach(fn[, options])` 2116 2117<!-- YAML 2118added: v17.5.0 2119--> 2120 2121> Stability: 1 - Experimental 2122 2123* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. 2124 * `data` {any} a chunk of data from the stream. 2125 * `options` {Object} 2126 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2127 abort the `fn` call early. 2128* `options` {Object} 2129 * `concurrency` {number} the maximum concurrent invocation of `fn` to call 2130 on the stream at once. **Default:** `1`. 2131 * `signal` {AbortSignal} allows destroying the stream if the signal is 2132 aborted. 2133* Returns: {Promise} a promise for when the stream has finished. 2134 2135This method allows iterating a stream. For each chunk in the stream the 2136`fn` function will be called. If the `fn` function returns a promise - that 2137promise will be `await`ed. 2138 2139This method is different from `for await...of` loops in that it can optionally 2140process chunks concurrently. In addition, a `forEach` iteration can only be 2141stopped by having passed a `signal` option and aborting the related 2142`AbortController` while `for await...of` can be stopped with `break` or 2143`return`. In either case the stream will be destroyed. 2144 2145This method is different from listening to the [`'data'`][] event in that it 2146uses the [`readable`][] event in the underlying machinary and can limit the 2147number of concurrent `fn` calls. 2148 2149```mjs 2150import { Readable } from 'node:stream'; 2151import { Resolver } from 'node:dns/promises'; 2152 2153// With a synchronous predicate. 2154for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) { 2155 console.log(chunk); // 3, 4 2156} 2157// With an asynchronous predicate, making at most 2 queries at a time. 2158const resolver = new Resolver(); 2159const dnsResults = Readable.from([ 2160 'nodejs.org', 2161 'openjsf.org', 2162 'www.linuxfoundation.org', 2163]).map(async (domain) => { 2164 const { address } = await resolver.resolve4(domain, { ttl: true }); 2165 return address; 2166}, { concurrency: 2 }); 2167await dnsResults.forEach((result) => { 2168 // Logs result, similar to `for await (const result of dnsResults)` 2169 console.log(result); 2170}); 2171console.log('done'); // Stream has finished 2172``` 2173 2174##### `readable.toArray([options])` 2175 2176<!-- YAML 2177added: v17.5.0 2178--> 2179 2180> Stability: 1 - Experimental 2181 2182* `options` {Object} 2183 * `signal` {AbortSignal} allows cancelling the toArray operation if the 2184 signal is aborted. 2185* Returns: {Promise} a promise containing an array with the contents of the 2186 stream. 2187 2188This method allows easily obtaining the contents of a stream. 2189 2190As this method reads the entire stream into memory, it negates the benefits of 2191streams. It's intended for interoperability and convenience, not as the primary 2192way to consume streams. 2193 2194```mjs 2195import { Readable } from 'node:stream'; 2196import { Resolver } from 'node:dns/promises'; 2197 2198await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4] 2199 2200// Make dns queries concurrently using .map and collect 2201// the results into an array using toArray 2202const dnsResults = await Readable.from([ 2203 'nodejs.org', 2204 'openjsf.org', 2205 'www.linuxfoundation.org', 2206]).map(async (domain) => { 2207 const { address } = await resolver.resolve4(domain, { ttl: true }); 2208 return address; 2209}, { concurrency: 2 }).toArray(); 2210``` 2211 2212##### `readable.some(fn[, options])` 2213 2214<!-- YAML 2215added: v17.5.0 2216--> 2217 2218> Stability: 1 - Experimental 2219 2220* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. 2221 * `data` {any} a chunk of data from the stream. 2222 * `options` {Object} 2223 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2224 abort the `fn` call early. 2225* `options` {Object} 2226 * `concurrency` {number} the maximum concurrent invocation of `fn` to call 2227 on the stream at once. **Default:** `1`. 2228 * `signal` {AbortSignal} allows destroying the stream if the signal is 2229 aborted. 2230* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy 2231 value for at least one of the chunks. 2232 2233This method is similar to `Array.prototype.some` and calls `fn` on each chunk 2234in the stream until the awaited return value is `true` (or any truthy value). 2235Once an `fn` call on a chunk awaited return value is truthy, the stream is 2236destroyed and the promise is fulfilled with `true`. If none of the `fn` 2237calls on the chunks return a truthy value, the promise is fulfilled with 2238`false`. 2239 2240```mjs 2241import { Readable } from 'node:stream'; 2242import { stat } from 'node:fs/promises'; 2243 2244// With a synchronous predicate. 2245await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true 2246await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false 2247 2248// With an asynchronous predicate, making at most 2 file checks at a time. 2249const anyBigFile = await Readable.from([ 2250 'file1', 2251 'file2', 2252 'file3', 2253]).some(async (fileName) => { 2254 const stats = await stat(fileName); 2255 return stats.size > 1024 * 1024; 2256}, { concurrency: 2 }); 2257console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB 2258console.log('done'); // Stream has finished 2259``` 2260 2261##### `readable.find(fn[, options])` 2262 2263<!-- YAML 2264added: v17.5.0 2265--> 2266 2267> Stability: 1 - Experimental 2268 2269* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. 2270 * `data` {any} a chunk of data from the stream. 2271 * `options` {Object} 2272 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2273 abort the `fn` call early. 2274* `options` {Object} 2275 * `concurrency` {number} the maximum concurrent invocation of `fn` to call 2276 on the stream at once. **Default:** `1`. 2277 * `signal` {AbortSignal} allows destroying the stream if the signal is 2278 aborted. 2279* Returns: {Promise} a promise evaluating to the first chunk for which `fn` 2280 evaluated with a truthy value, or `undefined` if no element was found. 2281 2282This method is similar to `Array.prototype.find` and calls `fn` on each chunk 2283in the stream to find a chunk with a truthy value for `fn`. Once an `fn` call's 2284awaited return value is truthy, the stream is destroyed and the promise is 2285fulfilled with value for which `fn` returned a truthy value. If all of the 2286`fn` calls on the chunks return a falsy value, the promise is fulfilled with 2287`undefined`. 2288 2289```mjs 2290import { Readable } from 'node:stream'; 2291import { stat } from 'node:fs/promises'; 2292 2293// With a synchronous predicate. 2294await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3 2295await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1 2296await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined 2297 2298// With an asynchronous predicate, making at most 2 file checks at a time. 2299const foundBigFile = await Readable.from([ 2300 'file1', 2301 'file2', 2302 'file3', 2303]).find(async (fileName) => { 2304 const stats = await stat(fileName); 2305 return stats.size > 1024 * 1024; 2306}, { concurrency: 2 }); 2307console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB 2308console.log('done'); // Stream has finished 2309``` 2310 2311##### `readable.every(fn[, options])` 2312 2313<!-- YAML 2314added: v17.5.0 2315--> 2316 2317> Stability: 1 - Experimental 2318 2319* `fn` {Function|AsyncFunction} a function to call on each chunk of the stream. 2320 * `data` {any} a chunk of data from the stream. 2321 * `options` {Object} 2322 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2323 abort the `fn` call early. 2324* `options` {Object} 2325 * `concurrency` {number} the maximum concurrent invocation of `fn` to call 2326 on the stream at once. **Default:** `1`. 2327 * `signal` {AbortSignal} allows destroying the stream if the signal is 2328 aborted. 2329* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy 2330 value for all of the chunks. 2331 2332This method is similar to `Array.prototype.every` and calls `fn` on each chunk 2333in the stream to check if all awaited return values are truthy value for `fn`. 2334Once an `fn` call on a chunk awaited return value is falsy, the stream is 2335destroyed and the promise is fulfilled with `false`. If all of the `fn` calls 2336on the chunks return a truthy value, the promise is fulfilled with `true`. 2337 2338```mjs 2339import { Readable } from 'node:stream'; 2340import { stat } from 'node:fs/promises'; 2341 2342// With a synchronous predicate. 2343await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false 2344await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true 2345 2346// With an asynchronous predicate, making at most 2 file checks at a time. 2347const allBigFiles = await Readable.from([ 2348 'file1', 2349 'file2', 2350 'file3', 2351]).every(async (fileName) => { 2352 const stats = await stat(fileName); 2353 return stats.size > 1024 * 1024; 2354}, { concurrency: 2 }); 2355// `true` if all files in the list are bigger than 1MiB 2356console.log(allBigFiles); 2357console.log('done'); // Stream has finished 2358``` 2359 2360##### `readable.flatMap(fn[, options])` 2361 2362<!-- YAML 2363added: v17.5.0 2364--> 2365 2366> Stability: 1 - Experimental 2367 2368* `fn` {Function|AsyncGeneratorFunction|AsyncFunction} a function to map over 2369 every chunk in the stream. 2370 * `data` {any} a chunk of data from the stream. 2371 * `options` {Object} 2372 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2373 abort the `fn` call early. 2374* `options` {Object} 2375 * `concurrency` {number} the maximum concurrent invocation of `fn` to call 2376 on the stream at once. **Default:** `1`. 2377 * `signal` {AbortSignal} allows destroying the stream if the signal is 2378 aborted. 2379* Returns: {Readable} a stream flat-mapped with the function `fn`. 2380 2381This method returns a new stream by applying the given callback to each 2382chunk of the stream and then flattening the result. 2383 2384It is possible to return a stream or another iterable or async iterable from 2385`fn` and the result streams will be merged (flattened) into the returned 2386stream. 2387 2388```mjs 2389import { Readable } from 'node:stream'; 2390import { createReadStream } from 'node:fs'; 2391 2392// With a synchronous mapper. 2393for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) { 2394 console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4 2395} 2396// With an asynchronous mapper, combine the contents of 4 files 2397const concatResult = Readable.from([ 2398 './1.mjs', 2399 './2.mjs', 2400 './3.mjs', 2401 './4.mjs', 2402]).flatMap((fileName) => createReadStream(fileName)); 2403for await (const result of concatResult) { 2404 // This will contain the contents (all chunks) of all 4 files 2405 console.log(result); 2406} 2407``` 2408 2409##### `readable.drop(limit[, options])` 2410 2411<!-- YAML 2412added: v17.5.0 2413--> 2414 2415> Stability: 1 - Experimental 2416 2417* `limit` {number} the number of chunks to drop from the readable. 2418* `options` {Object} 2419 * `signal` {AbortSignal} allows destroying the stream if the signal is 2420 aborted. 2421* Returns: {Readable} a stream with `limit` chunks dropped. 2422 2423This method returns a new stream with the first `limit` chunks dropped. 2424 2425```mjs 2426import { Readable } from 'node:stream'; 2427 2428await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4] 2429``` 2430 2431##### `readable.take(limit[, options])` 2432 2433<!-- YAML 2434added: v17.5.0 2435--> 2436 2437> Stability: 1 - Experimental 2438 2439* `limit` {number} the number of chunks to take from the readable. 2440* `options` {Object} 2441 * `signal` {AbortSignal} allows destroying the stream if the signal is 2442 aborted. 2443* Returns: {Readable} a stream with `limit` chunks taken. 2444 2445This method returns a new stream with the first `limit` chunks. 2446 2447```mjs 2448import { Readable } from 'node:stream'; 2449 2450await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] 2451``` 2452 2453##### `readable.asIndexedPairs([options])` 2454 2455<!-- YAML 2456added: v17.5.0 2457changes: 2458 - version: v18.17.0 2459 pr-url: https://github.com/nodejs/node/pull/48102 2460 description: Using the `asIndexedPairs` method emits a runtime warning that 2461 it will be removed in a future version. 2462--> 2463 2464> Stability: 1 - Experimental 2465 2466* `options` {Object} 2467 * `signal` {AbortSignal} allows destroying the stream if the signal is 2468 aborted. 2469* Returns: {Readable} a stream of indexed pairs. 2470 2471This method returns a new stream with chunks of the underlying stream paired 2472with a counter in the form `[index, chunk]`. The first index value is 0 and it 2473increases by 1 for each chunk produced. 2474 2475```mjs 2476import { Readable } from 'node:stream'; 2477 2478const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray(); 2479console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']] 2480``` 2481 2482##### `readable.reduce(fn[, initial[, options]])` 2483 2484<!-- YAML 2485added: v17.5.0 2486--> 2487 2488> Stability: 1 - Experimental 2489 2490* `fn` {Function|AsyncFunction} a reducer function to call over every chunk 2491 in the stream. 2492 * `previous` {any} the value obtained from the last call to `fn` or the 2493 `initial` value if specified or the first chunk of the stream otherwise. 2494 * `data` {any} a chunk of data from the stream. 2495 * `options` {Object} 2496 * `signal` {AbortSignal} aborted if the stream is destroyed allowing to 2497 abort the `fn` call early. 2498* `initial` {any} the initial value to use in the reduction. 2499* `options` {Object} 2500 * `signal` {AbortSignal} allows destroying the stream if the signal is 2501 aborted. 2502* Returns: {Promise} a promise for the final value of the reduction. 2503 2504This method calls `fn` on each chunk of the stream in order, passing it the 2505result from the calculation on the previous element. It returns a promise for 2506the final value of the reduction. 2507 2508If no `initial` value is supplied the first chunk of the stream is used as the 2509initial value. If the stream is empty, the promise is rejected with a 2510`TypeError` with the `ERR_INVALID_ARGS` code property. 2511 2512```mjs 2513import { Readable } from 'node:stream'; 2514import { readdir, stat } from 'node:fs/promises'; 2515import { join } from 'node:path'; 2516 2517const directoryPath = './src'; 2518const filesInDir = await readdir(directoryPath); 2519 2520const folderSize = await Readable.from(filesInDir) 2521 .reduce(async (totalSize, file) => { 2522 const { size } = await stat(join(directoryPath, file)); 2523 return totalSize + size; 2524 }, 0); 2525 2526console.log(folderSize); 2527``` 2528 2529The reducer function iterates the stream element-by-element which means that 2530there is no `concurrency` parameter or parallelism. To perform a `reduce` 2531concurrently, you can extract the async function to [`readable.map`][] method. 2532 2533```mjs 2534import { Readable } from 'node:stream'; 2535import { readdir, stat } from 'node:fs/promises'; 2536import { join } from 'node:path'; 2537 2538const directoryPath = './src'; 2539const filesInDir = await readdir(directoryPath); 2540 2541const folderSize = await Readable.from(filesInDir) 2542 .map((file) => stat(join(directoryPath, file)), { concurrency: 2 }) 2543 .reduce((totalSize, { size }) => totalSize + size, 0); 2544 2545console.log(folderSize); 2546``` 2547 2548### Duplex and transform streams 2549 2550#### Class: `stream.Duplex` 2551 2552<!-- YAML 2553added: v0.9.4 2554changes: 2555 - version: v6.8.0 2556 pr-url: https://github.com/nodejs/node/pull/8834 2557 description: Instances of `Duplex` now return `true` when 2558 checking `instanceof stream.Writable`. 2559--> 2560 2561<!--type=class--> 2562 2563Duplex streams are streams that implement both the [`Readable`][] and 2564[`Writable`][] interfaces. 2565 2566Examples of `Duplex` streams include: 2567 2568* [TCP sockets][] 2569* [zlib streams][zlib] 2570* [crypto streams][crypto] 2571 2572##### `duplex.allowHalfOpen` 2573 2574<!-- YAML 2575added: v0.9.4 2576--> 2577 2578* {boolean} 2579 2580If `false` then the stream will automatically end the writable side when the 2581readable side ends. Set initially by the `allowHalfOpen` constructor option, 2582which defaults to `true`. 2583 2584This can be changed manually to change the half-open behavior of an existing 2585`Duplex` stream instance, but must be changed before the `'end'` event is 2586emitted. 2587 2588#### Class: `stream.Transform` 2589 2590<!-- YAML 2591added: v0.9.4 2592--> 2593 2594<!--type=class--> 2595 2596Transform streams are [`Duplex`][] streams where the output is in some way 2597related to the input. Like all [`Duplex`][] streams, `Transform` streams 2598implement both the [`Readable`][] and [`Writable`][] interfaces. 2599 2600Examples of `Transform` streams include: 2601 2602* [zlib streams][zlib] 2603* [crypto streams][crypto] 2604 2605##### `transform.destroy([error])` 2606 2607<!-- YAML 2608added: v8.0.0 2609changes: 2610 - version: v14.0.0 2611 pr-url: https://github.com/nodejs/node/pull/29197 2612 description: Work as a no-op on a stream that has already been destroyed. 2613--> 2614 2615* `error` {Error} 2616* Returns: {this} 2617 2618Destroy the stream, and optionally emit an `'error'` event. After this call, the 2619transform stream would release any internal resources. 2620Implementors should not override this method, but instead implement 2621[`readable._destroy()`][readable-_destroy]. 2622The default implementation of `_destroy()` for `Transform` also emit `'close'` 2623unless `emitClose` is set in false. 2624 2625Once `destroy()` has been called, any further calls will be a no-op and no 2626further errors except from `_destroy()` may be emitted as `'error'`. 2627 2628### `stream.finished(stream[, options], callback)` 2629 2630<!-- YAML 2631added: v10.0.0 2632changes: 2633 - version: v18.14.0 2634 pr-url: https://github.com/nodejs/node/pull/46205 2635 description: Added support for `ReadableStream` and `WritableStream`. 2636 - version: v15.11.0 2637 pr-url: https://github.com/nodejs/node/pull/37354 2638 description: The `signal` option was added. 2639 - version: v14.0.0 2640 pr-url: https://github.com/nodejs/node/pull/32158 2641 description: The `finished(stream, cb)` will wait for the `'close'` event 2642 before invoking the callback. The implementation tries to 2643 detect legacy streams and only apply this behavior to streams 2644 which are expected to emit `'close'`. 2645 - version: v14.0.0 2646 pr-url: https://github.com/nodejs/node/pull/31545 2647 description: Emitting `'close'` before `'end'` on a `Readable` stream 2648 will cause an `ERR_STREAM_PREMATURE_CLOSE` error. 2649 - version: v14.0.0 2650 pr-url: https://github.com/nodejs/node/pull/31509 2651 description: Callback will be invoked on streams which have already 2652 finished before the call to `finished(stream, cb)`. 2653--> 2654 2655* `stream` {Stream|ReadableStream|WritableStream} 2656 2657A readable and/or writable stream/webstream. 2658 2659* `options` {Object} 2660 * `error` {boolean} If set to `false`, then a call to `emit('error', err)` is 2661 not treated as finished. **Default:** `true`. 2662 * `readable` {boolean} When set to `false`, the callback will be called when 2663 the stream ends even though the stream might still be readable. 2664 **Default:** `true`. 2665 * `writable` {boolean} When set to `false`, the callback will be called when 2666 the stream ends even though the stream might still be writable. 2667 **Default:** `true`. 2668 * `signal` {AbortSignal} allows aborting the wait for the stream finish. The 2669 underlying stream will _not_ be aborted if the signal is aborted. The 2670 callback will get called with an `AbortError`. All registered 2671 listeners added by this function will also be removed. 2672 * `cleanup` {boolean} remove all registered stream listeners. 2673 **Default:** `false`. 2674 2675* `callback` {Function} A callback function that takes an optional error 2676 argument. 2677 2678* Returns: {Function} A cleanup function which removes all registered 2679 listeners. 2680 2681A function to get notified when a stream is no longer readable, writable 2682or has experienced an error or a premature close event. 2683 2684```js 2685const { finished } = require('node:stream'); 2686const fs = require('node:fs'); 2687 2688const rs = fs.createReadStream('archive.tar'); 2689 2690finished(rs, (err) => { 2691 if (err) { 2692 console.error('Stream failed.', err); 2693 } else { 2694 console.log('Stream is done reading.'); 2695 } 2696}); 2697 2698rs.resume(); // Drain the stream. 2699``` 2700 2701Especially useful in error handling scenarios where a stream is destroyed 2702prematurely (like an aborted HTTP request), and will not emit `'end'` 2703or `'finish'`. 2704 2705The `finished` API provides [promise version][stream-finished-promise]. 2706 2707`stream.finished()` leaves dangling event listeners (in particular 2708`'error'`, `'end'`, `'finish'` and `'close'`) after `callback` has been 2709invoked. The reason for this is so that unexpected `'error'` events (due to 2710incorrect stream implementations) do not cause unexpected crashes. 2711If this is unwanted behavior then the returned cleanup function needs to be 2712invoked in the callback: 2713 2714```js 2715const cleanup = finished(rs, (err) => { 2716 cleanup(); 2717 // ... 2718}); 2719``` 2720 2721### `stream.pipeline(source[, ...transforms], destination, callback)` 2722 2723### `stream.pipeline(streams, callback)` 2724 2725<!-- YAML 2726added: v10.0.0 2727changes: 2728 - version: v18.16.0 2729 pr-url: https://github.com/nodejs/node/pull/46307 2730 description: Added support for webstreams. 2731 - version: v18.0.0 2732 pr-url: https://github.com/nodejs/node/pull/41678 2733 description: Passing an invalid callback to the `callback` argument 2734 now throws `ERR_INVALID_ARG_TYPE` instead of 2735 `ERR_INVALID_CALLBACK`. 2736 - version: v14.0.0 2737 pr-url: https://github.com/nodejs/node/pull/32158 2738 description: The `pipeline(..., cb)` will wait for the `'close'` event 2739 before invoking the callback. The implementation tries to 2740 detect legacy streams and only apply this behavior to streams 2741 which are expected to emit `'close'`. 2742 - version: v13.10.0 2743 pr-url: https://github.com/nodejs/node/pull/31223 2744 description: Add support for async generators. 2745--> 2746 2747* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| 2748 ReadableStream\[]|WritableStream\[]|TransformStream\[]} 2749* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream} 2750 * Returns: {Iterable|AsyncIterable} 2751* `...transforms` {Stream|Function|TransformStream} 2752 * `source` {AsyncIterable} 2753 * Returns: {AsyncIterable} 2754* `destination` {Stream|Function|WritableStream} 2755 * `source` {AsyncIterable} 2756 * Returns: {AsyncIterable|Promise} 2757* `callback` {Function} Called when the pipeline is fully done. 2758 * `err` {Error} 2759 * `val` Resolved value of `Promise` returned by `destination`. 2760* Returns: {Stream} 2761 2762A module method to pipe between streams and generators forwarding errors and 2763properly cleaning up and provide a callback when the pipeline is complete. 2764 2765```js 2766const { pipeline } = require('node:stream'); 2767const fs = require('node:fs'); 2768const zlib = require('node:zlib'); 2769 2770// Use the pipeline API to easily pipe a series of streams 2771// together and get notified when the pipeline is fully done. 2772 2773// A pipeline to gzip a potentially huge tar file efficiently: 2774 2775pipeline( 2776 fs.createReadStream('archive.tar'), 2777 zlib.createGzip(), 2778 fs.createWriteStream('archive.tar.gz'), 2779 (err) => { 2780 if (err) { 2781 console.error('Pipeline failed.', err); 2782 } else { 2783 console.log('Pipeline succeeded.'); 2784 } 2785 }, 2786); 2787``` 2788 2789The `pipeline` API provides a [promise version][stream-pipeline-promise]. 2790 2791`stream.pipeline()` will call `stream.destroy(err)` on all streams except: 2792 2793* `Readable` streams which have emitted `'end'` or `'close'`. 2794* `Writable` streams which have emitted `'finish'` or `'close'`. 2795 2796`stream.pipeline()` leaves dangling event listeners on the streams 2797after the `callback` has been invoked. In the case of reuse of streams after 2798failure, this can cause event listener leaks and swallowed errors. If the last 2799stream is readable, dangling event listeners will be removed so that the last 2800stream can be consumed later. 2801 2802`stream.pipeline()` closes all the streams when an error is raised. 2803The `IncomingRequest` usage with `pipeline` could lead to an unexpected behavior 2804once it would destroy the socket without sending the expected response. 2805See the example below: 2806 2807```js 2808const fs = require('node:fs'); 2809const http = require('node:http'); 2810const { pipeline } = require('node:stream'); 2811 2812const server = http.createServer((req, res) => { 2813 const fileStream = fs.createReadStream('./fileNotExist.txt'); 2814 pipeline(fileStream, res, (err) => { 2815 if (err) { 2816 console.log(err); // No such file 2817 // this message can't be sent once `pipeline` already destroyed the socket 2818 return res.end('error!!!'); 2819 } 2820 }); 2821}); 2822``` 2823 2824### `stream.compose(...streams)` 2825 2826<!-- YAML 2827added: v16.9.0 2828changes: 2829 - version: v18.16.0 2830 pr-url: https://github.com/nodejs/node/pull/46675 2831 description: Added support for webstreams. 2832--> 2833 2834> Stability: 1 - `stream.compose` is experimental. 2835 2836* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]| 2837 ReadableStream\[]|WritableStream\[]|TransformStream\[]} 2838* Returns: {stream.Duplex} 2839 2840Combines two or more streams into a `Duplex` stream that writes to the 2841first stream and reads from the last. Each provided stream is piped into 2842the next, using `stream.pipeline`. If any of the streams error then all 2843are destroyed, including the outer `Duplex` stream. 2844 2845Because `stream.compose` returns a new stream that in turn can (and 2846should) be piped into other streams, it enables composition. In contrast, 2847when passing streams to `stream.pipeline`, typically the first stream is 2848a readable stream and the last a writable stream, forming a closed 2849circuit. 2850 2851If passed a `Function` it must be a factory method taking a `source` 2852`Iterable`. 2853 2854```mjs 2855import { compose, Transform } from 'node:stream'; 2856 2857const removeSpaces = new Transform({ 2858 transform(chunk, encoding, callback) { 2859 callback(null, String(chunk).replace(' ', '')); 2860 }, 2861}); 2862 2863async function* toUpper(source) { 2864 for await (const chunk of source) { 2865 yield String(chunk).toUpperCase(); 2866 } 2867} 2868 2869let res = ''; 2870for await (const buf of compose(removeSpaces, toUpper).end('hello world')) { 2871 res += buf; 2872} 2873 2874console.log(res); // prints 'HELLOWORLD' 2875``` 2876 2877`stream.compose` can be used to convert async iterables, generators and 2878functions into streams. 2879 2880* `AsyncIterable` converts into a readable `Duplex`. Cannot yield 2881 `null`. 2882* `AsyncGeneratorFunction` converts into a readable/writable transform `Duplex`. 2883 Must take a source `AsyncIterable` as first parameter. Cannot yield 2884 `null`. 2885* `AsyncFunction` converts into a writable `Duplex`. Must return 2886 either `null` or `undefined`. 2887 2888```mjs 2889import { compose } from 'node:stream'; 2890import { finished } from 'node:stream/promises'; 2891 2892// Convert AsyncIterable into readable Duplex. 2893const s1 = compose(async function*() { 2894 yield 'Hello'; 2895 yield 'World'; 2896}()); 2897 2898// Convert AsyncGenerator into transform Duplex. 2899const s2 = compose(async function*(source) { 2900 for await (const chunk of source) { 2901 yield String(chunk).toUpperCase(); 2902 } 2903}); 2904 2905let res = ''; 2906 2907// Convert AsyncFunction into writable Duplex. 2908const s3 = compose(async function(source) { 2909 for await (const chunk of source) { 2910 res += chunk; 2911 } 2912}); 2913 2914await finished(compose(s1, s2, s3)); 2915 2916console.log(res); // prints 'HELLOWORLD' 2917``` 2918 2919See [`readable.compose(stream)`][] for `stream.compose` as operator. 2920 2921### `stream.Readable.from(iterable[, options])` 2922 2923<!-- YAML 2924added: 2925 - v12.3.0 2926 - v10.17.0 2927--> 2928 2929* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or 2930 `Symbol.iterator` iterable protocol. Emits an 'error' event if a null 2931 value is passed. 2932* `options` {Object} Options provided to `new stream.Readable([options])`. 2933 By default, `Readable.from()` will set `options.objectMode` to `true`, unless 2934 this is explicitly opted out by setting `options.objectMode` to `false`. 2935* Returns: {stream.Readable} 2936 2937A utility method for creating readable streams out of iterators. 2938 2939```js 2940const { Readable } = require('node:stream'); 2941 2942async function * generate() { 2943 yield 'hello'; 2944 yield 'streams'; 2945} 2946 2947const readable = Readable.from(generate()); 2948 2949readable.on('data', (chunk) => { 2950 console.log(chunk); 2951}); 2952``` 2953 2954Calling `Readable.from(string)` or `Readable.from(buffer)` will not have 2955the strings or buffers be iterated to match the other streams semantics 2956for performance reasons. 2957 2958If an `Iterable` object containing promises is passed as an argument, 2959it might result in unhandled rejection. 2960 2961```js 2962const { Readable } = require('node:stream'); 2963 2964Readable.from([ 2965 new Promise((resolve) => setTimeout(resolve('1'), 1500)), 2966 new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection 2967]); 2968``` 2969 2970### `stream.Readable.fromWeb(readableStream[, options])` 2971 2972<!-- YAML 2973added: v17.0.0 2974--> 2975 2976> Stability: 1 - Experimental 2977 2978* `readableStream` {ReadableStream} 2979* `options` {Object} 2980 * `encoding` {string} 2981 * `highWaterMark` {number} 2982 * `objectMode` {boolean} 2983 * `signal` {AbortSignal} 2984* Returns: {stream.Readable} 2985 2986### `stream.Readable.isDisturbed(stream)` 2987 2988<!-- YAML 2989added: v16.8.0 2990--> 2991 2992> Stability: 1 - Experimental 2993 2994* `stream` {stream.Readable|ReadableStream} 2995* Returns: `boolean` 2996 2997Returns whether the stream has been read from or cancelled. 2998 2999### `stream.isErrored(stream)` 3000 3001<!-- YAML 3002added: 3003 - v17.3.0 3004 - v16.14.0 3005--> 3006 3007> Stability: 1 - Experimental 3008 3009* `stream` {Readable|Writable|Duplex|WritableStream|ReadableStream} 3010* Returns: {boolean} 3011 3012Returns whether the stream has encountered an error. 3013 3014### `stream.isReadable(stream)` 3015 3016<!-- YAML 3017added: 3018 - v17.4.0 3019 - v16.14.0 3020--> 3021 3022> Stability: 1 - Experimental 3023 3024* `stream` {Readable|Duplex|ReadableStream} 3025* Returns: {boolean} 3026 3027Returns whether the stream is readable. 3028 3029### `stream.Readable.toWeb(streamReadable[, options])` 3030 3031<!-- YAML 3032added: v17.0.0 3033--> 3034 3035> Stability: 1 - Experimental 3036 3037* `streamReadable` {stream.Readable} 3038* `options` {Object} 3039 * `strategy` {Object} 3040 * `highWaterMark` {number} The maximum internal queue size (of the created 3041 `ReadableStream`) before backpressure is applied in reading from the given 3042 `stream.Readable`. If no value is provided, it will be taken from the 3043 given `stream.Readable`. 3044 * `size` {Function} A function that size of the given chunk of data. 3045 If no value is provided, the size will be `1` for all the chunks. 3046 * `chunk` {any} 3047 * Returns: {number} 3048* Returns: {ReadableStream} 3049 3050### `stream.Writable.fromWeb(writableStream[, options])` 3051 3052<!-- YAML 3053added: v17.0.0 3054--> 3055 3056> Stability: 1 - Experimental 3057 3058* `writableStream` {WritableStream} 3059* `options` {Object} 3060 * `decodeStrings` {boolean} 3061 * `highWaterMark` {number} 3062 * `objectMode` {boolean} 3063 * `signal` {AbortSignal} 3064* Returns: {stream.Writable} 3065 3066### `stream.Writable.toWeb(streamWritable)` 3067 3068<!-- YAML 3069added: v17.0.0 3070--> 3071 3072> Stability: 1 - Experimental 3073 3074* `streamWritable` {stream.Writable} 3075* Returns: {WritableStream} 3076 3077### `stream.Duplex.from(src)` 3078 3079<!-- YAML 3080added: v16.8.0 3081changes: 3082 - version: v18.17.0 3083 pr-url: https://github.com/nodejs/node/pull/46190 3084 description: The `src` argument can now be a `ReadableStream` or 3085 `WritableStream`. 3086--> 3087 3088* `src` {Stream|Blob|ArrayBuffer|string|Iterable|AsyncIterable| 3089 AsyncGeneratorFunction|AsyncFunction|Promise|Object| 3090 ReadableStream|WritableStream} 3091 3092A utility method for creating duplex streams. 3093 3094* `Stream` converts writable stream into writable `Duplex` and readable stream 3095 to `Duplex`. 3096* `Blob` converts into readable `Duplex`. 3097* `string` converts into readable `Duplex`. 3098* `ArrayBuffer` converts into readable `Duplex`. 3099* `AsyncIterable` converts into a readable `Duplex`. Cannot yield 3100 `null`. 3101* `AsyncGeneratorFunction` converts into a readable/writable transform 3102 `Duplex`. Must take a source `AsyncIterable` as first parameter. Cannot yield 3103 `null`. 3104* `AsyncFunction` converts into a writable `Duplex`. Must return 3105 either `null` or `undefined` 3106* `Object ({ writable, readable })` converts `readable` and 3107 `writable` into `Stream` and then combines them into `Duplex` where the 3108 `Duplex` will write to the `writable` and read from the `readable`. 3109* `Promise` converts into readable `Duplex`. Value `null` is ignored. 3110* `ReadableStream` converts into readable `Duplex`. 3111* `WritableStream` converts into writable `Duplex`. 3112* Returns: {stream.Duplex} 3113 3114If an `Iterable` object containing promises is passed as an argument, 3115it might result in unhandled rejection. 3116 3117```js 3118const { Duplex } = require('node:stream'); 3119 3120Duplex.from([ 3121 new Promise((resolve) => setTimeout(resolve('1'), 1500)), 3122 new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection 3123]); 3124``` 3125 3126### `stream.Duplex.fromWeb(pair[, options])` 3127 3128<!-- YAML 3129added: v17.0.0 3130--> 3131 3132> Stability: 1 - Experimental 3133 3134* `pair` {Object} 3135 * `readable` {ReadableStream} 3136 * `writable` {WritableStream} 3137* `options` {Object} 3138 * `allowHalfOpen` {boolean} 3139 * `decodeStrings` {boolean} 3140 * `encoding` {string} 3141 * `highWaterMark` {number} 3142 * `objectMode` {boolean} 3143 * `signal` {AbortSignal} 3144* Returns: {stream.Duplex} 3145 3146```mjs 3147import { Duplex } from 'node:stream'; 3148import { 3149 ReadableStream, 3150 WritableStream, 3151} from 'node:stream/web'; 3152 3153const readable = new ReadableStream({ 3154 start(controller) { 3155 controller.enqueue('world'); 3156 }, 3157}); 3158 3159const writable = new WritableStream({ 3160 write(chunk) { 3161 console.log('writable', chunk); 3162 }, 3163}); 3164 3165const pair = { 3166 readable, 3167 writable, 3168}; 3169const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true }); 3170 3171duplex.write('hello'); 3172 3173for await (const chunk of duplex) { 3174 console.log('readable', chunk); 3175} 3176``` 3177 3178```cjs 3179const { Duplex } = require('node:stream'); 3180const { 3181 ReadableStream, 3182 WritableStream, 3183} = require('node:stream/web'); 3184 3185const readable = new ReadableStream({ 3186 start(controller) { 3187 controller.enqueue('world'); 3188 }, 3189}); 3190 3191const writable = new WritableStream({ 3192 write(chunk) { 3193 console.log('writable', chunk); 3194 }, 3195}); 3196 3197const pair = { 3198 readable, 3199 writable, 3200}; 3201const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true }); 3202 3203duplex.write('hello'); 3204duplex.once('readable', () => console.log('readable', duplex.read())); 3205``` 3206 3207### `stream.Duplex.toWeb(streamDuplex)` 3208 3209<!-- YAML 3210added: v17.0.0 3211--> 3212 3213> Stability: 1 - Experimental 3214 3215* `streamDuplex` {stream.Duplex} 3216* Returns: {Object} 3217 * `readable` {ReadableStream} 3218 * `writable` {WritableStream} 3219 3220```mjs 3221import { Duplex } from 'node:stream'; 3222 3223const duplex = Duplex({ 3224 objectMode: true, 3225 read() { 3226 this.push('world'); 3227 this.push(null); 3228 }, 3229 write(chunk, encoding, callback) { 3230 console.log('writable', chunk); 3231 callback(); 3232 }, 3233}); 3234 3235const { readable, writable } = Duplex.toWeb(duplex); 3236writable.getWriter().write('hello'); 3237 3238const { value } = await readable.getReader().read(); 3239console.log('readable', value); 3240``` 3241 3242```cjs 3243const { Duplex } = require('node:stream'); 3244 3245const duplex = Duplex({ 3246 objectMode: true, 3247 read() { 3248 this.push('world'); 3249 this.push(null); 3250 }, 3251 write(chunk, encoding, callback) { 3252 console.log('writable', chunk); 3253 callback(); 3254 }, 3255}); 3256 3257const { readable, writable } = Duplex.toWeb(duplex); 3258writable.getWriter().write('hello'); 3259 3260readable.getReader().read().then((result) => { 3261 console.log('readable', result.value); 3262}); 3263``` 3264 3265### `stream.addAbortSignal(signal, stream)` 3266 3267<!-- YAML 3268added: v15.4.0 3269changes: 3270 - version: v18.16.0 3271 pr-url: https://github.com/nodejs/node/pull/46273 3272 description: Added support for `ReadableStream` and 3273 `WritableStream`. 3274--> 3275 3276* `signal` {AbortSignal} A signal representing possible cancellation 3277* `stream` {Stream|ReadableStream|WritableStream} 3278 3279A stream to attach a signal to. 3280 3281Attaches an AbortSignal to a readable or writeable stream. This lets code 3282control stream destruction using an `AbortController`. 3283 3284Calling `abort` on the `AbortController` corresponding to the passed 3285`AbortSignal` will behave the same way as calling `.destroy(new AbortError())` 3286on the stream, and `controller.error(new AbortError())` for webstreams. 3287 3288```js 3289const fs = require('node:fs'); 3290 3291const controller = new AbortController(); 3292const read = addAbortSignal( 3293 controller.signal, 3294 fs.createReadStream(('object.json')), 3295); 3296// Later, abort the operation closing the stream 3297controller.abort(); 3298``` 3299 3300Or using an `AbortSignal` with a readable stream as an async iterable: 3301 3302```js 3303const controller = new AbortController(); 3304setTimeout(() => controller.abort(), 10_000); // set a timeout 3305const stream = addAbortSignal( 3306 controller.signal, 3307 fs.createReadStream(('object.json')), 3308); 3309(async () => { 3310 try { 3311 for await (const chunk of stream) { 3312 await process(chunk); 3313 } 3314 } catch (e) { 3315 if (e.name === 'AbortError') { 3316 // The operation was cancelled 3317 } else { 3318 throw e; 3319 } 3320 } 3321})(); 3322``` 3323 3324Or using an `AbortSignal` with a ReadableStream: 3325 3326```js 3327const controller = new AbortController(); 3328const rs = new ReadableStream({ 3329 start(controller) { 3330 controller.enqueue('hello'); 3331 controller.enqueue('world'); 3332 controller.close(); 3333 }, 3334}); 3335 3336addAbortSignal(controller.signal, rs); 3337 3338finished(rs, (err) => { 3339 if (err) { 3340 if (err.name === 'AbortError') { 3341 // The operation was cancelled 3342 } 3343 } 3344}); 3345 3346const reader = rs.getReader(); 3347 3348reader.read().then(({ value, done }) => { 3349 console.log(value); // hello 3350 console.log(done); // false 3351 controller.abort(); 3352}); 3353``` 3354 3355### `stream.getDefaultHighWaterMark(objectMode)` 3356 3357<!-- YAML 3358added: v18.17.0 3359--> 3360 3361* `objectMode` {boolean} 3362* Returns: {integer} 3363 3364Returns the default highWaterMark used by streams. 3365Defaults to `16384` (16 KiB), or `16` for `objectMode`. 3366 3367### `stream.setDefaultHighWaterMark(objectMode, value)` 3368 3369<!-- YAML 3370added: v18.17.0 3371--> 3372 3373* `objectMode` {boolean} 3374* `value` {integer} highWaterMark value 3375 3376Sets the default highWaterMark used by streams. 3377 3378## API for stream implementers 3379 3380<!--type=misc--> 3381 3382The `node:stream` module API has been designed to make it possible to easily 3383implement streams using JavaScript's prototypal inheritance model. 3384 3385First, a stream developer would declare a new JavaScript class that extends one 3386of the four basic stream classes (`stream.Writable`, `stream.Readable`, 3387`stream.Duplex`, or `stream.Transform`), making sure they call the appropriate 3388parent class constructor: 3389 3390<!-- eslint-disable no-useless-constructor --> 3391 3392```js 3393const { Writable } = require('node:stream'); 3394 3395class MyWritable extends Writable { 3396 constructor({ highWaterMark, ...options }) { 3397 super({ highWaterMark }); 3398 // ... 3399 } 3400} 3401``` 3402 3403When extending streams, keep in mind what options the user 3404can and should provide before forwarding these to the base constructor. For 3405example, if the implementation makes assumptions in regard to the 3406`autoDestroy` and `emitClose` options, do not allow the 3407user to override these. Be explicit about what 3408options are forwarded instead of implicitly forwarding all options. 3409 3410The new stream class must then implement one or more specific methods, depending 3411on the type of stream being created, as detailed in the chart below: 3412 3413| Use-case | Class | Method(s) to implement | 3414| --------------------------------------------- | --------------- | ------------------------------------------------------------------------------------------------------------------ | 3415| Reading only | [`Readable`][] | [`_read()`][stream-_read] | 3416| Writing only | [`Writable`][] | [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | 3417| Reading and writing | [`Duplex`][] | [`_read()`][stream-_read], [`_write()`][stream-_write], [`_writev()`][stream-_writev], [`_final()`][stream-_final] | 3418| Operate on written data, then read the result | [`Transform`][] | [`_transform()`][stream-_transform], [`_flush()`][stream-_flush], [`_final()`][stream-_final] | 3419 3420The implementation code for a stream should _never_ call the "public" methods 3421of a stream that are intended for use by consumers (as described in the 3422[API for stream consumers][] section). Doing so may lead to adverse side effects 3423in application code consuming the stream. 3424 3425Avoid overriding public methods such as `write()`, `end()`, `cork()`, 3426`uncork()`, `read()` and `destroy()`, or emitting internal events such 3427as `'error'`, `'data'`, `'end'`, `'finish'` and `'close'` through `.emit()`. 3428Doing so can break current and future stream invariants leading to behavior 3429and/or compatibility issues with other streams, stream utilities, and user 3430expectations. 3431 3432### Simplified construction 3433 3434<!-- YAML 3435added: v1.2.0 3436--> 3437 3438For many simple cases, it is possible to create a stream without relying on 3439inheritance. This can be accomplished by directly creating instances of the 3440`stream.Writable`, `stream.Readable`, `stream.Duplex`, or `stream.Transform` 3441objects and passing appropriate methods as constructor options. 3442 3443```js 3444const { Writable } = require('node:stream'); 3445 3446const myWritable = new Writable({ 3447 construct(callback) { 3448 // Initialize state and load resources... 3449 }, 3450 write(chunk, encoding, callback) { 3451 // ... 3452 }, 3453 destroy() { 3454 // Free resources... 3455 }, 3456}); 3457``` 3458 3459### Implementing a writable stream 3460 3461The `stream.Writable` class is extended to implement a [`Writable`][] stream. 3462 3463Custom `Writable` streams _must_ call the `new stream.Writable([options])` 3464constructor and implement the `writable._write()` and/or `writable._writev()` 3465method. 3466 3467#### `new stream.Writable([options])` 3468 3469<!-- YAML 3470changes: 3471 - version: v15.5.0 3472 pr-url: https://github.com/nodejs/node/pull/36431 3473 description: support passing in an AbortSignal. 3474 - version: v14.0.0 3475 pr-url: https://github.com/nodejs/node/pull/30623 3476 description: Change `autoDestroy` option default to `true`. 3477 - version: 3478 - v11.2.0 3479 - v10.16.0 3480 pr-url: https://github.com/nodejs/node/pull/22795 3481 description: Add `autoDestroy` option to automatically `destroy()` the 3482 stream when it emits `'finish'` or errors. 3483 - version: v10.0.0 3484 pr-url: https://github.com/nodejs/node/pull/18438 3485 description: Add `emitClose` option to specify if `'close'` is emitted on 3486 destroy. 3487--> 3488 3489* `options` {Object} 3490 * `highWaterMark` {number} Buffer level when 3491 [`stream.write()`][stream-write] starts returning `false`. **Default:** 3492 `16384` (16 KiB), or `16` for `objectMode` streams. 3493 * `decodeStrings` {boolean} Whether to encode `string`s passed to 3494 [`stream.write()`][stream-write] to `Buffer`s (with the encoding 3495 specified in the [`stream.write()`][stream-write] call) before passing 3496 them to [`stream._write()`][stream-_write]. Other types of data are not 3497 converted (i.e. `Buffer`s are not decoded into `string`s). Setting to 3498 false will prevent `string`s from being converted. **Default:** `true`. 3499 * `defaultEncoding` {string} The default encoding that is used when no 3500 encoding is specified as an argument to [`stream.write()`][stream-write]. 3501 **Default:** `'utf8'`. 3502 * `objectMode` {boolean} Whether or not the 3503 [`stream.write(anyObj)`][stream-write] is a valid operation. When set, 3504 it becomes possible to write JavaScript values other than string, 3505 `Buffer` or `Uint8Array` if supported by the stream implementation. 3506 **Default:** `false`. 3507 * `emitClose` {boolean} Whether or not the stream should emit `'close'` 3508 after it has been destroyed. **Default:** `true`. 3509 * `write` {Function} Implementation for the 3510 [`stream._write()`][stream-_write] method. 3511 * `writev` {Function} Implementation for the 3512 [`stream._writev()`][stream-_writev] method. 3513 * `destroy` {Function} Implementation for the 3514 [`stream._destroy()`][writable-_destroy] method. 3515 * `final` {Function} Implementation for the 3516 [`stream._final()`][stream-_final] method. 3517 * `construct` {Function} Implementation for the 3518 [`stream._construct()`][writable-_construct] method. 3519 * `autoDestroy` {boolean} Whether this stream should automatically call 3520 `.destroy()` on itself after ending. **Default:** `true`. 3521 * `signal` {AbortSignal} A signal representing possible cancellation. 3522 3523<!-- eslint-disable no-useless-constructor --> 3524 3525```js 3526const { Writable } = require('node:stream'); 3527 3528class MyWritable extends Writable { 3529 constructor(options) { 3530 // Calls the stream.Writable() constructor. 3531 super(options); 3532 // ... 3533 } 3534} 3535``` 3536 3537Or, when using pre-ES6 style constructors: 3538 3539```js 3540const { Writable } = require('node:stream'); 3541const util = require('node:util'); 3542 3543function MyWritable(options) { 3544 if (!(this instanceof MyWritable)) 3545 return new MyWritable(options); 3546 Writable.call(this, options); 3547} 3548util.inherits(MyWritable, Writable); 3549``` 3550 3551Or, using the simplified constructor approach: 3552 3553```js 3554const { Writable } = require('node:stream'); 3555 3556const myWritable = new Writable({ 3557 write(chunk, encoding, callback) { 3558 // ... 3559 }, 3560 writev(chunks, callback) { 3561 // ... 3562 }, 3563}); 3564``` 3565 3566Calling `abort` on the `AbortController` corresponding to the passed 3567`AbortSignal` will behave the same way as calling `.destroy(new AbortError())` 3568on the writeable stream. 3569 3570```js 3571const { Writable } = require('node:stream'); 3572 3573const controller = new AbortController(); 3574const myWritable = new Writable({ 3575 write(chunk, encoding, callback) { 3576 // ... 3577 }, 3578 writev(chunks, callback) { 3579 // ... 3580 }, 3581 signal: controller.signal, 3582}); 3583// Later, abort the operation closing the stream 3584controller.abort(); 3585``` 3586 3587#### `writable._construct(callback)` 3588 3589<!-- YAML 3590added: v15.0.0 3591--> 3592 3593* `callback` {Function} Call this function (optionally with an error 3594 argument) when the stream has finished initializing. 3595 3596The `_construct()` method MUST NOT be called directly. It may be implemented 3597by child classes, and if so, will be called by the internal `Writable` 3598class methods only. 3599 3600This optional function will be called in a tick after the stream constructor 3601has returned, delaying any `_write()`, `_final()` and `_destroy()` calls until 3602`callback` is called. This is useful to initialize state or asynchronously 3603initialize resources before the stream can be used. 3604 3605```js 3606const { Writable } = require('node:stream'); 3607const fs = require('node:fs'); 3608 3609class WriteStream extends Writable { 3610 constructor(filename) { 3611 super(); 3612 this.filename = filename; 3613 this.fd = null; 3614 } 3615 _construct(callback) { 3616 fs.open(this.filename, (err, fd) => { 3617 if (err) { 3618 callback(err); 3619 } else { 3620 this.fd = fd; 3621 callback(); 3622 } 3623 }); 3624 } 3625 _write(chunk, encoding, callback) { 3626 fs.write(this.fd, chunk, callback); 3627 } 3628 _destroy(err, callback) { 3629 if (this.fd) { 3630 fs.close(this.fd, (er) => callback(er || err)); 3631 } else { 3632 callback(err); 3633 } 3634 } 3635} 3636``` 3637 3638#### `writable._write(chunk, encoding, callback)` 3639 3640<!-- YAML 3641changes: 3642 - version: v12.11.0 3643 pr-url: https://github.com/nodejs/node/pull/29639 3644 description: _write() is optional when providing _writev(). 3645--> 3646 3647* `chunk` {Buffer|string|any} The `Buffer` to be written, converted from the 3648 `string` passed to [`stream.write()`][stream-write]. If the stream's 3649 `decodeStrings` option is `false` or the stream is operating in object mode, 3650 the chunk will not be converted & will be whatever was passed to 3651 [`stream.write()`][stream-write]. 3652* `encoding` {string} If the chunk is a string, then `encoding` is the 3653 character encoding of that string. If chunk is a `Buffer`, or if the 3654 stream is operating in object mode, `encoding` may be ignored. 3655* `callback` {Function} Call this function (optionally with an error 3656 argument) when processing is complete for the supplied chunk. 3657 3658All `Writable` stream implementations must provide a 3659[`writable._write()`][stream-_write] and/or 3660[`writable._writev()`][stream-_writev] method to send data to the underlying 3661resource. 3662 3663[`Transform`][] streams provide their own implementation of the 3664[`writable._write()`][stream-_write]. 3665 3666This function MUST NOT be called by application code directly. It should be 3667implemented by child classes, and called by the internal `Writable` class 3668methods only. 3669 3670The `callback` function must be called synchronously inside of 3671`writable._write()` or asynchronously (i.e. different tick) to signal either 3672that the write completed successfully or failed with an error. 3673The first argument passed to the `callback` must be the `Error` object if the 3674call failed or `null` if the write succeeded. 3675 3676All calls to `writable.write()` that occur between the time `writable._write()` 3677is called and the `callback` is called will cause the written data to be 3678buffered. When the `callback` is invoked, the stream might emit a [`'drain'`][] 3679event. If a stream implementation is capable of processing multiple chunks of 3680data at once, the `writable._writev()` method should be implemented. 3681 3682If the `decodeStrings` property is explicitly set to `false` in the constructor 3683options, then `chunk` will remain the same object that is passed to `.write()`, 3684and may be a string rather than a `Buffer`. This is to support implementations 3685that have an optimized handling for certain string data encodings. In that case, 3686the `encoding` argument will indicate the character encoding of the string. 3687Otherwise, the `encoding` argument can be safely ignored. 3688 3689The `writable._write()` method is prefixed with an underscore because it is 3690internal to the class that defines it, and should never be called directly by 3691user programs. 3692 3693#### `writable._writev(chunks, callback)` 3694 3695* `chunks` {Object\[]} The data to be written. The value is an array of {Object} 3696 that each represent a discrete chunk of data to write. The properties of 3697 these objects are: 3698 * `chunk` {Buffer|string} A buffer instance or string containing the data to 3699 be written. The `chunk` will be a string if the `Writable` was created with 3700 the `decodeStrings` option set to `false` and a string was passed to `write()`. 3701 * `encoding` {string} The character encoding of the `chunk`. If `chunk` is 3702 a `Buffer`, the `encoding` will be `'buffer'`. 3703* `callback` {Function} A callback function (optionally with an error 3704 argument) to be invoked when processing is complete for the supplied chunks. 3705 3706This function MUST NOT be called by application code directly. It should be 3707implemented by child classes, and called by the internal `Writable` class 3708methods only. 3709 3710The `writable._writev()` method may be implemented in addition or alternatively 3711to `writable._write()` in stream implementations that are capable of processing 3712multiple chunks of data at once. If implemented and if there is buffered data 3713from previous writes, `_writev()` will be called instead of `_write()`. 3714 3715The `writable._writev()` method is prefixed with an underscore because it is 3716internal to the class that defines it, and should never be called directly by 3717user programs. 3718 3719#### `writable._destroy(err, callback)` 3720 3721<!-- YAML 3722added: v8.0.0 3723--> 3724 3725* `err` {Error} A possible error. 3726* `callback` {Function} A callback function that takes an optional error 3727 argument. 3728 3729The `_destroy()` method is called by [`writable.destroy()`][writable-destroy]. 3730It can be overridden by child classes but it **must not** be called directly. 3731Furthermore, the `callback` should not be mixed with async/await 3732once it is executed when a promise is resolved. 3733 3734#### `writable._final(callback)` 3735 3736<!-- YAML 3737added: v8.0.0 3738--> 3739 3740* `callback` {Function} Call this function (optionally with an error 3741 argument) when finished writing any remaining data. 3742 3743The `_final()` method **must not** be called directly. It may be implemented 3744by child classes, and if so, will be called by the internal `Writable` 3745class methods only. 3746 3747This optional function will be called before the stream closes, delaying the 3748`'finish'` event until `callback` is called. This is useful to close resources 3749or write buffered data before a stream ends. 3750 3751#### Errors while writing 3752 3753Errors occurring during the processing of the [`writable._write()`][], 3754[`writable._writev()`][] and [`writable._final()`][] methods must be propagated 3755by invoking the callback and passing the error as the first argument. 3756Throwing an `Error` from within these methods or manually emitting an `'error'` 3757event results in undefined behavior. 3758 3759If a `Readable` stream pipes into a `Writable` stream when `Writable` emits an 3760error, the `Readable` stream will be unpiped. 3761 3762```js 3763const { Writable } = require('node:stream'); 3764 3765const myWritable = new Writable({ 3766 write(chunk, encoding, callback) { 3767 if (chunk.toString().indexOf('a') >= 0) { 3768 callback(new Error('chunk is invalid')); 3769 } else { 3770 callback(); 3771 } 3772 }, 3773}); 3774``` 3775 3776#### An example writable stream 3777 3778The following illustrates a rather simplistic (and somewhat pointless) custom 3779`Writable` stream implementation. While this specific `Writable` stream instance 3780is not of any real particular usefulness, the example illustrates each of the 3781required elements of a custom [`Writable`][] stream instance: 3782 3783```js 3784const { Writable } = require('node:stream'); 3785 3786class MyWritable extends Writable { 3787 _write(chunk, encoding, callback) { 3788 if (chunk.toString().indexOf('a') >= 0) { 3789 callback(new Error('chunk is invalid')); 3790 } else { 3791 callback(); 3792 } 3793 } 3794} 3795``` 3796 3797#### Decoding buffers in a writable stream 3798 3799Decoding buffers is a common task, for instance, when using transformers whose 3800input is a string. This is not a trivial process when using multi-byte 3801characters encoding, such as UTF-8. The following example shows how to decode 3802multi-byte strings using `StringDecoder` and [`Writable`][]. 3803 3804```js 3805const { Writable } = require('node:stream'); 3806const { StringDecoder } = require('node:string_decoder'); 3807 3808class StringWritable extends Writable { 3809 constructor(options) { 3810 super(options); 3811 this._decoder = new StringDecoder(options && options.defaultEncoding); 3812 this.data = ''; 3813 } 3814 _write(chunk, encoding, callback) { 3815 if (encoding === 'buffer') { 3816 chunk = this._decoder.write(chunk); 3817 } 3818 this.data += chunk; 3819 callback(); 3820 } 3821 _final(callback) { 3822 this.data += this._decoder.end(); 3823 callback(); 3824 } 3825} 3826 3827const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from); 3828const w = new StringWritable(); 3829 3830w.write('currency: '); 3831w.write(euro[0]); 3832w.end(euro[1]); 3833 3834console.log(w.data); // currency: € 3835``` 3836 3837### Implementing a readable stream 3838 3839The `stream.Readable` class is extended to implement a [`Readable`][] stream. 3840 3841Custom `Readable` streams _must_ call the `new stream.Readable([options])` 3842constructor and implement the [`readable._read()`][] method. 3843 3844#### `new stream.Readable([options])` 3845 3846<!-- YAML 3847changes: 3848 - version: v15.5.0 3849 pr-url: https://github.com/nodejs/node/pull/36431 3850 description: support passing in an AbortSignal. 3851 - version: v14.0.0 3852 pr-url: https://github.com/nodejs/node/pull/30623 3853 description: Change `autoDestroy` option default to `true`. 3854 - version: 3855 - v11.2.0 3856 - v10.16.0 3857 pr-url: https://github.com/nodejs/node/pull/22795 3858 description: Add `autoDestroy` option to automatically `destroy()` the 3859 stream when it emits `'end'` or errors. 3860--> 3861 3862* `options` {Object} 3863 * `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store 3864 in the internal buffer before ceasing to read from the underlying resource. 3865 **Default:** `16384` (16 KiB), or `16` for `objectMode` streams. 3866 * `encoding` {string} If specified, then buffers will be decoded to 3867 strings using the specified encoding. **Default:** `null`. 3868 * `objectMode` {boolean} Whether this stream should behave 3869 as a stream of objects. Meaning that [`stream.read(n)`][stream-read] returns 3870 a single value instead of a `Buffer` of size `n`. **Default:** `false`. 3871 * `emitClose` {boolean} Whether or not the stream should emit `'close'` 3872 after it has been destroyed. **Default:** `true`. 3873 * `read` {Function} Implementation for the [`stream._read()`][stream-_read] 3874 method. 3875 * `destroy` {Function} Implementation for the 3876 [`stream._destroy()`][readable-_destroy] method. 3877 * `construct` {Function} Implementation for the 3878 [`stream._construct()`][readable-_construct] method. 3879 * `autoDestroy` {boolean} Whether this stream should automatically call 3880 `.destroy()` on itself after ending. **Default:** `true`. 3881 * `signal` {AbortSignal} A signal representing possible cancellation. 3882 3883<!-- eslint-disable no-useless-constructor --> 3884 3885```js 3886const { Readable } = require('node:stream'); 3887 3888class MyReadable extends Readable { 3889 constructor(options) { 3890 // Calls the stream.Readable(options) constructor. 3891 super(options); 3892 // ... 3893 } 3894} 3895``` 3896 3897Or, when using pre-ES6 style constructors: 3898 3899```js 3900const { Readable } = require('node:stream'); 3901const util = require('node:util'); 3902 3903function MyReadable(options) { 3904 if (!(this instanceof MyReadable)) 3905 return new MyReadable(options); 3906 Readable.call(this, options); 3907} 3908util.inherits(MyReadable, Readable); 3909``` 3910 3911Or, using the simplified constructor approach: 3912 3913```js 3914const { Readable } = require('node:stream'); 3915 3916const myReadable = new Readable({ 3917 read(size) { 3918 // ... 3919 }, 3920}); 3921``` 3922 3923Calling `abort` on the `AbortController` corresponding to the passed 3924`AbortSignal` will behave the same way as calling `.destroy(new AbortError())` 3925on the readable created. 3926 3927```js 3928const { Readable } = require('node:stream'); 3929const controller = new AbortController(); 3930const read = new Readable({ 3931 read(size) { 3932 // ... 3933 }, 3934 signal: controller.signal, 3935}); 3936// Later, abort the operation closing the stream 3937controller.abort(); 3938``` 3939 3940#### `readable._construct(callback)` 3941 3942<!-- YAML 3943added: v15.0.0 3944--> 3945 3946* `callback` {Function} Call this function (optionally with an error 3947 argument) when the stream has finished initializing. 3948 3949The `_construct()` method MUST NOT be called directly. It may be implemented 3950by child classes, and if so, will be called by the internal `Readable` 3951class methods only. 3952 3953This optional function will be scheduled in the next tick by the stream 3954constructor, delaying any `_read()` and `_destroy()` calls until `callback` is 3955called. This is useful to initialize state or asynchronously initialize 3956resources before the stream can be used. 3957 3958```js 3959const { Readable } = require('node:stream'); 3960const fs = require('node:fs'); 3961 3962class ReadStream extends Readable { 3963 constructor(filename) { 3964 super(); 3965 this.filename = filename; 3966 this.fd = null; 3967 } 3968 _construct(callback) { 3969 fs.open(this.filename, (err, fd) => { 3970 if (err) { 3971 callback(err); 3972 } else { 3973 this.fd = fd; 3974 callback(); 3975 } 3976 }); 3977 } 3978 _read(n) { 3979 const buf = Buffer.alloc(n); 3980 fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => { 3981 if (err) { 3982 this.destroy(err); 3983 } else { 3984 this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null); 3985 } 3986 }); 3987 } 3988 _destroy(err, callback) { 3989 if (this.fd) { 3990 fs.close(this.fd, (er) => callback(er || err)); 3991 } else { 3992 callback(err); 3993 } 3994 } 3995} 3996``` 3997 3998#### `readable._read(size)` 3999 4000<!-- YAML 4001added: v0.9.4 4002--> 4003 4004* `size` {number} Number of bytes to read asynchronously 4005 4006This function MUST NOT be called by application code directly. It should be 4007implemented by child classes, and called by the internal `Readable` class 4008methods only. 4009 4010All `Readable` stream implementations must provide an implementation of the 4011[`readable._read()`][] method to fetch data from the underlying resource. 4012 4013When [`readable._read()`][] is called, if data is available from the resource, 4014the implementation should begin pushing that data into the read queue using the 4015[`this.push(dataChunk)`][stream-push] method. `_read()` will be called again 4016after each call to [`this.push(dataChunk)`][stream-push] once the stream is 4017ready to accept more data. `_read()` may continue reading from the resource and 4018pushing data until `readable.push()` returns `false`. Only when `_read()` is 4019called again after it has stopped should it resume pushing additional data into 4020the queue. 4021 4022Once the [`readable._read()`][] method has been called, it will not be called 4023again until more data is pushed through the [`readable.push()`][stream-push] 4024method. Empty data such as empty buffers and strings will not cause 4025[`readable._read()`][] to be called. 4026 4027The `size` argument is advisory. For implementations where a "read" is a 4028single operation that returns data can use the `size` argument to determine how 4029much data to fetch. Other implementations may ignore this argument and simply 4030provide data whenever it becomes available. There is no need to "wait" until 4031`size` bytes are available before calling [`stream.push(chunk)`][stream-push]. 4032 4033The [`readable._read()`][] method is prefixed with an underscore because it is 4034internal to the class that defines it, and should never be called directly by 4035user programs. 4036 4037#### `readable._destroy(err, callback)` 4038 4039<!-- YAML 4040added: v8.0.0 4041--> 4042 4043* `err` {Error} A possible error. 4044* `callback` {Function} A callback function that takes an optional error 4045 argument. 4046 4047The `_destroy()` method is called by [`readable.destroy()`][readable-destroy]. 4048It can be overridden by child classes but it **must not** be called directly. 4049 4050#### `readable.push(chunk[, encoding])` 4051 4052<!-- YAML 4053changes: 4054 - version: v8.0.0 4055 pr-url: https://github.com/nodejs/node/pull/11608 4056 description: The `chunk` argument can now be a `Uint8Array` instance. 4057--> 4058 4059* `chunk` {Buffer|Uint8Array|string|null|any} Chunk of data to push into the 4060 read queue. For streams not operating in object mode, `chunk` must be a 4061 string, `Buffer` or `Uint8Array`. For object mode streams, `chunk` may be 4062 any JavaScript value. 4063* `encoding` {string} Encoding of string chunks. Must be a valid 4064 `Buffer` encoding, such as `'utf8'` or `'ascii'`. 4065* Returns: {boolean} `true` if additional chunks of data may continue to be 4066 pushed; `false` otherwise. 4067 4068When `chunk` is a `Buffer`, `Uint8Array`, or `string`, the `chunk` of data will 4069be added to the internal queue for users of the stream to consume. 4070Passing `chunk` as `null` signals the end of the stream (EOF), after which no 4071more data can be written. 4072 4073When the `Readable` is operating in paused mode, the data added with 4074`readable.push()` can be read out by calling the 4075[`readable.read()`][stream-read] method when the [`'readable'`][] event is 4076emitted. 4077 4078When the `Readable` is operating in flowing mode, the data added with 4079`readable.push()` will be delivered by emitting a `'data'` event. 4080 4081The `readable.push()` method is designed to be as flexible as possible. For 4082example, when wrapping a lower-level source that provides some form of 4083pause/resume mechanism, and a data callback, the low-level source can be wrapped 4084by the custom `Readable` instance: 4085 4086```js 4087// `_source` is an object with readStop() and readStart() methods, 4088// and an `ondata` member that gets called when it has data, and 4089// an `onend` member that gets called when the data is over. 4090 4091class SourceWrapper extends Readable { 4092 constructor(options) { 4093 super(options); 4094 4095 this._source = getLowLevelSourceObject(); 4096 4097 // Every time there's data, push it into the internal buffer. 4098 this._source.ondata = (chunk) => { 4099 // If push() returns false, then stop reading from source. 4100 if (!this.push(chunk)) 4101 this._source.readStop(); 4102 }; 4103 4104 // When the source ends, push the EOF-signaling `null` chunk. 4105 this._source.onend = () => { 4106 this.push(null); 4107 }; 4108 } 4109 // _read() will be called when the stream wants to pull more data in. 4110 // The advisory size argument is ignored in this case. 4111 _read(size) { 4112 this._source.readStart(); 4113 } 4114} 4115``` 4116 4117The `readable.push()` method is used to push the content 4118into the internal buffer. It can be driven by the [`readable._read()`][] method. 4119 4120For streams not operating in object mode, if the `chunk` parameter of 4121`readable.push()` is `undefined`, it will be treated as empty string or 4122buffer. See [`readable.push('')`][] for more information. 4123 4124#### Errors while reading 4125 4126Errors occurring during processing of the [`readable._read()`][] must be 4127propagated through the [`readable.destroy(err)`][readable-_destroy] method. 4128Throwing an `Error` from within [`readable._read()`][] or manually emitting an 4129`'error'` event results in undefined behavior. 4130 4131```js 4132const { Readable } = require('node:stream'); 4133 4134const myReadable = new Readable({ 4135 read(size) { 4136 const err = checkSomeErrorCondition(); 4137 if (err) { 4138 this.destroy(err); 4139 } else { 4140 // Do some work. 4141 } 4142 }, 4143}); 4144``` 4145 4146#### An example counting stream 4147 4148<!--type=example--> 4149 4150The following is a basic example of a `Readable` stream that emits the numerals 4151from 1 to 1,000,000 in ascending order, and then ends. 4152 4153```js 4154const { Readable } = require('node:stream'); 4155 4156class Counter extends Readable { 4157 constructor(opt) { 4158 super(opt); 4159 this._max = 1000000; 4160 this._index = 1; 4161 } 4162 4163 _read() { 4164 const i = this._index++; 4165 if (i > this._max) 4166 this.push(null); 4167 else { 4168 const str = String(i); 4169 const buf = Buffer.from(str, 'ascii'); 4170 this.push(buf); 4171 } 4172 } 4173} 4174``` 4175 4176### Implementing a duplex stream 4177 4178A [`Duplex`][] stream is one that implements both [`Readable`][] and 4179[`Writable`][], such as a TCP socket connection. 4180 4181Because JavaScript does not have support for multiple inheritance, the 4182`stream.Duplex` class is extended to implement a [`Duplex`][] stream (as opposed 4183to extending the `stream.Readable` _and_ `stream.Writable` classes). 4184 4185The `stream.Duplex` class prototypically inherits from `stream.Readable` and 4186parasitically from `stream.Writable`, but `instanceof` will work properly for 4187both base classes due to overriding [`Symbol.hasInstance`][] on 4188`stream.Writable`. 4189 4190Custom `Duplex` streams _must_ call the `new stream.Duplex([options])` 4191constructor and implement _both_ the [`readable._read()`][] and 4192`writable._write()` methods. 4193 4194#### `new stream.Duplex(options)` 4195 4196<!-- YAML 4197changes: 4198 - version: v8.4.0 4199 pr-url: https://github.com/nodejs/node/pull/14636 4200 description: The `readableHighWaterMark` and `writableHighWaterMark` options 4201 are supported now. 4202--> 4203 4204* `options` {Object} Passed to both `Writable` and `Readable` 4205 constructors. Also has the following fields: 4206 * `allowHalfOpen` {boolean} If set to `false`, then the stream will 4207 automatically end the writable side when the readable side ends. 4208 **Default:** `true`. 4209 * `readable` {boolean} Sets whether the `Duplex` should be readable. 4210 **Default:** `true`. 4211 * `writable` {boolean} Sets whether the `Duplex` should be writable. 4212 **Default:** `true`. 4213 * `readableObjectMode` {boolean} Sets `objectMode` for readable side of the 4214 stream. Has no effect if `objectMode` is `true`. **Default:** `false`. 4215 * `writableObjectMode` {boolean} Sets `objectMode` for writable side of the 4216 stream. Has no effect if `objectMode` is `true`. **Default:** `false`. 4217 * `readableHighWaterMark` {number} Sets `highWaterMark` for the readable side 4218 of the stream. Has no effect if `highWaterMark` is provided. 4219 * `writableHighWaterMark` {number} Sets `highWaterMark` for the writable side 4220 of the stream. Has no effect if `highWaterMark` is provided. 4221 4222<!-- eslint-disable no-useless-constructor --> 4223 4224```js 4225const { Duplex } = require('node:stream'); 4226 4227class MyDuplex extends Duplex { 4228 constructor(options) { 4229 super(options); 4230 // ... 4231 } 4232} 4233``` 4234 4235Or, when using pre-ES6 style constructors: 4236 4237```js 4238const { Duplex } = require('node:stream'); 4239const util = require('node:util'); 4240 4241function MyDuplex(options) { 4242 if (!(this instanceof MyDuplex)) 4243 return new MyDuplex(options); 4244 Duplex.call(this, options); 4245} 4246util.inherits(MyDuplex, Duplex); 4247``` 4248 4249Or, using the simplified constructor approach: 4250 4251```js 4252const { Duplex } = require('node:stream'); 4253 4254const myDuplex = new Duplex({ 4255 read(size) { 4256 // ... 4257 }, 4258 write(chunk, encoding, callback) { 4259 // ... 4260 }, 4261}); 4262``` 4263 4264When using pipeline: 4265 4266```js 4267const { Transform, pipeline } = require('node:stream'); 4268const fs = require('node:fs'); 4269 4270pipeline( 4271 fs.createReadStream('object.json') 4272 .setEncoding('utf8'), 4273 new Transform({ 4274 decodeStrings: false, // Accept string input rather than Buffers 4275 construct(callback) { 4276 this.data = ''; 4277 callback(); 4278 }, 4279 transform(chunk, encoding, callback) { 4280 this.data += chunk; 4281 callback(); 4282 }, 4283 flush(callback) { 4284 try { 4285 // Make sure is valid json. 4286 JSON.parse(this.data); 4287 this.push(this.data); 4288 callback(); 4289 } catch (err) { 4290 callback(err); 4291 } 4292 }, 4293 }), 4294 fs.createWriteStream('valid-object.json'), 4295 (err) => { 4296 if (err) { 4297 console.error('failed', err); 4298 } else { 4299 console.log('completed'); 4300 } 4301 }, 4302); 4303``` 4304 4305#### An example duplex stream 4306 4307The following illustrates a simple example of a `Duplex` stream that wraps a 4308hypothetical lower-level source object to which data can be written, and 4309from which data can be read, albeit using an API that is not compatible with 4310Node.js streams. 4311The following illustrates a simple example of a `Duplex` stream that buffers 4312incoming written data via the [`Writable`][] interface that is read back out 4313via the [`Readable`][] interface. 4314 4315```js 4316const { Duplex } = require('node:stream'); 4317const kSource = Symbol('source'); 4318 4319class MyDuplex extends Duplex { 4320 constructor(source, options) { 4321 super(options); 4322 this[kSource] = source; 4323 } 4324 4325 _write(chunk, encoding, callback) { 4326 // The underlying source only deals with strings. 4327 if (Buffer.isBuffer(chunk)) 4328 chunk = chunk.toString(); 4329 this[kSource].writeSomeData(chunk); 4330 callback(); 4331 } 4332 4333 _read(size) { 4334 this[kSource].fetchSomeData(size, (data, encoding) => { 4335 this.push(Buffer.from(data, encoding)); 4336 }); 4337 } 4338} 4339``` 4340 4341The most important aspect of a `Duplex` stream is that the `Readable` and 4342`Writable` sides operate independently of one another despite co-existing within 4343a single object instance. 4344 4345#### Object mode duplex streams 4346 4347For `Duplex` streams, `objectMode` can be set exclusively for either the 4348`Readable` or `Writable` side using the `readableObjectMode` and 4349`writableObjectMode` options respectively. 4350 4351In the following example, for instance, a new `Transform` stream (which is a 4352type of [`Duplex`][] stream) is created that has an object mode `Writable` side 4353that accepts JavaScript numbers that are converted to hexadecimal strings on 4354the `Readable` side. 4355 4356```js 4357const { Transform } = require('node:stream'); 4358 4359// All Transform streams are also Duplex Streams. 4360const myTransform = new Transform({ 4361 writableObjectMode: true, 4362 4363 transform(chunk, encoding, callback) { 4364 // Coerce the chunk to a number if necessary. 4365 chunk |= 0; 4366 4367 // Transform the chunk into something else. 4368 const data = chunk.toString(16); 4369 4370 // Push the data onto the readable queue. 4371 callback(null, '0'.repeat(data.length % 2) + data); 4372 }, 4373}); 4374 4375myTransform.setEncoding('ascii'); 4376myTransform.on('data', (chunk) => console.log(chunk)); 4377 4378myTransform.write(1); 4379// Prints: 01 4380myTransform.write(10); 4381// Prints: 0a 4382myTransform.write(100); 4383// Prints: 64 4384``` 4385 4386### Implementing a transform stream 4387 4388A [`Transform`][] stream is a [`Duplex`][] stream where the output is computed 4389in some way from the input. Examples include [zlib][] streams or [crypto][] 4390streams that compress, encrypt, or decrypt data. 4391 4392There is no requirement that the output be the same size as the input, the same 4393number of chunks, or arrive at the same time. For example, a `Hash` stream will 4394only ever have a single chunk of output which is provided when the input is 4395ended. A `zlib` stream will produce output that is either much smaller or much 4396larger than its input. 4397 4398The `stream.Transform` class is extended to implement a [`Transform`][] stream. 4399 4400The `stream.Transform` class prototypically inherits from `stream.Duplex` and 4401implements its own versions of the `writable._write()` and 4402[`readable._read()`][] methods. Custom `Transform` implementations _must_ 4403implement the [`transform._transform()`][stream-_transform] method and _may_ 4404also implement the [`transform._flush()`][stream-_flush] method. 4405 4406Care must be taken when using `Transform` streams in that data written to the 4407stream can cause the `Writable` side of the stream to become paused if the 4408output on the `Readable` side is not consumed. 4409 4410#### `new stream.Transform([options])` 4411 4412* `options` {Object} Passed to both `Writable` and `Readable` 4413 constructors. Also has the following fields: 4414 * `transform` {Function} Implementation for the 4415 [`stream._transform()`][stream-_transform] method. 4416 * `flush` {Function} Implementation for the [`stream._flush()`][stream-_flush] 4417 method. 4418 4419<!-- eslint-disable no-useless-constructor --> 4420 4421```js 4422const { Transform } = require('node:stream'); 4423 4424class MyTransform extends Transform { 4425 constructor(options) { 4426 super(options); 4427 // ... 4428 } 4429} 4430``` 4431 4432Or, when using pre-ES6 style constructors: 4433 4434```js 4435const { Transform } = require('node:stream'); 4436const util = require('node:util'); 4437 4438function MyTransform(options) { 4439 if (!(this instanceof MyTransform)) 4440 return new MyTransform(options); 4441 Transform.call(this, options); 4442} 4443util.inherits(MyTransform, Transform); 4444``` 4445 4446Or, using the simplified constructor approach: 4447 4448```js 4449const { Transform } = require('node:stream'); 4450 4451const myTransform = new Transform({ 4452 transform(chunk, encoding, callback) { 4453 // ... 4454 }, 4455}); 4456``` 4457 4458#### Event: `'end'` 4459 4460The [`'end'`][] event is from the `stream.Readable` class. The `'end'` event is 4461emitted after all data has been output, which occurs after the callback in 4462[`transform._flush()`][stream-_flush] has been called. In the case of an error, 4463`'end'` should not be emitted. 4464 4465#### Event: `'finish'` 4466 4467The [`'finish'`][] event is from the `stream.Writable` class. The `'finish'` 4468event is emitted after [`stream.end()`][stream-end] is called and all chunks 4469have been processed by [`stream._transform()`][stream-_transform]. In the case 4470of an error, `'finish'` should not be emitted. 4471 4472#### `transform._flush(callback)` 4473 4474* `callback` {Function} A callback function (optionally with an error 4475 argument and data) to be called when remaining data has been flushed. 4476 4477This function MUST NOT be called by application code directly. It should be 4478implemented by child classes, and called by the internal `Readable` class 4479methods only. 4480 4481In some cases, a transform operation may need to emit an additional bit of 4482data at the end of the stream. For example, a `zlib` compression stream will 4483store an amount of internal state used to optimally compress the output. When 4484the stream ends, however, that additional data needs to be flushed so that the 4485compressed data will be complete. 4486 4487Custom [`Transform`][] implementations _may_ implement the `transform._flush()` 4488method. This will be called when there is no more written data to be consumed, 4489but before the [`'end'`][] event is emitted signaling the end of the 4490[`Readable`][] stream. 4491 4492Within the `transform._flush()` implementation, the `transform.push()` method 4493may be called zero or more times, as appropriate. The `callback` function must 4494be called when the flush operation is complete. 4495 4496The `transform._flush()` method is prefixed with an underscore because it is 4497internal to the class that defines it, and should never be called directly by 4498user programs. 4499 4500#### `transform._transform(chunk, encoding, callback)` 4501 4502* `chunk` {Buffer|string|any} The `Buffer` to be transformed, converted from 4503 the `string` passed to [`stream.write()`][stream-write]. If the stream's 4504 `decodeStrings` option is `false` or the stream is operating in object mode, 4505 the chunk will not be converted & will be whatever was passed to 4506 [`stream.write()`][stream-write]. 4507* `encoding` {string} If the chunk is a string, then this is the 4508 encoding type. If chunk is a buffer, then this is the special 4509 value `'buffer'`. Ignore it in that case. 4510* `callback` {Function} A callback function (optionally with an error 4511 argument and data) to be called after the supplied `chunk` has been 4512 processed. 4513 4514This function MUST NOT be called by application code directly. It should be 4515implemented by child classes, and called by the internal `Readable` class 4516methods only. 4517 4518All `Transform` stream implementations must provide a `_transform()` 4519method to accept input and produce output. The `transform._transform()` 4520implementation handles the bytes being written, computes an output, then passes 4521that output off to the readable portion using the `transform.push()` method. 4522 4523The `transform.push()` method may be called zero or more times to generate 4524output from a single input chunk, depending on how much is to be output 4525as a result of the chunk. 4526 4527It is possible that no output is generated from any given chunk of input data. 4528 4529The `callback` function must be called only when the current chunk is completely 4530consumed. The first argument passed to the `callback` must be an `Error` object 4531if an error occurred while processing the input or `null` otherwise. If a second 4532argument is passed to the `callback`, it will be forwarded on to the 4533`transform.push()` method, but only if the first argument is falsy. In other 4534words, the following are equivalent: 4535 4536```js 4537transform.prototype._transform = function(data, encoding, callback) { 4538 this.push(data); 4539 callback(); 4540}; 4541 4542transform.prototype._transform = function(data, encoding, callback) { 4543 callback(null, data); 4544}; 4545``` 4546 4547The `transform._transform()` method is prefixed with an underscore because it 4548is internal to the class that defines it, and should never be called directly by 4549user programs. 4550 4551`transform._transform()` is never called in parallel; streams implement a 4552queue mechanism, and to receive the next chunk, `callback` must be 4553called, either synchronously or asynchronously. 4554 4555#### Class: `stream.PassThrough` 4556 4557The `stream.PassThrough` class is a trivial implementation of a [`Transform`][] 4558stream that simply passes the input bytes across to the output. Its purpose is 4559primarily for examples and testing, but there are some use cases where 4560`stream.PassThrough` is useful as a building block for novel sorts of streams. 4561 4562## Additional notes 4563 4564<!--type=misc--> 4565 4566### Streams compatibility with async generators and async iterators 4567 4568With the support of async generators and iterators in JavaScript, async 4569generators are effectively a first-class language-level stream construct at 4570this point. 4571 4572Some common interop cases of using Node.js streams with async generators 4573and async iterators are provided below. 4574 4575#### Consuming readable streams with async iterators 4576 4577```js 4578(async function() { 4579 for await (const chunk of readable) { 4580 console.log(chunk); 4581 } 4582})(); 4583``` 4584 4585Async iterators register a permanent error handler on the stream to prevent any 4586unhandled post-destroy errors. 4587 4588#### Creating readable streams with async generators 4589 4590A Node.js readable stream can be created from an asynchronous generator using 4591the `Readable.from()` utility method: 4592 4593```js 4594const { Readable } = require('node:stream'); 4595 4596const ac = new AbortController(); 4597const signal = ac.signal; 4598 4599async function * generate() { 4600 yield 'a'; 4601 await someLongRunningFn({ signal }); 4602 yield 'b'; 4603 yield 'c'; 4604} 4605 4606const readable = Readable.from(generate()); 4607readable.on('close', () => { 4608 ac.abort(); 4609}); 4610 4611readable.on('data', (chunk) => { 4612 console.log(chunk); 4613}); 4614``` 4615 4616#### Piping to writable streams from async iterators 4617 4618When writing to a writable stream from an async iterator, ensure correct 4619handling of backpressure and errors. [`stream.pipeline()`][] abstracts away 4620the handling of backpressure and backpressure-related errors: 4621 4622```js 4623const fs = require('node:fs'); 4624const { pipeline } = require('node:stream'); 4625const { pipeline: pipelinePromise } = require('node:stream/promises'); 4626 4627const writable = fs.createWriteStream('./file'); 4628 4629const ac = new AbortController(); 4630const signal = ac.signal; 4631 4632const iterator = createIterator({ signal }); 4633 4634// Callback Pattern 4635pipeline(iterator, writable, (err, value) => { 4636 if (err) { 4637 console.error(err); 4638 } else { 4639 console.log(value, 'value returned'); 4640 } 4641}).on('close', () => { 4642 ac.abort(); 4643}); 4644 4645// Promise Pattern 4646pipelinePromise(iterator, writable) 4647 .then((value) => { 4648 console.log(value, 'value returned'); 4649 }) 4650 .catch((err) => { 4651 console.error(err); 4652 ac.abort(); 4653 }); 4654``` 4655 4656<!--type=misc--> 4657 4658### Compatibility with older Node.js versions 4659 4660<!--type=misc--> 4661 4662Prior to Node.js 0.10, the `Readable` stream interface was simpler, but also 4663less powerful and less useful. 4664 4665* Rather than waiting for calls to the [`stream.read()`][stream-read] method, 4666 [`'data'`][] events would begin emitting immediately. Applications that 4667 would need to perform some amount of work to decide how to handle data 4668 were required to store read data into buffers so the data would not be lost. 4669* The [`stream.pause()`][stream-pause] method was advisory, rather than 4670 guaranteed. This meant that it was still necessary to be prepared to receive 4671 [`'data'`][] events _even when the stream was in a paused state_. 4672 4673In Node.js 0.10, the [`Readable`][] class was added. For backward 4674compatibility with older Node.js programs, `Readable` streams switch into 4675"flowing mode" when a [`'data'`][] event handler is added, or when the 4676[`stream.resume()`][stream-resume] method is called. The effect is that, even 4677when not using the new [`stream.read()`][stream-read] method and 4678[`'readable'`][] event, it is no longer necessary to worry about losing 4679[`'data'`][] chunks. 4680 4681While most applications will continue to function normally, this introduces an 4682edge case in the following conditions: 4683 4684* No [`'data'`][] event listener is added. 4685* The [`stream.resume()`][stream-resume] method is never called. 4686* The stream is not piped to any writable destination. 4687 4688For example, consider the following code: 4689 4690```js 4691// WARNING! BROKEN! 4692net.createServer((socket) => { 4693 4694 // We add an 'end' listener, but never consume the data. 4695 socket.on('end', () => { 4696 // It will never get here. 4697 socket.end('The message was received but was not processed.\n'); 4698 }); 4699 4700}).listen(1337); 4701``` 4702 4703Prior to Node.js 0.10, the incoming message data would be simply discarded. 4704However, in Node.js 0.10 and beyond, the socket remains paused forever. 4705 4706The workaround in this situation is to call the 4707[`stream.resume()`][stream-resume] method to begin the flow of data: 4708 4709```js 4710// Workaround. 4711net.createServer((socket) => { 4712 socket.on('end', () => { 4713 socket.end('The message was received but was not processed.\n'); 4714 }); 4715 4716 // Start the flow of data, discarding it. 4717 socket.resume(); 4718}).listen(1337); 4719``` 4720 4721In addition to new `Readable` streams switching into flowing mode, 4722pre-0.10 style streams can be wrapped in a `Readable` class using the 4723[`readable.wrap()`][`stream.wrap()`] method. 4724 4725### `readable.read(0)` 4726 4727There are some cases where it is necessary to trigger a refresh of the 4728underlying readable stream mechanisms, without actually consuming any 4729data. In such cases, it is possible to call `readable.read(0)`, which will 4730always return `null`. 4731 4732If the internal read buffer is below the `highWaterMark`, and the 4733stream is not currently reading, then calling `stream.read(0)` will trigger 4734a low-level [`stream._read()`][stream-_read] call. 4735 4736While most applications will almost never need to do this, there are 4737situations within Node.js where this is done, particularly in the 4738`Readable` stream class internals. 4739 4740### `readable.push('')` 4741 4742Use of `readable.push('')` is not recommended. 4743 4744Pushing a zero-byte string, `Buffer`, or `Uint8Array` to a stream that is not in 4745object mode has an interesting side effect. Because it _is_ a call to 4746[`readable.push()`][stream-push], the call will end the reading process. 4747However, because the argument is an empty string, no data is added to the 4748readable buffer so there is nothing for a user to consume. 4749 4750### `highWaterMark` discrepancy after calling `readable.setEncoding()` 4751 4752The use of `readable.setEncoding()` will change the behavior of how the 4753`highWaterMark` operates in non-object mode. 4754 4755Typically, the size of the current buffer is measured against the 4756`highWaterMark` in _bytes_. However, after `setEncoding()` is called, the 4757comparison function will begin to measure the buffer's size in _characters_. 4758 4759This is not a problem in common cases with `latin1` or `ascii`. But it is 4760advised to be mindful about this behavior when working with strings that could 4761contain multi-byte characters. 4762 4763[API for stream consumers]: #api-for-stream-consumers 4764[API for stream implementers]: #api-for-stream-implementers 4765[Compatibility]: #compatibility-with-older-nodejs-versions 4766[HTTP requests, on the client]: http.md#class-httpclientrequest 4767[HTTP responses, on the server]: http.md#class-httpserverresponse 4768[TCP sockets]: net.md#class-netsocket 4769[Three states]: #three-states 4770[`'data'`]: #event-data 4771[`'drain'`]: #event-drain 4772[`'end'`]: #event-end 4773[`'finish'`]: #event-finish 4774[`'readable'`]: #event-readable 4775[`Duplex`]: #class-streamduplex 4776[`EventEmitter`]: events.md#class-eventemitter 4777[`Readable`]: #class-streamreadable 4778[`Symbol.hasInstance`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Symbol/hasInstance 4779[`Transform`]: #class-streamtransform 4780[`Writable`]: #class-streamwritable 4781[`fs.createReadStream()`]: fs.md#fscreatereadstreampath-options 4782[`fs.createWriteStream()`]: fs.md#fscreatewritestreampath-options 4783[`net.Socket`]: net.md#class-netsocket 4784[`process.stderr`]: process.md#processstderr 4785[`process.stdin`]: process.md#processstdin 4786[`process.stdout`]: process.md#processstdout 4787[`readable._read()`]: #readable_readsize 4788[`readable.compose(stream)`]: #readablecomposestream-options 4789[`readable.map`]: #readablemapfn-options 4790[`readable.push('')`]: #readablepush 4791[`readable.setEncoding()`]: #readablesetencodingencoding 4792[`stream.Readable.from()`]: #streamreadablefromiterable-options 4793[`stream.addAbortSignal()`]: #streamaddabortsignalsignal-stream 4794[`stream.compose`]: #streamcomposestreams 4795[`stream.cork()`]: #writablecork 4796[`stream.finished()`]: #streamfinishedstream-options-callback 4797[`stream.pipe()`]: #readablepipedestination-options 4798[`stream.pipeline()`]: #streampipelinesource-transforms-destination-callback 4799[`stream.uncork()`]: #writableuncork 4800[`stream.unpipe()`]: #readableunpipedestination 4801[`stream.wrap()`]: #readablewrapstream 4802[`writable._final()`]: #writable_finalcallback 4803[`writable._write()`]: #writable_writechunk-encoding-callback 4804[`writable._writev()`]: #writable_writevchunks-callback 4805[`writable.cork()`]: #writablecork 4806[`writable.end()`]: #writableendchunk-encoding-callback 4807[`writable.uncork()`]: #writableuncork 4808[`writable.writableFinished`]: #writablewritablefinished 4809[`zlib.createDeflate()`]: zlib.md#zlibcreatedeflateoptions 4810[child process stdin]: child_process.md#subprocessstdin 4811[child process stdout and stderr]: child_process.md#subprocessstdout 4812[crypto]: crypto.md 4813[fs read streams]: fs.md#class-fsreadstream 4814[fs write streams]: fs.md#class-fswritestream 4815[http-incoming-message]: http.md#class-httpincomingmessage 4816[hwm-gotcha]: #highwatermark-discrepancy-after-calling-readablesetencoding 4817[object-mode]: #object-mode 4818[readable-_construct]: #readable_constructcallback 4819[readable-_destroy]: #readable_destroyerr-callback 4820[readable-destroy]: #readabledestroyerror 4821[stream-_final]: #writable_finalcallback 4822[stream-_flush]: #transform_flushcallback 4823[stream-_read]: #readable_readsize 4824[stream-_transform]: #transform_transformchunk-encoding-callback 4825[stream-_write]: #writable_writechunk-encoding-callback 4826[stream-_writev]: #writable_writevchunks-callback 4827[stream-end]: #writableendchunk-encoding-callback 4828[stream-finished]: #streamfinishedstream-options-callback 4829[stream-finished-promise]: #streamfinishedstream-options 4830[stream-pause]: #readablepause 4831[stream-pipeline]: #streampipelinesource-transforms-destination-callback 4832[stream-pipeline-promise]: #streampipelinesource-transforms-destination-options 4833[stream-push]: #readablepushchunk-encoding 4834[stream-read]: #readablereadsize 4835[stream-resume]: #readableresume 4836[stream-uncork]: #writableuncork 4837[stream-write]: #writablewritechunk-encoding-callback 4838[writable-_construct]: #writable_constructcallback 4839[writable-_destroy]: #writable_destroyerr-callback 4840[writable-destroy]: #writabledestroyerror 4841[writable-new]: #new-streamwritableoptions 4842[zlib]: zlib.md 4843