1'use strict'; 2 3const common = require('../common'); 4const { 5 Stream, 6 Writable, 7 Readable, 8 Transform, 9 pipeline, 10 PassThrough, 11 Duplex, 12 addAbortSignal, 13} = require('stream'); 14const pipelinep = require('stream/promises').pipeline; 15const assert = require('assert'); 16const http = require('http'); 17const { promisify } = require('util'); 18const net = require('net'); 19const tsp = require('timers/promises'); 20 21{ 22 let finished = false; 23 const processed = []; 24 const expected = [ 25 Buffer.from('a'), 26 Buffer.from('b'), 27 Buffer.from('c'), 28 ]; 29 30 const read = new Readable({ 31 read() {} 32 }); 33 34 const write = new Writable({ 35 write(data, enc, cb) { 36 processed.push(data); 37 cb(); 38 } 39 }); 40 41 write.on('finish', () => { 42 finished = true; 43 }); 44 45 for (let i = 0; i < expected.length; i++) { 46 read.push(expected[i]); 47 } 48 read.push(null); 49 50 pipeline(read, write, common.mustSucceed(() => { 51 assert.ok(finished); 52 assert.deepStrictEqual(processed, expected); 53 })); 54} 55 56{ 57 const read = new Readable({ 58 read() {} 59 }); 60 61 assert.throws(() => { 62 pipeline(read, () => {}); 63 }, /ERR_MISSING_ARGS/); 64 assert.throws(() => { 65 pipeline(() => {}); 66 }, /ERR_MISSING_ARGS/); 67 assert.throws(() => { 68 pipeline(); 69 }, /ERR_INVALID_ARG_TYPE/); 70} 71 72{ 73 const read = new Readable({ 74 read() {} 75 }); 76 77 const write = new Writable({ 78 write(data, enc, cb) { 79 cb(); 80 } 81 }); 82 83 read.push('data'); 84 setImmediate(() => read.destroy()); 85 86 pipeline(read, write, common.mustCall((err) => { 87 assert.ok(err, 'should have an error'); 88 })); 89} 90 91{ 92 const read = new Readable({ 93 read() {} 94 }); 95 96 const write = new Writable({ 97 write(data, enc, cb) { 98 cb(); 99 } 100 }); 101 102 read.push('data'); 103 setImmediate(() => read.destroy(new Error('kaboom'))); 104 105 const dst = pipeline(read, write, common.mustCall((err) => { 106 assert.deepStrictEqual(err, new Error('kaboom')); 107 })); 108 109 assert.strictEqual(dst, write); 110} 111 112{ 113 const read = new Readable({ 114 read() {} 115 }); 116 117 const transform = new Transform({ 118 transform(data, enc, cb) { 119 cb(new Error('kaboom')); 120 } 121 }); 122 123 const write = new Writable({ 124 write(data, enc, cb) { 125 cb(); 126 } 127 }); 128 129 read.on('close', common.mustCall()); 130 transform.on('close', common.mustCall()); 131 write.on('close', common.mustCall()); 132 133 [read, transform, write].forEach((stream) => { 134 stream.on('error', common.mustCall((err) => { 135 assert.deepStrictEqual(err, new Error('kaboom')); 136 })); 137 }); 138 139 const dst = pipeline(read, transform, write, common.mustCall((err) => { 140 assert.deepStrictEqual(err, new Error('kaboom')); 141 })); 142 143 assert.strictEqual(dst, write); 144 145 read.push('hello'); 146} 147 148{ 149 const server = http.createServer((req, res) => { 150 const rs = new Readable({ 151 read() { 152 rs.push('hello'); 153 rs.push(null); 154 } 155 }); 156 157 pipeline(rs, res, () => {}); 158 }); 159 160 server.listen(0, () => { 161 const req = http.request({ 162 port: server.address().port 163 }); 164 165 req.end(); 166 req.on('response', (res) => { 167 const buf = []; 168 res.on('data', (data) => buf.push(data)); 169 res.on('end', common.mustCall(() => { 170 assert.deepStrictEqual( 171 Buffer.concat(buf), 172 Buffer.from('hello') 173 ); 174 server.close(); 175 })); 176 }); 177 }); 178} 179 180{ 181 const server = http.createServer((req, res) => { 182 let sent = false; 183 const rs = new Readable({ 184 read() { 185 if (sent) { 186 return; 187 } 188 sent = true; 189 rs.push('hello'); 190 }, 191 destroy: common.mustCall((err, cb) => { 192 // Prevents fd leaks by destroying http pipelines 193 cb(); 194 }) 195 }); 196 197 pipeline(rs, res, () => {}); 198 }); 199 200 server.listen(0, () => { 201 const req = http.request({ 202 port: server.address().port 203 }); 204 205 req.end(); 206 req.on('response', (res) => { 207 setImmediate(() => { 208 res.destroy(); 209 server.close(); 210 }); 211 }); 212 }); 213} 214 215{ 216 const server = http.createServer((req, res) => { 217 let sent = 0; 218 const rs = new Readable({ 219 read() { 220 if (sent++ > 10) { 221 return; 222 } 223 rs.push('hello'); 224 }, 225 destroy: common.mustCall((err, cb) => { 226 cb(); 227 }) 228 }); 229 230 pipeline(rs, res, () => {}); 231 }); 232 233 let cnt = 10; 234 235 const badSink = new Writable({ 236 write(data, enc, cb) { 237 cnt--; 238 if (cnt === 0) cb(new Error('kaboom')); 239 else cb(); 240 } 241 }); 242 243 server.listen(0, () => { 244 const req = http.request({ 245 port: server.address().port 246 }); 247 248 req.end(); 249 req.on('response', (res) => { 250 pipeline(res, badSink, common.mustCall((err) => { 251 assert.deepStrictEqual(err, new Error('kaboom')); 252 server.close(); 253 })); 254 }); 255 }); 256} 257 258{ 259 const server = http.createServer((req, res) => { 260 pipeline(req, res, common.mustSucceed()); 261 }); 262 263 server.listen(0, () => { 264 const req = http.request({ 265 port: server.address().port 266 }); 267 268 let sent = 0; 269 const rs = new Readable({ 270 read() { 271 if (sent++ > 10) { 272 return; 273 } 274 rs.push('hello'); 275 } 276 }); 277 278 pipeline(rs, req, common.mustCall(() => { 279 server.close(); 280 })); 281 282 req.on('response', (res) => { 283 let cnt = 10; 284 res.on('data', () => { 285 cnt--; 286 if (cnt === 0) rs.destroy(); 287 }); 288 }); 289 }); 290} 291 292{ 293 const makeTransform = () => { 294 const tr = new Transform({ 295 transform(data, enc, cb) { 296 cb(null, data); 297 } 298 }); 299 300 tr.on('close', common.mustCall()); 301 return tr; 302 }; 303 304 const rs = new Readable({ 305 read() { 306 rs.push('hello'); 307 } 308 }); 309 310 let cnt = 10; 311 312 const ws = new Writable({ 313 write(data, enc, cb) { 314 cnt--; 315 if (cnt === 0) return cb(new Error('kaboom')); 316 cb(); 317 } 318 }); 319 320 rs.on('close', common.mustCall()); 321 ws.on('close', common.mustCall()); 322 323 pipeline( 324 rs, 325 makeTransform(), 326 makeTransform(), 327 makeTransform(), 328 makeTransform(), 329 makeTransform(), 330 makeTransform(), 331 ws, 332 common.mustCall((err) => { 333 assert.deepStrictEqual(err, new Error('kaboom')); 334 }) 335 ); 336} 337 338{ 339 const oldStream = new Stream(); 340 341 oldStream.pause = oldStream.resume = () => {}; 342 oldStream.write = (data) => { 343 oldStream.emit('data', data); 344 return true; 345 }; 346 oldStream.end = () => { 347 oldStream.emit('end'); 348 }; 349 350 const expected = [ 351 Buffer.from('hello'), 352 Buffer.from('world'), 353 ]; 354 355 const rs = new Readable({ 356 read() { 357 for (let i = 0; i < expected.length; i++) { 358 rs.push(expected[i]); 359 } 360 rs.push(null); 361 } 362 }); 363 364 const ws = new Writable({ 365 write(data, enc, cb) { 366 assert.deepStrictEqual(data, expected.shift()); 367 cb(); 368 } 369 }); 370 371 let finished = false; 372 373 ws.on('finish', () => { 374 finished = true; 375 }); 376 377 pipeline( 378 rs, 379 oldStream, 380 ws, 381 common.mustSucceed(() => { 382 assert(finished, 'last stream finished'); 383 }) 384 ); 385} 386 387{ 388 const oldStream = new Stream(); 389 390 oldStream.pause = oldStream.resume = () => {}; 391 oldStream.write = (data) => { 392 oldStream.emit('data', data); 393 return true; 394 }; 395 oldStream.end = () => { 396 oldStream.emit('end'); 397 }; 398 399 const destroyableOldStream = new Stream(); 400 401 destroyableOldStream.pause = destroyableOldStream.resume = () => {}; 402 destroyableOldStream.destroy = common.mustCall(() => { 403 destroyableOldStream.emit('close'); 404 }); 405 destroyableOldStream.write = (data) => { 406 destroyableOldStream.emit('data', data); 407 return true; 408 }; 409 destroyableOldStream.end = () => { 410 destroyableOldStream.emit('end'); 411 }; 412 413 const rs = new Readable({ 414 read() { 415 rs.destroy(new Error('stop')); 416 } 417 }); 418 419 const ws = new Writable({ 420 write(data, enc, cb) { 421 cb(); 422 } 423 }); 424 425 let finished = false; 426 427 ws.on('finish', () => { 428 finished = true; 429 }); 430 431 pipeline( 432 rs, 433 oldStream, 434 destroyableOldStream, 435 ws, 436 common.mustCall((err) => { 437 assert.deepStrictEqual(err, new Error('stop')); 438 assert(!finished, 'should not finish'); 439 }) 440 ); 441} 442 443{ 444 const pipelinePromise = promisify(pipeline); 445 446 async function run() { 447 const read = new Readable({ 448 read() {} 449 }); 450 451 const write = new Writable({ 452 write(data, enc, cb) { 453 cb(); 454 } 455 }); 456 457 read.push('data'); 458 read.push(null); 459 460 let finished = false; 461 462 write.on('finish', () => { 463 finished = true; 464 }); 465 466 await pipelinePromise(read, write); 467 468 assert(finished); 469 } 470 471 run(); 472} 473 474{ 475 // Check aborted signal without values 476 const pipelinePromise = promisify(pipeline); 477 async function run() { 478 const ac = new AbortController(); 479 const { signal } = ac; 480 async function* producer() { 481 ac.abort(); 482 await Promise.resolve(); 483 yield '8'; 484 } 485 486 const w = new Writable({ 487 write(chunk, encoding, callback) { 488 callback(); 489 } 490 }); 491 await pipelinePromise(producer, w, { signal }); 492 } 493 494 assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 495} 496 497{ 498 // Check aborted signal after init. 499 const pipelinePromise = promisify(pipeline); 500 async function run() { 501 const ac = new AbortController(); 502 const { signal } = ac; 503 async function* producer() { 504 yield '5'; 505 await Promise.resolve(); 506 ac.abort(); 507 await Promise.resolve(); 508 yield '8'; 509 } 510 511 const w = new Writable({ 512 write(chunk, encoding, callback) { 513 callback(); 514 } 515 }); 516 await pipelinePromise(producer, w, { signal }); 517 } 518 519 assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 520} 521 522{ 523 // Check pre-aborted signal 524 const pipelinePromise = promisify(pipeline); 525 async function run() { 526 const signal = AbortSignal.abort(); 527 async function* producer() { 528 yield '5'; 529 await Promise.resolve(); 530 yield '8'; 531 } 532 533 const w = new Writable({ 534 write(chunk, encoding, callback) { 535 callback(); 536 } 537 }); 538 await pipelinePromise(producer, w, { signal }); 539 } 540 541 assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 542} 543 544{ 545 const read = new Readable({ 546 read() {} 547 }); 548 549 const transform = new Transform({ 550 transform(data, enc, cb) { 551 cb(new Error('kaboom')); 552 } 553 }); 554 555 const write = new Writable({ 556 write(data, enc, cb) { 557 cb(); 558 } 559 }); 560 561 assert.throws( 562 () => pipeline(read, transform, write), 563 { code: 'ERR_INVALID_ARG_TYPE' } 564 ); 565} 566 567{ 568 const server = http.Server(function(req, res) { 569 res.write('asd'); 570 }); 571 server.listen(0, function() { 572 http.get({ port: this.address().port }, (res) => { 573 const stream = new PassThrough(); 574 575 stream.on('error', common.mustCall()); 576 577 pipeline( 578 res, 579 stream, 580 common.mustCall((err) => { 581 assert.strictEqual(err.message, 'oh no'); 582 server.close(); 583 }) 584 ); 585 586 stream.destroy(new Error('oh no')); 587 }).on('error', common.mustNotCall()); 588 }); 589} 590 591{ 592 let res = ''; 593 const w = new Writable({ 594 write(chunk, encoding, callback) { 595 res += chunk; 596 callback(); 597 } 598 }); 599 pipeline(function*() { 600 yield 'hello'; 601 yield 'world'; 602 }(), w, common.mustSucceed(() => { 603 assert.strictEqual(res, 'helloworld'); 604 })); 605} 606 607{ 608 let res = ''; 609 const w = new Writable({ 610 write(chunk, encoding, callback) { 611 res += chunk; 612 callback(); 613 } 614 }); 615 pipeline(async function*() { 616 await Promise.resolve(); 617 yield 'hello'; 618 yield 'world'; 619 }(), w, common.mustSucceed(() => { 620 assert.strictEqual(res, 'helloworld'); 621 })); 622} 623 624{ 625 let res = ''; 626 const w = new Writable({ 627 write(chunk, encoding, callback) { 628 res += chunk; 629 callback(); 630 } 631 }); 632 pipeline(function*() { 633 yield 'hello'; 634 yield 'world'; 635 }, w, common.mustSucceed(() => { 636 assert.strictEqual(res, 'helloworld'); 637 })); 638} 639 640{ 641 let res = ''; 642 const w = new Writable({ 643 write(chunk, encoding, callback) { 644 res += chunk; 645 callback(); 646 } 647 }); 648 pipeline(async function*() { 649 await Promise.resolve(); 650 yield 'hello'; 651 yield 'world'; 652 }, w, common.mustSucceed(() => { 653 assert.strictEqual(res, 'helloworld'); 654 })); 655} 656 657{ 658 let res = ''; 659 pipeline(async function*() { 660 await Promise.resolve(); 661 yield 'hello'; 662 yield 'world'; 663 }, async function*(source) { 664 for await (const chunk of source) { 665 yield chunk.toUpperCase(); 666 } 667 }, async function(source) { 668 for await (const chunk of source) { 669 res += chunk; 670 } 671 }, common.mustSucceed(() => { 672 assert.strictEqual(res, 'HELLOWORLD'); 673 })); 674} 675 676{ 677 pipeline(async function*() { 678 await Promise.resolve(); 679 yield 'hello'; 680 yield 'world'; 681 }, async function*(source) { 682 for await (const chunk of source) { 683 yield chunk.toUpperCase(); 684 } 685 }, async function(source) { 686 let ret = ''; 687 for await (const chunk of source) { 688 ret += chunk; 689 } 690 return ret; 691 }, common.mustSucceed((val) => { 692 assert.strictEqual(val, 'HELLOWORLD'); 693 })); 694} 695 696{ 697 // AsyncIterable destination is returned and finalizes. 698 699 const ret = pipeline(async function*() { 700 await Promise.resolve(); 701 yield 'hello'; 702 }, async function*(source) { // eslint-disable-line require-yield 703 for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty 704 }, common.mustCall((err) => { 705 assert.strictEqual(err, undefined); 706 })); 707 ret.resume(); 708 assert.strictEqual(typeof ret.pipe, 'function'); 709} 710 711{ 712 // AsyncFunction destination is not returned and error is 713 // propagated. 714 715 const ret = pipeline(async function*() { // eslint-disable-line require-yield 716 await Promise.resolve(); 717 throw new Error('kaboom'); 718 }, async function*(source) { // eslint-disable-line require-yield 719 for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty 720 }, common.mustCall((err) => { 721 assert.strictEqual(err.message, 'kaboom'); 722 })); 723 ret.resume(); 724 assert.strictEqual(typeof ret.pipe, 'function'); 725} 726 727{ 728 const s = new PassThrough(); 729 pipeline(async function*() { // eslint-disable-line require-yield 730 throw new Error('kaboom'); 731 }, s, common.mustCall((err) => { 732 assert.strictEqual(err.message, 'kaboom'); 733 assert.strictEqual(s.destroyed, true); 734 })); 735} 736 737{ 738 const s = new PassThrough(); 739 pipeline(async function*() { // eslint-disable-line require-yield 740 throw new Error('kaboom'); 741 }(), s, common.mustCall((err) => { 742 assert.strictEqual(err.message, 'kaboom'); 743 assert.strictEqual(s.destroyed, true); 744 })); 745} 746 747{ 748 const s = new PassThrough(); 749 pipeline(function*() { // eslint-disable-line require-yield 750 throw new Error('kaboom'); 751 }, s, common.mustCall((err, val) => { 752 assert.strictEqual(err.message, 'kaboom'); 753 assert.strictEqual(s.destroyed, true); 754 })); 755} 756 757{ 758 const s = new PassThrough(); 759 pipeline(function*() { // eslint-disable-line require-yield 760 throw new Error('kaboom'); 761 }(), s, common.mustCall((err, val) => { 762 assert.strictEqual(err.message, 'kaboom'); 763 assert.strictEqual(s.destroyed, true); 764 })); 765} 766 767{ 768 const s = new PassThrough(); 769 pipeline(async function*() { 770 await Promise.resolve(); 771 yield 'hello'; 772 yield 'world'; 773 }, s, async function(source) { 774 for await (const chunk of source) { // eslint-disable-line no-unused-vars 775 throw new Error('kaboom'); 776 } 777 }, common.mustCall((err, val) => { 778 assert.strictEqual(err.message, 'kaboom'); 779 assert.strictEqual(s.destroyed, true); 780 })); 781} 782 783{ 784 const s = new PassThrough(); 785 const ret = pipeline(function() { 786 return ['hello', 'world']; 787 }, s, async function*(source) { // eslint-disable-line require-yield 788 for await (const chunk of source) { // eslint-disable-line no-unused-vars 789 throw new Error('kaboom'); 790 } 791 }, common.mustCall((err) => { 792 assert.strictEqual(err.message, 'kaboom'); 793 assert.strictEqual(s.destroyed, true); 794 })); 795 ret.resume(); 796 assert.strictEqual(typeof ret.pipe, 'function'); 797} 798 799{ 800 // Legacy streams without async iterator. 801 802 const s = new PassThrough(); 803 s.push('asd'); 804 s.push(null); 805 s[Symbol.asyncIterator] = null; 806 let ret = ''; 807 pipeline(s, async function(source) { 808 for await (const chunk of source) { 809 ret += chunk; 810 } 811 }, common.mustCall((err) => { 812 assert.strictEqual(err, undefined); 813 assert.strictEqual(ret, 'asd'); 814 })); 815} 816 817{ 818 // v1 streams without read(). 819 820 const s = new Stream(); 821 process.nextTick(() => { 822 s.emit('data', 'asd'); 823 s.emit('end'); 824 }); 825 // 'destroyer' can be called multiple times, 826 // once from stream wrapper and 827 // once from iterator wrapper. 828 s.close = common.mustCallAtLeast(1); 829 let ret = ''; 830 pipeline(s, async function(source) { 831 for await (const chunk of source) { 832 ret += chunk; 833 } 834 }, common.mustCall((err) => { 835 assert.strictEqual(err, undefined); 836 assert.strictEqual(ret, 'asd'); 837 })); 838} 839 840{ 841 // v1 error streams without read(). 842 843 const s = new Stream(); 844 process.nextTick(() => { 845 s.emit('error', new Error('kaboom')); 846 }); 847 s.destroy = common.mustCall(); 848 pipeline(s, async function(source) { 849 }, common.mustCall((err) => { 850 assert.strictEqual(err.message, 'kaboom'); 851 })); 852} 853 854{ 855 const s = new PassThrough(); 856 assert.throws(() => { 857 pipeline(function(source) { 858 }, s, () => {}); 859 }, (err) => { 860 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 861 assert.strictEqual(s.destroyed, false); 862 return true; 863 }); 864} 865 866{ 867 const s = new PassThrough(); 868 assert.throws(() => { 869 pipeline(s, function(source) { 870 }, s, () => {}); 871 }, (err) => { 872 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 873 assert.strictEqual(s.destroyed, false); 874 return true; 875 }); 876} 877 878{ 879 const s = new PassThrough(); 880 assert.throws(() => { 881 pipeline(s, function(source) { 882 }, () => {}); 883 }, (err) => { 884 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 885 assert.strictEqual(s.destroyed, false); 886 return true; 887 }); 888} 889 890{ 891 const s = new PassThrough(); 892 assert.throws(() => { 893 pipeline(s, function*(source) { 894 }, () => {}); 895 }, (err) => { 896 assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 897 assert.strictEqual(s.destroyed, false); 898 return true; 899 }); 900} 901 902{ 903 let res = ''; 904 pipeline(async function*() { 905 await Promise.resolve(); 906 yield 'hello'; 907 yield 'world'; 908 }, new Transform({ 909 transform(chunk, encoding, cb) { 910 cb(new Error('kaboom')); 911 } 912 }), async function(source) { 913 for await (const chunk of source) { 914 res += chunk; 915 } 916 }, common.mustCall((err) => { 917 assert.strictEqual(err.message, 'kaboom'); 918 assert.strictEqual(res, ''); 919 })); 920} 921 922{ 923 let res = ''; 924 pipeline(async function*() { 925 await Promise.resolve(); 926 yield 'hello'; 927 yield 'world'; 928 }, new Transform({ 929 transform(chunk, encoding, cb) { 930 process.nextTick(cb, new Error('kaboom')); 931 } 932 }), async function(source) { 933 for await (const chunk of source) { 934 res += chunk; 935 } 936 }, common.mustCall((err) => { 937 assert.strictEqual(err.message, 'kaboom'); 938 assert.strictEqual(res, ''); 939 })); 940} 941 942{ 943 let res = ''; 944 pipeline(async function*() { 945 await Promise.resolve(); 946 yield 'hello'; 947 yield 'world'; 948 }, new Transform({ 949 decodeStrings: false, 950 transform(chunk, encoding, cb) { 951 cb(null, chunk.toUpperCase()); 952 } 953 }), async function(source) { 954 for await (const chunk of source) { 955 res += chunk; 956 } 957 }, common.mustSucceed(() => { 958 assert.strictEqual(res, 'HELLOWORLD'); 959 })); 960} 961 962{ 963 // Ensure no unhandled rejection from async function. 964 965 pipeline(async function*() { 966 yield 'hello'; 967 }, async function(source) { 968 throw new Error('kaboom'); 969 }, common.mustCall((err) => { 970 assert.strictEqual(err.message, 'kaboom'); 971 })); 972} 973 974{ 975 const src = new PassThrough({ autoDestroy: false }); 976 const dst = new PassThrough({ autoDestroy: false }); 977 pipeline(src, dst, common.mustCall(() => { 978 assert.strictEqual(src.destroyed, false); 979 assert.strictEqual(dst.destroyed, false); 980 })); 981 src.end(); 982} 983 984{ 985 // Make sure 'close' before 'end' finishes without error 986 // if readable has received eof. 987 // Ref: https://github.com/nodejs/node/issues/29699 988 const r = new Readable(); 989 const w = new Writable({ 990 write(chunk, encoding, cb) { 991 cb(); 992 } 993 }); 994 pipeline(r, w, (err) => { 995 assert.strictEqual(err, undefined); 996 }); 997 r.push('asd'); 998 r.push(null); 999 r.emit('close'); 1000} 1001 1002{ 1003 const server = http.createServer((req, res) => { 1004 }); 1005 1006 server.listen(0, () => { 1007 const req = http.request({ 1008 port: server.address().port 1009 }); 1010 1011 const body = new PassThrough(); 1012 pipeline( 1013 body, 1014 req, 1015 common.mustSucceed(() => { 1016 assert(!req.res); 1017 assert(!req.aborted); 1018 req.abort(); 1019 server.close(); 1020 }) 1021 ); 1022 body.end(); 1023 }); 1024} 1025 1026{ 1027 const src = new PassThrough(); 1028 const dst = new PassThrough(); 1029 pipeline(src, dst, common.mustSucceed(() => { 1030 assert.strictEqual(dst.destroyed, false); 1031 })); 1032 src.end(); 1033} 1034 1035{ 1036 const src = new PassThrough(); 1037 const dst = new PassThrough(); 1038 dst.readable = false; 1039 pipeline(src, dst, common.mustSucceed(() => { 1040 assert.strictEqual(dst.destroyed, true); 1041 })); 1042 src.end(); 1043} 1044 1045{ 1046 let res = ''; 1047 const rs = new Readable({ 1048 read() { 1049 setImmediate(() => { 1050 rs.push('hello'); 1051 }); 1052 } 1053 }); 1054 const ws = new Writable({ 1055 write: common.mustNotCall() 1056 }); 1057 pipeline(rs, async function*(stream) { // eslint-disable-line require-yield 1058 for await (const chunk of stream) { // eslint-disable-line no-unused-vars 1059 throw new Error('kaboom'); 1060 } 1061 }, async function *(source) { // eslint-disable-line require-yield 1062 for await (const chunk of source) { 1063 res += chunk; 1064 } 1065 }, ws, common.mustCall((err) => { 1066 assert.strictEqual(err.message, 'kaboom'); 1067 assert.strictEqual(res, ''); 1068 })); 1069} 1070 1071{ 1072 const server = http.createServer((req, res) => { 1073 req.socket.on('error', common.mustNotCall()); 1074 pipeline(req, new PassThrough(), (err) => { 1075 assert.ifError(err); 1076 res.end(); 1077 server.close(); 1078 }); 1079 }); 1080 1081 server.listen(0, () => { 1082 const req = http.request({ 1083 method: 'PUT', 1084 port: server.address().port 1085 }); 1086 req.end('asd123'); 1087 req.on('response', common.mustCall()); 1088 req.on('error', common.mustNotCall()); 1089 }); 1090} 1091 1092{ 1093 // Might still want to be able to use the writable side 1094 // of src. This is in the case where e.g. the Duplex input 1095 // is not directly connected to its output. Such a case could 1096 // happen when the Duplex is reading from a socket and then echos 1097 // the data back on the same socket. 1098 const src = new PassThrough(); 1099 assert.strictEqual(src.writable, true); 1100 const dst = new PassThrough(); 1101 pipeline(src, dst, common.mustCall((err) => { 1102 assert.strictEqual(src.writable, true); 1103 assert.strictEqual(src.destroyed, false); 1104 })); 1105 src.push(null); 1106} 1107 1108{ 1109 const src = new PassThrough(); 1110 const dst = pipeline( 1111 src, 1112 async function * (source) { 1113 for await (const chunk of source) { 1114 yield chunk; 1115 } 1116 }, 1117 common.mustCall((err) => { 1118 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 1119 }) 1120 ); 1121 src.push('asd'); 1122 dst.destroy(); 1123} 1124 1125{ 1126 pipeline(async function * () { 1127 yield 'asd'; 1128 }, async function * (source) { 1129 for await (const chunk of source) { 1130 yield { chunk }; 1131 } 1132 }, common.mustSucceed()); 1133} 1134 1135{ 1136 let closed = false; 1137 const src = new Readable({ 1138 read() {}, 1139 destroy(err, cb) { 1140 process.nextTick(cb); 1141 } 1142 }); 1143 const dst = new Writable({ 1144 write(chunk, encoding, callback) { 1145 callback(); 1146 } 1147 }); 1148 src.on('close', () => { 1149 closed = true; 1150 }); 1151 src.push(null); 1152 pipeline(src, dst, common.mustCall((err) => { 1153 assert.strictEqual(closed, true); 1154 })); 1155} 1156 1157{ 1158 let closed = false; 1159 const src = new Readable({ 1160 read() {}, 1161 destroy(err, cb) { 1162 process.nextTick(cb); 1163 } 1164 }); 1165 const dst = new Duplex({}); 1166 src.on('close', common.mustCall(() => { 1167 closed = true; 1168 })); 1169 src.push(null); 1170 pipeline(src, dst, common.mustCall((err) => { 1171 assert.strictEqual(closed, true); 1172 })); 1173} 1174 1175{ 1176 const server = net.createServer(common.mustCall((socket) => { 1177 // echo server 1178 pipeline(socket, socket, common.mustSucceed()); 1179 // 13 force destroys the socket before it has a chance to emit finish 1180 socket.on('finish', common.mustCall(() => { 1181 server.close(); 1182 })); 1183 })).listen(0, common.mustCall(() => { 1184 const socket = net.connect(server.address().port); 1185 socket.end(); 1186 })); 1187} 1188 1189{ 1190 const d = new Duplex({ 1191 autoDestroy: false, 1192 write: common.mustCall((data, enc, cb) => { 1193 d.push(data); 1194 cb(); 1195 }), 1196 read: common.mustCall(() => { 1197 d.push(null); 1198 }), 1199 final: common.mustCall((cb) => { 1200 setTimeout(() => { 1201 assert.strictEqual(d.destroyed, false); 1202 cb(); 1203 }, 1000); 1204 }), 1205 destroy: common.mustNotCall() 1206 }); 1207 1208 const sink = new Writable({ 1209 write: common.mustCall((data, enc, cb) => { 1210 cb(); 1211 }) 1212 }); 1213 1214 pipeline(d, sink, common.mustSucceed()); 1215 1216 d.write('test'); 1217 d.end(); 1218} 1219 1220{ 1221 const server = net.createServer(common.mustCall((socket) => { 1222 // echo server 1223 pipeline(socket, socket, common.mustSucceed()); 1224 socket.on('finish', common.mustCall(() => { 1225 server.close(); 1226 })); 1227 })).listen(0, common.mustCall(() => { 1228 const socket = net.connect(server.address().port); 1229 socket.end(); 1230 })); 1231} 1232 1233{ 1234 const d = new Duplex({ 1235 autoDestroy: false, 1236 write: common.mustCall((data, enc, cb) => { 1237 d.push(data); 1238 cb(); 1239 }), 1240 read: common.mustCall(() => { 1241 d.push(null); 1242 }), 1243 final: common.mustCall((cb) => { 1244 setTimeout(() => { 1245 assert.strictEqual(d.destroyed, false); 1246 cb(); 1247 }, 1000); 1248 }), 1249 // `destroy()` won't be invoked by pipeline since 1250 // the writable side has not completed when 1251 // the pipeline has completed. 1252 destroy: common.mustNotCall() 1253 }); 1254 1255 const sink = new Writable({ 1256 write: common.mustCall((data, enc, cb) => { 1257 cb(); 1258 }) 1259 }); 1260 1261 pipeline(d, sink, common.mustSucceed()); 1262 1263 d.write('test'); 1264 d.end(); 1265} 1266 1267{ 1268 const r = new Readable({ 1269 read() {} 1270 }); 1271 r.push('hello'); 1272 r.push('world'); 1273 r.push(null); 1274 let res = ''; 1275 const w = new Writable({ 1276 write(chunk, encoding, callback) { 1277 res += chunk; 1278 callback(); 1279 } 1280 }); 1281 pipeline([r, w], common.mustSucceed(() => { 1282 assert.strictEqual(res, 'helloworld'); 1283 })); 1284} 1285 1286{ 1287 let flushed = false; 1288 const makeStream = () => 1289 new Transform({ 1290 transform: (chunk, enc, cb) => cb(null, chunk), 1291 flush: (cb) => 1292 setTimeout(() => { 1293 flushed = true; 1294 cb(null); 1295 }, 1), 1296 }); 1297 1298 const input = new Readable(); 1299 input.push(null); 1300 1301 pipeline( 1302 input, 1303 makeStream(), 1304 common.mustCall(() => { 1305 assert.strictEqual(flushed, true); 1306 }), 1307 ); 1308} 1309{ 1310 function createThenable() { 1311 let counter = 0; 1312 return { 1313 get then() { 1314 if (counter++) { 1315 throw new Error('Cannot access `then` more than once'); 1316 } 1317 return Function.prototype; 1318 }, 1319 }; 1320 } 1321 1322 pipeline( 1323 function* () { 1324 yield 0; 1325 }, 1326 createThenable, 1327 () => common.mustNotCall(), 1328 ); 1329} 1330 1331 1332{ 1333 const ac = new AbortController(); 1334 const r = Readable.from(async function* () { 1335 for (let i = 0; i < 10; i++) { 1336 await Promise.resolve(); 1337 yield String(i); 1338 if (i === 5) { 1339 ac.abort(); 1340 } 1341 } 1342 }()); 1343 let res = ''; 1344 const w = new Writable({ 1345 write(chunk, encoding, callback) { 1346 res += chunk; 1347 callback(); 1348 } 1349 }); 1350 const cb = common.mustCall((err) => { 1351 assert.strictEqual(err.name, 'AbortError'); 1352 assert.strictEqual(res, '012345'); 1353 assert.strictEqual(w.destroyed, true); 1354 assert.strictEqual(r.destroyed, true); 1355 assert.strictEqual(pipelined.destroyed, true); 1356 }); 1357 const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb)); 1358} 1359 1360{ 1361 pipeline([1, 2, 3], PassThrough({ objectMode: true }), 1362 common.mustSucceed(() => {})); 1363 1364 let res = ''; 1365 const w = new Writable({ 1366 write(chunk, encoding, callback) { 1367 res += chunk; 1368 callback(); 1369 }, 1370 }); 1371 pipeline(['1', '2', '3'], w, common.mustSucceed(() => { 1372 assert.strictEqual(res, '123'); 1373 })); 1374} 1375 1376{ 1377 const content = 'abc'; 1378 pipeline(Buffer.from(content), PassThrough({ objectMode: true }), 1379 common.mustSucceed(() => {})); 1380 1381 let res = ''; 1382 pipeline(Buffer.from(content), async function*(previous) { 1383 for await (const val of previous) { 1384 res += String.fromCharCode(val); 1385 yield val; 1386 } 1387 }, common.mustSucceed(() => { 1388 assert.strictEqual(res, content); 1389 })); 1390} 1391 1392{ 1393 const ac = new AbortController(); 1394 const signal = ac.signal; 1395 pipelinep( 1396 async function * ({ signal }) { // eslint-disable-line require-yield 1397 await tsp.setTimeout(1e6, signal); 1398 }, 1399 async function(source) { 1400 1401 }, 1402 { signal } 1403 ).catch(common.mustCall((err) => { 1404 assert.strictEqual(err.name, 'AbortError'); 1405 })); 1406 ac.abort(); 1407} 1408 1409{ 1410 async function run() { 1411 let finished = false; 1412 let text = ''; 1413 const write = new Writable({ 1414 write(data, enc, cb) { 1415 text += data; 1416 cb(); 1417 } 1418 }); 1419 write.on('finish', () => { 1420 finished = true; 1421 }); 1422 1423 await pipelinep([Readable.from('Hello World!'), write]); 1424 assert(finished); 1425 assert.strictEqual(text, 'Hello World!'); 1426 } 1427 1428 run(); 1429} 1430 1431{ 1432 let finished = false; 1433 let text = ''; 1434 const write = new Writable({ 1435 write(data, enc, cb) { 1436 text += data; 1437 cb(); 1438 } 1439 }); 1440 write.on('finish', () => { 1441 finished = true; 1442 }); 1443 1444 pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => { 1445 assert(finished); 1446 assert.strictEqual(text, 'Hello World!'); 1447 })); 1448} 1449 1450{ 1451 const pipelinePromise = promisify(pipeline); 1452 1453 async function run() { 1454 const read = new Readable({ 1455 read() {} 1456 }); 1457 1458 const duplex = new PassThrough(); 1459 1460 read.push(null); 1461 1462 await pipelinePromise(read, duplex); 1463 1464 assert.strictEqual(duplex.destroyed, false); 1465 } 1466 1467 run().then(common.mustCall()); 1468} 1469 1470{ 1471 const pipelinePromise = promisify(pipeline); 1472 1473 async function run() { 1474 const read = new Readable({ 1475 read() {} 1476 }); 1477 1478 const duplex = new PassThrough(); 1479 1480 read.push(null); 1481 1482 await pipelinePromise(read, duplex, { end: false }); 1483 1484 assert.strictEqual(duplex.destroyed, false); 1485 assert.strictEqual(duplex.writableEnded, false); 1486 } 1487 1488 run().then(common.mustCall()); 1489} 1490 1491{ 1492 const s = new PassThrough({ objectMode: true }); 1493 pipeline(async function*() { 1494 await Promise.resolve(); 1495 yield 'hello'; 1496 yield 'world'; 1497 yield 'world'; 1498 }, s, async function(source) { 1499 let ret = ''; 1500 let n = 0; 1501 for await (const chunk of source) { 1502 if (n++ > 1) { 1503 break; 1504 } 1505 ret += chunk; 1506 } 1507 return ret; 1508 }, common.mustCall((err, val) => { 1509 assert.strictEqual(err, undefined); 1510 assert.strictEqual(val, 'helloworld'); 1511 assert.strictEqual(s.destroyed, true); 1512 })); 1513} 1514 1515{ 1516 const s = new PassThrough({ objectMode: true }); 1517 pipeline(async function*() { 1518 await Promise.resolve(); 1519 yield 'hello'; 1520 yield 'world'; 1521 yield 'world'; 1522 }, s, async function(source) { 1523 return null; 1524 }, common.mustCall((err, val) => { 1525 assert.strictEqual(err, undefined); 1526 assert.strictEqual(val, null); 1527 })); 1528} 1529 1530{ 1531 // Mimics a legacy stream without the .destroy method 1532 class LegacyWritable extends Stream { 1533 write(chunk, encoding, callback) { 1534 callback(); 1535 } 1536 } 1537 1538 const writable = new LegacyWritable(); 1539 writable.on('error', common.mustCall((err) => { 1540 assert.deepStrictEqual(err, new Error('stop')); 1541 })); 1542 1543 pipeline( 1544 Readable.from({ 1545 [Symbol.asyncIterator]() { 1546 return { 1547 next() { 1548 return Promise.reject(new Error('stop')); 1549 } 1550 }; 1551 } 1552 }), 1553 writable, 1554 common.mustCall((err) => { 1555 assert.deepStrictEqual(err, new Error('stop')); 1556 }) 1557 ); 1558} 1559 1560{ 1561 class CustomReadable extends Readable { 1562 _read() { 1563 this.push('asd'); 1564 this.push(null); 1565 } 1566 } 1567 1568 class CustomWritable extends Writable { 1569 constructor() { 1570 super(); 1571 this.endCount = 0; 1572 this.str = ''; 1573 } 1574 1575 _write(chunk, enc, cb) { 1576 this.str += chunk; 1577 cb(); 1578 } 1579 1580 end() { 1581 this.endCount += 1; 1582 super.end(); 1583 } 1584 } 1585 1586 const readable = new CustomReadable(); 1587 const writable = new CustomWritable(); 1588 1589 pipeline(readable, writable, common.mustSucceed(() => { 1590 assert.strictEqual(writable.str, 'asd'); 1591 assert.strictEqual(writable.endCount, 1); 1592 })); 1593} 1594 1595{ 1596 const readable = new Readable({ 1597 read() {} 1598 }); 1599 readable.on('end', common.mustCall(() => { 1600 pipeline(readable, new PassThrough(), common.mustSucceed()); 1601 })); 1602 readable.push(null); 1603 readable.read(); 1604} 1605 1606{ 1607 const dup = new Duplex({ 1608 read() {}, 1609 write(chunk, enc, cb) { 1610 cb(); 1611 } 1612 }); 1613 dup.on('end', common.mustCall(() => { 1614 pipeline(dup, new PassThrough(), common.mustSucceed()); 1615 })); 1616 dup.push(null); 1617 dup.read(); 1618} 1619 1620{ 1621 let res = ''; 1622 const writable = new Writable({ 1623 write(chunk, enc, cb) { 1624 res += chunk; 1625 cb(); 1626 } 1627 }); 1628 pipelinep(async function*() { 1629 yield 'hello'; 1630 await Promise.resolve(); 1631 yield 'world'; 1632 }, writable, { end: false }).then(common.mustCall(() => { 1633 assert.strictEqual(res, 'helloworld'); 1634 assert.strictEqual(writable.closed, false); 1635 })); 1636} 1637 1638{ 1639 const r = new Readable(); 1640 for (let i = 0; i < 4000; i++) { 1641 r.push('asdfdagljanfgkaljdfn'); 1642 } 1643 r.push(null); 1644 1645 let ended = false; 1646 r.on('end', () => { 1647 ended = true; 1648 }); 1649 1650 const w = new Writable({ 1651 write(chunk, enc, cb) { 1652 cb(null); 1653 }, 1654 final: common.mustCall((cb) => { 1655 assert.strictEqual(ended, true); 1656 cb(null); 1657 }) 1658 }); 1659 1660 pipeline(r, w, common.mustCall((err) => { 1661 assert.strictEqual(err, undefined); 1662 })); 1663 1664} 1665