1'use strict';
2
3const common = require('../common');
4const {
5  Stream,
6  Readable,
7  Transform,
8  PassThrough,
9  pipeline
10} = require('stream');
11const assert = require('assert');
12const http = require('http');
13const fs = require('fs');
14
15async function tests() {
16  {
17    // v1 stream
18
19    const stream = new Stream();
20    stream.destroy = common.mustCall();
21    process.nextTick(() => {
22      stream.emit('data', 'hello');
23      stream.emit('data', 'world');
24      stream.emit('end');
25    });
26
27    let res = '';
28    stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
29    for await (const d of stream) {
30      res += d;
31    }
32    assert.strictEqual(res, 'helloworld');
33  }
34
35  {
36    // v1 stream error
37
38    const stream = new Stream();
39    stream.close = common.mustCall();
40    process.nextTick(() => {
41      stream.emit('data', 0);
42      stream.emit('data', 1);
43      stream.emit('error', new Error('asd'));
44    });
45
46    const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
47    await iter.next()
48      .then(common.mustNotCall())
49      .catch(common.mustCall((err) => {
50        assert.strictEqual(err.message, 'asd');
51      }));
52  }
53
54  {
55    // Non standard stream cleanup
56
57    const readable = new Readable({ autoDestroy: false, read() {} });
58    readable.push('asd');
59    readable.push('asd');
60    readable.destroy = null;
61    readable.close = common.mustCall(() => {
62      readable.emit('close');
63    });
64
65    await (async () => {
66      for await (const d of readable) {
67        return;
68      }
69    })();
70  }
71
72  {
73    const readable = new Readable({ objectMode: true, read() {} });
74    readable.push(0);
75    readable.push(1);
76    readable.push(null);
77
78    const iter = readable[Symbol.asyncIterator]();
79    assert.strictEqual((await iter.next()).value, 0);
80    for await (const d of iter) {
81      assert.strictEqual(d, 1);
82    }
83  }
84
85  {
86    console.log('read without for..await');
87    const max = 5;
88    const readable = new Readable({
89      objectMode: true,
90      read() {}
91    });
92
93    const iter = readable[Symbol.asyncIterator]();
94    assert.strictEqual(iter.stream, readable);
95    const values = [];
96    for (let i = 0; i < max; i++) {
97      values.push(iter.next());
98    }
99    Promise.all(values).then(common.mustCall((values) => {
100      values.forEach(common.mustCall(
101        (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5));
102    }));
103
104    readable.push('hello-0');
105    readable.push('hello-1');
106    readable.push('hello-2');
107    readable.push('hello-3');
108    readable.push('hello-4');
109    readable.push(null);
110
111    const last = await iter.next();
112    assert.strictEqual(last.done, true);
113  }
114
115  {
116    console.log('read without for..await deferred');
117    const readable = new Readable({
118      objectMode: true,
119      read() {}
120    });
121
122    const iter = readable[Symbol.asyncIterator]();
123    assert.strictEqual(iter.stream, readable);
124    let values = [];
125    for (let i = 0; i < 3; i++) {
126      values.push(iter.next());
127    }
128
129    readable.push('hello-0');
130    readable.push('hello-1');
131    readable.push('hello-2');
132
133    let k = 0;
134    const results1 = await Promise.all(values);
135    results1.forEach(common.mustCall(
136      (item) => assert.strictEqual(item.value, 'hello-' + k++), 3));
137
138    values = [];
139    for (let i = 0; i < 2; i++) {
140      values.push(iter.next());
141    }
142
143    readable.push('hello-3');
144    readable.push('hello-4');
145    readable.push(null);
146
147    const results2 = await Promise.all(values);
148    results2.forEach(common.mustCall(
149      (item) => assert.strictEqual(item.value, 'hello-' + k++), 2));
150
151    const last = await iter.next();
152    assert.strictEqual(last.done, true);
153  }
154
155  {
156    console.log('read without for..await with errors');
157    const max = 3;
158    const readable = new Readable({
159      objectMode: true,
160      read() {}
161    });
162
163    const iter = readable[Symbol.asyncIterator]();
164    assert.strictEqual(iter.stream, readable);
165    const values = [];
166    const errors = [];
167    let i;
168    for (i = 0; i < max; i++) {
169      values.push(iter.next());
170    }
171    for (i = 0; i < 2; i++) {
172      errors.push(iter.next());
173    }
174
175    readable.push('hello-0');
176    readable.push('hello-1');
177    readable.push('hello-2');
178
179    const resolved = await Promise.all(values);
180
181    resolved.forEach(common.mustCall(
182      (item, i) => assert.strictEqual(item.value, 'hello-' + i), max));
183
184    errors.slice(0, 1).forEach((promise) => {
185      promise.catch(common.mustCall((err) => {
186        assert.strictEqual(err.message, 'kaboom');
187      }));
188    });
189
190    errors.slice(1).forEach((promise) => {
191      promise.then(common.mustCall(({ done, value }) => {
192        assert.strictEqual(done, true);
193        assert.strictEqual(value, undefined);
194      }));
195    });
196
197    readable.destroy(new Error('kaboom'));
198  }
199
200  {
201    console.log('call next() after error');
202    const readable = new Readable({
203      read() {}
204    });
205    const iterator = readable[Symbol.asyncIterator]();
206
207    const err = new Error('kaboom');
208    readable.destroy(err);
209    await assert.rejects(iterator.next.bind(iterator), err);
210  }
211
212  {
213    console.log('read object mode');
214    const max = 42;
215    let readed = 0;
216    let received = 0;
217    const readable = new Readable({
218      objectMode: true,
219      read() {
220        this.push('hello');
221        if (++readed === max) {
222          this.push(null);
223        }
224      }
225    });
226
227    for await (const k of readable) {
228      received++;
229      assert.strictEqual(k, 'hello');
230    }
231
232    assert.strictEqual(readed, received);
233  }
234
235  {
236    console.log('destroy sync');
237    const readable = new Readable({
238      objectMode: true,
239      read() {
240        this.destroy(new Error('kaboom from read'));
241      }
242    });
243
244    let err;
245    try {
246      // eslint-disable-next-line no-unused-vars, no-empty
247      for await (const k of readable) { }
248    } catch (e) {
249      err = e;
250    }
251    assert.strictEqual(err.message, 'kaboom from read');
252  }
253
254  {
255    console.log('destroy async');
256    const readable = new Readable({
257      objectMode: true,
258      read() {
259        if (!this.pushed) {
260          this.push('hello');
261          this.pushed = true;
262
263          setImmediate(() => {
264            this.destroy(new Error('kaboom'));
265          });
266        }
267      }
268    });
269
270    let received = 0;
271
272    let err = null;
273    try {
274      // eslint-disable-next-line no-unused-vars
275      for await (const k of readable) {
276        received++;
277      }
278    } catch (e) {
279      err = e;
280    }
281
282    assert.strictEqual(err.message, 'kaboom');
283    assert.strictEqual(received, 1);
284  }
285
286  {
287    console.log('destroyed by throw');
288    const readable = new Readable({
289      objectMode: true,
290      read() {
291        this.push('hello');
292      }
293    });
294
295    let err = null;
296    try {
297      for await (const k of readable) {
298        assert.strictEqual(k, 'hello');
299        throw new Error('kaboom');
300      }
301    } catch (e) {
302      err = e;
303    }
304
305    assert.strictEqual(err.message, 'kaboom');
306    assert.strictEqual(readable.destroyed, true);
307  }
308
309  {
310    console.log('destroyed sync after push');
311    const readable = new Readable({
312      objectMode: true,
313      read() {
314        this.push('hello');
315        this.destroy(new Error('kaboom'));
316      }
317    });
318
319    let received = 0;
320
321    let err = null;
322    try {
323      for await (const k of readable) {
324        assert.strictEqual(k, 'hello');
325        received++;
326      }
327    } catch (e) {
328      err = e;
329    }
330
331    assert.strictEqual(err.message, 'kaboom');
332    assert.strictEqual(received, 1);
333  }
334
335  {
336    console.log('destroyed will not deadlock');
337    const readable = new Readable();
338    readable.destroy();
339    process.nextTick(async () => {
340      readable.on('close', common.mustNotCall());
341      let received = 0;
342      let err = null;
343      try {
344        for await (const k of readable) {
345          // Just make linting pass. This should never run.
346          assert.strictEqual(k, 'hello');
347          received++;
348        }
349      } catch (_err) {
350        err = _err;
351      }
352      assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
353      assert.strictEqual(received, 0);
354    });
355  }
356
357  {
358    console.log('push async');
359    const max = 42;
360    let readed = 0;
361    let received = 0;
362    const readable = new Readable({
363      objectMode: true,
364      read() {
365        setImmediate(() => {
366          this.push('hello');
367          if (++readed === max) {
368            this.push(null);
369          }
370        });
371      }
372    });
373
374    for await (const k of readable) {
375      received++;
376      assert.strictEqual(k, 'hello');
377    }
378
379    assert.strictEqual(readed, received);
380  }
381
382  {
383    console.log('push binary async');
384    const max = 42;
385    let readed = 0;
386    const readable = new Readable({
387      read() {
388        setImmediate(() => {
389          this.push('hello');
390          if (++readed === max) {
391            this.push(null);
392          }
393        });
394      }
395    });
396
397    let expected = '';
398    readable.setEncoding('utf8');
399    readable.pause();
400    readable.on('data', (chunk) => {
401      expected += chunk;
402    });
403
404    let data = '';
405    for await (const k of readable) {
406      data += k;
407    }
408
409    assert.strictEqual(data, expected);
410  }
411
412  {
413    console.log('.next() on destroyed stream');
414    const readable = new Readable({
415      read() {
416        // no-op
417      }
418    });
419
420    readable.destroy();
421
422    const it = await readable[Symbol.asyncIterator]();
423    const next = it.next();
424    next
425      .then(common.mustNotCall())
426      .catch(common.mustCall((err) => {
427        assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
428      }));
429  }
430
431  {
432    console.log('.next() on pipelined stream');
433    const readable = new Readable({
434      read() {
435        // no-op
436      }
437    });
438
439    const passthrough = new PassThrough();
440    const err = new Error('kaboom');
441    pipeline(readable, passthrough, common.mustCall((e) => {
442      assert.strictEqual(e, err);
443    }));
444    readable.destroy(err);
445    await assert.rejects(
446      readable[Symbol.asyncIterator]().next(),
447      (e) => {
448        assert.strictEqual(e, err);
449        return true;
450      }
451    );
452  }
453
454  {
455    console.log('iterating on an ended stream completes');
456    const r = new Readable({
457      objectMode: true,
458      read() {
459        this.push('asdf');
460        this.push('hehe');
461        this.push(null);
462      }
463    });
464    // eslint-disable-next-line no-unused-vars, no-empty
465    for await (const a of r) { }
466    // eslint-disable-next-line no-unused-vars, no-empty
467    for await (const b of r) { }
468  }
469
470  {
471    console.log('destroy mid-stream errors');
472    const r = new Readable({
473      objectMode: true,
474      read() {
475        this.push('asdf');
476        this.push('hehe');
477      }
478    });
479
480    let err = null;
481    try {
482      // eslint-disable-next-line no-unused-vars
483      for await (const a of r) {
484        r.destroy(null);
485      }
486    } catch (_err) {
487      err = _err;
488    }
489    assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
490  }
491
492  {
493    console.log('readable side of a transform stream pushes null');
494    const transform = new Transform({
495      objectMode: true,
496      transform: (chunk, enc, cb) => { cb(null, chunk); }
497    });
498    transform.push(0);
499    transform.push(1);
500    process.nextTick(() => {
501      transform.push(null);
502    });
503
504    const mustReach = [ common.mustCall(), common.mustCall() ];
505
506    const iter = transform[Symbol.asyncIterator]();
507    assert.strictEqual((await iter.next()).value, 0);
508
509    for await (const d of iter) {
510      assert.strictEqual(d, 1);
511      mustReach[0]();
512    }
513    mustReach[1]();
514  }
515
516  {
517    console.log('all next promises must be resolved on end');
518    const r = new Readable({
519      objectMode: true,
520      read() {
521      }
522    });
523
524    const b = r[Symbol.asyncIterator]();
525    const c = b.next();
526    const d = b.next();
527    r.push(null);
528    assert.deepStrictEqual(await c, { done: true, value: undefined });
529    assert.deepStrictEqual(await d, { done: true, value: undefined });
530  }
531
532  {
533    console.log('all next promises must be rejected on destroy');
534    const r = new Readable({
535      objectMode: true,
536      read() {
537      }
538    });
539
540    const b = r[Symbol.asyncIterator]();
541    const c = b.next();
542    const d = b.next();
543    r.destroy();
544    c
545      .then(common.mustNotCall())
546      .catch(common.mustCall((err) => {
547        assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
548      }));
549    assert.deepStrictEqual(await d, { done: true, value: undefined });
550  }
551
552  {
553    console.log('all next promises must be resolved on destroy with error');
554    const r = new Readable({
555      objectMode: true,
556      read() {
557      }
558    });
559
560    const b = r[Symbol.asyncIterator]();
561    const c = b.next();
562    const d = b.next();
563    const err = new Error('kaboom');
564    r.destroy(err);
565
566    await Promise.all([(async () => {
567      let e;
568      try {
569        await c;
570      } catch (_e) {
571        e = _e;
572      }
573      assert.strictEqual(e, err);
574    })(), (async () => {
575      let e;
576      let x;
577      try {
578        x = await d;
579      } catch (_e) {
580        e = _e;
581      }
582      assert.strictEqual(e, undefined);
583      assert.strictEqual(x.done, true);
584      assert.strictEqual(x.value, undefined);
585    })()]);
586  }
587
588  {
589    const _err = new Error('asd');
590    const r = new Readable({
591      read() {
592      },
593      destroy(err, callback) {
594        setTimeout(() => callback(_err), 1);
595      }
596    });
597
598    r.destroy();
599    const it = r[Symbol.asyncIterator]();
600    it.next().catch(common.mustCall((err) => {
601      assert.strictEqual(err, _err);
602    }));
603  }
604
605  {
606    // Don't destroy if no auto destroy.
607    // https://github.com/nodejs/node/issues/35116
608
609    const r = new Readable({
610      autoDestroy: false,
611      read() {
612        this.push('asd');
613        this.push(null);
614      }
615    });
616
617    for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty
618    assert.strictEqual(r.destroyed, false);
619  }
620
621  {
622    // Destroy if no auto destroy and premature break.
623    // https://github.com/nodejs/node/pull/35122/files#r485678318
624
625    const r = new Readable({
626      autoDestroy: false,
627      read() {
628        this.push('asd');
629      }
630    });
631
632    for await (const chunk of r) { // eslint-disable-line no-unused-vars
633      break;
634    }
635    assert.strictEqual(r.destroyed, true);
636  }
637
638  {
639    // Don't destroy before 'end'.
640
641    const r = new Readable({
642      read() {
643        this.push('asd');
644        this.push(null);
645      }
646    }).on('end', () => {
647      assert.strictEqual(r.destroyed, false);
648    });
649
650    for await (const chunk of r) { } // eslint-disable-line no-unused-vars, no-empty
651    assert.strictEqual(r.destroyed, true);
652  }
653}
654
655{
656  // AsyncIterator return should end even when destroy
657  // does not implement the callback API.
658
659  const r = new Readable({
660    objectMode: true,
661    read() {
662    }
663  });
664
665  const originalDestroy = r.destroy;
666  r.destroy = (err) => {
667    originalDestroy.call(r, err);
668  };
669  const it = r[Symbol.asyncIterator]();
670  const p = it.return();
671  r.push(null);
672  p.then(common.mustCall());
673}
674
675
676{
677  // AsyncIterator return should not error with
678  // premature close.
679
680  const r = new Readable({
681    objectMode: true,
682    read() {
683    }
684  });
685
686  const originalDestroy = r.destroy;
687  r.destroy = (err) => {
688    originalDestroy.call(r, err);
689  };
690  const it = r[Symbol.asyncIterator]();
691  const p = it.return();
692  r.emit('close');
693  p.then(common.mustCall()).catch(common.mustNotCall());
694}
695
696{
697  // AsyncIterator should not finish correctly if destroyed.
698
699  const r = new Readable({
700    objectMode: true,
701    read() {
702    }
703  });
704
705  r.destroy();
706  r.on('close', () => {
707    const it = r[Symbol.asyncIterator]();
708    const next = it.next();
709    next
710      .then(common.mustNotCall())
711      .catch(common.mustCall((err) => {
712        assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
713      }));
714  });
715}
716
717{
718  // AsyncIterator should throw if prematurely closed
719  // before end has been emitted.
720  (async function() {
721    const readable = fs.createReadStream(__filename);
722
723    try {
724      // eslint-disable-next-line no-unused-vars
725      for await (const chunk of readable) {
726        readable.close();
727      }
728
729      assert.fail('should have thrown');
730    } catch (err) {
731      assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
732    }
733
734    assert.ok(readable.destroyed);
735  })().then(common.mustCall());
736}
737
738// AsyncIterator non-destroying iterator
739{
740  function createReadable() {
741    return Readable.from((async function* () {
742      await Promise.resolve();
743      yield 5;
744      await Promise.resolve();
745      yield 7;
746      await Promise.resolve();
747    })());
748  }
749
750  // Check explicit destroying on return
751  (async function() {
752    const readable = createReadable();
753    for await (const chunk of readable.iterator({ destroyOnReturn: true })) {
754      assert.strictEqual(chunk, 5);
755      break;
756    }
757
758    assert.ok(readable.destroyed);
759  })().then(common.mustCall());
760
761  // Check explicit non-destroy with return true
762  (async function() {
763    const readable = createReadable();
764    const opts = { destroyOnReturn: false };
765    for await (const chunk of readable.iterator(opts)) {
766      assert.strictEqual(chunk, 5);
767      break;
768    }
769
770    assert.ok(!readable.destroyed);
771
772    for await (const chunk of readable.iterator(opts)) {
773      assert.strictEqual(chunk, 7);
774    }
775
776    assert.ok(readable.destroyed);
777  })().then(common.mustCall());
778
779  // Check non-object options.
780  {
781    const readable = createReadable();
782    assert.throws(
783      () => readable.iterator(42),
784      {
785        code: 'ERR_INVALID_ARG_TYPE',
786        name: 'TypeError',
787        message: 'The "options" argument must be of type object. Received ' +
788                 'type number (42)',
789      }
790    );
791  }
792
793  // Check for dangling listeners
794  (async function() {
795    const readable = createReadable();
796    const opts = { destroyOnReturn: false };
797    while (readable.readable) {
798      // eslint-disable-next-line no-unused-vars
799      for await (const chunk of readable.iterator(opts)) {
800        break;
801      }
802    }
803
804    assert.deepStrictEqual(readable.eventNames(), []);
805  })().then(common.mustCall());
806}
807
808{
809  let _req;
810  const server = http.createServer((request, response) => {
811    response.statusCode = 404;
812    response.write('never ends');
813  });
814
815  server.listen(() => {
816    _req = http.request(`http://localhost:${server.address().port}`)
817      .on('response', common.mustCall(async (res) => {
818        setTimeout(() => {
819          _req.destroy(new Error('something happened'));
820        }, 100);
821
822        res.on('error', common.mustCall());
823
824        let _err;
825        try {
826          // eslint-disable-next-line no-unused-vars, no-empty
827          for await (const chunk of res) { }
828        } catch (err) {
829          _err = err;
830        }
831
832        assert.strictEqual(_err.code, 'ECONNRESET');
833        server.close();
834      }))
835      .on('error', common.mustCall())
836      .end();
837  });
838}
839
840{
841  async function getParsedBody(request) {
842    let body = '';
843
844    for await (const data of request) {
845      body += data;
846    }
847
848    try {
849      return JSON.parse(body);
850    } catch {
851      return {};
852    }
853  }
854
855  const str = JSON.stringify({ asd: true });
856  const server = http.createServer(async (request, response) => {
857    const body = await getParsedBody(request);
858    response.statusCode = 200;
859    assert.strictEqual(JSON.stringify(body), str);
860    response.end(JSON.stringify(body));
861  }).listen(() => {
862    http
863      .request({
864        method: 'POST',
865        hostname: 'localhost',
866        port: server.address().port,
867      })
868      .end(str)
869      .on('response', async (res) => {
870        let body = '';
871        for await (const chunk of res) {
872          body += chunk;
873        }
874        assert.strictEqual(body, str);
875        server.close();
876      });
877  });
878}
879
880// To avoid missing some tests if a promise does not resolve
881tests().then(common.mustCall());
882