1'use strict'; 2 3const common = require('../common'); 4const { 5 Stream, 6 Readable, 7 Transform, 8 PassThrough, 9 pipeline 10} = require('stream'); 11const assert = require('assert'); 12const http = require('http'); 13const fs = require('fs'); 14 15async function tests() { 16 { 17 // v1 stream 18 19 const stream = new Stream(); 20 stream.destroy = common.mustCall(); 21 process.nextTick(() => { 22 stream.emit('data', 'hello'); 23 stream.emit('data', 'world'); 24 stream.emit('end'); 25 }); 26 27 let res = ''; 28 stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator]; 29 for await (const d of stream) { 30 res += d; 31 } 32 assert.strictEqual(res, 'helloworld'); 33 } 34 35 { 36 // v1 stream error 37 38 const stream = new Stream(); 39 stream.close = common.mustCall(); 40 process.nextTick(() => { 41 stream.emit('data', 0); 42 stream.emit('data', 1); 43 stream.emit('error', new Error('asd')); 44 }); 45 46 const iter = Readable.prototype[Symbol.asyncIterator].call(stream); 47 await iter.next() 48 .then(common.mustNotCall()) 49 .catch(common.mustCall((err) => { 50 assert.strictEqual(err.message, 'asd'); 51 })); 52 } 53 54 { 55 // Non standard stream cleanup 56 57 const readable = new Readable({ autoDestroy: false, read() {} }); 58 readable.push('asd'); 59 readable.push('asd'); 60 readable.destroy = null; 61 readable.close = common.mustCall(() => { 62 readable.emit('close'); 63 }); 64 65 await (async () => { 66 for await (const d of readable) { 67 return; 68 } 69 })(); 70 } 71 72 { 73 const readable = new Readable({ objectMode: true, read() {} }); 74 readable.push(0); 75 readable.push(1); 76 readable.push(null); 77 78 const iter = readable[Symbol.asyncIterator](); 79 assert.strictEqual((await iter.next()).value, 0); 80 for await (const d of iter) { 81 assert.strictEqual(d, 1); 82 } 83 } 84 85 { 86 console.log('read without for..await'); 87 const max = 5; 88 const readable = new Readable({ 89 objectMode: true, 90 read() {} 91 }); 92 93 const iter = readable[Symbol.asyncIterator](); 94 assert.strictEqual(iter.stream, readable); 95 const values = []; 96 for (let i = 0; i < max; i++) { 97 values.push(iter.next()); 98 } 99 Promise.all(values).then(common.mustCall((values) => { 100 values.forEach(common.mustCall( 101 (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5)); 102 })); 103 104 readable.push('hello-0'); 105 readable.push('hello-1'); 106 readable.push('hello-2'); 107 readable.push('hello-3'); 108 readable.push('hello-4'); 109 readable.push(null); 110 111 const last = await iter.next(); 112 assert.strictEqual(last.done, true); 113 } 114 115 { 116 console.log('read without for..await deferred'); 117 const readable = new Readable({ 118 objectMode: true, 119 read() {} 120 }); 121 122 const iter = readable[Symbol.asyncIterator](); 123 assert.strictEqual(iter.stream, readable); 124 let values = []; 125 for (let i = 0; i < 3; i++) { 126 values.push(iter.next()); 127 } 128 129 readable.push('hello-0'); 130 readable.push('hello-1'); 131 readable.push('hello-2'); 132 133 let k = 0; 134 const results1 = await Promise.all(values); 135 results1.forEach(common.mustCall( 136 (item) => assert.strictEqual(item.value, 'hello-' + k++), 3)); 137 138 values = []; 139 for (let i = 0; i < 2; i++) { 140 values.push(iter.next()); 141 } 142 143 readable.push('hello-3'); 144 readable.push('hello-4'); 145 readable.push(null); 146 147 const results2 = await Promise.all(values); 148 results2.forEach(common.mustCall( 149 (item) => assert.strictEqual(item.value, 'hello-' + k++), 2)); 150 151 const last = await iter.next(); 152 assert.strictEqual(last.done, true); 153 } 154 155 { 156 console.log('read without for..await with errors'); 157 const max = 3; 158 const readable = new Readable({ 159 objectMode: true, 160 read() {} 161 }); 162 163 const iter = readable[Symbol.asyncIterator](); 164 assert.strictEqual(iter.stream, readable); 165 const values = []; 166 const errors = []; 167 let i; 168 for (i = 0; i < max; i++) { 169 values.push(iter.next()); 170 } 171 for (i = 0; i < 2; i++) { 172 errors.push(iter.next()); 173 } 174 175 readable.push('hello-0'); 176 readable.push('hello-1'); 177 readable.push('hello-2'); 178 179 const resolved = await Promise.all(values); 180 181 resolved.forEach(common.mustCall( 182 (item, i) => assert.strictEqual(item.value, 'hello-' + i), max)); 183 184 errors.slice(0, 1).forEach((promise) => { 185 promise.catch(common.mustCall((err) => { 186 assert.strictEqual(err.message, 'kaboom'); 187 })); 188 }); 189 190 errors.slice(1).forEach((promise) => { 191 promise.then(common.mustCall(({ done, value }) => { 192 assert.strictEqual(done, true); 193 assert.strictEqual(value, undefined); 194 })); 195 }); 196 197 readable.destroy(new Error('kaboom')); 198 } 199 200 { 201 console.log('call next() after error'); 202 const readable = new Readable({ 203 read() {} 204 }); 205 const iterator = readable[Symbol.asyncIterator](); 206 207 const err = new Error('kaboom'); 208 readable.destroy(err); 209 await assert.rejects(iterator.next.bind(iterator), err); 210 } 211 212 { 213 console.log('read object mode'); 214 const max = 42; 215 let readed = 0; 216 let received = 0; 217 const readable = new Readable({ 218 objectMode: true, 219 read() { 220 this.push('hello'); 221 if (++readed === max) { 222 this.push(null); 223 } 224 } 225 }); 226 227 for await (const k of readable) { 228 received++; 229 assert.strictEqual(k, 'hello'); 230 } 231 232 assert.strictEqual(readed, received); 233 } 234 235 { 236 console.log('destroy sync'); 237 const readable = new Readable({ 238 objectMode: true, 239 read() { 240 this.destroy(new Error('kaboom from read')); 241 } 242 }); 243 244 let err; 245 try { 246 // eslint-disable-next-line no-unused-vars, no-empty 247 for await (const k of readable) { } 248 } catch (e) { 249 err = e; 250 } 251 assert.strictEqual(err.message, 'kaboom from read'); 252 } 253 254 { 255 console.log('destroy async'); 256 const readable = new Readable({ 257 objectMode: true, 258 read() { 259 if (!this.pushed) { 260 this.push('hello'); 261 this.pushed = true; 262 263 setImmediate(() => { 264 this.destroy(new Error('kaboom')); 265 }); 266 } 267 } 268 }); 269 270 let received = 0; 271 272 let err = null; 273 try { 274 // eslint-disable-next-line no-unused-vars 275 for await (const k of readable) { 276 received++; 277 } 278 } catch (e) { 279 err = e; 280 } 281 282 assert.strictEqual(err.message, 'kaboom'); 283 assert.strictEqual(received, 1); 284 } 285 286 { 287 console.log('destroyed by throw'); 288 const readable = new Readable({ 289 objectMode: true, 290 read() { 291 this.push('hello'); 292 } 293 }); 294 295 let err = null; 296 try { 297 for await (const k of readable) { 298 assert.strictEqual(k, 'hello'); 299 throw new Error('kaboom'); 300 } 301 } catch (e) { 302 err = e; 303 } 304 305 assert.strictEqual(err.message, 'kaboom'); 306 assert.strictEqual(readable.destroyed, true); 307 } 308 309 { 310 console.log('destroyed sync after push'); 311 const readable = new Readable({ 312 objectMode: true, 313 read() { 314 this.push('hello'); 315 this.destroy(new Error('kaboom')); 316 } 317 }); 318 319 let received = 0; 320 321 let err = null; 322 try { 323 for await (const k of readable) { 324 assert.strictEqual(k, 'hello'); 325 received++; 326 } 327 } catch (e) { 328 err = e; 329 } 330 331 assert.strictEqual(err.message, 'kaboom'); 332 assert.strictEqual(received, 1); 333 } 334 335 { 336 console.log('destroyed will not deadlock'); 337 const readable = new Readable(); 338 readable.destroy(); 339 process.nextTick(async () => { 340 readable.on('close', common.mustNotCall()); 341 let received = 0; 342 let err = null; 343 try { 344 for await (const k of readable) { 345 // Just make linting pass. This should never run. 346 assert.strictEqual(k, 'hello'); 347 received++; 348 } 349 } catch (_err) { 350 err = _err; 351 } 352 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 353 assert.strictEqual(received, 0); 354 }); 355 } 356 357 { 358 console.log('push async'); 359 const max = 42; 360 let readed = 0; 361 let received = 0; 362 const readable = new Readable({ 363 objectMode: true, 364 read() { 365 setImmediate(() => { 366 this.push('hello'); 367 if (++readed === max) { 368 this.push(null); 369 } 370 }); 371 } 372 }); 373 374 for await (const k of readable) { 375 received++; 376 assert.strictEqual(k, 'hello'); 377 } 378 379 assert.strictEqual(readed, received); 380 } 381 382 { 383 console.log('push binary async'); 384 const max = 42; 385 let readed = 0; 386 const readable = new Readable({ 387 read() { 388 setImmediate(() => { 389 this.push('hello'); 390 if (++readed === max) { 391 this.push(null); 392 } 393 }); 394 } 395 }); 396 397 let expected = ''; 398 readable.setEncoding('utf8'); 399 readable.pause(); 400 readable.on('data', (chunk) => { 401 expected += chunk; 402 }); 403 404 let data = ''; 405 for await (const k of readable) { 406 data += k; 407 } 408 409 assert.strictEqual(data, expected); 410 } 411 412 { 413 console.log('.next() on destroyed stream'); 414 const readable = new Readable({ 415 read() { 416 // no-op 417 } 418 }); 419 420 readable.destroy(); 421 422 const it = await readable[Symbol.asyncIterator](); 423 const next = it.next(); 424 next 425 .then(common.mustNotCall()) 426 .catch(common.mustCall((err) => { 427 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 428 })); 429 } 430 431 { 432 console.log('.next() on pipelined stream'); 433 const readable = new Readable({ 434 read() { 435 // no-op 436 } 437 }); 438 439 const passthrough = new PassThrough(); 440 const err = new Error('kaboom'); 441 pipeline(readable, passthrough, common.mustCall((e) => { 442 assert.strictEqual(e, err); 443 })); 444 readable.destroy(err); 445 await assert.rejects( 446 readable[Symbol.asyncIterator]().next(), 447 (e) => { 448 assert.strictEqual(e, err); 449 return true; 450 } 451 ); 452 } 453 454 { 455 console.log('iterating on an ended stream completes'); 456 const r = new Readable({ 457 objectMode: true, 458 read() { 459 this.push('asdf'); 460 this.push('hehe'); 461 this.push(null); 462 } 463 }); 464 // eslint-disable-next-line no-unused-vars, no-empty 465 for await (const a of r) { } 466 // eslint-disable-next-line no-unused-vars, no-empty 467 for await (const b of r) { } 468 } 469 470 { 471 console.log('destroy mid-stream errors'); 472 const r = new Readable({ 473 objectMode: true, 474 read() { 475 this.push('asdf'); 476 this.push('hehe'); 477 } 478 }); 479 480 let err = null; 481 try { 482 // eslint-disable-next-line no-unused-vars 483 for await (const a of r) { 484 r.destroy(null); 485 } 486 } catch (_err) { 487 err = _err; 488 } 489 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 490 } 491 492 { 493 console.log('readable side of a transform stream pushes null'); 494 const transform = new Transform({ 495 objectMode: true, 496 transform: (chunk, enc, cb) => { cb(null, chunk); } 497 }); 498 transform.push(0); 499 transform.push(1); 500 process.nextTick(() => { 501 transform.push(null); 502 }); 503 504 const mustReach = [ common.mustCall(), common.mustCall() ]; 505 506 const iter = transform[Symbol.asyncIterator](); 507 assert.strictEqual((await iter.next()).value, 0); 508 509 for await (const d of iter) { 510 assert.strictEqual(d, 1); 511 mustReach[0](); 512 } 513 mustReach[1](); 514 } 515 516 { 517 console.log('all next promises must be resolved on end'); 518 const r = new Readable({ 519 objectMode: true, 520 read() { 521 } 522 }); 523 524 const b = r[Symbol.asyncIterator](); 525 const c = b.next(); 526 const d = b.next(); 527 r.push(null); 528 assert.deepStrictEqual(await c, { done: true, value: undefined }); 529 assert.deepStrictEqual(await d, { done: true, value: undefined }); 530 } 531 532 { 533 console.log('all next promises must be rejected on destroy'); 534 const r = new Readable({ 535 objectMode: true, 536 read() { 537 } 538 }); 539 540 const b = r[Symbol.asyncIterator](); 541 const c = b.next(); 542 const d = b.next(); 543 r.destroy(); 544 c 545 .then(common.mustNotCall()) 546 .catch(common.mustCall((err) => { 547 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 548 })); 549 assert.deepStrictEqual(await d, { done: true, value: undefined }); 550 } 551 552 { 553 console.log('all next promises must be resolved on destroy with error'); 554 const r = new Readable({ 555 objectMode: true, 556 read() { 557 } 558 }); 559 560 const b = r[Symbol.asyncIterator](); 561 const c = b.next(); 562 const d = b.next(); 563 const err = new Error('kaboom'); 564 r.destroy(err); 565 566 await Promise.all([(async () => { 567 let e; 568 try { 569 await c; 570 } catch (_e) { 571 e = _e; 572 } 573 assert.strictEqual(e, err); 574 })(), (async () => { 575 let e; 576 let x; 577 try { 578 x = await d; 579 } catch (_e) { 580 e = _e; 581 } 582 assert.strictEqual(e, undefined); 583 assert.strictEqual(x.done, true); 584 assert.strictEqual(x.value, undefined); 585 })()]); 586 } 587 588 { 589 const _err = new Error('asd'); 590 const r = new Readable({ 591 read() { 592 }, 593 destroy(err, callback) { 594 setTimeout(() => callback(_err), 1); 595 } 596 }); 597 598 r.destroy(); 599 const it = r[Symbol.asyncIterator](); 600 it.next().catch(common.mustCall((err) => { 601 assert.strictEqual(err, _err); 602 })); 603 } 604 605 { 606 // Don't destroy if no auto destroy. 607 // https://github.com/nodejs/node/issues/35116 608 609 const r = new Readable({ 610 autoDestroy: false, 611 read() { 612 this.push('asd'); 613 this.push(null); 614 } 615 }); 616 617 for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty 618 assert.strictEqual(r.destroyed, false); 619 } 620 621 { 622 // Destroy if no auto destroy and premature break. 623 // https://github.com/nodejs/node/pull/35122/files#r485678318 624 625 const r = new Readable({ 626 autoDestroy: false, 627 read() { 628 this.push('asd'); 629 } 630 }); 631 632 for await (const chunk of r) { // eslint-disable-line no-unused-vars 633 break; 634 } 635 assert.strictEqual(r.destroyed, true); 636 } 637 638 { 639 // Don't destroy before 'end'. 640 641 const r = new Readable({ 642 read() { 643 this.push('asd'); 644 this.push(null); 645 } 646 }).on('end', () => { 647 assert.strictEqual(r.destroyed, false); 648 }); 649 650 for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty 651 assert.strictEqual(r.destroyed, true); 652 } 653} 654 655{ 656 // AsyncIterator return should end even when destroy 657 // does not implement the callback API. 658 659 const r = new Readable({ 660 objectMode: true, 661 read() { 662 } 663 }); 664 665 const originalDestroy = r.destroy; 666 r.destroy = (err) => { 667 originalDestroy.call(r, err); 668 }; 669 const it = r[Symbol.asyncIterator](); 670 const p = it.return(); 671 r.push(null); 672 p.then(common.mustCall()); 673} 674 675 676{ 677 // AsyncIterator return should not error with 678 // premature close. 679 680 const r = new Readable({ 681 objectMode: true, 682 read() { 683 } 684 }); 685 686 const originalDestroy = r.destroy; 687 r.destroy = (err) => { 688 originalDestroy.call(r, err); 689 }; 690 const it = r[Symbol.asyncIterator](); 691 const p = it.return(); 692 r.emit('close'); 693 p.then(common.mustCall()).catch(common.mustNotCall()); 694} 695 696{ 697 // AsyncIterator should not finish correctly if destroyed. 698 699 const r = new Readable({ 700 objectMode: true, 701 read() { 702 } 703 }); 704 705 r.destroy(); 706 r.on('close', () => { 707 const it = r[Symbol.asyncIterator](); 708 const next = it.next(); 709 next 710 .then(common.mustNotCall()) 711 .catch(common.mustCall((err) => { 712 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 713 })); 714 }); 715} 716 717{ 718 // AsyncIterator should throw if prematurely closed 719 // before end has been emitted. 720 (async function() { 721 const readable = fs.createReadStream(__filename); 722 723 try { 724 // eslint-disable-next-line no-unused-vars 725 for await (const chunk of readable) { 726 readable.close(); 727 } 728 729 assert.fail('should have thrown'); 730 } catch (err) { 731 assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 732 } 733 734 assert.ok(readable.destroyed); 735 })().then(common.mustCall()); 736} 737 738// AsyncIterator non-destroying iterator 739{ 740 function createReadable() { 741 return Readable.from((async function* () { 742 await Promise.resolve(); 743 yield 5; 744 await Promise.resolve(); 745 yield 7; 746 await Promise.resolve(); 747 })()); 748 } 749 750 // Check explicit destroying on return 751 (async function() { 752 const readable = createReadable(); 753 for await (const chunk of readable.iterator({ destroyOnReturn: true })) { 754 assert.strictEqual(chunk, 5); 755 break; 756 } 757 758 assert.ok(readable.destroyed); 759 })().then(common.mustCall()); 760 761 // Check explicit non-destroy with return true 762 (async function() { 763 const readable = createReadable(); 764 const opts = { destroyOnReturn: false }; 765 for await (const chunk of readable.iterator(opts)) { 766 assert.strictEqual(chunk, 5); 767 break; 768 } 769 770 assert.ok(!readable.destroyed); 771 772 for await (const chunk of readable.iterator(opts)) { 773 assert.strictEqual(chunk, 7); 774 } 775 776 assert.ok(readable.destroyed); 777 })().then(common.mustCall()); 778 779 // Check non-object options. 780 { 781 const readable = createReadable(); 782 assert.throws( 783 () => readable.iterator(42), 784 { 785 code: 'ERR_INVALID_ARG_TYPE', 786 name: 'TypeError', 787 message: 'The "options" argument must be of type object. Received ' + 788 'type number (42)', 789 } 790 ); 791 } 792 793 // Check for dangling listeners 794 (async function() { 795 const readable = createReadable(); 796 const opts = { destroyOnReturn: false }; 797 while (readable.readable) { 798 // eslint-disable-next-line no-unused-vars 799 for await (const chunk of readable.iterator(opts)) { 800 break; 801 } 802 } 803 804 assert.deepStrictEqual(readable.eventNames(), []); 805 })().then(common.mustCall()); 806} 807 808{ 809 let _req; 810 const server = http.createServer((request, response) => { 811 response.statusCode = 404; 812 response.write('never ends'); 813 }); 814 815 server.listen(() => { 816 _req = http.request(`http://localhost:${server.address().port}`) 817 .on('response', common.mustCall(async (res) => { 818 setTimeout(() => { 819 _req.destroy(new Error('something happened')); 820 }, 100); 821 822 res.on('error', common.mustCall()); 823 824 let _err; 825 try { 826 // eslint-disable-next-line no-unused-vars, no-empty 827 for await (const chunk of res) { } 828 } catch (err) { 829 _err = err; 830 } 831 832 assert.strictEqual(_err.code, 'ECONNRESET'); 833 server.close(); 834 })) 835 .on('error', common.mustCall()) 836 .end(); 837 }); 838} 839 840{ 841 async function getParsedBody(request) { 842 let body = ''; 843 844 for await (const data of request) { 845 body += data; 846 } 847 848 try { 849 return JSON.parse(body); 850 } catch { 851 return {}; 852 } 853 } 854 855 const str = JSON.stringify({ asd: true }); 856 const server = http.createServer(async (request, response) => { 857 const body = await getParsedBody(request); 858 response.statusCode = 200; 859 assert.strictEqual(JSON.stringify(body), str); 860 response.end(JSON.stringify(body)); 861 }).listen(() => { 862 http 863 .request({ 864 method: 'POST', 865 hostname: 'localhost', 866 port: server.address().port, 867 }) 868 .end(str) 869 .on('response', async (res) => { 870 let body = ''; 871 for await (const chunk of res) { 872 body += chunk; 873 } 874 assert.strictEqual(body, str); 875 server.close(); 876 }); 877 }); 878} 879 880// To avoid missing some tests if a promise does not resolve 881tests().then(common.mustCall()); 882