11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst common = require('../common');
41cb0ef41Sopenharmony_ciconst {
51cb0ef41Sopenharmony_ci  Stream,
61cb0ef41Sopenharmony_ci  Writable,
71cb0ef41Sopenharmony_ci  Readable,
81cb0ef41Sopenharmony_ci  Transform,
91cb0ef41Sopenharmony_ci  pipeline,
101cb0ef41Sopenharmony_ci  PassThrough,
111cb0ef41Sopenharmony_ci  Duplex,
121cb0ef41Sopenharmony_ci  addAbortSignal,
131cb0ef41Sopenharmony_ci} = require('stream');
141cb0ef41Sopenharmony_ciconst pipelinep = require('stream/promises').pipeline;
151cb0ef41Sopenharmony_ciconst assert = require('assert');
161cb0ef41Sopenharmony_ciconst http = require('http');
171cb0ef41Sopenharmony_ciconst { promisify } = require('util');
181cb0ef41Sopenharmony_ciconst net = require('net');
191cb0ef41Sopenharmony_ciconst tsp = require('timers/promises');
201cb0ef41Sopenharmony_ci
211cb0ef41Sopenharmony_ci{
221cb0ef41Sopenharmony_ci  let finished = false;
231cb0ef41Sopenharmony_ci  const processed = [];
241cb0ef41Sopenharmony_ci  const expected = [
251cb0ef41Sopenharmony_ci    Buffer.from('a'),
261cb0ef41Sopenharmony_ci    Buffer.from('b'),
271cb0ef41Sopenharmony_ci    Buffer.from('c'),
281cb0ef41Sopenharmony_ci  ];
291cb0ef41Sopenharmony_ci
301cb0ef41Sopenharmony_ci  const read = new Readable({
311cb0ef41Sopenharmony_ci    read() {}
321cb0ef41Sopenharmony_ci  });
331cb0ef41Sopenharmony_ci
341cb0ef41Sopenharmony_ci  const write = new Writable({
351cb0ef41Sopenharmony_ci    write(data, enc, cb) {
361cb0ef41Sopenharmony_ci      processed.push(data);
371cb0ef41Sopenharmony_ci      cb();
381cb0ef41Sopenharmony_ci    }
391cb0ef41Sopenharmony_ci  });
401cb0ef41Sopenharmony_ci
411cb0ef41Sopenharmony_ci  write.on('finish', () => {
421cb0ef41Sopenharmony_ci    finished = true;
431cb0ef41Sopenharmony_ci  });
441cb0ef41Sopenharmony_ci
451cb0ef41Sopenharmony_ci  for (let i = 0; i < expected.length; i++) {
461cb0ef41Sopenharmony_ci    read.push(expected[i]);
471cb0ef41Sopenharmony_ci  }
481cb0ef41Sopenharmony_ci  read.push(null);
491cb0ef41Sopenharmony_ci
501cb0ef41Sopenharmony_ci  pipeline(read, write, common.mustSucceed(() => {
511cb0ef41Sopenharmony_ci    assert.ok(finished);
521cb0ef41Sopenharmony_ci    assert.deepStrictEqual(processed, expected);
531cb0ef41Sopenharmony_ci  }));
541cb0ef41Sopenharmony_ci}
551cb0ef41Sopenharmony_ci
561cb0ef41Sopenharmony_ci{
571cb0ef41Sopenharmony_ci  const read = new Readable({
581cb0ef41Sopenharmony_ci    read() {}
591cb0ef41Sopenharmony_ci  });
601cb0ef41Sopenharmony_ci
611cb0ef41Sopenharmony_ci  assert.throws(() => {
621cb0ef41Sopenharmony_ci    pipeline(read, () => {});
631cb0ef41Sopenharmony_ci  }, /ERR_MISSING_ARGS/);
641cb0ef41Sopenharmony_ci  assert.throws(() => {
651cb0ef41Sopenharmony_ci    pipeline(() => {});
661cb0ef41Sopenharmony_ci  }, /ERR_MISSING_ARGS/);
671cb0ef41Sopenharmony_ci  assert.throws(() => {
681cb0ef41Sopenharmony_ci    pipeline();
691cb0ef41Sopenharmony_ci  }, /ERR_INVALID_ARG_TYPE/);
701cb0ef41Sopenharmony_ci}
711cb0ef41Sopenharmony_ci
721cb0ef41Sopenharmony_ci{
731cb0ef41Sopenharmony_ci  const read = new Readable({
741cb0ef41Sopenharmony_ci    read() {}
751cb0ef41Sopenharmony_ci  });
761cb0ef41Sopenharmony_ci
771cb0ef41Sopenharmony_ci  const write = new Writable({
781cb0ef41Sopenharmony_ci    write(data, enc, cb) {
791cb0ef41Sopenharmony_ci      cb();
801cb0ef41Sopenharmony_ci    }
811cb0ef41Sopenharmony_ci  });
821cb0ef41Sopenharmony_ci
831cb0ef41Sopenharmony_ci  read.push('data');
841cb0ef41Sopenharmony_ci  setImmediate(() => read.destroy());
851cb0ef41Sopenharmony_ci
861cb0ef41Sopenharmony_ci  pipeline(read, write, common.mustCall((err) => {
871cb0ef41Sopenharmony_ci    assert.ok(err, 'should have an error');
881cb0ef41Sopenharmony_ci  }));
891cb0ef41Sopenharmony_ci}
901cb0ef41Sopenharmony_ci
911cb0ef41Sopenharmony_ci{
921cb0ef41Sopenharmony_ci  const read = new Readable({
931cb0ef41Sopenharmony_ci    read() {}
941cb0ef41Sopenharmony_ci  });
951cb0ef41Sopenharmony_ci
961cb0ef41Sopenharmony_ci  const write = new Writable({
971cb0ef41Sopenharmony_ci    write(data, enc, cb) {
981cb0ef41Sopenharmony_ci      cb();
991cb0ef41Sopenharmony_ci    }
1001cb0ef41Sopenharmony_ci  });
1011cb0ef41Sopenharmony_ci
1021cb0ef41Sopenharmony_ci  read.push('data');
1031cb0ef41Sopenharmony_ci  setImmediate(() => read.destroy(new Error('kaboom')));
1041cb0ef41Sopenharmony_ci
1051cb0ef41Sopenharmony_ci  const dst = pipeline(read, write, common.mustCall((err) => {
1061cb0ef41Sopenharmony_ci    assert.deepStrictEqual(err, new Error('kaboom'));
1071cb0ef41Sopenharmony_ci  }));
1081cb0ef41Sopenharmony_ci
1091cb0ef41Sopenharmony_ci  assert.strictEqual(dst, write);
1101cb0ef41Sopenharmony_ci}
1111cb0ef41Sopenharmony_ci
1121cb0ef41Sopenharmony_ci{
1131cb0ef41Sopenharmony_ci  const read = new Readable({
1141cb0ef41Sopenharmony_ci    read() {}
1151cb0ef41Sopenharmony_ci  });
1161cb0ef41Sopenharmony_ci
1171cb0ef41Sopenharmony_ci  const transform = new Transform({
1181cb0ef41Sopenharmony_ci    transform(data, enc, cb) {
1191cb0ef41Sopenharmony_ci      cb(new Error('kaboom'));
1201cb0ef41Sopenharmony_ci    }
1211cb0ef41Sopenharmony_ci  });
1221cb0ef41Sopenharmony_ci
1231cb0ef41Sopenharmony_ci  const write = new Writable({
1241cb0ef41Sopenharmony_ci    write(data, enc, cb) {
1251cb0ef41Sopenharmony_ci      cb();
1261cb0ef41Sopenharmony_ci    }
1271cb0ef41Sopenharmony_ci  });
1281cb0ef41Sopenharmony_ci
1291cb0ef41Sopenharmony_ci  read.on('close', common.mustCall());
1301cb0ef41Sopenharmony_ci  transform.on('close', common.mustCall());
1311cb0ef41Sopenharmony_ci  write.on('close', common.mustCall());
1321cb0ef41Sopenharmony_ci
1331cb0ef41Sopenharmony_ci  [read, transform, write].forEach((stream) => {
1341cb0ef41Sopenharmony_ci    stream.on('error', common.mustCall((err) => {
1351cb0ef41Sopenharmony_ci      assert.deepStrictEqual(err, new Error('kaboom'));
1361cb0ef41Sopenharmony_ci    }));
1371cb0ef41Sopenharmony_ci  });
1381cb0ef41Sopenharmony_ci
1391cb0ef41Sopenharmony_ci  const dst = pipeline(read, transform, write, common.mustCall((err) => {
1401cb0ef41Sopenharmony_ci    assert.deepStrictEqual(err, new Error('kaboom'));
1411cb0ef41Sopenharmony_ci  }));
1421cb0ef41Sopenharmony_ci
1431cb0ef41Sopenharmony_ci  assert.strictEqual(dst, write);
1441cb0ef41Sopenharmony_ci
1451cb0ef41Sopenharmony_ci  read.push('hello');
1461cb0ef41Sopenharmony_ci}
1471cb0ef41Sopenharmony_ci
1481cb0ef41Sopenharmony_ci{
1491cb0ef41Sopenharmony_ci  const server = http.createServer((req, res) => {
1501cb0ef41Sopenharmony_ci    const rs = new Readable({
1511cb0ef41Sopenharmony_ci      read() {
1521cb0ef41Sopenharmony_ci        rs.push('hello');
1531cb0ef41Sopenharmony_ci        rs.push(null);
1541cb0ef41Sopenharmony_ci      }
1551cb0ef41Sopenharmony_ci    });
1561cb0ef41Sopenharmony_ci
1571cb0ef41Sopenharmony_ci    pipeline(rs, res, () => {});
1581cb0ef41Sopenharmony_ci  });
1591cb0ef41Sopenharmony_ci
1601cb0ef41Sopenharmony_ci  server.listen(0, () => {
1611cb0ef41Sopenharmony_ci    const req = http.request({
1621cb0ef41Sopenharmony_ci      port: server.address().port
1631cb0ef41Sopenharmony_ci    });
1641cb0ef41Sopenharmony_ci
1651cb0ef41Sopenharmony_ci    req.end();
1661cb0ef41Sopenharmony_ci    req.on('response', (res) => {
1671cb0ef41Sopenharmony_ci      const buf = [];
1681cb0ef41Sopenharmony_ci      res.on('data', (data) => buf.push(data));
1691cb0ef41Sopenharmony_ci      res.on('end', common.mustCall(() => {
1701cb0ef41Sopenharmony_ci        assert.deepStrictEqual(
1711cb0ef41Sopenharmony_ci          Buffer.concat(buf),
1721cb0ef41Sopenharmony_ci          Buffer.from('hello')
1731cb0ef41Sopenharmony_ci        );
1741cb0ef41Sopenharmony_ci        server.close();
1751cb0ef41Sopenharmony_ci      }));
1761cb0ef41Sopenharmony_ci    });
1771cb0ef41Sopenharmony_ci  });
1781cb0ef41Sopenharmony_ci}
1791cb0ef41Sopenharmony_ci
1801cb0ef41Sopenharmony_ci{
1811cb0ef41Sopenharmony_ci  const server = http.createServer((req, res) => {
1821cb0ef41Sopenharmony_ci    let sent = false;
1831cb0ef41Sopenharmony_ci    const rs = new Readable({
1841cb0ef41Sopenharmony_ci      read() {
1851cb0ef41Sopenharmony_ci        if (sent) {
1861cb0ef41Sopenharmony_ci          return;
1871cb0ef41Sopenharmony_ci        }
1881cb0ef41Sopenharmony_ci        sent = true;
1891cb0ef41Sopenharmony_ci        rs.push('hello');
1901cb0ef41Sopenharmony_ci      },
1911cb0ef41Sopenharmony_ci      destroy: common.mustCall((err, cb) => {
1921cb0ef41Sopenharmony_ci        // Prevents fd leaks by destroying http pipelines
1931cb0ef41Sopenharmony_ci        cb();
1941cb0ef41Sopenharmony_ci      })
1951cb0ef41Sopenharmony_ci    });
1961cb0ef41Sopenharmony_ci
1971cb0ef41Sopenharmony_ci    pipeline(rs, res, () => {});
1981cb0ef41Sopenharmony_ci  });
1991cb0ef41Sopenharmony_ci
2001cb0ef41Sopenharmony_ci  server.listen(0, () => {
2011cb0ef41Sopenharmony_ci    const req = http.request({
2021cb0ef41Sopenharmony_ci      port: server.address().port
2031cb0ef41Sopenharmony_ci    });
2041cb0ef41Sopenharmony_ci
2051cb0ef41Sopenharmony_ci    req.end();
2061cb0ef41Sopenharmony_ci    req.on('response', (res) => {
2071cb0ef41Sopenharmony_ci      setImmediate(() => {
2081cb0ef41Sopenharmony_ci        res.destroy();
2091cb0ef41Sopenharmony_ci        server.close();
2101cb0ef41Sopenharmony_ci      });
2111cb0ef41Sopenharmony_ci    });
2121cb0ef41Sopenharmony_ci  });
2131cb0ef41Sopenharmony_ci}
2141cb0ef41Sopenharmony_ci
2151cb0ef41Sopenharmony_ci{
2161cb0ef41Sopenharmony_ci  const server = http.createServer((req, res) => {
2171cb0ef41Sopenharmony_ci    let sent = 0;
2181cb0ef41Sopenharmony_ci    const rs = new Readable({
2191cb0ef41Sopenharmony_ci      read() {
2201cb0ef41Sopenharmony_ci        if (sent++ > 10) {
2211cb0ef41Sopenharmony_ci          return;
2221cb0ef41Sopenharmony_ci        }
2231cb0ef41Sopenharmony_ci        rs.push('hello');
2241cb0ef41Sopenharmony_ci      },
2251cb0ef41Sopenharmony_ci      destroy: common.mustCall((err, cb) => {
2261cb0ef41Sopenharmony_ci        cb();
2271cb0ef41Sopenharmony_ci      })
2281cb0ef41Sopenharmony_ci    });
2291cb0ef41Sopenharmony_ci
2301cb0ef41Sopenharmony_ci    pipeline(rs, res, () => {});
2311cb0ef41Sopenharmony_ci  });
2321cb0ef41Sopenharmony_ci
2331cb0ef41Sopenharmony_ci  let cnt = 10;
2341cb0ef41Sopenharmony_ci
2351cb0ef41Sopenharmony_ci  const badSink = new Writable({
2361cb0ef41Sopenharmony_ci    write(data, enc, cb) {
2371cb0ef41Sopenharmony_ci      cnt--;
2381cb0ef41Sopenharmony_ci      if (cnt === 0) cb(new Error('kaboom'));
2391cb0ef41Sopenharmony_ci      else cb();
2401cb0ef41Sopenharmony_ci    }
2411cb0ef41Sopenharmony_ci  });
2421cb0ef41Sopenharmony_ci
2431cb0ef41Sopenharmony_ci  server.listen(0, () => {
2441cb0ef41Sopenharmony_ci    const req = http.request({
2451cb0ef41Sopenharmony_ci      port: server.address().port
2461cb0ef41Sopenharmony_ci    });
2471cb0ef41Sopenharmony_ci
2481cb0ef41Sopenharmony_ci    req.end();
2491cb0ef41Sopenharmony_ci    req.on('response', (res) => {
2501cb0ef41Sopenharmony_ci      pipeline(res, badSink, common.mustCall((err) => {
2511cb0ef41Sopenharmony_ci        assert.deepStrictEqual(err, new Error('kaboom'));
2521cb0ef41Sopenharmony_ci        server.close();
2531cb0ef41Sopenharmony_ci      }));
2541cb0ef41Sopenharmony_ci    });
2551cb0ef41Sopenharmony_ci  });
2561cb0ef41Sopenharmony_ci}
2571cb0ef41Sopenharmony_ci
2581cb0ef41Sopenharmony_ci{
2591cb0ef41Sopenharmony_ci  const server = http.createServer((req, res) => {
2601cb0ef41Sopenharmony_ci    pipeline(req, res, common.mustSucceed());
2611cb0ef41Sopenharmony_ci  });
2621cb0ef41Sopenharmony_ci
2631cb0ef41Sopenharmony_ci  server.listen(0, () => {
2641cb0ef41Sopenharmony_ci    const req = http.request({
2651cb0ef41Sopenharmony_ci      port: server.address().port
2661cb0ef41Sopenharmony_ci    });
2671cb0ef41Sopenharmony_ci
2681cb0ef41Sopenharmony_ci    let sent = 0;
2691cb0ef41Sopenharmony_ci    const rs = new Readable({
2701cb0ef41Sopenharmony_ci      read() {
2711cb0ef41Sopenharmony_ci        if (sent++ > 10) {
2721cb0ef41Sopenharmony_ci          return;
2731cb0ef41Sopenharmony_ci        }
2741cb0ef41Sopenharmony_ci        rs.push('hello');
2751cb0ef41Sopenharmony_ci      }
2761cb0ef41Sopenharmony_ci    });
2771cb0ef41Sopenharmony_ci
2781cb0ef41Sopenharmony_ci    pipeline(rs, req, common.mustCall(() => {
2791cb0ef41Sopenharmony_ci      server.close();
2801cb0ef41Sopenharmony_ci    }));
2811cb0ef41Sopenharmony_ci
2821cb0ef41Sopenharmony_ci    req.on('response', (res) => {
2831cb0ef41Sopenharmony_ci      let cnt = 10;
2841cb0ef41Sopenharmony_ci      res.on('data', () => {
2851cb0ef41Sopenharmony_ci        cnt--;
2861cb0ef41Sopenharmony_ci        if (cnt === 0) rs.destroy();
2871cb0ef41Sopenharmony_ci      });
2881cb0ef41Sopenharmony_ci    });
2891cb0ef41Sopenharmony_ci  });
2901cb0ef41Sopenharmony_ci}
2911cb0ef41Sopenharmony_ci
2921cb0ef41Sopenharmony_ci{
2931cb0ef41Sopenharmony_ci  const makeTransform = () => {
2941cb0ef41Sopenharmony_ci    const tr = new Transform({
2951cb0ef41Sopenharmony_ci      transform(data, enc, cb) {
2961cb0ef41Sopenharmony_ci        cb(null, data);
2971cb0ef41Sopenharmony_ci      }
2981cb0ef41Sopenharmony_ci    });
2991cb0ef41Sopenharmony_ci
3001cb0ef41Sopenharmony_ci    tr.on('close', common.mustCall());
3011cb0ef41Sopenharmony_ci    return tr;
3021cb0ef41Sopenharmony_ci  };
3031cb0ef41Sopenharmony_ci
3041cb0ef41Sopenharmony_ci  const rs = new Readable({
3051cb0ef41Sopenharmony_ci    read() {
3061cb0ef41Sopenharmony_ci      rs.push('hello');
3071cb0ef41Sopenharmony_ci    }
3081cb0ef41Sopenharmony_ci  });
3091cb0ef41Sopenharmony_ci
3101cb0ef41Sopenharmony_ci  let cnt = 10;
3111cb0ef41Sopenharmony_ci
3121cb0ef41Sopenharmony_ci  const ws = new Writable({
3131cb0ef41Sopenharmony_ci    write(data, enc, cb) {
3141cb0ef41Sopenharmony_ci      cnt--;
3151cb0ef41Sopenharmony_ci      if (cnt === 0) return cb(new Error('kaboom'));
3161cb0ef41Sopenharmony_ci      cb();
3171cb0ef41Sopenharmony_ci    }
3181cb0ef41Sopenharmony_ci  });
3191cb0ef41Sopenharmony_ci
3201cb0ef41Sopenharmony_ci  rs.on('close', common.mustCall());
3211cb0ef41Sopenharmony_ci  ws.on('close', common.mustCall());
3221cb0ef41Sopenharmony_ci
3231cb0ef41Sopenharmony_ci  pipeline(
3241cb0ef41Sopenharmony_ci    rs,
3251cb0ef41Sopenharmony_ci    makeTransform(),
3261cb0ef41Sopenharmony_ci    makeTransform(),
3271cb0ef41Sopenharmony_ci    makeTransform(),
3281cb0ef41Sopenharmony_ci    makeTransform(),
3291cb0ef41Sopenharmony_ci    makeTransform(),
3301cb0ef41Sopenharmony_ci    makeTransform(),
3311cb0ef41Sopenharmony_ci    ws,
3321cb0ef41Sopenharmony_ci    common.mustCall((err) => {
3331cb0ef41Sopenharmony_ci      assert.deepStrictEqual(err, new Error('kaboom'));
3341cb0ef41Sopenharmony_ci    })
3351cb0ef41Sopenharmony_ci  );
3361cb0ef41Sopenharmony_ci}
3371cb0ef41Sopenharmony_ci
3381cb0ef41Sopenharmony_ci{
3391cb0ef41Sopenharmony_ci  const oldStream = new Stream();
3401cb0ef41Sopenharmony_ci
3411cb0ef41Sopenharmony_ci  oldStream.pause = oldStream.resume = () => {};
3421cb0ef41Sopenharmony_ci  oldStream.write = (data) => {
3431cb0ef41Sopenharmony_ci    oldStream.emit('data', data);
3441cb0ef41Sopenharmony_ci    return true;
3451cb0ef41Sopenharmony_ci  };
3461cb0ef41Sopenharmony_ci  oldStream.end = () => {
3471cb0ef41Sopenharmony_ci    oldStream.emit('end');
3481cb0ef41Sopenharmony_ci  };
3491cb0ef41Sopenharmony_ci
3501cb0ef41Sopenharmony_ci  const expected = [
3511cb0ef41Sopenharmony_ci    Buffer.from('hello'),
3521cb0ef41Sopenharmony_ci    Buffer.from('world'),
3531cb0ef41Sopenharmony_ci  ];
3541cb0ef41Sopenharmony_ci
3551cb0ef41Sopenharmony_ci  const rs = new Readable({
3561cb0ef41Sopenharmony_ci    read() {
3571cb0ef41Sopenharmony_ci      for (let i = 0; i < expected.length; i++) {
3581cb0ef41Sopenharmony_ci        rs.push(expected[i]);
3591cb0ef41Sopenharmony_ci      }
3601cb0ef41Sopenharmony_ci      rs.push(null);
3611cb0ef41Sopenharmony_ci    }
3621cb0ef41Sopenharmony_ci  });
3631cb0ef41Sopenharmony_ci
3641cb0ef41Sopenharmony_ci  const ws = new Writable({
3651cb0ef41Sopenharmony_ci    write(data, enc, cb) {
3661cb0ef41Sopenharmony_ci      assert.deepStrictEqual(data, expected.shift());
3671cb0ef41Sopenharmony_ci      cb();
3681cb0ef41Sopenharmony_ci    }
3691cb0ef41Sopenharmony_ci  });
3701cb0ef41Sopenharmony_ci
3711cb0ef41Sopenharmony_ci  let finished = false;
3721cb0ef41Sopenharmony_ci
3731cb0ef41Sopenharmony_ci  ws.on('finish', () => {
3741cb0ef41Sopenharmony_ci    finished = true;
3751cb0ef41Sopenharmony_ci  });
3761cb0ef41Sopenharmony_ci
3771cb0ef41Sopenharmony_ci  pipeline(
3781cb0ef41Sopenharmony_ci    rs,
3791cb0ef41Sopenharmony_ci    oldStream,
3801cb0ef41Sopenharmony_ci    ws,
3811cb0ef41Sopenharmony_ci    common.mustSucceed(() => {
3821cb0ef41Sopenharmony_ci      assert(finished, 'last stream finished');
3831cb0ef41Sopenharmony_ci    })
3841cb0ef41Sopenharmony_ci  );
3851cb0ef41Sopenharmony_ci}
3861cb0ef41Sopenharmony_ci
3871cb0ef41Sopenharmony_ci{
3881cb0ef41Sopenharmony_ci  const oldStream = new Stream();
3891cb0ef41Sopenharmony_ci
3901cb0ef41Sopenharmony_ci  oldStream.pause = oldStream.resume = () => {};
3911cb0ef41Sopenharmony_ci  oldStream.write = (data) => {
3921cb0ef41Sopenharmony_ci    oldStream.emit('data', data);
3931cb0ef41Sopenharmony_ci    return true;
3941cb0ef41Sopenharmony_ci  };
3951cb0ef41Sopenharmony_ci  oldStream.end = () => {
3961cb0ef41Sopenharmony_ci    oldStream.emit('end');
3971cb0ef41Sopenharmony_ci  };
3981cb0ef41Sopenharmony_ci
3991cb0ef41Sopenharmony_ci  const destroyableOldStream = new Stream();
4001cb0ef41Sopenharmony_ci
4011cb0ef41Sopenharmony_ci  destroyableOldStream.pause = destroyableOldStream.resume = () => {};
4021cb0ef41Sopenharmony_ci  destroyableOldStream.destroy = common.mustCall(() => {
4031cb0ef41Sopenharmony_ci    destroyableOldStream.emit('close');
4041cb0ef41Sopenharmony_ci  });
4051cb0ef41Sopenharmony_ci  destroyableOldStream.write = (data) => {
4061cb0ef41Sopenharmony_ci    destroyableOldStream.emit('data', data);
4071cb0ef41Sopenharmony_ci    return true;
4081cb0ef41Sopenharmony_ci  };
4091cb0ef41Sopenharmony_ci  destroyableOldStream.end = () => {
4101cb0ef41Sopenharmony_ci    destroyableOldStream.emit('end');
4111cb0ef41Sopenharmony_ci  };
4121cb0ef41Sopenharmony_ci
4131cb0ef41Sopenharmony_ci  const rs = new Readable({
4141cb0ef41Sopenharmony_ci    read() {
4151cb0ef41Sopenharmony_ci      rs.destroy(new Error('stop'));
4161cb0ef41Sopenharmony_ci    }
4171cb0ef41Sopenharmony_ci  });
4181cb0ef41Sopenharmony_ci
4191cb0ef41Sopenharmony_ci  const ws = new Writable({
4201cb0ef41Sopenharmony_ci    write(data, enc, cb) {
4211cb0ef41Sopenharmony_ci      cb();
4221cb0ef41Sopenharmony_ci    }
4231cb0ef41Sopenharmony_ci  });
4241cb0ef41Sopenharmony_ci
4251cb0ef41Sopenharmony_ci  let finished = false;
4261cb0ef41Sopenharmony_ci
4271cb0ef41Sopenharmony_ci  ws.on('finish', () => {
4281cb0ef41Sopenharmony_ci    finished = true;
4291cb0ef41Sopenharmony_ci  });
4301cb0ef41Sopenharmony_ci
4311cb0ef41Sopenharmony_ci  pipeline(
4321cb0ef41Sopenharmony_ci    rs,
4331cb0ef41Sopenharmony_ci    oldStream,
4341cb0ef41Sopenharmony_ci    destroyableOldStream,
4351cb0ef41Sopenharmony_ci    ws,
4361cb0ef41Sopenharmony_ci    common.mustCall((err) => {
4371cb0ef41Sopenharmony_ci      assert.deepStrictEqual(err, new Error('stop'));
4381cb0ef41Sopenharmony_ci      assert(!finished, 'should not finish');
4391cb0ef41Sopenharmony_ci    })
4401cb0ef41Sopenharmony_ci  );
4411cb0ef41Sopenharmony_ci}
4421cb0ef41Sopenharmony_ci
4431cb0ef41Sopenharmony_ci{
4441cb0ef41Sopenharmony_ci  const pipelinePromise = promisify(pipeline);
4451cb0ef41Sopenharmony_ci
4461cb0ef41Sopenharmony_ci  async function run() {
4471cb0ef41Sopenharmony_ci    const read = new Readable({
4481cb0ef41Sopenharmony_ci      read() {}
4491cb0ef41Sopenharmony_ci    });
4501cb0ef41Sopenharmony_ci
4511cb0ef41Sopenharmony_ci    const write = new Writable({
4521cb0ef41Sopenharmony_ci      write(data, enc, cb) {
4531cb0ef41Sopenharmony_ci        cb();
4541cb0ef41Sopenharmony_ci      }
4551cb0ef41Sopenharmony_ci    });
4561cb0ef41Sopenharmony_ci
4571cb0ef41Sopenharmony_ci    read.push('data');
4581cb0ef41Sopenharmony_ci    read.push(null);
4591cb0ef41Sopenharmony_ci
4601cb0ef41Sopenharmony_ci    let finished = false;
4611cb0ef41Sopenharmony_ci
4621cb0ef41Sopenharmony_ci    write.on('finish', () => {
4631cb0ef41Sopenharmony_ci      finished = true;
4641cb0ef41Sopenharmony_ci    });
4651cb0ef41Sopenharmony_ci
4661cb0ef41Sopenharmony_ci    await pipelinePromise(read, write);
4671cb0ef41Sopenharmony_ci
4681cb0ef41Sopenharmony_ci    assert(finished);
4691cb0ef41Sopenharmony_ci  }
4701cb0ef41Sopenharmony_ci
4711cb0ef41Sopenharmony_ci  run();
4721cb0ef41Sopenharmony_ci}
4731cb0ef41Sopenharmony_ci
4741cb0ef41Sopenharmony_ci{
4751cb0ef41Sopenharmony_ci  // Check aborted signal without values
4761cb0ef41Sopenharmony_ci  const pipelinePromise = promisify(pipeline);
4771cb0ef41Sopenharmony_ci  async function run() {
4781cb0ef41Sopenharmony_ci    const ac = new AbortController();
4791cb0ef41Sopenharmony_ci    const { signal } = ac;
4801cb0ef41Sopenharmony_ci    async function* producer() {
4811cb0ef41Sopenharmony_ci      ac.abort();
4821cb0ef41Sopenharmony_ci      await Promise.resolve();
4831cb0ef41Sopenharmony_ci      yield '8';
4841cb0ef41Sopenharmony_ci    }
4851cb0ef41Sopenharmony_ci
4861cb0ef41Sopenharmony_ci    const w = new Writable({
4871cb0ef41Sopenharmony_ci      write(chunk, encoding, callback) {
4881cb0ef41Sopenharmony_ci        callback();
4891cb0ef41Sopenharmony_ci      }
4901cb0ef41Sopenharmony_ci    });
4911cb0ef41Sopenharmony_ci    await pipelinePromise(producer, w, { signal });
4921cb0ef41Sopenharmony_ci  }
4931cb0ef41Sopenharmony_ci
4941cb0ef41Sopenharmony_ci  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
4951cb0ef41Sopenharmony_ci}
4961cb0ef41Sopenharmony_ci
4971cb0ef41Sopenharmony_ci{
4981cb0ef41Sopenharmony_ci  // Check aborted signal after init.
4991cb0ef41Sopenharmony_ci  const pipelinePromise = promisify(pipeline);
5001cb0ef41Sopenharmony_ci  async function run() {
5011cb0ef41Sopenharmony_ci    const ac = new AbortController();
5021cb0ef41Sopenharmony_ci    const { signal } = ac;
5031cb0ef41Sopenharmony_ci    async function* producer() {
5041cb0ef41Sopenharmony_ci      yield '5';
5051cb0ef41Sopenharmony_ci      await Promise.resolve();
5061cb0ef41Sopenharmony_ci      ac.abort();
5071cb0ef41Sopenharmony_ci      await Promise.resolve();
5081cb0ef41Sopenharmony_ci      yield '8';
5091cb0ef41Sopenharmony_ci    }
5101cb0ef41Sopenharmony_ci
5111cb0ef41Sopenharmony_ci    const w = new Writable({
5121cb0ef41Sopenharmony_ci      write(chunk, encoding, callback) {
5131cb0ef41Sopenharmony_ci        callback();
5141cb0ef41Sopenharmony_ci      }
5151cb0ef41Sopenharmony_ci    });
5161cb0ef41Sopenharmony_ci    await pipelinePromise(producer, w, { signal });
5171cb0ef41Sopenharmony_ci  }
5181cb0ef41Sopenharmony_ci
5191cb0ef41Sopenharmony_ci  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
5201cb0ef41Sopenharmony_ci}
5211cb0ef41Sopenharmony_ci
5221cb0ef41Sopenharmony_ci{
5231cb0ef41Sopenharmony_ci  // Check pre-aborted signal
5241cb0ef41Sopenharmony_ci  const pipelinePromise = promisify(pipeline);
5251cb0ef41Sopenharmony_ci  async function run() {
5261cb0ef41Sopenharmony_ci    const signal = AbortSignal.abort();
5271cb0ef41Sopenharmony_ci    async function* producer() {
5281cb0ef41Sopenharmony_ci      yield '5';
5291cb0ef41Sopenharmony_ci      await Promise.resolve();
5301cb0ef41Sopenharmony_ci      yield '8';
5311cb0ef41Sopenharmony_ci    }
5321cb0ef41Sopenharmony_ci
5331cb0ef41Sopenharmony_ci    const w = new Writable({
5341cb0ef41Sopenharmony_ci      write(chunk, encoding, callback) {
5351cb0ef41Sopenharmony_ci        callback();
5361cb0ef41Sopenharmony_ci      }
5371cb0ef41Sopenharmony_ci    });
5381cb0ef41Sopenharmony_ci    await pipelinePromise(producer, w, { signal });
5391cb0ef41Sopenharmony_ci  }
5401cb0ef41Sopenharmony_ci
5411cb0ef41Sopenharmony_ci  assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
5421cb0ef41Sopenharmony_ci}
5431cb0ef41Sopenharmony_ci
5441cb0ef41Sopenharmony_ci{
5451cb0ef41Sopenharmony_ci  const read = new Readable({
5461cb0ef41Sopenharmony_ci    read() {}
5471cb0ef41Sopenharmony_ci  });
5481cb0ef41Sopenharmony_ci
5491cb0ef41Sopenharmony_ci  const transform = new Transform({
5501cb0ef41Sopenharmony_ci    transform(data, enc, cb) {
5511cb0ef41Sopenharmony_ci      cb(new Error('kaboom'));
5521cb0ef41Sopenharmony_ci    }
5531cb0ef41Sopenharmony_ci  });
5541cb0ef41Sopenharmony_ci
5551cb0ef41Sopenharmony_ci  const write = new Writable({
5561cb0ef41Sopenharmony_ci    write(data, enc, cb) {
5571cb0ef41Sopenharmony_ci      cb();
5581cb0ef41Sopenharmony_ci    }
5591cb0ef41Sopenharmony_ci  });
5601cb0ef41Sopenharmony_ci
5611cb0ef41Sopenharmony_ci  assert.throws(
5621cb0ef41Sopenharmony_ci    () => pipeline(read, transform, write),
5631cb0ef41Sopenharmony_ci    { code: 'ERR_INVALID_ARG_TYPE' }
5641cb0ef41Sopenharmony_ci  );
5651cb0ef41Sopenharmony_ci}
5661cb0ef41Sopenharmony_ci
5671cb0ef41Sopenharmony_ci{
5681cb0ef41Sopenharmony_ci  const server = http.Server(function(req, res) {
5691cb0ef41Sopenharmony_ci    res.write('asd');
5701cb0ef41Sopenharmony_ci  });
5711cb0ef41Sopenharmony_ci  server.listen(0, function() {
5721cb0ef41Sopenharmony_ci    http.get({ port: this.address().port }, (res) => {
5731cb0ef41Sopenharmony_ci      const stream = new PassThrough();
5741cb0ef41Sopenharmony_ci
5751cb0ef41Sopenharmony_ci      stream.on('error', common.mustCall());
5761cb0ef41Sopenharmony_ci
5771cb0ef41Sopenharmony_ci      pipeline(
5781cb0ef41Sopenharmony_ci        res,
5791cb0ef41Sopenharmony_ci        stream,
5801cb0ef41Sopenharmony_ci        common.mustCall((err) => {
5811cb0ef41Sopenharmony_ci          assert.strictEqual(err.message, 'oh no');
5821cb0ef41Sopenharmony_ci          server.close();
5831cb0ef41Sopenharmony_ci        })
5841cb0ef41Sopenharmony_ci      );
5851cb0ef41Sopenharmony_ci
5861cb0ef41Sopenharmony_ci      stream.destroy(new Error('oh no'));
5871cb0ef41Sopenharmony_ci    }).on('error', common.mustNotCall());
5881cb0ef41Sopenharmony_ci  });
5891cb0ef41Sopenharmony_ci}
5901cb0ef41Sopenharmony_ci
5911cb0ef41Sopenharmony_ci{
5921cb0ef41Sopenharmony_ci  let res = '';
5931cb0ef41Sopenharmony_ci  const w = new Writable({
5941cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
5951cb0ef41Sopenharmony_ci      res += chunk;
5961cb0ef41Sopenharmony_ci      callback();
5971cb0ef41Sopenharmony_ci    }
5981cb0ef41Sopenharmony_ci  });
5991cb0ef41Sopenharmony_ci  pipeline(function*() {
6001cb0ef41Sopenharmony_ci    yield 'hello';
6011cb0ef41Sopenharmony_ci    yield 'world';
6021cb0ef41Sopenharmony_ci  }(), w, common.mustSucceed(() => {
6031cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'helloworld');
6041cb0ef41Sopenharmony_ci  }));
6051cb0ef41Sopenharmony_ci}
6061cb0ef41Sopenharmony_ci
6071cb0ef41Sopenharmony_ci{
6081cb0ef41Sopenharmony_ci  let res = '';
6091cb0ef41Sopenharmony_ci  const w = new Writable({
6101cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
6111cb0ef41Sopenharmony_ci      res += chunk;
6121cb0ef41Sopenharmony_ci      callback();
6131cb0ef41Sopenharmony_ci    }
6141cb0ef41Sopenharmony_ci  });
6151cb0ef41Sopenharmony_ci  pipeline(async function*() {
6161cb0ef41Sopenharmony_ci    await Promise.resolve();
6171cb0ef41Sopenharmony_ci    yield 'hello';
6181cb0ef41Sopenharmony_ci    yield 'world';
6191cb0ef41Sopenharmony_ci  }(), w, common.mustSucceed(() => {
6201cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'helloworld');
6211cb0ef41Sopenharmony_ci  }));
6221cb0ef41Sopenharmony_ci}
6231cb0ef41Sopenharmony_ci
6241cb0ef41Sopenharmony_ci{
6251cb0ef41Sopenharmony_ci  let res = '';
6261cb0ef41Sopenharmony_ci  const w = new Writable({
6271cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
6281cb0ef41Sopenharmony_ci      res += chunk;
6291cb0ef41Sopenharmony_ci      callback();
6301cb0ef41Sopenharmony_ci    }
6311cb0ef41Sopenharmony_ci  });
6321cb0ef41Sopenharmony_ci  pipeline(function*() {
6331cb0ef41Sopenharmony_ci    yield 'hello';
6341cb0ef41Sopenharmony_ci    yield 'world';
6351cb0ef41Sopenharmony_ci  }, w, common.mustSucceed(() => {
6361cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'helloworld');
6371cb0ef41Sopenharmony_ci  }));
6381cb0ef41Sopenharmony_ci}
6391cb0ef41Sopenharmony_ci
6401cb0ef41Sopenharmony_ci{
6411cb0ef41Sopenharmony_ci  let res = '';
6421cb0ef41Sopenharmony_ci  const w = new Writable({
6431cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
6441cb0ef41Sopenharmony_ci      res += chunk;
6451cb0ef41Sopenharmony_ci      callback();
6461cb0ef41Sopenharmony_ci    }
6471cb0ef41Sopenharmony_ci  });
6481cb0ef41Sopenharmony_ci  pipeline(async function*() {
6491cb0ef41Sopenharmony_ci    await Promise.resolve();
6501cb0ef41Sopenharmony_ci    yield 'hello';
6511cb0ef41Sopenharmony_ci    yield 'world';
6521cb0ef41Sopenharmony_ci  }, w, common.mustSucceed(() => {
6531cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'helloworld');
6541cb0ef41Sopenharmony_ci  }));
6551cb0ef41Sopenharmony_ci}
6561cb0ef41Sopenharmony_ci
6571cb0ef41Sopenharmony_ci{
6581cb0ef41Sopenharmony_ci  let res = '';
6591cb0ef41Sopenharmony_ci  pipeline(async function*() {
6601cb0ef41Sopenharmony_ci    await Promise.resolve();
6611cb0ef41Sopenharmony_ci    yield 'hello';
6621cb0ef41Sopenharmony_ci    yield 'world';
6631cb0ef41Sopenharmony_ci  }, async function*(source) {
6641cb0ef41Sopenharmony_ci    for await (const chunk of source) {
6651cb0ef41Sopenharmony_ci      yield chunk.toUpperCase();
6661cb0ef41Sopenharmony_ci    }
6671cb0ef41Sopenharmony_ci  }, async function(source) {
6681cb0ef41Sopenharmony_ci    for await (const chunk of source) {
6691cb0ef41Sopenharmony_ci      res += chunk;
6701cb0ef41Sopenharmony_ci    }
6711cb0ef41Sopenharmony_ci  }, common.mustSucceed(() => {
6721cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'HELLOWORLD');
6731cb0ef41Sopenharmony_ci  }));
6741cb0ef41Sopenharmony_ci}
6751cb0ef41Sopenharmony_ci
6761cb0ef41Sopenharmony_ci{
6771cb0ef41Sopenharmony_ci  pipeline(async function*() {
6781cb0ef41Sopenharmony_ci    await Promise.resolve();
6791cb0ef41Sopenharmony_ci    yield 'hello';
6801cb0ef41Sopenharmony_ci    yield 'world';
6811cb0ef41Sopenharmony_ci  }, async function*(source) {
6821cb0ef41Sopenharmony_ci    for await (const chunk of source) {
6831cb0ef41Sopenharmony_ci      yield chunk.toUpperCase();
6841cb0ef41Sopenharmony_ci    }
6851cb0ef41Sopenharmony_ci  }, async function(source) {
6861cb0ef41Sopenharmony_ci    let ret = '';
6871cb0ef41Sopenharmony_ci    for await (const chunk of source) {
6881cb0ef41Sopenharmony_ci      ret += chunk;
6891cb0ef41Sopenharmony_ci    }
6901cb0ef41Sopenharmony_ci    return ret;
6911cb0ef41Sopenharmony_ci  }, common.mustSucceed((val) => {
6921cb0ef41Sopenharmony_ci    assert.strictEqual(val, 'HELLOWORLD');
6931cb0ef41Sopenharmony_ci  }));
6941cb0ef41Sopenharmony_ci}
6951cb0ef41Sopenharmony_ci
6961cb0ef41Sopenharmony_ci{
6971cb0ef41Sopenharmony_ci  // AsyncIterable destination is returned and finalizes.
6981cb0ef41Sopenharmony_ci
6991cb0ef41Sopenharmony_ci  const ret = pipeline(async function*() {
7001cb0ef41Sopenharmony_ci    await Promise.resolve();
7011cb0ef41Sopenharmony_ci    yield 'hello';
7021cb0ef41Sopenharmony_ci  }, async function*(source) { // eslint-disable-line require-yield
7031cb0ef41Sopenharmony_ci    for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty
7041cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
7051cb0ef41Sopenharmony_ci    assert.strictEqual(err, undefined);
7061cb0ef41Sopenharmony_ci  }));
7071cb0ef41Sopenharmony_ci  ret.resume();
7081cb0ef41Sopenharmony_ci  assert.strictEqual(typeof ret.pipe, 'function');
7091cb0ef41Sopenharmony_ci}
7101cb0ef41Sopenharmony_ci
7111cb0ef41Sopenharmony_ci{
7121cb0ef41Sopenharmony_ci  // AsyncFunction destination is not returned and error is
7131cb0ef41Sopenharmony_ci  // propagated.
7141cb0ef41Sopenharmony_ci
7151cb0ef41Sopenharmony_ci  const ret = pipeline(async function*() { // eslint-disable-line require-yield
7161cb0ef41Sopenharmony_ci    await Promise.resolve();
7171cb0ef41Sopenharmony_ci    throw new Error('kaboom');
7181cb0ef41Sopenharmony_ci  }, async function*(source) { // eslint-disable-line require-yield
7191cb0ef41Sopenharmony_ci    for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty
7201cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
7211cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
7221cb0ef41Sopenharmony_ci  }));
7231cb0ef41Sopenharmony_ci  ret.resume();
7241cb0ef41Sopenharmony_ci  assert.strictEqual(typeof ret.pipe, 'function');
7251cb0ef41Sopenharmony_ci}
7261cb0ef41Sopenharmony_ci
7271cb0ef41Sopenharmony_ci{
7281cb0ef41Sopenharmony_ci  const s = new PassThrough();
7291cb0ef41Sopenharmony_ci  pipeline(async function*() { // eslint-disable-line require-yield
7301cb0ef41Sopenharmony_ci    throw new Error('kaboom');
7311cb0ef41Sopenharmony_ci  }, s, common.mustCall((err) => {
7321cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
7331cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, true);
7341cb0ef41Sopenharmony_ci  }));
7351cb0ef41Sopenharmony_ci}
7361cb0ef41Sopenharmony_ci
7371cb0ef41Sopenharmony_ci{
7381cb0ef41Sopenharmony_ci  const s = new PassThrough();
7391cb0ef41Sopenharmony_ci  pipeline(async function*() { // eslint-disable-line require-yield
7401cb0ef41Sopenharmony_ci    throw new Error('kaboom');
7411cb0ef41Sopenharmony_ci  }(), s, common.mustCall((err) => {
7421cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
7431cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, true);
7441cb0ef41Sopenharmony_ci  }));
7451cb0ef41Sopenharmony_ci}
7461cb0ef41Sopenharmony_ci
7471cb0ef41Sopenharmony_ci{
7481cb0ef41Sopenharmony_ci  const s = new PassThrough();
7491cb0ef41Sopenharmony_ci  pipeline(function*() { // eslint-disable-line require-yield
7501cb0ef41Sopenharmony_ci    throw new Error('kaboom');
7511cb0ef41Sopenharmony_ci  }, s, common.mustCall((err, val) => {
7521cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
7531cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, true);
7541cb0ef41Sopenharmony_ci  }));
7551cb0ef41Sopenharmony_ci}
7561cb0ef41Sopenharmony_ci
7571cb0ef41Sopenharmony_ci{
7581cb0ef41Sopenharmony_ci  const s = new PassThrough();
7591cb0ef41Sopenharmony_ci  pipeline(function*() { // eslint-disable-line require-yield
7601cb0ef41Sopenharmony_ci    throw new Error('kaboom');
7611cb0ef41Sopenharmony_ci  }(), s, common.mustCall((err, val) => {
7621cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
7631cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, true);
7641cb0ef41Sopenharmony_ci  }));
7651cb0ef41Sopenharmony_ci}
7661cb0ef41Sopenharmony_ci
7671cb0ef41Sopenharmony_ci{
7681cb0ef41Sopenharmony_ci  const s = new PassThrough();
7691cb0ef41Sopenharmony_ci  pipeline(async function*() {
7701cb0ef41Sopenharmony_ci    await Promise.resolve();
7711cb0ef41Sopenharmony_ci    yield 'hello';
7721cb0ef41Sopenharmony_ci    yield 'world';
7731cb0ef41Sopenharmony_ci  }, s, async function(source) {
7741cb0ef41Sopenharmony_ci    for await (const chunk of source) { // eslint-disable-line no-unused-vars
7751cb0ef41Sopenharmony_ci      throw new Error('kaboom');
7761cb0ef41Sopenharmony_ci    }
7771cb0ef41Sopenharmony_ci  }, common.mustCall((err, val) => {
7781cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
7791cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, true);
7801cb0ef41Sopenharmony_ci  }));
7811cb0ef41Sopenharmony_ci}
7821cb0ef41Sopenharmony_ci
7831cb0ef41Sopenharmony_ci{
7841cb0ef41Sopenharmony_ci  const s = new PassThrough();
7851cb0ef41Sopenharmony_ci  const ret = pipeline(function() {
7861cb0ef41Sopenharmony_ci    return ['hello', 'world'];
7871cb0ef41Sopenharmony_ci  }, s, async function*(source) { // eslint-disable-line require-yield
7881cb0ef41Sopenharmony_ci    for await (const chunk of source) { // eslint-disable-line no-unused-vars
7891cb0ef41Sopenharmony_ci      throw new Error('kaboom');
7901cb0ef41Sopenharmony_ci    }
7911cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
7921cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
7931cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, true);
7941cb0ef41Sopenharmony_ci  }));
7951cb0ef41Sopenharmony_ci  ret.resume();
7961cb0ef41Sopenharmony_ci  assert.strictEqual(typeof ret.pipe, 'function');
7971cb0ef41Sopenharmony_ci}
7981cb0ef41Sopenharmony_ci
7991cb0ef41Sopenharmony_ci{
8001cb0ef41Sopenharmony_ci  // Legacy streams without async iterator.
8011cb0ef41Sopenharmony_ci
8021cb0ef41Sopenharmony_ci  const s = new PassThrough();
8031cb0ef41Sopenharmony_ci  s.push('asd');
8041cb0ef41Sopenharmony_ci  s.push(null);
8051cb0ef41Sopenharmony_ci  s[Symbol.asyncIterator] = null;
8061cb0ef41Sopenharmony_ci  let ret = '';
8071cb0ef41Sopenharmony_ci  pipeline(s, async function(source) {
8081cb0ef41Sopenharmony_ci    for await (const chunk of source) {
8091cb0ef41Sopenharmony_ci      ret += chunk;
8101cb0ef41Sopenharmony_ci    }
8111cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
8121cb0ef41Sopenharmony_ci    assert.strictEqual(err, undefined);
8131cb0ef41Sopenharmony_ci    assert.strictEqual(ret, 'asd');
8141cb0ef41Sopenharmony_ci  }));
8151cb0ef41Sopenharmony_ci}
8161cb0ef41Sopenharmony_ci
8171cb0ef41Sopenharmony_ci{
8181cb0ef41Sopenharmony_ci  // v1 streams without read().
8191cb0ef41Sopenharmony_ci
8201cb0ef41Sopenharmony_ci  const s = new Stream();
8211cb0ef41Sopenharmony_ci  process.nextTick(() => {
8221cb0ef41Sopenharmony_ci    s.emit('data', 'asd');
8231cb0ef41Sopenharmony_ci    s.emit('end');
8241cb0ef41Sopenharmony_ci  });
8251cb0ef41Sopenharmony_ci  // 'destroyer' can be called multiple times,
8261cb0ef41Sopenharmony_ci  // once from stream wrapper and
8271cb0ef41Sopenharmony_ci  // once from iterator wrapper.
8281cb0ef41Sopenharmony_ci  s.close = common.mustCallAtLeast(1);
8291cb0ef41Sopenharmony_ci  let ret = '';
8301cb0ef41Sopenharmony_ci  pipeline(s, async function(source) {
8311cb0ef41Sopenharmony_ci    for await (const chunk of source) {
8321cb0ef41Sopenharmony_ci      ret += chunk;
8331cb0ef41Sopenharmony_ci    }
8341cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
8351cb0ef41Sopenharmony_ci    assert.strictEqual(err, undefined);
8361cb0ef41Sopenharmony_ci    assert.strictEqual(ret, 'asd');
8371cb0ef41Sopenharmony_ci  }));
8381cb0ef41Sopenharmony_ci}
8391cb0ef41Sopenharmony_ci
8401cb0ef41Sopenharmony_ci{
8411cb0ef41Sopenharmony_ci  // v1 error streams without read().
8421cb0ef41Sopenharmony_ci
8431cb0ef41Sopenharmony_ci  const s = new Stream();
8441cb0ef41Sopenharmony_ci  process.nextTick(() => {
8451cb0ef41Sopenharmony_ci    s.emit('error', new Error('kaboom'));
8461cb0ef41Sopenharmony_ci  });
8471cb0ef41Sopenharmony_ci  s.destroy = common.mustCall();
8481cb0ef41Sopenharmony_ci  pipeline(s, async function(source) {
8491cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
8501cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
8511cb0ef41Sopenharmony_ci  }));
8521cb0ef41Sopenharmony_ci}
8531cb0ef41Sopenharmony_ci
8541cb0ef41Sopenharmony_ci{
8551cb0ef41Sopenharmony_ci  const s = new PassThrough();
8561cb0ef41Sopenharmony_ci  assert.throws(() => {
8571cb0ef41Sopenharmony_ci    pipeline(function(source) {
8581cb0ef41Sopenharmony_ci    }, s, () => {});
8591cb0ef41Sopenharmony_ci  }, (err) => {
8601cb0ef41Sopenharmony_ci    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
8611cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, false);
8621cb0ef41Sopenharmony_ci    return true;
8631cb0ef41Sopenharmony_ci  });
8641cb0ef41Sopenharmony_ci}
8651cb0ef41Sopenharmony_ci
8661cb0ef41Sopenharmony_ci{
8671cb0ef41Sopenharmony_ci  const s = new PassThrough();
8681cb0ef41Sopenharmony_ci  assert.throws(() => {
8691cb0ef41Sopenharmony_ci    pipeline(s, function(source) {
8701cb0ef41Sopenharmony_ci    }, s, () => {});
8711cb0ef41Sopenharmony_ci  }, (err) => {
8721cb0ef41Sopenharmony_ci    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
8731cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, false);
8741cb0ef41Sopenharmony_ci    return true;
8751cb0ef41Sopenharmony_ci  });
8761cb0ef41Sopenharmony_ci}
8771cb0ef41Sopenharmony_ci
8781cb0ef41Sopenharmony_ci{
8791cb0ef41Sopenharmony_ci  const s = new PassThrough();
8801cb0ef41Sopenharmony_ci  assert.throws(() => {
8811cb0ef41Sopenharmony_ci    pipeline(s, function(source) {
8821cb0ef41Sopenharmony_ci    }, () => {});
8831cb0ef41Sopenharmony_ci  }, (err) => {
8841cb0ef41Sopenharmony_ci    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
8851cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, false);
8861cb0ef41Sopenharmony_ci    return true;
8871cb0ef41Sopenharmony_ci  });
8881cb0ef41Sopenharmony_ci}
8891cb0ef41Sopenharmony_ci
8901cb0ef41Sopenharmony_ci{
8911cb0ef41Sopenharmony_ci  const s = new PassThrough();
8921cb0ef41Sopenharmony_ci  assert.throws(() => {
8931cb0ef41Sopenharmony_ci    pipeline(s, function*(source) {
8941cb0ef41Sopenharmony_ci    }, () => {});
8951cb0ef41Sopenharmony_ci  }, (err) => {
8961cb0ef41Sopenharmony_ci    assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE');
8971cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, false);
8981cb0ef41Sopenharmony_ci    return true;
8991cb0ef41Sopenharmony_ci  });
9001cb0ef41Sopenharmony_ci}
9011cb0ef41Sopenharmony_ci
9021cb0ef41Sopenharmony_ci{
9031cb0ef41Sopenharmony_ci  let res = '';
9041cb0ef41Sopenharmony_ci  pipeline(async function*() {
9051cb0ef41Sopenharmony_ci    await Promise.resolve();
9061cb0ef41Sopenharmony_ci    yield 'hello';
9071cb0ef41Sopenharmony_ci    yield 'world';
9081cb0ef41Sopenharmony_ci  }, new Transform({
9091cb0ef41Sopenharmony_ci    transform(chunk, encoding, cb) {
9101cb0ef41Sopenharmony_ci      cb(new Error('kaboom'));
9111cb0ef41Sopenharmony_ci    }
9121cb0ef41Sopenharmony_ci  }), async function(source) {
9131cb0ef41Sopenharmony_ci    for await (const chunk of source) {
9141cb0ef41Sopenharmony_ci      res += chunk;
9151cb0ef41Sopenharmony_ci    }
9161cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
9171cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
9181cb0ef41Sopenharmony_ci    assert.strictEqual(res, '');
9191cb0ef41Sopenharmony_ci  }));
9201cb0ef41Sopenharmony_ci}
9211cb0ef41Sopenharmony_ci
9221cb0ef41Sopenharmony_ci{
9231cb0ef41Sopenharmony_ci  let res = '';
9241cb0ef41Sopenharmony_ci  pipeline(async function*() {
9251cb0ef41Sopenharmony_ci    await Promise.resolve();
9261cb0ef41Sopenharmony_ci    yield 'hello';
9271cb0ef41Sopenharmony_ci    yield 'world';
9281cb0ef41Sopenharmony_ci  }, new Transform({
9291cb0ef41Sopenharmony_ci    transform(chunk, encoding, cb) {
9301cb0ef41Sopenharmony_ci      process.nextTick(cb, new Error('kaboom'));
9311cb0ef41Sopenharmony_ci    }
9321cb0ef41Sopenharmony_ci  }), async function(source) {
9331cb0ef41Sopenharmony_ci    for await (const chunk of source) {
9341cb0ef41Sopenharmony_ci      res += chunk;
9351cb0ef41Sopenharmony_ci    }
9361cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
9371cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
9381cb0ef41Sopenharmony_ci    assert.strictEqual(res, '');
9391cb0ef41Sopenharmony_ci  }));
9401cb0ef41Sopenharmony_ci}
9411cb0ef41Sopenharmony_ci
9421cb0ef41Sopenharmony_ci{
9431cb0ef41Sopenharmony_ci  let res = '';
9441cb0ef41Sopenharmony_ci  pipeline(async function*() {
9451cb0ef41Sopenharmony_ci    await Promise.resolve();
9461cb0ef41Sopenharmony_ci    yield 'hello';
9471cb0ef41Sopenharmony_ci    yield 'world';
9481cb0ef41Sopenharmony_ci  }, new Transform({
9491cb0ef41Sopenharmony_ci    decodeStrings: false,
9501cb0ef41Sopenharmony_ci    transform(chunk, encoding, cb) {
9511cb0ef41Sopenharmony_ci      cb(null, chunk.toUpperCase());
9521cb0ef41Sopenharmony_ci    }
9531cb0ef41Sopenharmony_ci  }), async function(source) {
9541cb0ef41Sopenharmony_ci    for await (const chunk of source) {
9551cb0ef41Sopenharmony_ci      res += chunk;
9561cb0ef41Sopenharmony_ci    }
9571cb0ef41Sopenharmony_ci  }, common.mustSucceed(() => {
9581cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'HELLOWORLD');
9591cb0ef41Sopenharmony_ci  }));
9601cb0ef41Sopenharmony_ci}
9611cb0ef41Sopenharmony_ci
9621cb0ef41Sopenharmony_ci{
9631cb0ef41Sopenharmony_ci  // Ensure no unhandled rejection from async function.
9641cb0ef41Sopenharmony_ci
9651cb0ef41Sopenharmony_ci  pipeline(async function*() {
9661cb0ef41Sopenharmony_ci    yield 'hello';
9671cb0ef41Sopenharmony_ci  }, async function(source) {
9681cb0ef41Sopenharmony_ci    throw new Error('kaboom');
9691cb0ef41Sopenharmony_ci  }, common.mustCall((err) => {
9701cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
9711cb0ef41Sopenharmony_ci  }));
9721cb0ef41Sopenharmony_ci}
9731cb0ef41Sopenharmony_ci
9741cb0ef41Sopenharmony_ci{
9751cb0ef41Sopenharmony_ci  const src = new PassThrough({ autoDestroy: false });
9761cb0ef41Sopenharmony_ci  const dst = new PassThrough({ autoDestroy: false });
9771cb0ef41Sopenharmony_ci  pipeline(src, dst, common.mustCall(() => {
9781cb0ef41Sopenharmony_ci    assert.strictEqual(src.destroyed, false);
9791cb0ef41Sopenharmony_ci    assert.strictEqual(dst.destroyed, false);
9801cb0ef41Sopenharmony_ci  }));
9811cb0ef41Sopenharmony_ci  src.end();
9821cb0ef41Sopenharmony_ci}
9831cb0ef41Sopenharmony_ci
9841cb0ef41Sopenharmony_ci{
9851cb0ef41Sopenharmony_ci  // Make sure 'close' before 'end' finishes without error
9861cb0ef41Sopenharmony_ci  // if readable has received eof.
9871cb0ef41Sopenharmony_ci  // Ref: https://github.com/nodejs/node/issues/29699
9881cb0ef41Sopenharmony_ci  const r = new Readable();
9891cb0ef41Sopenharmony_ci  const w = new Writable({
9901cb0ef41Sopenharmony_ci    write(chunk, encoding, cb) {
9911cb0ef41Sopenharmony_ci      cb();
9921cb0ef41Sopenharmony_ci    }
9931cb0ef41Sopenharmony_ci  });
9941cb0ef41Sopenharmony_ci  pipeline(r, w, (err) => {
9951cb0ef41Sopenharmony_ci    assert.strictEqual(err, undefined);
9961cb0ef41Sopenharmony_ci  });
9971cb0ef41Sopenharmony_ci  r.push('asd');
9981cb0ef41Sopenharmony_ci  r.push(null);
9991cb0ef41Sopenharmony_ci  r.emit('close');
10001cb0ef41Sopenharmony_ci}
10011cb0ef41Sopenharmony_ci
10021cb0ef41Sopenharmony_ci{
10031cb0ef41Sopenharmony_ci  const server = http.createServer((req, res) => {
10041cb0ef41Sopenharmony_ci  });
10051cb0ef41Sopenharmony_ci
10061cb0ef41Sopenharmony_ci  server.listen(0, () => {
10071cb0ef41Sopenharmony_ci    const req = http.request({
10081cb0ef41Sopenharmony_ci      port: server.address().port
10091cb0ef41Sopenharmony_ci    });
10101cb0ef41Sopenharmony_ci
10111cb0ef41Sopenharmony_ci    const body = new PassThrough();
10121cb0ef41Sopenharmony_ci    pipeline(
10131cb0ef41Sopenharmony_ci      body,
10141cb0ef41Sopenharmony_ci      req,
10151cb0ef41Sopenharmony_ci      common.mustSucceed(() => {
10161cb0ef41Sopenharmony_ci        assert(!req.res);
10171cb0ef41Sopenharmony_ci        assert(!req.aborted);
10181cb0ef41Sopenharmony_ci        req.abort();
10191cb0ef41Sopenharmony_ci        server.close();
10201cb0ef41Sopenharmony_ci      })
10211cb0ef41Sopenharmony_ci    );
10221cb0ef41Sopenharmony_ci    body.end();
10231cb0ef41Sopenharmony_ci  });
10241cb0ef41Sopenharmony_ci}
10251cb0ef41Sopenharmony_ci
10261cb0ef41Sopenharmony_ci{
10271cb0ef41Sopenharmony_ci  const src = new PassThrough();
10281cb0ef41Sopenharmony_ci  const dst = new PassThrough();
10291cb0ef41Sopenharmony_ci  pipeline(src, dst, common.mustSucceed(() => {
10301cb0ef41Sopenharmony_ci    assert.strictEqual(dst.destroyed, false);
10311cb0ef41Sopenharmony_ci  }));
10321cb0ef41Sopenharmony_ci  src.end();
10331cb0ef41Sopenharmony_ci}
10341cb0ef41Sopenharmony_ci
10351cb0ef41Sopenharmony_ci{
10361cb0ef41Sopenharmony_ci  const src = new PassThrough();
10371cb0ef41Sopenharmony_ci  const dst = new PassThrough();
10381cb0ef41Sopenharmony_ci  dst.readable = false;
10391cb0ef41Sopenharmony_ci  pipeline(src, dst, common.mustSucceed(() => {
10401cb0ef41Sopenharmony_ci    assert.strictEqual(dst.destroyed, true);
10411cb0ef41Sopenharmony_ci  }));
10421cb0ef41Sopenharmony_ci  src.end();
10431cb0ef41Sopenharmony_ci}
10441cb0ef41Sopenharmony_ci
10451cb0ef41Sopenharmony_ci{
10461cb0ef41Sopenharmony_ci  let res = '';
10471cb0ef41Sopenharmony_ci  const rs = new Readable({
10481cb0ef41Sopenharmony_ci    read() {
10491cb0ef41Sopenharmony_ci      setImmediate(() => {
10501cb0ef41Sopenharmony_ci        rs.push('hello');
10511cb0ef41Sopenharmony_ci      });
10521cb0ef41Sopenharmony_ci    }
10531cb0ef41Sopenharmony_ci  });
10541cb0ef41Sopenharmony_ci  const ws = new Writable({
10551cb0ef41Sopenharmony_ci    write: common.mustNotCall()
10561cb0ef41Sopenharmony_ci  });
10571cb0ef41Sopenharmony_ci  pipeline(rs, async function*(stream) { // eslint-disable-line require-yield
10581cb0ef41Sopenharmony_ci    for await (const chunk of stream) { // eslint-disable-line no-unused-vars
10591cb0ef41Sopenharmony_ci      throw new Error('kaboom');
10601cb0ef41Sopenharmony_ci    }
10611cb0ef41Sopenharmony_ci  }, async function *(source) { // eslint-disable-line require-yield
10621cb0ef41Sopenharmony_ci    for await (const chunk of source) {
10631cb0ef41Sopenharmony_ci      res += chunk;
10641cb0ef41Sopenharmony_ci    }
10651cb0ef41Sopenharmony_ci  }, ws, common.mustCall((err) => {
10661cb0ef41Sopenharmony_ci    assert.strictEqual(err.message, 'kaboom');
10671cb0ef41Sopenharmony_ci    assert.strictEqual(res, '');
10681cb0ef41Sopenharmony_ci  }));
10691cb0ef41Sopenharmony_ci}
10701cb0ef41Sopenharmony_ci
10711cb0ef41Sopenharmony_ci{
10721cb0ef41Sopenharmony_ci  const server = http.createServer((req, res) => {
10731cb0ef41Sopenharmony_ci    req.socket.on('error', common.mustNotCall());
10741cb0ef41Sopenharmony_ci    pipeline(req, new PassThrough(), (err) => {
10751cb0ef41Sopenharmony_ci      assert.ifError(err);
10761cb0ef41Sopenharmony_ci      res.end();
10771cb0ef41Sopenharmony_ci      server.close();
10781cb0ef41Sopenharmony_ci    });
10791cb0ef41Sopenharmony_ci  });
10801cb0ef41Sopenharmony_ci
10811cb0ef41Sopenharmony_ci  server.listen(0, () => {
10821cb0ef41Sopenharmony_ci    const req = http.request({
10831cb0ef41Sopenharmony_ci      method: 'PUT',
10841cb0ef41Sopenharmony_ci      port: server.address().port
10851cb0ef41Sopenharmony_ci    });
10861cb0ef41Sopenharmony_ci    req.end('asd123');
10871cb0ef41Sopenharmony_ci    req.on('response', common.mustCall());
10881cb0ef41Sopenharmony_ci    req.on('error', common.mustNotCall());
10891cb0ef41Sopenharmony_ci  });
10901cb0ef41Sopenharmony_ci}
10911cb0ef41Sopenharmony_ci
10921cb0ef41Sopenharmony_ci{
10931cb0ef41Sopenharmony_ci  // Might still want to be able to use the writable side
10941cb0ef41Sopenharmony_ci  // of src. This is in the case where e.g. the Duplex input
10951cb0ef41Sopenharmony_ci  // is not directly connected to its output. Such a case could
10961cb0ef41Sopenharmony_ci  // happen when the Duplex is reading from a socket and then echos
10971cb0ef41Sopenharmony_ci  // the data back on the same socket.
10981cb0ef41Sopenharmony_ci  const src = new PassThrough();
10991cb0ef41Sopenharmony_ci  assert.strictEqual(src.writable, true);
11001cb0ef41Sopenharmony_ci  const dst = new PassThrough();
11011cb0ef41Sopenharmony_ci  pipeline(src, dst, common.mustCall((err) => {
11021cb0ef41Sopenharmony_ci    assert.strictEqual(src.writable, true);
11031cb0ef41Sopenharmony_ci    assert.strictEqual(src.destroyed, false);
11041cb0ef41Sopenharmony_ci  }));
11051cb0ef41Sopenharmony_ci  src.push(null);
11061cb0ef41Sopenharmony_ci}
11071cb0ef41Sopenharmony_ci
11081cb0ef41Sopenharmony_ci{
11091cb0ef41Sopenharmony_ci  const src = new PassThrough();
11101cb0ef41Sopenharmony_ci  const dst = pipeline(
11111cb0ef41Sopenharmony_ci    src,
11121cb0ef41Sopenharmony_ci    async function * (source) {
11131cb0ef41Sopenharmony_ci      for await (const chunk of source) {
11141cb0ef41Sopenharmony_ci        yield chunk;
11151cb0ef41Sopenharmony_ci      }
11161cb0ef41Sopenharmony_ci    },
11171cb0ef41Sopenharmony_ci    common.mustCall((err) => {
11181cb0ef41Sopenharmony_ci      assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
11191cb0ef41Sopenharmony_ci    })
11201cb0ef41Sopenharmony_ci  );
11211cb0ef41Sopenharmony_ci  src.push('asd');
11221cb0ef41Sopenharmony_ci  dst.destroy();
11231cb0ef41Sopenharmony_ci}
11241cb0ef41Sopenharmony_ci
11251cb0ef41Sopenharmony_ci{
11261cb0ef41Sopenharmony_ci  pipeline(async function * () {
11271cb0ef41Sopenharmony_ci    yield 'asd';
11281cb0ef41Sopenharmony_ci  }, async function * (source) {
11291cb0ef41Sopenharmony_ci    for await (const chunk of source) {
11301cb0ef41Sopenharmony_ci      yield { chunk };
11311cb0ef41Sopenharmony_ci    }
11321cb0ef41Sopenharmony_ci  }, common.mustSucceed());
11331cb0ef41Sopenharmony_ci}
11341cb0ef41Sopenharmony_ci
11351cb0ef41Sopenharmony_ci{
11361cb0ef41Sopenharmony_ci  let closed = false;
11371cb0ef41Sopenharmony_ci  const src = new Readable({
11381cb0ef41Sopenharmony_ci    read() {},
11391cb0ef41Sopenharmony_ci    destroy(err, cb) {
11401cb0ef41Sopenharmony_ci      process.nextTick(cb);
11411cb0ef41Sopenharmony_ci    }
11421cb0ef41Sopenharmony_ci  });
11431cb0ef41Sopenharmony_ci  const dst = new Writable({
11441cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
11451cb0ef41Sopenharmony_ci      callback();
11461cb0ef41Sopenharmony_ci    }
11471cb0ef41Sopenharmony_ci  });
11481cb0ef41Sopenharmony_ci  src.on('close', () => {
11491cb0ef41Sopenharmony_ci    closed = true;
11501cb0ef41Sopenharmony_ci  });
11511cb0ef41Sopenharmony_ci  src.push(null);
11521cb0ef41Sopenharmony_ci  pipeline(src, dst, common.mustCall((err) => {
11531cb0ef41Sopenharmony_ci    assert.strictEqual(closed, true);
11541cb0ef41Sopenharmony_ci  }));
11551cb0ef41Sopenharmony_ci}
11561cb0ef41Sopenharmony_ci
11571cb0ef41Sopenharmony_ci{
11581cb0ef41Sopenharmony_ci  let closed = false;
11591cb0ef41Sopenharmony_ci  const src = new Readable({
11601cb0ef41Sopenharmony_ci    read() {},
11611cb0ef41Sopenharmony_ci    destroy(err, cb) {
11621cb0ef41Sopenharmony_ci      process.nextTick(cb);
11631cb0ef41Sopenharmony_ci    }
11641cb0ef41Sopenharmony_ci  });
11651cb0ef41Sopenharmony_ci  const dst = new Duplex({});
11661cb0ef41Sopenharmony_ci  src.on('close', common.mustCall(() => {
11671cb0ef41Sopenharmony_ci    closed = true;
11681cb0ef41Sopenharmony_ci  }));
11691cb0ef41Sopenharmony_ci  src.push(null);
11701cb0ef41Sopenharmony_ci  pipeline(src, dst, common.mustCall((err) => {
11711cb0ef41Sopenharmony_ci    assert.strictEqual(closed, true);
11721cb0ef41Sopenharmony_ci  }));
11731cb0ef41Sopenharmony_ci}
11741cb0ef41Sopenharmony_ci
11751cb0ef41Sopenharmony_ci{
11761cb0ef41Sopenharmony_ci  const server = net.createServer(common.mustCall((socket) => {
11771cb0ef41Sopenharmony_ci    // echo server
11781cb0ef41Sopenharmony_ci    pipeline(socket, socket, common.mustSucceed());
11791cb0ef41Sopenharmony_ci    // 13 force destroys the socket before it has a chance to emit finish
11801cb0ef41Sopenharmony_ci    socket.on('finish', common.mustCall(() => {
11811cb0ef41Sopenharmony_ci      server.close();
11821cb0ef41Sopenharmony_ci    }));
11831cb0ef41Sopenharmony_ci  })).listen(0, common.mustCall(() => {
11841cb0ef41Sopenharmony_ci    const socket = net.connect(server.address().port);
11851cb0ef41Sopenharmony_ci    socket.end();
11861cb0ef41Sopenharmony_ci  }));
11871cb0ef41Sopenharmony_ci}
11881cb0ef41Sopenharmony_ci
11891cb0ef41Sopenharmony_ci{
11901cb0ef41Sopenharmony_ci  const d = new Duplex({
11911cb0ef41Sopenharmony_ci    autoDestroy: false,
11921cb0ef41Sopenharmony_ci    write: common.mustCall((data, enc, cb) => {
11931cb0ef41Sopenharmony_ci      d.push(data);
11941cb0ef41Sopenharmony_ci      cb();
11951cb0ef41Sopenharmony_ci    }),
11961cb0ef41Sopenharmony_ci    read: common.mustCall(() => {
11971cb0ef41Sopenharmony_ci      d.push(null);
11981cb0ef41Sopenharmony_ci    }),
11991cb0ef41Sopenharmony_ci    final: common.mustCall((cb) => {
12001cb0ef41Sopenharmony_ci      setTimeout(() => {
12011cb0ef41Sopenharmony_ci        assert.strictEqual(d.destroyed, false);
12021cb0ef41Sopenharmony_ci        cb();
12031cb0ef41Sopenharmony_ci      }, 1000);
12041cb0ef41Sopenharmony_ci    }),
12051cb0ef41Sopenharmony_ci    destroy: common.mustNotCall()
12061cb0ef41Sopenharmony_ci  });
12071cb0ef41Sopenharmony_ci
12081cb0ef41Sopenharmony_ci  const sink = new Writable({
12091cb0ef41Sopenharmony_ci    write: common.mustCall((data, enc, cb) => {
12101cb0ef41Sopenharmony_ci      cb();
12111cb0ef41Sopenharmony_ci    })
12121cb0ef41Sopenharmony_ci  });
12131cb0ef41Sopenharmony_ci
12141cb0ef41Sopenharmony_ci  pipeline(d, sink, common.mustSucceed());
12151cb0ef41Sopenharmony_ci
12161cb0ef41Sopenharmony_ci  d.write('test');
12171cb0ef41Sopenharmony_ci  d.end();
12181cb0ef41Sopenharmony_ci}
12191cb0ef41Sopenharmony_ci
12201cb0ef41Sopenharmony_ci{
12211cb0ef41Sopenharmony_ci  const server = net.createServer(common.mustCall((socket) => {
12221cb0ef41Sopenharmony_ci    // echo server
12231cb0ef41Sopenharmony_ci    pipeline(socket, socket, common.mustSucceed());
12241cb0ef41Sopenharmony_ci    socket.on('finish', common.mustCall(() => {
12251cb0ef41Sopenharmony_ci      server.close();
12261cb0ef41Sopenharmony_ci    }));
12271cb0ef41Sopenharmony_ci  })).listen(0, common.mustCall(() => {
12281cb0ef41Sopenharmony_ci    const socket = net.connect(server.address().port);
12291cb0ef41Sopenharmony_ci    socket.end();
12301cb0ef41Sopenharmony_ci  }));
12311cb0ef41Sopenharmony_ci}
12321cb0ef41Sopenharmony_ci
12331cb0ef41Sopenharmony_ci{
12341cb0ef41Sopenharmony_ci  const d = new Duplex({
12351cb0ef41Sopenharmony_ci    autoDestroy: false,
12361cb0ef41Sopenharmony_ci    write: common.mustCall((data, enc, cb) => {
12371cb0ef41Sopenharmony_ci      d.push(data);
12381cb0ef41Sopenharmony_ci      cb();
12391cb0ef41Sopenharmony_ci    }),
12401cb0ef41Sopenharmony_ci    read: common.mustCall(() => {
12411cb0ef41Sopenharmony_ci      d.push(null);
12421cb0ef41Sopenharmony_ci    }),
12431cb0ef41Sopenharmony_ci    final: common.mustCall((cb) => {
12441cb0ef41Sopenharmony_ci      setTimeout(() => {
12451cb0ef41Sopenharmony_ci        assert.strictEqual(d.destroyed, false);
12461cb0ef41Sopenharmony_ci        cb();
12471cb0ef41Sopenharmony_ci      }, 1000);
12481cb0ef41Sopenharmony_ci    }),
12491cb0ef41Sopenharmony_ci    // `destroy()` won't be invoked by pipeline since
12501cb0ef41Sopenharmony_ci    // the writable side has not completed when
12511cb0ef41Sopenharmony_ci    // the pipeline has completed.
12521cb0ef41Sopenharmony_ci    destroy: common.mustNotCall()
12531cb0ef41Sopenharmony_ci  });
12541cb0ef41Sopenharmony_ci
12551cb0ef41Sopenharmony_ci  const sink = new Writable({
12561cb0ef41Sopenharmony_ci    write: common.mustCall((data, enc, cb) => {
12571cb0ef41Sopenharmony_ci      cb();
12581cb0ef41Sopenharmony_ci    })
12591cb0ef41Sopenharmony_ci  });
12601cb0ef41Sopenharmony_ci
12611cb0ef41Sopenharmony_ci  pipeline(d, sink, common.mustSucceed());
12621cb0ef41Sopenharmony_ci
12631cb0ef41Sopenharmony_ci  d.write('test');
12641cb0ef41Sopenharmony_ci  d.end();
12651cb0ef41Sopenharmony_ci}
12661cb0ef41Sopenharmony_ci
12671cb0ef41Sopenharmony_ci{
12681cb0ef41Sopenharmony_ci  const r = new Readable({
12691cb0ef41Sopenharmony_ci    read() {}
12701cb0ef41Sopenharmony_ci  });
12711cb0ef41Sopenharmony_ci  r.push('hello');
12721cb0ef41Sopenharmony_ci  r.push('world');
12731cb0ef41Sopenharmony_ci  r.push(null);
12741cb0ef41Sopenharmony_ci  let res = '';
12751cb0ef41Sopenharmony_ci  const w = new Writable({
12761cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
12771cb0ef41Sopenharmony_ci      res += chunk;
12781cb0ef41Sopenharmony_ci      callback();
12791cb0ef41Sopenharmony_ci    }
12801cb0ef41Sopenharmony_ci  });
12811cb0ef41Sopenharmony_ci  pipeline([r, w], common.mustSucceed(() => {
12821cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'helloworld');
12831cb0ef41Sopenharmony_ci  }));
12841cb0ef41Sopenharmony_ci}
12851cb0ef41Sopenharmony_ci
12861cb0ef41Sopenharmony_ci{
12871cb0ef41Sopenharmony_ci  let flushed = false;
12881cb0ef41Sopenharmony_ci  const makeStream = () =>
12891cb0ef41Sopenharmony_ci    new Transform({
12901cb0ef41Sopenharmony_ci      transform: (chunk, enc, cb) => cb(null, chunk),
12911cb0ef41Sopenharmony_ci      flush: (cb) =>
12921cb0ef41Sopenharmony_ci        setTimeout(() => {
12931cb0ef41Sopenharmony_ci          flushed = true;
12941cb0ef41Sopenharmony_ci          cb(null);
12951cb0ef41Sopenharmony_ci        }, 1),
12961cb0ef41Sopenharmony_ci    });
12971cb0ef41Sopenharmony_ci
12981cb0ef41Sopenharmony_ci  const input = new Readable();
12991cb0ef41Sopenharmony_ci  input.push(null);
13001cb0ef41Sopenharmony_ci
13011cb0ef41Sopenharmony_ci  pipeline(
13021cb0ef41Sopenharmony_ci    input,
13031cb0ef41Sopenharmony_ci    makeStream(),
13041cb0ef41Sopenharmony_ci    common.mustCall(() => {
13051cb0ef41Sopenharmony_ci      assert.strictEqual(flushed, true);
13061cb0ef41Sopenharmony_ci    }),
13071cb0ef41Sopenharmony_ci  );
13081cb0ef41Sopenharmony_ci}
13091cb0ef41Sopenharmony_ci{
13101cb0ef41Sopenharmony_ci  function createThenable() {
13111cb0ef41Sopenharmony_ci    let counter = 0;
13121cb0ef41Sopenharmony_ci    return {
13131cb0ef41Sopenharmony_ci      get then() {
13141cb0ef41Sopenharmony_ci        if (counter++) {
13151cb0ef41Sopenharmony_ci          throw new Error('Cannot access `then` more than once');
13161cb0ef41Sopenharmony_ci        }
13171cb0ef41Sopenharmony_ci        return Function.prototype;
13181cb0ef41Sopenharmony_ci      },
13191cb0ef41Sopenharmony_ci    };
13201cb0ef41Sopenharmony_ci  }
13211cb0ef41Sopenharmony_ci
13221cb0ef41Sopenharmony_ci  pipeline(
13231cb0ef41Sopenharmony_ci    function* () {
13241cb0ef41Sopenharmony_ci      yield 0;
13251cb0ef41Sopenharmony_ci    },
13261cb0ef41Sopenharmony_ci    createThenable,
13271cb0ef41Sopenharmony_ci    () => common.mustNotCall(),
13281cb0ef41Sopenharmony_ci  );
13291cb0ef41Sopenharmony_ci}
13301cb0ef41Sopenharmony_ci
13311cb0ef41Sopenharmony_ci
13321cb0ef41Sopenharmony_ci{
13331cb0ef41Sopenharmony_ci  const ac = new AbortController();
13341cb0ef41Sopenharmony_ci  const r = Readable.from(async function* () {
13351cb0ef41Sopenharmony_ci    for (let i = 0; i < 10; i++) {
13361cb0ef41Sopenharmony_ci      await Promise.resolve();
13371cb0ef41Sopenharmony_ci      yield String(i);
13381cb0ef41Sopenharmony_ci      if (i === 5) {
13391cb0ef41Sopenharmony_ci        ac.abort();
13401cb0ef41Sopenharmony_ci      }
13411cb0ef41Sopenharmony_ci    }
13421cb0ef41Sopenharmony_ci  }());
13431cb0ef41Sopenharmony_ci  let res = '';
13441cb0ef41Sopenharmony_ci  const w = new Writable({
13451cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
13461cb0ef41Sopenharmony_ci      res += chunk;
13471cb0ef41Sopenharmony_ci      callback();
13481cb0ef41Sopenharmony_ci    }
13491cb0ef41Sopenharmony_ci  });
13501cb0ef41Sopenharmony_ci  const cb = common.mustCall((err) => {
13511cb0ef41Sopenharmony_ci    assert.strictEqual(err.name, 'AbortError');
13521cb0ef41Sopenharmony_ci    assert.strictEqual(res, '012345');
13531cb0ef41Sopenharmony_ci    assert.strictEqual(w.destroyed, true);
13541cb0ef41Sopenharmony_ci    assert.strictEqual(r.destroyed, true);
13551cb0ef41Sopenharmony_ci    assert.strictEqual(pipelined.destroyed, true);
13561cb0ef41Sopenharmony_ci  });
13571cb0ef41Sopenharmony_ci  const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb));
13581cb0ef41Sopenharmony_ci}
13591cb0ef41Sopenharmony_ci
13601cb0ef41Sopenharmony_ci{
13611cb0ef41Sopenharmony_ci  pipeline([1, 2, 3], PassThrough({ objectMode: true }),
13621cb0ef41Sopenharmony_ci           common.mustSucceed(() => {}));
13631cb0ef41Sopenharmony_ci
13641cb0ef41Sopenharmony_ci  let res = '';
13651cb0ef41Sopenharmony_ci  const w = new Writable({
13661cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
13671cb0ef41Sopenharmony_ci      res += chunk;
13681cb0ef41Sopenharmony_ci      callback();
13691cb0ef41Sopenharmony_ci    },
13701cb0ef41Sopenharmony_ci  });
13711cb0ef41Sopenharmony_ci  pipeline(['1', '2', '3'], w, common.mustSucceed(() => {
13721cb0ef41Sopenharmony_ci    assert.strictEqual(res, '123');
13731cb0ef41Sopenharmony_ci  }));
13741cb0ef41Sopenharmony_ci}
13751cb0ef41Sopenharmony_ci
13761cb0ef41Sopenharmony_ci{
13771cb0ef41Sopenharmony_ci  const content = 'abc';
13781cb0ef41Sopenharmony_ci  pipeline(Buffer.from(content), PassThrough({ objectMode: true }),
13791cb0ef41Sopenharmony_ci           common.mustSucceed(() => {}));
13801cb0ef41Sopenharmony_ci
13811cb0ef41Sopenharmony_ci  let res = '';
13821cb0ef41Sopenharmony_ci  pipeline(Buffer.from(content), async function*(previous) {
13831cb0ef41Sopenharmony_ci    for await (const val of previous) {
13841cb0ef41Sopenharmony_ci      res += String.fromCharCode(val);
13851cb0ef41Sopenharmony_ci      yield val;
13861cb0ef41Sopenharmony_ci    }
13871cb0ef41Sopenharmony_ci  }, common.mustSucceed(() => {
13881cb0ef41Sopenharmony_ci    assert.strictEqual(res, content);
13891cb0ef41Sopenharmony_ci  }));
13901cb0ef41Sopenharmony_ci}
13911cb0ef41Sopenharmony_ci
13921cb0ef41Sopenharmony_ci{
13931cb0ef41Sopenharmony_ci  const ac = new AbortController();
13941cb0ef41Sopenharmony_ci  const signal = ac.signal;
13951cb0ef41Sopenharmony_ci  pipelinep(
13961cb0ef41Sopenharmony_ci    async function * ({ signal }) { // eslint-disable-line require-yield
13971cb0ef41Sopenharmony_ci      await tsp.setTimeout(1e6, signal);
13981cb0ef41Sopenharmony_ci    },
13991cb0ef41Sopenharmony_ci    async function(source) {
14001cb0ef41Sopenharmony_ci
14011cb0ef41Sopenharmony_ci    },
14021cb0ef41Sopenharmony_ci    { signal }
14031cb0ef41Sopenharmony_ci  ).catch(common.mustCall((err) => {
14041cb0ef41Sopenharmony_ci    assert.strictEqual(err.name, 'AbortError');
14051cb0ef41Sopenharmony_ci  }));
14061cb0ef41Sopenharmony_ci  ac.abort();
14071cb0ef41Sopenharmony_ci}
14081cb0ef41Sopenharmony_ci
14091cb0ef41Sopenharmony_ci{
14101cb0ef41Sopenharmony_ci  async function run() {
14111cb0ef41Sopenharmony_ci    let finished = false;
14121cb0ef41Sopenharmony_ci    let text = '';
14131cb0ef41Sopenharmony_ci    const write = new Writable({
14141cb0ef41Sopenharmony_ci      write(data, enc, cb) {
14151cb0ef41Sopenharmony_ci        text += data;
14161cb0ef41Sopenharmony_ci        cb();
14171cb0ef41Sopenharmony_ci      }
14181cb0ef41Sopenharmony_ci    });
14191cb0ef41Sopenharmony_ci    write.on('finish', () => {
14201cb0ef41Sopenharmony_ci      finished = true;
14211cb0ef41Sopenharmony_ci    });
14221cb0ef41Sopenharmony_ci
14231cb0ef41Sopenharmony_ci    await pipelinep([Readable.from('Hello World!'), write]);
14241cb0ef41Sopenharmony_ci    assert(finished);
14251cb0ef41Sopenharmony_ci    assert.strictEqual(text, 'Hello World!');
14261cb0ef41Sopenharmony_ci  }
14271cb0ef41Sopenharmony_ci
14281cb0ef41Sopenharmony_ci  run();
14291cb0ef41Sopenharmony_ci}
14301cb0ef41Sopenharmony_ci
14311cb0ef41Sopenharmony_ci{
14321cb0ef41Sopenharmony_ci  let finished = false;
14331cb0ef41Sopenharmony_ci  let text = '';
14341cb0ef41Sopenharmony_ci  const write = new Writable({
14351cb0ef41Sopenharmony_ci    write(data, enc, cb) {
14361cb0ef41Sopenharmony_ci      text += data;
14371cb0ef41Sopenharmony_ci      cb();
14381cb0ef41Sopenharmony_ci    }
14391cb0ef41Sopenharmony_ci  });
14401cb0ef41Sopenharmony_ci  write.on('finish', () => {
14411cb0ef41Sopenharmony_ci    finished = true;
14421cb0ef41Sopenharmony_ci  });
14431cb0ef41Sopenharmony_ci
14441cb0ef41Sopenharmony_ci  pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => {
14451cb0ef41Sopenharmony_ci    assert(finished);
14461cb0ef41Sopenharmony_ci    assert.strictEqual(text, 'Hello World!');
14471cb0ef41Sopenharmony_ci  }));
14481cb0ef41Sopenharmony_ci}
14491cb0ef41Sopenharmony_ci
14501cb0ef41Sopenharmony_ci{
14511cb0ef41Sopenharmony_ci  const pipelinePromise = promisify(pipeline);
14521cb0ef41Sopenharmony_ci
14531cb0ef41Sopenharmony_ci  async function run() {
14541cb0ef41Sopenharmony_ci    const read = new Readable({
14551cb0ef41Sopenharmony_ci      read() {}
14561cb0ef41Sopenharmony_ci    });
14571cb0ef41Sopenharmony_ci
14581cb0ef41Sopenharmony_ci    const duplex = new PassThrough();
14591cb0ef41Sopenharmony_ci
14601cb0ef41Sopenharmony_ci    read.push(null);
14611cb0ef41Sopenharmony_ci
14621cb0ef41Sopenharmony_ci    await pipelinePromise(read, duplex);
14631cb0ef41Sopenharmony_ci
14641cb0ef41Sopenharmony_ci    assert.strictEqual(duplex.destroyed, false);
14651cb0ef41Sopenharmony_ci  }
14661cb0ef41Sopenharmony_ci
14671cb0ef41Sopenharmony_ci  run().then(common.mustCall());
14681cb0ef41Sopenharmony_ci}
14691cb0ef41Sopenharmony_ci
14701cb0ef41Sopenharmony_ci{
14711cb0ef41Sopenharmony_ci  const pipelinePromise = promisify(pipeline);
14721cb0ef41Sopenharmony_ci
14731cb0ef41Sopenharmony_ci  async function run() {
14741cb0ef41Sopenharmony_ci    const read = new Readable({
14751cb0ef41Sopenharmony_ci      read() {}
14761cb0ef41Sopenharmony_ci    });
14771cb0ef41Sopenharmony_ci
14781cb0ef41Sopenharmony_ci    const duplex = new PassThrough();
14791cb0ef41Sopenharmony_ci
14801cb0ef41Sopenharmony_ci    read.push(null);
14811cb0ef41Sopenharmony_ci
14821cb0ef41Sopenharmony_ci    await pipelinePromise(read, duplex, { end: false });
14831cb0ef41Sopenharmony_ci
14841cb0ef41Sopenharmony_ci    assert.strictEqual(duplex.destroyed, false);
14851cb0ef41Sopenharmony_ci    assert.strictEqual(duplex.writableEnded, false);
14861cb0ef41Sopenharmony_ci  }
14871cb0ef41Sopenharmony_ci
14881cb0ef41Sopenharmony_ci  run().then(common.mustCall());
14891cb0ef41Sopenharmony_ci}
14901cb0ef41Sopenharmony_ci
14911cb0ef41Sopenharmony_ci{
14921cb0ef41Sopenharmony_ci  const s = new PassThrough({ objectMode: true });
14931cb0ef41Sopenharmony_ci  pipeline(async function*() {
14941cb0ef41Sopenharmony_ci    await Promise.resolve();
14951cb0ef41Sopenharmony_ci    yield 'hello';
14961cb0ef41Sopenharmony_ci    yield 'world';
14971cb0ef41Sopenharmony_ci    yield 'world';
14981cb0ef41Sopenharmony_ci  }, s, async function(source) {
14991cb0ef41Sopenharmony_ci    let ret = '';
15001cb0ef41Sopenharmony_ci    let n = 0;
15011cb0ef41Sopenharmony_ci    for await (const chunk of source) {
15021cb0ef41Sopenharmony_ci      if (n++ > 1) {
15031cb0ef41Sopenharmony_ci        break;
15041cb0ef41Sopenharmony_ci      }
15051cb0ef41Sopenharmony_ci      ret += chunk;
15061cb0ef41Sopenharmony_ci    }
15071cb0ef41Sopenharmony_ci    return ret;
15081cb0ef41Sopenharmony_ci  }, common.mustCall((err, val) => {
15091cb0ef41Sopenharmony_ci    assert.strictEqual(err, undefined);
15101cb0ef41Sopenharmony_ci    assert.strictEqual(val, 'helloworld');
15111cb0ef41Sopenharmony_ci    assert.strictEqual(s.destroyed, true);
15121cb0ef41Sopenharmony_ci  }));
15131cb0ef41Sopenharmony_ci}
15141cb0ef41Sopenharmony_ci
15151cb0ef41Sopenharmony_ci{
15161cb0ef41Sopenharmony_ci  const s = new PassThrough({ objectMode: true });
15171cb0ef41Sopenharmony_ci  pipeline(async function*() {
15181cb0ef41Sopenharmony_ci    await Promise.resolve();
15191cb0ef41Sopenharmony_ci    yield 'hello';
15201cb0ef41Sopenharmony_ci    yield 'world';
15211cb0ef41Sopenharmony_ci    yield 'world';
15221cb0ef41Sopenharmony_ci  }, s, async function(source) {
15231cb0ef41Sopenharmony_ci    return null;
15241cb0ef41Sopenharmony_ci  }, common.mustCall((err, val) => {
15251cb0ef41Sopenharmony_ci    assert.strictEqual(err, undefined);
15261cb0ef41Sopenharmony_ci    assert.strictEqual(val, null);
15271cb0ef41Sopenharmony_ci  }));
15281cb0ef41Sopenharmony_ci}
15291cb0ef41Sopenharmony_ci
15301cb0ef41Sopenharmony_ci{
15311cb0ef41Sopenharmony_ci  // Mimics a legacy stream without the .destroy method
15321cb0ef41Sopenharmony_ci  class LegacyWritable extends Stream {
15331cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
15341cb0ef41Sopenharmony_ci      callback();
15351cb0ef41Sopenharmony_ci    }
15361cb0ef41Sopenharmony_ci  }
15371cb0ef41Sopenharmony_ci
15381cb0ef41Sopenharmony_ci  const writable = new LegacyWritable();
15391cb0ef41Sopenharmony_ci  writable.on('error', common.mustCall((err) => {
15401cb0ef41Sopenharmony_ci    assert.deepStrictEqual(err, new Error('stop'));
15411cb0ef41Sopenharmony_ci  }));
15421cb0ef41Sopenharmony_ci
15431cb0ef41Sopenharmony_ci  pipeline(
15441cb0ef41Sopenharmony_ci    Readable.from({
15451cb0ef41Sopenharmony_ci      [Symbol.asyncIterator]() {
15461cb0ef41Sopenharmony_ci        return {
15471cb0ef41Sopenharmony_ci          next() {
15481cb0ef41Sopenharmony_ci            return Promise.reject(new Error('stop'));
15491cb0ef41Sopenharmony_ci          }
15501cb0ef41Sopenharmony_ci        };
15511cb0ef41Sopenharmony_ci      }
15521cb0ef41Sopenharmony_ci    }),
15531cb0ef41Sopenharmony_ci    writable,
15541cb0ef41Sopenharmony_ci    common.mustCall((err) => {
15551cb0ef41Sopenharmony_ci      assert.deepStrictEqual(err, new Error('stop'));
15561cb0ef41Sopenharmony_ci    })
15571cb0ef41Sopenharmony_ci  );
15581cb0ef41Sopenharmony_ci}
15591cb0ef41Sopenharmony_ci
15601cb0ef41Sopenharmony_ci{
15611cb0ef41Sopenharmony_ci  class CustomReadable extends Readable {
15621cb0ef41Sopenharmony_ci    _read() {
15631cb0ef41Sopenharmony_ci      this.push('asd');
15641cb0ef41Sopenharmony_ci      this.push(null);
15651cb0ef41Sopenharmony_ci    }
15661cb0ef41Sopenharmony_ci  }
15671cb0ef41Sopenharmony_ci
15681cb0ef41Sopenharmony_ci  class CustomWritable extends Writable {
15691cb0ef41Sopenharmony_ci    constructor() {
15701cb0ef41Sopenharmony_ci      super();
15711cb0ef41Sopenharmony_ci      this.endCount = 0;
15721cb0ef41Sopenharmony_ci      this.str = '';
15731cb0ef41Sopenharmony_ci    }
15741cb0ef41Sopenharmony_ci
15751cb0ef41Sopenharmony_ci    _write(chunk, enc, cb) {
15761cb0ef41Sopenharmony_ci      this.str += chunk;
15771cb0ef41Sopenharmony_ci      cb();
15781cb0ef41Sopenharmony_ci    }
15791cb0ef41Sopenharmony_ci
15801cb0ef41Sopenharmony_ci    end() {
15811cb0ef41Sopenharmony_ci      this.endCount += 1;
15821cb0ef41Sopenharmony_ci      super.end();
15831cb0ef41Sopenharmony_ci    }
15841cb0ef41Sopenharmony_ci  }
15851cb0ef41Sopenharmony_ci
15861cb0ef41Sopenharmony_ci  const readable = new CustomReadable();
15871cb0ef41Sopenharmony_ci  const writable = new CustomWritable();
15881cb0ef41Sopenharmony_ci
15891cb0ef41Sopenharmony_ci  pipeline(readable, writable, common.mustSucceed(() => {
15901cb0ef41Sopenharmony_ci    assert.strictEqual(writable.str, 'asd');
15911cb0ef41Sopenharmony_ci    assert.strictEqual(writable.endCount, 1);
15921cb0ef41Sopenharmony_ci  }));
15931cb0ef41Sopenharmony_ci}
15941cb0ef41Sopenharmony_ci
15951cb0ef41Sopenharmony_ci{
15961cb0ef41Sopenharmony_ci  const readable = new Readable({
15971cb0ef41Sopenharmony_ci    read() {}
15981cb0ef41Sopenharmony_ci  });
15991cb0ef41Sopenharmony_ci  readable.on('end', common.mustCall(() => {
16001cb0ef41Sopenharmony_ci    pipeline(readable, new PassThrough(), common.mustSucceed());
16011cb0ef41Sopenharmony_ci  }));
16021cb0ef41Sopenharmony_ci  readable.push(null);
16031cb0ef41Sopenharmony_ci  readable.read();
16041cb0ef41Sopenharmony_ci}
16051cb0ef41Sopenharmony_ci
16061cb0ef41Sopenharmony_ci{
16071cb0ef41Sopenharmony_ci  const dup = new Duplex({
16081cb0ef41Sopenharmony_ci    read() {},
16091cb0ef41Sopenharmony_ci    write(chunk, enc, cb) {
16101cb0ef41Sopenharmony_ci      cb();
16111cb0ef41Sopenharmony_ci    }
16121cb0ef41Sopenharmony_ci  });
16131cb0ef41Sopenharmony_ci  dup.on('end', common.mustCall(() => {
16141cb0ef41Sopenharmony_ci    pipeline(dup, new PassThrough(), common.mustSucceed());
16151cb0ef41Sopenharmony_ci  }));
16161cb0ef41Sopenharmony_ci  dup.push(null);
16171cb0ef41Sopenharmony_ci  dup.read();
16181cb0ef41Sopenharmony_ci}
16191cb0ef41Sopenharmony_ci
16201cb0ef41Sopenharmony_ci{
16211cb0ef41Sopenharmony_ci  let res = '';
16221cb0ef41Sopenharmony_ci  const writable = new Writable({
16231cb0ef41Sopenharmony_ci    write(chunk, enc, cb) {
16241cb0ef41Sopenharmony_ci      res += chunk;
16251cb0ef41Sopenharmony_ci      cb();
16261cb0ef41Sopenharmony_ci    }
16271cb0ef41Sopenharmony_ci  });
16281cb0ef41Sopenharmony_ci  pipelinep(async function*() {
16291cb0ef41Sopenharmony_ci    yield 'hello';
16301cb0ef41Sopenharmony_ci    await Promise.resolve();
16311cb0ef41Sopenharmony_ci    yield 'world';
16321cb0ef41Sopenharmony_ci  }, writable, { end: false }).then(common.mustCall(() => {
16331cb0ef41Sopenharmony_ci    assert.strictEqual(res, 'helloworld');
16341cb0ef41Sopenharmony_ci    assert.strictEqual(writable.closed, false);
16351cb0ef41Sopenharmony_ci  }));
16361cb0ef41Sopenharmony_ci}
16371cb0ef41Sopenharmony_ci
16381cb0ef41Sopenharmony_ci{
16391cb0ef41Sopenharmony_ci  const r = new Readable();
16401cb0ef41Sopenharmony_ci  for (let i = 0; i < 4000; i++) {
16411cb0ef41Sopenharmony_ci    r.push('asdfdagljanfgkaljdfn');
16421cb0ef41Sopenharmony_ci  }
16431cb0ef41Sopenharmony_ci  r.push(null);
16441cb0ef41Sopenharmony_ci
16451cb0ef41Sopenharmony_ci  let ended = false;
16461cb0ef41Sopenharmony_ci  r.on('end', () => {
16471cb0ef41Sopenharmony_ci    ended = true;
16481cb0ef41Sopenharmony_ci  });
16491cb0ef41Sopenharmony_ci
16501cb0ef41Sopenharmony_ci  const w = new Writable({
16511cb0ef41Sopenharmony_ci    write(chunk, enc, cb) {
16521cb0ef41Sopenharmony_ci      cb(null);
16531cb0ef41Sopenharmony_ci    },
16541cb0ef41Sopenharmony_ci    final: common.mustCall((cb) => {
16551cb0ef41Sopenharmony_ci      assert.strictEqual(ended, true);
16561cb0ef41Sopenharmony_ci      cb(null);
16571cb0ef41Sopenharmony_ci    })
16581cb0ef41Sopenharmony_ci  });
16591cb0ef41Sopenharmony_ci
16601cb0ef41Sopenharmony_ci  pipeline(r, w, common.mustCall((err) => {
16611cb0ef41Sopenharmony_ci    assert.strictEqual(err, undefined);
16621cb0ef41Sopenharmony_ci  }));
16631cb0ef41Sopenharmony_ci
16641cb0ef41Sopenharmony_ci}
1665