1'use strict';
2
3const common = require('../common');
4const {
5  Stream,
6  Writable,
7  Readable,
8  Transform,
9  pipeline,
10  PassThrough,
11  Duplex,
12  addAbortSignal,
13} = require('stream');
14const pipelinep = require('stream/promises').pipeline;
15const assert = require('assert');
16const http = require('http');
17const { promisify } = require('util');
18const net = require('net');
19const tsp = require('timers/promises');
20
21{
22  let finished = false;
23  const processed = [];
24  const expected = [
25    Buffer.from('a'),
26    Buffer.from('b'),
27    Buffer.from('c'),
28  ];
29
30  const read = new Readable({
31    read() {}
32  });
33
34  const write = new Writable({
35    write(data, enc, cb) {
36      processed.push(data);
37      cb();
38    }
39  });
40
41  write.on('finish', () => {
42    finished = true;
43  });
44
45  for (let i = 0; i < expected.length; i++) {
46    read.push(expected[i]);
47  }
48  read.push(null);
49
50  pipeline(read, write, common.mustSucceed(() => {
51    assert.ok(finished);
52    assert.deepStrictEqual(processed, expected);
53  }));
54}
55
56{
57  const read = new Readable({
58    read() {}
59  });
60
61  assert.throws(() => {
62    pipeline(read, () => {});
63  }, /ERR_MISSING_ARGS/);
64  assert.throws(() => {
65    pipeline(() => {});
66  }, /ERR_MISSING_ARGS/);
67  assert.throws(() => {
68    pipeline();
69  }, /ERR_INVALID_ARG_TYPE/);
70}
71
72{
73  const read = new Readable({
74    read() {}
75  });
76
77  const write = new Writable({
78    write(data, enc, cb) {
79      cb();
80    }
81  });
82
83  read.push('data');
84  setImmediate(() => read.destroy());
85
86  pipeline(read, write, common.mustCall((err) => {
87    assert.ok(err, 'should have an error');
88  }));
89}
90
91{
92  const read = new Readable({
93    read() {}
94  });
95
96  const write = new Writable({
97    write(data, enc, cb) {
98      cb();
99    }
100  });
101
102  read.push('data');
103  setImmediate(() => read.destroy(new Error('kaboom')));
104
105  const dst = pipeline(read, write, common.mustCall((err) => {
106    assert.deepStrictEqual(err, new Error('kaboom'));
107  }));
108
109  assert.strictEqual(dst, write);
110}
111
112{
113  const read = new Readable({
114    read() {}
115  });
116
117  const transform = new Transform({
118    transform(data, enc, cb) {
119      cb(new Error('kaboom'));
120    }
121  });
122
123  const write = new Writable({
124    write(data, enc, cb) {
125      cb();
126    }
127  });
128
129  read.on('close', common.mustCall());
130  transform.on('close', common.mustCall());
131  write.on('close', common.mustCall());
132
133  [read, transform, write].forEach((stream) => {
134    stream.on('error', common.mustCall((err) => {
135      assert.deepStrictEqual(err, new Error('kaboom'));
136    }));
137  });
138
139  const dst = pipeline(read, transform, write, common.mustCall((err) => {
140    assert.deepStrictEqual(err, new Error('kaboom'));
141  }));
142
143  assert.strictEqual(dst, write);
144
145  read.push('hello');
146}
147
148{
149  const server = http.createServer((req, res) => {
150    const rs = new Readable({
151      read() {
152        rs.push('hello');
153        rs.push(null);
154      }
155    });
156
157    pipeline(rs, res, () => {});
158  });
159
160  server.listen(0, () => {
161    const req = http.request({
162      port: server.address().port
163    });
164
165    req.end();
166    req.on('response', (res) => {
167      const buf = [];
168      res.on('data', (data) => buf.push(data));
169      res.on('end', common.mustCall(() => {
170        assert.deepStrictEqual(
171          Buffer.concat(buf),
172          Buffer.from('hello')
173        );
174        server.close();
175      }));
176    });
177  });
178}
179
180{
181  const server = http.createServer((req, res) => {
182    let sent = false;
183    const rs = new Readable({
184      read() {
185        if (sent) {
186          return;
187        }
188        sent = true;
189        rs.push('hello');
190      },
191      destroy: common.mustCall((err, cb) => {
192        // Prevents fd leaks by destroying http pipelines
193        cb();
194      })
195    });
196
197    pipeline(rs, res, () => {});
198  });
199
200  server.listen(0, () => {
201    const req = http.request({
202      port: server.address().port
203    });
204
205    req.end();
206    req.on('response', (res) => {
207      setImmediate(() => {
208        res.destroy();
209        server.close();
210      });
211    });
212  });
213}
214
215{
216  const server = http.createServer((req, res) => {
217    let sent = 0;
218    const rs = new Readable({
219      read() {
220        if (sent++ > 10) {
221          return;
222        }
223        rs.push('hello');
224      },
225      destroy: common.mustCall((err, cb) => {
226        cb();
227      })
228    });
229
230    pipeline(rs, res, () => {});
231  });
232
233  let cnt = 10;
234
235  const badSink = new Writable({
236    write(data, enc, cb) {
237      cnt--;
238      if (cnt === 0) cb(new Error('kaboom'));
239      else cb();
240    }
241  });
242
243  server.listen(0, () => {
244    const req = http.request({
245      port: server.address().port
246    });
247
248    req.end();
249    req.on('response', (res) => {
250      pipeline(res, badSink, common.mustCall((err) => {
251        assert.deepStrictEqual(err, new Error('kaboom'));
252        server.close();
253      }));
254    });
255  });
256}
257
258{
259  const server = http.createServer((req, res) => {
260    pipeline(req, res, common.mustSucceed());
261  });
262
263  server.listen(0, () => {
264    const req = http.request({
265      port: server.address().port
266    });
267
268    let sent = 0;
269    const rs = new Readable({
270      read() {
271        if (sent++ > 10) {
272          return;
273        }
274        rs.push('hello');
275      }
276    });
277
278    pipeline(rs, req, common.mustCall(() => {
279      server.close();
280    }));
281
282    req.on('response', (res) => {
283      let cnt = 10;
284      res.on('data', () => {
285        cnt--;
286        if (cnt === 0) rs.destroy();
287      });
288    });
289  });
290}
291
292{
293  const makeTransform = () => {
294    const tr = new Transform({
295      transform(data, enc, cb) {
296        cb(null, data);
297      }
298    });
299
300    tr.on('close', common.mustCall());
301    return tr;
302  };
303
304  const rs = new Readable({
305    read() {
306      rs.push('hello');
307    }
308  });
309
310  let cnt = 10;
311
312  const ws = new Writable({
313    write(data, enc, cb) {
314      cnt--;
315      if (cnt === 0) return cb(new Error('kaboom'));
316      cb();
317    }
318  });
319
320  rs.on('close', common.mustCall());
321  ws.on('close', common.mustCall());
322
323  pipeline(
324    rs,
325    makeTransform(),
326    makeTransform(),
327    makeTransform(),
328    makeTransform(),
329    makeTransform(),
330    makeTransform(),
331    ws,
332    common.mustCall((err) => {
333      assert.deepStrictEqual(err, new Error('kaboom'));
334    })
335  );
336}
337
338{
339  const oldStream = new Stream();
340
341  oldStream.pause = oldStream.resume = () => {};
342  oldStream.write = (data) => {
343    oldStream.emit('data', data);
344    return true;
345  };
346  oldStream.end = () => {
347    oldStream.emit('end');
348  };
349
350  const expected = [
351    Buffer.from('hello'),
352    Buffer.from('world'),
353  ];
354
355  const rs = new Readable({
356    read() {
357      for (let i = 0; i < expected.length; i++) {
358        rs.push(expected[i]);
359      }
360      rs.push(null);
361    }
362  });
363
364  const ws = new Writable({
365    write(data, enc, cb) {
366      assert.deepStrictEqual(data, expected.shift());
367      cb();
368    }
369  });
370
371  let finished = false;
372
373  ws.on('finish', () => {
374    finished = true;
375  });
376
377  pipeline(
378    rs,
379    oldStream,
380    ws,
381    common.mustSucceed(() => {
382      assert(finished, 'last stream finished');
383    })
384  );
385}
386
387{
388  const oldStream = new Stream();
389
390  oldStream.pause = oldStream.resume = () => {};
391  oldStream.write = (data) => {
392    oldStream.emit('data', data);
393    return true;
394  };
395  oldStream.end = () => {
396    oldStream.emit('end');
397  };
398
399  const destroyableOldStream = new Stream();
400
401  destroyableOldStream.pause = destroyableOldStream.resume = () => {};
402  destroyableOldStream.destroy = common.mustCall(() => {
403    destroyableOldStream.emit('close');
404  });
405  destroyableOldStream.write = (data) => {
406    destroyableOldStream.emit('data', data);
407    return true;
408  };
409  destroyableOldStream.end = () => {
410    destroyableOldStream.emit('end');
411  };
412
413  const rs = new Readable({
414    read() {
415      rs.destroy(new Error('stop'));
416    }
417  });
418
419  const ws = new Writable({
420    write(data, enc, cb) {
421      cb();
422    }
423  });
424
425  let finished = false;
426
427  ws.on('finish', () => {
428    finished = true;
429  });
430
431  pipeline(
432    rs,
433    oldStream,
434    destroyableOldStream,
435    ws,
436    common.mustCall((err) => {
437      assert.deepStrictEqual(err, new Error('stop'));
438      assert(!finished, 'should not finish');
439    })
440  );
441}
442
443{
444  const pipelinePromise = promisify(pipeline);
445
446  async function run() {
447    const read = new Readable({
448      read() {}
449    });
450
451    const write = new Writable({
452      write(data, enc, cb) {
453        cb();
454      }
455    });
456
457    read.push('data');
458    read.push(null);
459
460    let finished = false;
461
462    write.on('finish', () => {
463      finished = true;
464    });
465
466    await pipelinePromise(read, write);
467
468    assert(finished);
469  }
470
471  run();
472}
473
474{
475  // Check aborted signal without values
476  const pipelinePromise = promisify(pipeline);
477  async function run() {
478    const ac = new AbortController();
479    const { signal } = ac;
480    async function* producer() {
481      ac.abort();
482      await Promise.resolve();
483      yield '8';
484    }
485
486    const w = new Writable({
487      write(chunk, encoding, callback) {
488        callback();
489      }
490    });
491    await pipelinePromise(producer, w, { signal });
492  }
493
494  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
495}
496
497{
498  // Check aborted signal after init.
499  const pipelinePromise = promisify(pipeline);
500  async function run() {
501    const ac = new AbortController();
502    const { signal } = ac;
503    async function* producer() {
504      yield '5';
505      await Promise.resolve();
506      ac.abort();
507      await Promise.resolve();
508      yield '8';
509    }
510
511    const w = new Writable({
512      write(chunk, encoding, callback) {
513        callback();
514      }
515    });
516    await pipelinePromise(producer, w, { signal });
517  }
518
519  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
520}
521
522{
523  // Check pre-aborted signal
524  const pipelinePromise = promisify(pipeline);
525  async function run() {
526    const signal = AbortSignal.abort();
527    async function* producer() {
528      yield '5';
529      await Promise.resolve();
530      yield '8';
531    }
532
533    const w = new Writable({
534      write(chunk, encoding, callback) {
535        callback();
536      }
537    });
538    await pipelinePromise(producer, w, { signal });
539  }
540
541  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
542}
543
544{
545  const read = new Readable({
546    read() {}
547  });
548
549  const transform = new Transform({
550    transform(data, enc, cb) {
551      cb(new Error('kaboom'));
552    }
553  });
554
555  const write = new Writable({
556    write(data, enc, cb) {
557      cb();
558    }
559  });
560
561  assert.throws(
562    () => pipeline(read, transform, write),
563    { code: 'ERR_INVALID_ARG_TYPE' }
564  );
565}
566
567{
568  const server = http.Server(function(req, res) {
569    res.write('asd');
570  });
571  server.listen(0, function() {
572    http.get({ port: this.address().port }, (res) => {
573      const stream = new PassThrough();
574
575      stream.on('error', common.mustCall());
576
577      pipeline(
578        res,
579        stream,
580        common.mustCall((err) => {
581          assert.strictEqual(err.message, 'oh no');
582          server.close();
583        })
584      );
585
586      stream.destroy(new Error('oh no'));
587    }).on('error', common.mustNotCall());
588  });
589}
590
591{
592  let res = '';
593  const w = new Writable({
594    write(chunk, encoding, callback) {
595      res += chunk;
596      callback();
597    }
598  });
599  pipeline(function*() {
600    yield 'hello';
601    yield 'world';
602  }(), w, common.mustSucceed(() => {
603    assert.strictEqual(res, 'helloworld');
604  }));
605}
606
607{
608  let res = '';
609  const w = new Writable({
610    write(chunk, encoding, callback) {
611      res += chunk;
612      callback();
613    }
614  });
615  pipeline(async function*() {
616    await Promise.resolve();
617    yield 'hello';
618    yield 'world';
619  }(), w, common.mustSucceed(() => {
620    assert.strictEqual(res, 'helloworld');
621  }));
622}
623
624{
625  let res = '';
626  const w = new Writable({
627    write(chunk, encoding, callback) {
628      res += chunk;
629      callback();
630    }
631  });
632  pipeline(function*() {
633    yield 'hello';
634    yield 'world';
635  }, w, common.mustSucceed(() => {
636    assert.strictEqual(res, 'helloworld');
637  }));
638}
639
640{
641  let res = '';
642  const w = new Writable({
643    write(chunk, encoding, callback) {
644      res += chunk;
645      callback();
646    }
647  });
648  pipeline(async function*() {
649    await Promise.resolve();
650    yield 'hello';
651    yield 'world';
652  }, w, common.mustSucceed(() => {
653    assert.strictEqual(res, 'helloworld');
654  }));
655}
656
657{
658  let res = '';
659  pipeline(async function*() {
660    await Promise.resolve();
661    yield 'hello';
662    yield 'world';
663  }, async function*(source) {
664    for await (const chunk of source) {
665      yield chunk.toUpperCase();
666    }
667  }, async function(source) {
668    for await (const chunk of source) {
669      res += chunk;
670    }
671  }, common.mustSucceed(() => {
672    assert.strictEqual(res, 'HELLOWORLD');
673  }));
674}
675
676{
677  pipeline(async function*() {
678    await Promise.resolve();
679    yield 'hello';
680    yield 'world';
681  }, async function*(source) {
682    for await (const chunk of source) {
683      yield chunk.toUpperCase();
684    }
685  }, async function(source) {
686    let ret = '';
687    for await (const chunk of source) {
688      ret += chunk;
689    }
690    return ret;
691  }, common.mustSucceed((val) => {
692    assert.strictEqual(val, 'HELLOWORLD');
693  }));
694}
695
696{
697  // AsyncIterable destination is returned and finalizes.
698
699  const ret = pipeline(async function*() {
700    await Promise.resolve();
701    yield 'hello';
702  }, async function*(source) { // eslint-disable-line require-yield
703    for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty
704  }, common.mustCall((err) => {
705    assert.strictEqual(err, undefined);
706  }));
707  ret.resume();
708  assert.strictEqual(typeof ret.pipe, 'function');
709}
710
711{
712  // AsyncFunction destination is not returned and error is
713  // propagated.
714
715  const ret = pipeline(async function*() { // eslint-disable-line require-yield
716    await Promise.resolve();
717    throw new Error('kaboom');
718  }, async function*(source) { // eslint-disable-line require-yield
719    for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty
720  }, common.mustCall((err) => {
721    assert.strictEqual(err.message, 'kaboom');
722  }));
723  ret.resume();
724  assert.strictEqual(typeof ret.pipe, 'function');
725}
726
727{
728  const s = new PassThrough();
729  pipeline(async function*() { // eslint-disable-line require-yield
730    throw new Error('kaboom');
731  }, s, common.mustCall((err) => {
732    assert.strictEqual(err.message, 'kaboom');
733    assert.strictEqual(s.destroyed, true);
734  }));
735}
736
737{
738  const s = new PassThrough();
739  pipeline(async function*() { // eslint-disable-line require-yield
740    throw new Error('kaboom');
741  }(), s, common.mustCall((err) => {
742    assert.strictEqual(err.message, 'kaboom');
743    assert.strictEqual(s.destroyed, true);
744  }));
745}
746
747{
748  const s = new PassThrough();
749  pipeline(function*() { // eslint-disable-line require-yield
750    throw new Error('kaboom');
751  }, s, common.mustCall((err, val) => {
752    assert.strictEqual(err.message, 'kaboom');
753    assert.strictEqual(s.destroyed, true);
754  }));
755}
756
757{
758  const s = new PassThrough();
759  pipeline(function*() { // eslint-disable-line require-yield
760    throw new Error('kaboom');
761  }(), s, common.mustCall((err, val) => {
762    assert.strictEqual(err.message, 'kaboom');
763    assert.strictEqual(s.destroyed, true);
764  }));
765}
766
767{
768  const s = new PassThrough();
769  pipeline(async function*() {
770    await Promise.resolve();
771    yield 'hello';
772    yield 'world';
773  }, s, async function(source) {
774    for await (const chunk of source) { // eslint-disable-line no-unused-vars
775      throw new Error('kaboom');
776    }
777  }, common.mustCall((err, val) => {
778    assert.strictEqual(err.message, 'kaboom');
779    assert.strictEqual(s.destroyed, true);
780  }));
781}
782
783{
784  const s = new PassThrough();
785  const ret = pipeline(function() {
786    return ['hello', 'world'];
787  }, s, async function*(source) { // eslint-disable-line require-yield
788    for await (const chunk of source) { // eslint-disable-line no-unused-vars
789      throw new Error('kaboom');
790    }
791  }, common.mustCall((err) => {
792    assert.strictEqual(err.message, 'kaboom');
793    assert.strictEqual(s.destroyed, true);
794  }));
795  ret.resume();
796  assert.strictEqual(typeof ret.pipe, 'function');
797}
798
799{
800  // Legacy streams without async iterator.
801
802  const s = new PassThrough();
803  s.push('asd');
804  s.push(null);
805  s[Symbol.asyncIterator] = null;
806  let ret = '';
807  pipeline(s, async function(source) {
808    for await (const chunk of source) {
809      ret += chunk;
810    }
811  }, common.mustCall((err) => {
812    assert.strictEqual(err, undefined);
813    assert.strictEqual(ret, 'asd');
814  }));
815}
816
817{
818  // v1 streams without read().
819
820  const s = new Stream();
821  process.nextTick(() => {
822    s.emit('data', 'asd');
823    s.emit('end');
824  });
825  // 'destroyer' can be called multiple times,
826  // once from stream wrapper and
827  // once from iterator wrapper.
828  s.close = common.mustCallAtLeast(1);
829  let ret = '';
830  pipeline(s, async function(source) {
831    for await (const chunk of source) {
832      ret += chunk;
833    }
834  }, common.mustCall((err) => {
835    assert.strictEqual(err, undefined);
836    assert.strictEqual(ret, 'asd');
837  }));
838}
839
840{
841  // v1 error streams without read().
842
843  const s = new Stream();
844  process.nextTick(() => {
845    s.emit('error', new Error('kaboom'));
846  });
847  s.destroy = common.mustCall();
848  pipeline(s, async function(source) {
849  }, common.mustCall((err) => {
850    assert.strictEqual(err.message, 'kaboom');
851  }));
852}
853
854{
855  const s = new PassThrough();
856  assert.throws(() => {
857    pipeline(function(source) {
858    }, s, () => {});
859  }, (err) => {
860    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
861    assert.strictEqual(s.destroyed, false);
862    return true;
863  });
864}
865
866{
867  const s = new PassThrough();
868  assert.throws(() => {
869    pipeline(s, function(source) {
870    }, s, () => {});
871  }, (err) => {
872    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
873    assert.strictEqual(s.destroyed, false);
874    return true;
875  });
876}
877
878{
879  const s = new PassThrough();
880  assert.throws(() => {
881    pipeline(s, function(source) {
882    }, () => {});
883  }, (err) => {
884    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
885    assert.strictEqual(s.destroyed, false);
886    return true;
887  });
888}
889
890{
891  const s = new PassThrough();
892  assert.throws(() => {
893    pipeline(s, function*(source) {
894    }, () => {});
895  }, (err) => {
896    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
897    assert.strictEqual(s.destroyed, false);
898    return true;
899  });
900}
901
902{
903  let res = '';
904  pipeline(async function*() {
905    await Promise.resolve();
906    yield 'hello';
907    yield 'world';
908  }, new Transform({
909    transform(chunk, encoding, cb) {
910      cb(new Error('kaboom'));
911    }
912  }), async function(source) {
913    for await (const chunk of source) {
914      res += chunk;
915    }
916  }, common.mustCall((err) => {
917    assert.strictEqual(err.message, 'kaboom');
918    assert.strictEqual(res, '');
919  }));
920}
921
922{
923  let res = '';
924  pipeline(async function*() {
925    await Promise.resolve();
926    yield 'hello';
927    yield 'world';
928  }, new Transform({
929    transform(chunk, encoding, cb) {
930      process.nextTick(cb, new Error('kaboom'));
931    }
932  }), async function(source) {
933    for await (const chunk of source) {
934      res += chunk;
935    }
936  }, common.mustCall((err) => {
937    assert.strictEqual(err.message, 'kaboom');
938    assert.strictEqual(res, '');
939  }));
940}
941
942{
943  let res = '';
944  pipeline(async function*() {
945    await Promise.resolve();
946    yield 'hello';
947    yield 'world';
948  }, new Transform({
949    decodeStrings: false,
950    transform(chunk, encoding, cb) {
951      cb(null, chunk.toUpperCase());
952    }
953  }), async function(source) {
954    for await (const chunk of source) {
955      res += chunk;
956    }
957  }, common.mustSucceed(() => {
958    assert.strictEqual(res, 'HELLOWORLD');
959  }));
960}
961
962{
963  // Ensure no unhandled rejection from async function.
964
965  pipeline(async function*() {
966    yield 'hello';
967  }, async function(source) {
968    throw new Error('kaboom');
969  }, common.mustCall((err) => {
970    assert.strictEqual(err.message, 'kaboom');
971  }));
972}
973
974{
975  const src = new PassThrough({ autoDestroy: false });
976  const dst = new PassThrough({ autoDestroy: false });
977  pipeline(src, dst, common.mustCall(() => {
978    assert.strictEqual(src.destroyed, false);
979    assert.strictEqual(dst.destroyed, false);
980  }));
981  src.end();
982}
983
984{
985  // Make sure 'close' before 'end' finishes without error
986  // if readable has received eof.
987  // Ref: https://github.com/nodejs/node/issues/29699
988  const r = new Readable();
989  const w = new Writable({
990    write(chunk, encoding, cb) {
991      cb();
992    }
993  });
994  pipeline(r, w, (err) => {
995    assert.strictEqual(err, undefined);
996  });
997  r.push('asd');
998  r.push(null);
999  r.emit('close');
1000}
1001
1002{
1003  const server = http.createServer((req, res) => {
1004  });
1005
1006  server.listen(0, () => {
1007    const req = http.request({
1008      port: server.address().port
1009    });
1010
1011    const body = new PassThrough();
1012    pipeline(
1013      body,
1014      req,
1015      common.mustSucceed(() => {
1016        assert(!req.res);
1017        assert(!req.aborted);
1018        req.abort();
1019        server.close();
1020      })
1021    );
1022    body.end();
1023  });
1024}
1025
1026{
1027  const src = new PassThrough();
1028  const dst = new PassThrough();
1029  pipeline(src, dst, common.mustSucceed(() => {
1030    assert.strictEqual(dst.destroyed, false);
1031  }));
1032  src.end();
1033}
1034
1035{
1036  const src = new PassThrough();
1037  const dst = new PassThrough();
1038  dst.readable = false;
1039  pipeline(src, dst, common.mustSucceed(() => {
1040    assert.strictEqual(dst.destroyed, true);
1041  }));
1042  src.end();
1043}
1044
1045{
1046  let res = '';
1047  const rs = new Readable({
1048    read() {
1049      setImmediate(() => {
1050        rs.push('hello');
1051      });
1052    }
1053  });
1054  const ws = new Writable({
1055    write: common.mustNotCall()
1056  });
1057  pipeline(rs, async function*(stream) { // eslint-disable-line require-yield
1058    for await (const chunk of stream) { // eslint-disable-line no-unused-vars
1059      throw new Error('kaboom');
1060    }
1061  }, async function *(source) { // eslint-disable-line require-yield
1062    for await (const chunk of source) {
1063      res += chunk;
1064    }
1065  }, ws, common.mustCall((err) => {
1066    assert.strictEqual(err.message, 'kaboom');
1067    assert.strictEqual(res, '');
1068  }));
1069}
1070
1071{
1072  const server = http.createServer((req, res) => {
1073    req.socket.on('error', common.mustNotCall());
1074    pipeline(req, new PassThrough(), (err) => {
1075      assert.ifError(err);
1076      res.end();
1077      server.close();
1078    });
1079  });
1080
1081  server.listen(0, () => {
1082    const req = http.request({
1083      method: 'PUT',
1084      port: server.address().port
1085    });
1086    req.end('asd123');
1087    req.on('response', common.mustCall());
1088    req.on('error', common.mustNotCall());
1089  });
1090}
1091
1092{
1093  // Might still want to be able to use the writable side
1094  // of src. This is in the case where e.g. the Duplex input
1095  // is not directly connected to its output. Such a case could
1096  // happen when the Duplex is reading from a socket and then echos
1097  // the data back on the same socket.
1098  const src = new PassThrough();
1099  assert.strictEqual(src.writable, true);
1100  const dst = new PassThrough();
1101  pipeline(src, dst, common.mustCall((err) => {
1102    assert.strictEqual(src.writable, true);
1103    assert.strictEqual(src.destroyed, false);
1104  }));
1105  src.push(null);
1106}
1107
1108{
1109  const src = new PassThrough();
1110  const dst = pipeline(
1111    src,
1112    async function * (source) {
1113      for await (const chunk of source) {
1114        yield chunk;
1115      }
1116    },
1117    common.mustCall((err) => {
1118      assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
1119    })
1120  );
1121  src.push('asd');
1122  dst.destroy();
1123}
1124
1125{
1126  pipeline(async function * () {
1127    yield 'asd';
1128  }, async function * (source) {
1129    for await (const chunk of source) {
1130      yield { chunk };
1131    }
1132  }, common.mustSucceed());
1133}
1134
1135{
1136  let closed = false;
1137  const src = new Readable({
1138    read() {},
1139    destroy(err, cb) {
1140      process.nextTick(cb);
1141    }
1142  });
1143  const dst = new Writable({
1144    write(chunk, encoding, callback) {
1145      callback();
1146    }
1147  });
1148  src.on('close', () => {
1149    closed = true;
1150  });
1151  src.push(null);
1152  pipeline(src, dst, common.mustCall((err) => {
1153    assert.strictEqual(closed, true);
1154  }));
1155}
1156
1157{
1158  let closed = false;
1159  const src = new Readable({
1160    read() {},
1161    destroy(err, cb) {
1162      process.nextTick(cb);
1163    }
1164  });
1165  const dst = new Duplex({});
1166  src.on('close', common.mustCall(() => {
1167    closed = true;
1168  }));
1169  src.push(null);
1170  pipeline(src, dst, common.mustCall((err) => {
1171    assert.strictEqual(closed, true);
1172  }));
1173}
1174
1175{
1176  const server = net.createServer(common.mustCall((socket) => {
1177    // echo server
1178    pipeline(socket, socket, common.mustSucceed());
1179    // 13 force destroys the socket before it has a chance to emit finish
1180    socket.on('finish', common.mustCall(() => {
1181      server.close();
1182    }));
1183  })).listen(0, common.mustCall(() => {
1184    const socket = net.connect(server.address().port);
1185    socket.end();
1186  }));
1187}
1188
1189{
1190  const d = new Duplex({
1191    autoDestroy: false,
1192    write: common.mustCall((data, enc, cb) => {
1193      d.push(data);
1194      cb();
1195    }),
1196    read: common.mustCall(() => {
1197      d.push(null);
1198    }),
1199    final: common.mustCall((cb) => {
1200      setTimeout(() => {
1201        assert.strictEqual(d.destroyed, false);
1202        cb();
1203      }, 1000);
1204    }),
1205    destroy: common.mustNotCall()
1206  });
1207
1208  const sink = new Writable({
1209    write: common.mustCall((data, enc, cb) => {
1210      cb();
1211    })
1212  });
1213
1214  pipeline(d, sink, common.mustSucceed());
1215
1216  d.write('test');
1217  d.end();
1218}
1219
1220{
1221  const server = net.createServer(common.mustCall((socket) => {
1222    // echo server
1223    pipeline(socket, socket, common.mustSucceed());
1224    socket.on('finish', common.mustCall(() => {
1225      server.close();
1226    }));
1227  })).listen(0, common.mustCall(() => {
1228    const socket = net.connect(server.address().port);
1229    socket.end();
1230  }));
1231}
1232
1233{
1234  const d = new Duplex({
1235    autoDestroy: false,
1236    write: common.mustCall((data, enc, cb) => {
1237      d.push(data);
1238      cb();
1239    }),
1240    read: common.mustCall(() => {
1241      d.push(null);
1242    }),
1243    final: common.mustCall((cb) => {
1244      setTimeout(() => {
1245        assert.strictEqual(d.destroyed, false);
1246        cb();
1247      }, 1000);
1248    }),
1249    // `destroy()` won't be invoked by pipeline since
1250    // the writable side has not completed when
1251    // the pipeline has completed.
1252    destroy: common.mustNotCall()
1253  });
1254
1255  const sink = new Writable({
1256    write: common.mustCall((data, enc, cb) => {
1257      cb();
1258    })
1259  });
1260
1261  pipeline(d, sink, common.mustSucceed());
1262
1263  d.write('test');
1264  d.end();
1265}
1266
1267{
1268  const r = new Readable({
1269    read() {}
1270  });
1271  r.push('hello');
1272  r.push('world');
1273  r.push(null);
1274  let res = '';
1275  const w = new Writable({
1276    write(chunk, encoding, callback) {
1277      res += chunk;
1278      callback();
1279    }
1280  });
1281  pipeline([r, w], common.mustSucceed(() => {
1282    assert.strictEqual(res, 'helloworld');
1283  }));
1284}
1285
1286{
1287  let flushed = false;
1288  const makeStream = () =>
1289    new Transform({
1290      transform: (chunk, enc, cb) => cb(null, chunk),
1291      flush: (cb) =>
1292        setTimeout(() => {
1293          flushed = true;
1294          cb(null);
1295        }, 1),
1296    });
1297
1298  const input = new Readable();
1299  input.push(null);
1300
1301  pipeline(
1302    input,
1303    makeStream(),
1304    common.mustCall(() => {
1305      assert.strictEqual(flushed, true);
1306    }),
1307  );
1308}
1309{
1310  function createThenable() {
1311    let counter = 0;
1312    return {
1313      get then() {
1314        if (counter++) {
1315          throw new Error('Cannot access `then` more than once');
1316        }
1317        return Function.prototype;
1318      },
1319    };
1320  }
1321
1322  pipeline(
1323    function* () {
1324      yield 0;
1325    },
1326    createThenable,
1327    () => common.mustNotCall(),
1328  );
1329}
1330
1331
1332{
1333  const ac = new AbortController();
1334  const r = Readable.from(async function* () {
1335    for (let i = 0; i < 10; i++) {
1336      await Promise.resolve();
1337      yield String(i);
1338      if (i === 5) {
1339        ac.abort();
1340      }
1341    }
1342  }());
1343  let res = '';
1344  const w = new Writable({
1345    write(chunk, encoding, callback) {
1346      res += chunk;
1347      callback();
1348    }
1349  });
1350  const cb = common.mustCall((err) => {
1351    assert.strictEqual(err.name, 'AbortError');
1352    assert.strictEqual(res, '012345');
1353    assert.strictEqual(w.destroyed, true);
1354    assert.strictEqual(r.destroyed, true);
1355    assert.strictEqual(pipelined.destroyed, true);
1356  });
1357  const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb));
1358}
1359
1360{
1361  pipeline([1, 2, 3], PassThrough({ objectMode: true }),
1362           common.mustSucceed(() => {}));
1363
1364  let res = '';
1365  const w = new Writable({
1366    write(chunk, encoding, callback) {
1367      res += chunk;
1368      callback();
1369    },
1370  });
1371  pipeline(['1', '2', '3'], w, common.mustSucceed(() => {
1372    assert.strictEqual(res, '123');
1373  }));
1374}
1375
1376{
1377  const content = 'abc';
1378  pipeline(Buffer.from(content), PassThrough({ objectMode: true }),
1379           common.mustSucceed(() => {}));
1380
1381  let res = '';
1382  pipeline(Buffer.from(content), async function*(previous) {
1383    for await (const val of previous) {
1384      res += String.fromCharCode(val);
1385      yield val;
1386    }
1387  }, common.mustSucceed(() => {
1388    assert.strictEqual(res, content);
1389  }));
1390}
1391
1392{
1393  const ac = new AbortController();
1394  const signal = ac.signal;
1395  pipelinep(
1396    async function * ({ signal }) { // eslint-disable-line require-yield
1397      await tsp.setTimeout(1e6, signal);
1398    },
1399    async function(source) {
1400
1401    },
1402    { signal }
1403  ).catch(common.mustCall((err) => {
1404    assert.strictEqual(err.name, 'AbortError');
1405  }));
1406  ac.abort();
1407}
1408
1409{
1410  async function run() {
1411    let finished = false;
1412    let text = '';
1413    const write = new Writable({
1414      write(data, enc, cb) {
1415        text += data;
1416        cb();
1417      }
1418    });
1419    write.on('finish', () => {
1420      finished = true;
1421    });
1422
1423    await pipelinep([Readable.from('Hello World!'), write]);
1424    assert(finished);
1425    assert.strictEqual(text, 'Hello World!');
1426  }
1427
1428  run();
1429}
1430
1431{
1432  let finished = false;
1433  let text = '';
1434  const write = new Writable({
1435    write(data, enc, cb) {
1436      text += data;
1437      cb();
1438    }
1439  });
1440  write.on('finish', () => {
1441    finished = true;
1442  });
1443
1444  pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => {
1445    assert(finished);
1446    assert.strictEqual(text, 'Hello World!');
1447  }));
1448}
1449
1450{
1451  const pipelinePromise = promisify(pipeline);
1452
1453  async function run() {
1454    const read = new Readable({
1455      read() {}
1456    });
1457
1458    const duplex = new PassThrough();
1459
1460    read.push(null);
1461
1462    await pipelinePromise(read, duplex);
1463
1464    assert.strictEqual(duplex.destroyed, false);
1465  }
1466
1467  run().then(common.mustCall());
1468}
1469
1470{
1471  const pipelinePromise = promisify(pipeline);
1472
1473  async function run() {
1474    const read = new Readable({
1475      read() {}
1476    });
1477
1478    const duplex = new PassThrough();
1479
1480    read.push(null);
1481
1482    await pipelinePromise(read, duplex, { end: false });
1483
1484    assert.strictEqual(duplex.destroyed, false);
1485    assert.strictEqual(duplex.writableEnded, false);
1486  }
1487
1488  run().then(common.mustCall());
1489}
1490
1491{
1492  const s = new PassThrough({ objectMode: true });
1493  pipeline(async function*() {
1494    await Promise.resolve();
1495    yield 'hello';
1496    yield 'world';
1497    yield 'world';
1498  }, s, async function(source) {
1499    let ret = '';
1500    let n = 0;
1501    for await (const chunk of source) {
1502      if (n++ > 1) {
1503        break;
1504      }
1505      ret += chunk;
1506    }
1507    return ret;
1508  }, common.mustCall((err, val) => {
1509    assert.strictEqual(err, undefined);
1510    assert.strictEqual(val, 'helloworld');
1511    assert.strictEqual(s.destroyed, true);
1512  }));
1513}
1514
1515{
1516  const s = new PassThrough({ objectMode: true });
1517  pipeline(async function*() {
1518    await Promise.resolve();
1519    yield 'hello';
1520    yield 'world';
1521    yield 'world';
1522  }, s, async function(source) {
1523    return null;
1524  }, common.mustCall((err, val) => {
1525    assert.strictEqual(err, undefined);
1526    assert.strictEqual(val, null);
1527  }));
1528}
1529
1530{
1531  // Mimics a legacy stream without the .destroy method
1532  class LegacyWritable extends Stream {
1533    write(chunk, encoding, callback) {
1534      callback();
1535    }
1536  }
1537
1538  const writable = new LegacyWritable();
1539  writable.on('error', common.mustCall((err) => {
1540    assert.deepStrictEqual(err, new Error('stop'));
1541  }));
1542
1543  pipeline(
1544    Readable.from({
1545      [Symbol.asyncIterator]() {
1546        return {
1547          next() {
1548            return Promise.reject(new Error('stop'));
1549          }
1550        };
1551      }
1552    }),
1553    writable,
1554    common.mustCall((err) => {
1555      assert.deepStrictEqual(err, new Error('stop'));
1556    })
1557  );
1558}
1559
1560{
1561  class CustomReadable extends Readable {
1562    _read() {
1563      this.push('asd');
1564      this.push(null);
1565    }
1566  }
1567
1568  class CustomWritable extends Writable {
1569    constructor() {
1570      super();
1571      this.endCount = 0;
1572      this.str = '';
1573    }
1574
1575    _write(chunk, enc, cb) {
1576      this.str += chunk;
1577      cb();
1578    }
1579
1580    end() {
1581      this.endCount += 1;
1582      super.end();
1583    }
1584  }
1585
1586  const readable = new CustomReadable();
1587  const writable = new CustomWritable();
1588
1589  pipeline(readable, writable, common.mustSucceed(() => {
1590    assert.strictEqual(writable.str, 'asd');
1591    assert.strictEqual(writable.endCount, 1);
1592  }));
1593}
1594
1595{
1596  const readable = new Readable({
1597    read() {}
1598  });
1599  readable.on('end', common.mustCall(() => {
1600    pipeline(readable, new PassThrough(), common.mustSucceed());
1601  }));
1602  readable.push(null);
1603  readable.read();
1604}
1605
1606{
1607  const dup = new Duplex({
1608    read() {},
1609    write(chunk, enc, cb) {
1610      cb();
1611    }
1612  });
1613  dup.on('end', common.mustCall(() => {
1614    pipeline(dup, new PassThrough(), common.mustSucceed());
1615  }));
1616  dup.push(null);
1617  dup.read();
1618}
1619
1620{
1621  let res = '';
1622  const writable = new Writable({
1623    write(chunk, enc, cb) {
1624      res += chunk;
1625      cb();
1626    }
1627  });
1628  pipelinep(async function*() {
1629    yield 'hello';
1630    await Promise.resolve();
1631    yield 'world';
1632  }, writable, { end: false }).then(common.mustCall(() => {
1633    assert.strictEqual(res, 'helloworld');
1634    assert.strictEqual(writable.closed, false);
1635  }));
1636}
1637
1638{
1639  const r = new Readable();
1640  for (let i = 0; i < 4000; i++) {
1641    r.push('asdfdagljanfgkaljdfn');
1642  }
1643  r.push(null);
1644
1645  let ended = false;
1646  r.on('end', () => {
1647    ended = true;
1648  });
1649
1650  const w = new Writable({
1651    write(chunk, enc, cb) {
1652      cb(null);
1653    },
1654    final: common.mustCall((cb) => {
1655      assert.strictEqual(ended, true);
1656      cb(null);
1657    })
1658  });
1659
1660  pipeline(r, w, common.mustCall((err) => {
1661    assert.strictEqual(err, undefined);
1662  }));
1663
1664}
1665