11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst common = require('../common'); 41cb0ef41Sopenharmony_ciconst { 51cb0ef41Sopenharmony_ci Stream, 61cb0ef41Sopenharmony_ci Writable, 71cb0ef41Sopenharmony_ci Readable, 81cb0ef41Sopenharmony_ci Transform, 91cb0ef41Sopenharmony_ci pipeline, 101cb0ef41Sopenharmony_ci PassThrough, 111cb0ef41Sopenharmony_ci Duplex, 121cb0ef41Sopenharmony_ci addAbortSignal, 131cb0ef41Sopenharmony_ci} = require('stream'); 141cb0ef41Sopenharmony_ciconst pipelinep = require('stream/promises').pipeline; 151cb0ef41Sopenharmony_ciconst assert = require('assert'); 161cb0ef41Sopenharmony_ciconst http = require('http'); 171cb0ef41Sopenharmony_ciconst { promisify } = require('util'); 181cb0ef41Sopenharmony_ciconst net = require('net'); 191cb0ef41Sopenharmony_ciconst tsp = require('timers/promises'); 201cb0ef41Sopenharmony_ci 211cb0ef41Sopenharmony_ci{ 221cb0ef41Sopenharmony_ci let finished = false; 231cb0ef41Sopenharmony_ci const processed = []; 241cb0ef41Sopenharmony_ci const expected = [ 251cb0ef41Sopenharmony_ci Buffer.from('a'), 261cb0ef41Sopenharmony_ci Buffer.from('b'), 271cb0ef41Sopenharmony_ci Buffer.from('c'), 281cb0ef41Sopenharmony_ci ]; 291cb0ef41Sopenharmony_ci 301cb0ef41Sopenharmony_ci const read = new Readable({ 311cb0ef41Sopenharmony_ci read() {} 321cb0ef41Sopenharmony_ci }); 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_ci const write = new Writable({ 351cb0ef41Sopenharmony_ci write(data, enc, cb) { 361cb0ef41Sopenharmony_ci processed.push(data); 371cb0ef41Sopenharmony_ci cb(); 381cb0ef41Sopenharmony_ci } 391cb0ef41Sopenharmony_ci }); 401cb0ef41Sopenharmony_ci 411cb0ef41Sopenharmony_ci write.on('finish', () => { 421cb0ef41Sopenharmony_ci finished = true; 431cb0ef41Sopenharmony_ci }); 441cb0ef41Sopenharmony_ci 451cb0ef41Sopenharmony_ci for (let i = 0; i < expected.length; i++) { 461cb0ef41Sopenharmony_ci read.push(expected[i]); 471cb0ef41Sopenharmony_ci } 481cb0ef41Sopenharmony_ci read.push(null); 491cb0ef41Sopenharmony_ci 501cb0ef41Sopenharmony_ci pipeline(read, write, common.mustSucceed(() => { 511cb0ef41Sopenharmony_ci assert.ok(finished); 521cb0ef41Sopenharmony_ci assert.deepStrictEqual(processed, expected); 531cb0ef41Sopenharmony_ci })); 541cb0ef41Sopenharmony_ci} 551cb0ef41Sopenharmony_ci 561cb0ef41Sopenharmony_ci{ 571cb0ef41Sopenharmony_ci const read = new Readable({ 581cb0ef41Sopenharmony_ci read() {} 591cb0ef41Sopenharmony_ci }); 601cb0ef41Sopenharmony_ci 611cb0ef41Sopenharmony_ci assert.throws(() => { 621cb0ef41Sopenharmony_ci pipeline(read, () => {}); 631cb0ef41Sopenharmony_ci }, /ERR_MISSING_ARGS/); 641cb0ef41Sopenharmony_ci assert.throws(() => { 651cb0ef41Sopenharmony_ci pipeline(() => {}); 661cb0ef41Sopenharmony_ci }, /ERR_MISSING_ARGS/); 671cb0ef41Sopenharmony_ci assert.throws(() => { 681cb0ef41Sopenharmony_ci pipeline(); 691cb0ef41Sopenharmony_ci }, /ERR_INVALID_ARG_TYPE/); 701cb0ef41Sopenharmony_ci} 711cb0ef41Sopenharmony_ci 721cb0ef41Sopenharmony_ci{ 731cb0ef41Sopenharmony_ci const read = new Readable({ 741cb0ef41Sopenharmony_ci read() {} 751cb0ef41Sopenharmony_ci }); 761cb0ef41Sopenharmony_ci 771cb0ef41Sopenharmony_ci const write = new Writable({ 781cb0ef41Sopenharmony_ci write(data, enc, cb) { 791cb0ef41Sopenharmony_ci cb(); 801cb0ef41Sopenharmony_ci } 811cb0ef41Sopenharmony_ci }); 821cb0ef41Sopenharmony_ci 831cb0ef41Sopenharmony_ci read.push('data'); 841cb0ef41Sopenharmony_ci setImmediate(() => read.destroy()); 851cb0ef41Sopenharmony_ci 861cb0ef41Sopenharmony_ci pipeline(read, write, common.mustCall((err) => { 871cb0ef41Sopenharmony_ci assert.ok(err, 'should have an error'); 881cb0ef41Sopenharmony_ci })); 891cb0ef41Sopenharmony_ci} 901cb0ef41Sopenharmony_ci 911cb0ef41Sopenharmony_ci{ 921cb0ef41Sopenharmony_ci const read = new Readable({ 931cb0ef41Sopenharmony_ci read() {} 941cb0ef41Sopenharmony_ci }); 951cb0ef41Sopenharmony_ci 961cb0ef41Sopenharmony_ci const write = new Writable({ 971cb0ef41Sopenharmony_ci write(data, enc, cb) { 981cb0ef41Sopenharmony_ci cb(); 991cb0ef41Sopenharmony_ci } 1001cb0ef41Sopenharmony_ci }); 1011cb0ef41Sopenharmony_ci 1021cb0ef41Sopenharmony_ci read.push('data'); 1031cb0ef41Sopenharmony_ci setImmediate(() => read.destroy(new Error('kaboom'))); 1041cb0ef41Sopenharmony_ci 1051cb0ef41Sopenharmony_ci const dst = pipeline(read, write, common.mustCall((err) => { 1061cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('kaboom')); 1071cb0ef41Sopenharmony_ci })); 1081cb0ef41Sopenharmony_ci 1091cb0ef41Sopenharmony_ci assert.strictEqual(dst, write); 1101cb0ef41Sopenharmony_ci} 1111cb0ef41Sopenharmony_ci 1121cb0ef41Sopenharmony_ci{ 1131cb0ef41Sopenharmony_ci const read = new Readable({ 1141cb0ef41Sopenharmony_ci read() {} 1151cb0ef41Sopenharmony_ci }); 1161cb0ef41Sopenharmony_ci 1171cb0ef41Sopenharmony_ci const transform = new Transform({ 1181cb0ef41Sopenharmony_ci transform(data, enc, cb) { 1191cb0ef41Sopenharmony_ci cb(new Error('kaboom')); 1201cb0ef41Sopenharmony_ci } 1211cb0ef41Sopenharmony_ci }); 1221cb0ef41Sopenharmony_ci 1231cb0ef41Sopenharmony_ci const write = new Writable({ 1241cb0ef41Sopenharmony_ci write(data, enc, cb) { 1251cb0ef41Sopenharmony_ci cb(); 1261cb0ef41Sopenharmony_ci } 1271cb0ef41Sopenharmony_ci }); 1281cb0ef41Sopenharmony_ci 1291cb0ef41Sopenharmony_ci read.on('close', common.mustCall()); 1301cb0ef41Sopenharmony_ci transform.on('close', common.mustCall()); 1311cb0ef41Sopenharmony_ci write.on('close', common.mustCall()); 1321cb0ef41Sopenharmony_ci 1331cb0ef41Sopenharmony_ci [read, transform, write].forEach((stream) => { 1341cb0ef41Sopenharmony_ci stream.on('error', common.mustCall((err) => { 1351cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('kaboom')); 1361cb0ef41Sopenharmony_ci })); 1371cb0ef41Sopenharmony_ci }); 1381cb0ef41Sopenharmony_ci 1391cb0ef41Sopenharmony_ci const dst = pipeline(read, transform, write, common.mustCall((err) => { 1401cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('kaboom')); 1411cb0ef41Sopenharmony_ci })); 1421cb0ef41Sopenharmony_ci 1431cb0ef41Sopenharmony_ci assert.strictEqual(dst, write); 1441cb0ef41Sopenharmony_ci 1451cb0ef41Sopenharmony_ci read.push('hello'); 1461cb0ef41Sopenharmony_ci} 1471cb0ef41Sopenharmony_ci 1481cb0ef41Sopenharmony_ci{ 1491cb0ef41Sopenharmony_ci const server = http.createServer((req, res) => { 1501cb0ef41Sopenharmony_ci const rs = new Readable({ 1511cb0ef41Sopenharmony_ci read() { 1521cb0ef41Sopenharmony_ci rs.push('hello'); 1531cb0ef41Sopenharmony_ci rs.push(null); 1541cb0ef41Sopenharmony_ci } 1551cb0ef41Sopenharmony_ci }); 1561cb0ef41Sopenharmony_ci 1571cb0ef41Sopenharmony_ci pipeline(rs, res, () => {}); 1581cb0ef41Sopenharmony_ci }); 1591cb0ef41Sopenharmony_ci 1601cb0ef41Sopenharmony_ci server.listen(0, () => { 1611cb0ef41Sopenharmony_ci const req = http.request({ 1621cb0ef41Sopenharmony_ci port: server.address().port 1631cb0ef41Sopenharmony_ci }); 1641cb0ef41Sopenharmony_ci 1651cb0ef41Sopenharmony_ci req.end(); 1661cb0ef41Sopenharmony_ci req.on('response', (res) => { 1671cb0ef41Sopenharmony_ci const buf = []; 1681cb0ef41Sopenharmony_ci res.on('data', (data) => buf.push(data)); 1691cb0ef41Sopenharmony_ci res.on('end', common.mustCall(() => { 1701cb0ef41Sopenharmony_ci assert.deepStrictEqual( 1711cb0ef41Sopenharmony_ci Buffer.concat(buf), 1721cb0ef41Sopenharmony_ci Buffer.from('hello') 1731cb0ef41Sopenharmony_ci ); 1741cb0ef41Sopenharmony_ci server.close(); 1751cb0ef41Sopenharmony_ci })); 1761cb0ef41Sopenharmony_ci }); 1771cb0ef41Sopenharmony_ci }); 1781cb0ef41Sopenharmony_ci} 1791cb0ef41Sopenharmony_ci 1801cb0ef41Sopenharmony_ci{ 1811cb0ef41Sopenharmony_ci const server = http.createServer((req, res) => { 1821cb0ef41Sopenharmony_ci let sent = false; 1831cb0ef41Sopenharmony_ci const rs = new Readable({ 1841cb0ef41Sopenharmony_ci read() { 1851cb0ef41Sopenharmony_ci if (sent) { 1861cb0ef41Sopenharmony_ci return; 1871cb0ef41Sopenharmony_ci } 1881cb0ef41Sopenharmony_ci sent = true; 1891cb0ef41Sopenharmony_ci rs.push('hello'); 1901cb0ef41Sopenharmony_ci }, 1911cb0ef41Sopenharmony_ci destroy: common.mustCall((err, cb) => { 1921cb0ef41Sopenharmony_ci // Prevents fd leaks by destroying http pipelines 1931cb0ef41Sopenharmony_ci cb(); 1941cb0ef41Sopenharmony_ci }) 1951cb0ef41Sopenharmony_ci }); 1961cb0ef41Sopenharmony_ci 1971cb0ef41Sopenharmony_ci pipeline(rs, res, () => {}); 1981cb0ef41Sopenharmony_ci }); 1991cb0ef41Sopenharmony_ci 2001cb0ef41Sopenharmony_ci server.listen(0, () => { 2011cb0ef41Sopenharmony_ci const req = http.request({ 2021cb0ef41Sopenharmony_ci port: server.address().port 2031cb0ef41Sopenharmony_ci }); 2041cb0ef41Sopenharmony_ci 2051cb0ef41Sopenharmony_ci req.end(); 2061cb0ef41Sopenharmony_ci req.on('response', (res) => { 2071cb0ef41Sopenharmony_ci setImmediate(() => { 2081cb0ef41Sopenharmony_ci res.destroy(); 2091cb0ef41Sopenharmony_ci server.close(); 2101cb0ef41Sopenharmony_ci }); 2111cb0ef41Sopenharmony_ci }); 2121cb0ef41Sopenharmony_ci }); 2131cb0ef41Sopenharmony_ci} 2141cb0ef41Sopenharmony_ci 2151cb0ef41Sopenharmony_ci{ 2161cb0ef41Sopenharmony_ci const server = http.createServer((req, res) => { 2171cb0ef41Sopenharmony_ci let sent = 0; 2181cb0ef41Sopenharmony_ci const rs = new Readable({ 2191cb0ef41Sopenharmony_ci read() { 2201cb0ef41Sopenharmony_ci if (sent++ > 10) { 2211cb0ef41Sopenharmony_ci return; 2221cb0ef41Sopenharmony_ci } 2231cb0ef41Sopenharmony_ci rs.push('hello'); 2241cb0ef41Sopenharmony_ci }, 2251cb0ef41Sopenharmony_ci destroy: common.mustCall((err, cb) => { 2261cb0ef41Sopenharmony_ci cb(); 2271cb0ef41Sopenharmony_ci }) 2281cb0ef41Sopenharmony_ci }); 2291cb0ef41Sopenharmony_ci 2301cb0ef41Sopenharmony_ci pipeline(rs, res, () => {}); 2311cb0ef41Sopenharmony_ci }); 2321cb0ef41Sopenharmony_ci 2331cb0ef41Sopenharmony_ci let cnt = 10; 2341cb0ef41Sopenharmony_ci 2351cb0ef41Sopenharmony_ci const badSink = new Writable({ 2361cb0ef41Sopenharmony_ci write(data, enc, cb) { 2371cb0ef41Sopenharmony_ci cnt--; 2381cb0ef41Sopenharmony_ci if (cnt === 0) cb(new Error('kaboom')); 2391cb0ef41Sopenharmony_ci else cb(); 2401cb0ef41Sopenharmony_ci } 2411cb0ef41Sopenharmony_ci }); 2421cb0ef41Sopenharmony_ci 2431cb0ef41Sopenharmony_ci server.listen(0, () => { 2441cb0ef41Sopenharmony_ci const req = http.request({ 2451cb0ef41Sopenharmony_ci port: server.address().port 2461cb0ef41Sopenharmony_ci }); 2471cb0ef41Sopenharmony_ci 2481cb0ef41Sopenharmony_ci req.end(); 2491cb0ef41Sopenharmony_ci req.on('response', (res) => { 2501cb0ef41Sopenharmony_ci pipeline(res, badSink, common.mustCall((err) => { 2511cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('kaboom')); 2521cb0ef41Sopenharmony_ci server.close(); 2531cb0ef41Sopenharmony_ci })); 2541cb0ef41Sopenharmony_ci }); 2551cb0ef41Sopenharmony_ci }); 2561cb0ef41Sopenharmony_ci} 2571cb0ef41Sopenharmony_ci 2581cb0ef41Sopenharmony_ci{ 2591cb0ef41Sopenharmony_ci const server = http.createServer((req, res) => { 2601cb0ef41Sopenharmony_ci pipeline(req, res, common.mustSucceed()); 2611cb0ef41Sopenharmony_ci }); 2621cb0ef41Sopenharmony_ci 2631cb0ef41Sopenharmony_ci server.listen(0, () => { 2641cb0ef41Sopenharmony_ci const req = http.request({ 2651cb0ef41Sopenharmony_ci port: server.address().port 2661cb0ef41Sopenharmony_ci }); 2671cb0ef41Sopenharmony_ci 2681cb0ef41Sopenharmony_ci let sent = 0; 2691cb0ef41Sopenharmony_ci const rs = new Readable({ 2701cb0ef41Sopenharmony_ci read() { 2711cb0ef41Sopenharmony_ci if (sent++ > 10) { 2721cb0ef41Sopenharmony_ci return; 2731cb0ef41Sopenharmony_ci } 2741cb0ef41Sopenharmony_ci rs.push('hello'); 2751cb0ef41Sopenharmony_ci } 2761cb0ef41Sopenharmony_ci }); 2771cb0ef41Sopenharmony_ci 2781cb0ef41Sopenharmony_ci pipeline(rs, req, common.mustCall(() => { 2791cb0ef41Sopenharmony_ci server.close(); 2801cb0ef41Sopenharmony_ci })); 2811cb0ef41Sopenharmony_ci 2821cb0ef41Sopenharmony_ci req.on('response', (res) => { 2831cb0ef41Sopenharmony_ci let cnt = 10; 2841cb0ef41Sopenharmony_ci res.on('data', () => { 2851cb0ef41Sopenharmony_ci cnt--; 2861cb0ef41Sopenharmony_ci if (cnt === 0) rs.destroy(); 2871cb0ef41Sopenharmony_ci }); 2881cb0ef41Sopenharmony_ci }); 2891cb0ef41Sopenharmony_ci }); 2901cb0ef41Sopenharmony_ci} 2911cb0ef41Sopenharmony_ci 2921cb0ef41Sopenharmony_ci{ 2931cb0ef41Sopenharmony_ci const makeTransform = () => { 2941cb0ef41Sopenharmony_ci const tr = new Transform({ 2951cb0ef41Sopenharmony_ci transform(data, enc, cb) { 2961cb0ef41Sopenharmony_ci cb(null, data); 2971cb0ef41Sopenharmony_ci } 2981cb0ef41Sopenharmony_ci }); 2991cb0ef41Sopenharmony_ci 3001cb0ef41Sopenharmony_ci tr.on('close', common.mustCall()); 3011cb0ef41Sopenharmony_ci return tr; 3021cb0ef41Sopenharmony_ci }; 3031cb0ef41Sopenharmony_ci 3041cb0ef41Sopenharmony_ci const rs = new Readable({ 3051cb0ef41Sopenharmony_ci read() { 3061cb0ef41Sopenharmony_ci rs.push('hello'); 3071cb0ef41Sopenharmony_ci } 3081cb0ef41Sopenharmony_ci }); 3091cb0ef41Sopenharmony_ci 3101cb0ef41Sopenharmony_ci let cnt = 10; 3111cb0ef41Sopenharmony_ci 3121cb0ef41Sopenharmony_ci const ws = new Writable({ 3131cb0ef41Sopenharmony_ci write(data, enc, cb) { 3141cb0ef41Sopenharmony_ci cnt--; 3151cb0ef41Sopenharmony_ci if (cnt === 0) return cb(new Error('kaboom')); 3161cb0ef41Sopenharmony_ci cb(); 3171cb0ef41Sopenharmony_ci } 3181cb0ef41Sopenharmony_ci }); 3191cb0ef41Sopenharmony_ci 3201cb0ef41Sopenharmony_ci rs.on('close', common.mustCall()); 3211cb0ef41Sopenharmony_ci ws.on('close', common.mustCall()); 3221cb0ef41Sopenharmony_ci 3231cb0ef41Sopenharmony_ci pipeline( 3241cb0ef41Sopenharmony_ci rs, 3251cb0ef41Sopenharmony_ci makeTransform(), 3261cb0ef41Sopenharmony_ci makeTransform(), 3271cb0ef41Sopenharmony_ci makeTransform(), 3281cb0ef41Sopenharmony_ci makeTransform(), 3291cb0ef41Sopenharmony_ci makeTransform(), 3301cb0ef41Sopenharmony_ci makeTransform(), 3311cb0ef41Sopenharmony_ci ws, 3321cb0ef41Sopenharmony_ci common.mustCall((err) => { 3331cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('kaboom')); 3341cb0ef41Sopenharmony_ci }) 3351cb0ef41Sopenharmony_ci ); 3361cb0ef41Sopenharmony_ci} 3371cb0ef41Sopenharmony_ci 3381cb0ef41Sopenharmony_ci{ 3391cb0ef41Sopenharmony_ci const oldStream = new Stream(); 3401cb0ef41Sopenharmony_ci 3411cb0ef41Sopenharmony_ci oldStream.pause = oldStream.resume = () => {}; 3421cb0ef41Sopenharmony_ci oldStream.write = (data) => { 3431cb0ef41Sopenharmony_ci oldStream.emit('data', data); 3441cb0ef41Sopenharmony_ci return true; 3451cb0ef41Sopenharmony_ci }; 3461cb0ef41Sopenharmony_ci oldStream.end = () => { 3471cb0ef41Sopenharmony_ci oldStream.emit('end'); 3481cb0ef41Sopenharmony_ci }; 3491cb0ef41Sopenharmony_ci 3501cb0ef41Sopenharmony_ci const expected = [ 3511cb0ef41Sopenharmony_ci Buffer.from('hello'), 3521cb0ef41Sopenharmony_ci Buffer.from('world'), 3531cb0ef41Sopenharmony_ci ]; 3541cb0ef41Sopenharmony_ci 3551cb0ef41Sopenharmony_ci const rs = new Readable({ 3561cb0ef41Sopenharmony_ci read() { 3571cb0ef41Sopenharmony_ci for (let i = 0; i < expected.length; i++) { 3581cb0ef41Sopenharmony_ci rs.push(expected[i]); 3591cb0ef41Sopenharmony_ci } 3601cb0ef41Sopenharmony_ci rs.push(null); 3611cb0ef41Sopenharmony_ci } 3621cb0ef41Sopenharmony_ci }); 3631cb0ef41Sopenharmony_ci 3641cb0ef41Sopenharmony_ci const ws = new Writable({ 3651cb0ef41Sopenharmony_ci write(data, enc, cb) { 3661cb0ef41Sopenharmony_ci assert.deepStrictEqual(data, expected.shift()); 3671cb0ef41Sopenharmony_ci cb(); 3681cb0ef41Sopenharmony_ci } 3691cb0ef41Sopenharmony_ci }); 3701cb0ef41Sopenharmony_ci 3711cb0ef41Sopenharmony_ci let finished = false; 3721cb0ef41Sopenharmony_ci 3731cb0ef41Sopenharmony_ci ws.on('finish', () => { 3741cb0ef41Sopenharmony_ci finished = true; 3751cb0ef41Sopenharmony_ci }); 3761cb0ef41Sopenharmony_ci 3771cb0ef41Sopenharmony_ci pipeline( 3781cb0ef41Sopenharmony_ci rs, 3791cb0ef41Sopenharmony_ci oldStream, 3801cb0ef41Sopenharmony_ci ws, 3811cb0ef41Sopenharmony_ci common.mustSucceed(() => { 3821cb0ef41Sopenharmony_ci assert(finished, 'last stream finished'); 3831cb0ef41Sopenharmony_ci }) 3841cb0ef41Sopenharmony_ci ); 3851cb0ef41Sopenharmony_ci} 3861cb0ef41Sopenharmony_ci 3871cb0ef41Sopenharmony_ci{ 3881cb0ef41Sopenharmony_ci const oldStream = new Stream(); 3891cb0ef41Sopenharmony_ci 3901cb0ef41Sopenharmony_ci oldStream.pause = oldStream.resume = () => {}; 3911cb0ef41Sopenharmony_ci oldStream.write = (data) => { 3921cb0ef41Sopenharmony_ci oldStream.emit('data', data); 3931cb0ef41Sopenharmony_ci return true; 3941cb0ef41Sopenharmony_ci }; 3951cb0ef41Sopenharmony_ci oldStream.end = () => { 3961cb0ef41Sopenharmony_ci oldStream.emit('end'); 3971cb0ef41Sopenharmony_ci }; 3981cb0ef41Sopenharmony_ci 3991cb0ef41Sopenharmony_ci const destroyableOldStream = new Stream(); 4001cb0ef41Sopenharmony_ci 4011cb0ef41Sopenharmony_ci destroyableOldStream.pause = destroyableOldStream.resume = () => {}; 4021cb0ef41Sopenharmony_ci destroyableOldStream.destroy = common.mustCall(() => { 4031cb0ef41Sopenharmony_ci destroyableOldStream.emit('close'); 4041cb0ef41Sopenharmony_ci }); 4051cb0ef41Sopenharmony_ci destroyableOldStream.write = (data) => { 4061cb0ef41Sopenharmony_ci destroyableOldStream.emit('data', data); 4071cb0ef41Sopenharmony_ci return true; 4081cb0ef41Sopenharmony_ci }; 4091cb0ef41Sopenharmony_ci destroyableOldStream.end = () => { 4101cb0ef41Sopenharmony_ci destroyableOldStream.emit('end'); 4111cb0ef41Sopenharmony_ci }; 4121cb0ef41Sopenharmony_ci 4131cb0ef41Sopenharmony_ci const rs = new Readable({ 4141cb0ef41Sopenharmony_ci read() { 4151cb0ef41Sopenharmony_ci rs.destroy(new Error('stop')); 4161cb0ef41Sopenharmony_ci } 4171cb0ef41Sopenharmony_ci }); 4181cb0ef41Sopenharmony_ci 4191cb0ef41Sopenharmony_ci const ws = new Writable({ 4201cb0ef41Sopenharmony_ci write(data, enc, cb) { 4211cb0ef41Sopenharmony_ci cb(); 4221cb0ef41Sopenharmony_ci } 4231cb0ef41Sopenharmony_ci }); 4241cb0ef41Sopenharmony_ci 4251cb0ef41Sopenharmony_ci let finished = false; 4261cb0ef41Sopenharmony_ci 4271cb0ef41Sopenharmony_ci ws.on('finish', () => { 4281cb0ef41Sopenharmony_ci finished = true; 4291cb0ef41Sopenharmony_ci }); 4301cb0ef41Sopenharmony_ci 4311cb0ef41Sopenharmony_ci pipeline( 4321cb0ef41Sopenharmony_ci rs, 4331cb0ef41Sopenharmony_ci oldStream, 4341cb0ef41Sopenharmony_ci destroyableOldStream, 4351cb0ef41Sopenharmony_ci ws, 4361cb0ef41Sopenharmony_ci common.mustCall((err) => { 4371cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('stop')); 4381cb0ef41Sopenharmony_ci assert(!finished, 'should not finish'); 4391cb0ef41Sopenharmony_ci }) 4401cb0ef41Sopenharmony_ci ); 4411cb0ef41Sopenharmony_ci} 4421cb0ef41Sopenharmony_ci 4431cb0ef41Sopenharmony_ci{ 4441cb0ef41Sopenharmony_ci const pipelinePromise = promisify(pipeline); 4451cb0ef41Sopenharmony_ci 4461cb0ef41Sopenharmony_ci async function run() { 4471cb0ef41Sopenharmony_ci const read = new Readable({ 4481cb0ef41Sopenharmony_ci read() {} 4491cb0ef41Sopenharmony_ci }); 4501cb0ef41Sopenharmony_ci 4511cb0ef41Sopenharmony_ci const write = new Writable({ 4521cb0ef41Sopenharmony_ci write(data, enc, cb) { 4531cb0ef41Sopenharmony_ci cb(); 4541cb0ef41Sopenharmony_ci } 4551cb0ef41Sopenharmony_ci }); 4561cb0ef41Sopenharmony_ci 4571cb0ef41Sopenharmony_ci read.push('data'); 4581cb0ef41Sopenharmony_ci read.push(null); 4591cb0ef41Sopenharmony_ci 4601cb0ef41Sopenharmony_ci let finished = false; 4611cb0ef41Sopenharmony_ci 4621cb0ef41Sopenharmony_ci write.on('finish', () => { 4631cb0ef41Sopenharmony_ci finished = true; 4641cb0ef41Sopenharmony_ci }); 4651cb0ef41Sopenharmony_ci 4661cb0ef41Sopenharmony_ci await pipelinePromise(read, write); 4671cb0ef41Sopenharmony_ci 4681cb0ef41Sopenharmony_ci assert(finished); 4691cb0ef41Sopenharmony_ci } 4701cb0ef41Sopenharmony_ci 4711cb0ef41Sopenharmony_ci run(); 4721cb0ef41Sopenharmony_ci} 4731cb0ef41Sopenharmony_ci 4741cb0ef41Sopenharmony_ci{ 4751cb0ef41Sopenharmony_ci // Check aborted signal without values 4761cb0ef41Sopenharmony_ci const pipelinePromise = promisify(pipeline); 4771cb0ef41Sopenharmony_ci async function run() { 4781cb0ef41Sopenharmony_ci const ac = new AbortController(); 4791cb0ef41Sopenharmony_ci const { signal } = ac; 4801cb0ef41Sopenharmony_ci async function* producer() { 4811cb0ef41Sopenharmony_ci ac.abort(); 4821cb0ef41Sopenharmony_ci await Promise.resolve(); 4831cb0ef41Sopenharmony_ci yield '8'; 4841cb0ef41Sopenharmony_ci } 4851cb0ef41Sopenharmony_ci 4861cb0ef41Sopenharmony_ci const w = new Writable({ 4871cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 4881cb0ef41Sopenharmony_ci callback(); 4891cb0ef41Sopenharmony_ci } 4901cb0ef41Sopenharmony_ci }); 4911cb0ef41Sopenharmony_ci await pipelinePromise(producer, w, { signal }); 4921cb0ef41Sopenharmony_ci } 4931cb0ef41Sopenharmony_ci 4941cb0ef41Sopenharmony_ci assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 4951cb0ef41Sopenharmony_ci} 4961cb0ef41Sopenharmony_ci 4971cb0ef41Sopenharmony_ci{ 4981cb0ef41Sopenharmony_ci // Check aborted signal after init. 4991cb0ef41Sopenharmony_ci const pipelinePromise = promisify(pipeline); 5001cb0ef41Sopenharmony_ci async function run() { 5011cb0ef41Sopenharmony_ci const ac = new AbortController(); 5021cb0ef41Sopenharmony_ci const { signal } = ac; 5031cb0ef41Sopenharmony_ci async function* producer() { 5041cb0ef41Sopenharmony_ci yield '5'; 5051cb0ef41Sopenharmony_ci await Promise.resolve(); 5061cb0ef41Sopenharmony_ci ac.abort(); 5071cb0ef41Sopenharmony_ci await Promise.resolve(); 5081cb0ef41Sopenharmony_ci yield '8'; 5091cb0ef41Sopenharmony_ci } 5101cb0ef41Sopenharmony_ci 5111cb0ef41Sopenharmony_ci const w = new Writable({ 5121cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 5131cb0ef41Sopenharmony_ci callback(); 5141cb0ef41Sopenharmony_ci } 5151cb0ef41Sopenharmony_ci }); 5161cb0ef41Sopenharmony_ci await pipelinePromise(producer, w, { signal }); 5171cb0ef41Sopenharmony_ci } 5181cb0ef41Sopenharmony_ci 5191cb0ef41Sopenharmony_ci assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 5201cb0ef41Sopenharmony_ci} 5211cb0ef41Sopenharmony_ci 5221cb0ef41Sopenharmony_ci{ 5231cb0ef41Sopenharmony_ci // Check pre-aborted signal 5241cb0ef41Sopenharmony_ci const pipelinePromise = promisify(pipeline); 5251cb0ef41Sopenharmony_ci async function run() { 5261cb0ef41Sopenharmony_ci const signal = AbortSignal.abort(); 5271cb0ef41Sopenharmony_ci async function* producer() { 5281cb0ef41Sopenharmony_ci yield '5'; 5291cb0ef41Sopenharmony_ci await Promise.resolve(); 5301cb0ef41Sopenharmony_ci yield '8'; 5311cb0ef41Sopenharmony_ci } 5321cb0ef41Sopenharmony_ci 5331cb0ef41Sopenharmony_ci const w = new Writable({ 5341cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 5351cb0ef41Sopenharmony_ci callback(); 5361cb0ef41Sopenharmony_ci } 5371cb0ef41Sopenharmony_ci }); 5381cb0ef41Sopenharmony_ci await pipelinePromise(producer, w, { signal }); 5391cb0ef41Sopenharmony_ci } 5401cb0ef41Sopenharmony_ci 5411cb0ef41Sopenharmony_ci assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); 5421cb0ef41Sopenharmony_ci} 5431cb0ef41Sopenharmony_ci 5441cb0ef41Sopenharmony_ci{ 5451cb0ef41Sopenharmony_ci const read = new Readable({ 5461cb0ef41Sopenharmony_ci read() {} 5471cb0ef41Sopenharmony_ci }); 5481cb0ef41Sopenharmony_ci 5491cb0ef41Sopenharmony_ci const transform = new Transform({ 5501cb0ef41Sopenharmony_ci transform(data, enc, cb) { 5511cb0ef41Sopenharmony_ci cb(new Error('kaboom')); 5521cb0ef41Sopenharmony_ci } 5531cb0ef41Sopenharmony_ci }); 5541cb0ef41Sopenharmony_ci 5551cb0ef41Sopenharmony_ci const write = new Writable({ 5561cb0ef41Sopenharmony_ci write(data, enc, cb) { 5571cb0ef41Sopenharmony_ci cb(); 5581cb0ef41Sopenharmony_ci } 5591cb0ef41Sopenharmony_ci }); 5601cb0ef41Sopenharmony_ci 5611cb0ef41Sopenharmony_ci assert.throws( 5621cb0ef41Sopenharmony_ci () => pipeline(read, transform, write), 5631cb0ef41Sopenharmony_ci { code: 'ERR_INVALID_ARG_TYPE' } 5641cb0ef41Sopenharmony_ci ); 5651cb0ef41Sopenharmony_ci} 5661cb0ef41Sopenharmony_ci 5671cb0ef41Sopenharmony_ci{ 5681cb0ef41Sopenharmony_ci const server = http.Server(function(req, res) { 5691cb0ef41Sopenharmony_ci res.write('asd'); 5701cb0ef41Sopenharmony_ci }); 5711cb0ef41Sopenharmony_ci server.listen(0, function() { 5721cb0ef41Sopenharmony_ci http.get({ port: this.address().port }, (res) => { 5731cb0ef41Sopenharmony_ci const stream = new PassThrough(); 5741cb0ef41Sopenharmony_ci 5751cb0ef41Sopenharmony_ci stream.on('error', common.mustCall()); 5761cb0ef41Sopenharmony_ci 5771cb0ef41Sopenharmony_ci pipeline( 5781cb0ef41Sopenharmony_ci res, 5791cb0ef41Sopenharmony_ci stream, 5801cb0ef41Sopenharmony_ci common.mustCall((err) => { 5811cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'oh no'); 5821cb0ef41Sopenharmony_ci server.close(); 5831cb0ef41Sopenharmony_ci }) 5841cb0ef41Sopenharmony_ci ); 5851cb0ef41Sopenharmony_ci 5861cb0ef41Sopenharmony_ci stream.destroy(new Error('oh no')); 5871cb0ef41Sopenharmony_ci }).on('error', common.mustNotCall()); 5881cb0ef41Sopenharmony_ci }); 5891cb0ef41Sopenharmony_ci} 5901cb0ef41Sopenharmony_ci 5911cb0ef41Sopenharmony_ci{ 5921cb0ef41Sopenharmony_ci let res = ''; 5931cb0ef41Sopenharmony_ci const w = new Writable({ 5941cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 5951cb0ef41Sopenharmony_ci res += chunk; 5961cb0ef41Sopenharmony_ci callback(); 5971cb0ef41Sopenharmony_ci } 5981cb0ef41Sopenharmony_ci }); 5991cb0ef41Sopenharmony_ci pipeline(function*() { 6001cb0ef41Sopenharmony_ci yield 'hello'; 6011cb0ef41Sopenharmony_ci yield 'world'; 6021cb0ef41Sopenharmony_ci }(), w, common.mustSucceed(() => { 6031cb0ef41Sopenharmony_ci assert.strictEqual(res, 'helloworld'); 6041cb0ef41Sopenharmony_ci })); 6051cb0ef41Sopenharmony_ci} 6061cb0ef41Sopenharmony_ci 6071cb0ef41Sopenharmony_ci{ 6081cb0ef41Sopenharmony_ci let res = ''; 6091cb0ef41Sopenharmony_ci const w = new Writable({ 6101cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 6111cb0ef41Sopenharmony_ci res += chunk; 6121cb0ef41Sopenharmony_ci callback(); 6131cb0ef41Sopenharmony_ci } 6141cb0ef41Sopenharmony_ci }); 6151cb0ef41Sopenharmony_ci pipeline(async function*() { 6161cb0ef41Sopenharmony_ci await Promise.resolve(); 6171cb0ef41Sopenharmony_ci yield 'hello'; 6181cb0ef41Sopenharmony_ci yield 'world'; 6191cb0ef41Sopenharmony_ci }(), w, common.mustSucceed(() => { 6201cb0ef41Sopenharmony_ci assert.strictEqual(res, 'helloworld'); 6211cb0ef41Sopenharmony_ci })); 6221cb0ef41Sopenharmony_ci} 6231cb0ef41Sopenharmony_ci 6241cb0ef41Sopenharmony_ci{ 6251cb0ef41Sopenharmony_ci let res = ''; 6261cb0ef41Sopenharmony_ci const w = new Writable({ 6271cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 6281cb0ef41Sopenharmony_ci res += chunk; 6291cb0ef41Sopenharmony_ci callback(); 6301cb0ef41Sopenharmony_ci } 6311cb0ef41Sopenharmony_ci }); 6321cb0ef41Sopenharmony_ci pipeline(function*() { 6331cb0ef41Sopenharmony_ci yield 'hello'; 6341cb0ef41Sopenharmony_ci yield 'world'; 6351cb0ef41Sopenharmony_ci }, w, common.mustSucceed(() => { 6361cb0ef41Sopenharmony_ci assert.strictEqual(res, 'helloworld'); 6371cb0ef41Sopenharmony_ci })); 6381cb0ef41Sopenharmony_ci} 6391cb0ef41Sopenharmony_ci 6401cb0ef41Sopenharmony_ci{ 6411cb0ef41Sopenharmony_ci let res = ''; 6421cb0ef41Sopenharmony_ci const w = new Writable({ 6431cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 6441cb0ef41Sopenharmony_ci res += chunk; 6451cb0ef41Sopenharmony_ci callback(); 6461cb0ef41Sopenharmony_ci } 6471cb0ef41Sopenharmony_ci }); 6481cb0ef41Sopenharmony_ci pipeline(async function*() { 6491cb0ef41Sopenharmony_ci await Promise.resolve(); 6501cb0ef41Sopenharmony_ci yield 'hello'; 6511cb0ef41Sopenharmony_ci yield 'world'; 6521cb0ef41Sopenharmony_ci }, w, common.mustSucceed(() => { 6531cb0ef41Sopenharmony_ci assert.strictEqual(res, 'helloworld'); 6541cb0ef41Sopenharmony_ci })); 6551cb0ef41Sopenharmony_ci} 6561cb0ef41Sopenharmony_ci 6571cb0ef41Sopenharmony_ci{ 6581cb0ef41Sopenharmony_ci let res = ''; 6591cb0ef41Sopenharmony_ci pipeline(async function*() { 6601cb0ef41Sopenharmony_ci await Promise.resolve(); 6611cb0ef41Sopenharmony_ci yield 'hello'; 6621cb0ef41Sopenharmony_ci yield 'world'; 6631cb0ef41Sopenharmony_ci }, async function*(source) { 6641cb0ef41Sopenharmony_ci for await (const chunk of source) { 6651cb0ef41Sopenharmony_ci yield chunk.toUpperCase(); 6661cb0ef41Sopenharmony_ci } 6671cb0ef41Sopenharmony_ci }, async function(source) { 6681cb0ef41Sopenharmony_ci for await (const chunk of source) { 6691cb0ef41Sopenharmony_ci res += chunk; 6701cb0ef41Sopenharmony_ci } 6711cb0ef41Sopenharmony_ci }, common.mustSucceed(() => { 6721cb0ef41Sopenharmony_ci assert.strictEqual(res, 'HELLOWORLD'); 6731cb0ef41Sopenharmony_ci })); 6741cb0ef41Sopenharmony_ci} 6751cb0ef41Sopenharmony_ci 6761cb0ef41Sopenharmony_ci{ 6771cb0ef41Sopenharmony_ci pipeline(async function*() { 6781cb0ef41Sopenharmony_ci await Promise.resolve(); 6791cb0ef41Sopenharmony_ci yield 'hello'; 6801cb0ef41Sopenharmony_ci yield 'world'; 6811cb0ef41Sopenharmony_ci }, async function*(source) { 6821cb0ef41Sopenharmony_ci for await (const chunk of source) { 6831cb0ef41Sopenharmony_ci yield chunk.toUpperCase(); 6841cb0ef41Sopenharmony_ci } 6851cb0ef41Sopenharmony_ci }, async function(source) { 6861cb0ef41Sopenharmony_ci let ret = ''; 6871cb0ef41Sopenharmony_ci for await (const chunk of source) { 6881cb0ef41Sopenharmony_ci ret += chunk; 6891cb0ef41Sopenharmony_ci } 6901cb0ef41Sopenharmony_ci return ret; 6911cb0ef41Sopenharmony_ci }, common.mustSucceed((val) => { 6921cb0ef41Sopenharmony_ci assert.strictEqual(val, 'HELLOWORLD'); 6931cb0ef41Sopenharmony_ci })); 6941cb0ef41Sopenharmony_ci} 6951cb0ef41Sopenharmony_ci 6961cb0ef41Sopenharmony_ci{ 6971cb0ef41Sopenharmony_ci // AsyncIterable destination is returned and finalizes. 6981cb0ef41Sopenharmony_ci 6991cb0ef41Sopenharmony_ci const ret = pipeline(async function*() { 7001cb0ef41Sopenharmony_ci await Promise.resolve(); 7011cb0ef41Sopenharmony_ci yield 'hello'; 7021cb0ef41Sopenharmony_ci }, async function*(source) { // eslint-disable-line require-yield 7031cb0ef41Sopenharmony_ci for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty 7041cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 7051cb0ef41Sopenharmony_ci assert.strictEqual(err, undefined); 7061cb0ef41Sopenharmony_ci })); 7071cb0ef41Sopenharmony_ci ret.resume(); 7081cb0ef41Sopenharmony_ci assert.strictEqual(typeof ret.pipe, 'function'); 7091cb0ef41Sopenharmony_ci} 7101cb0ef41Sopenharmony_ci 7111cb0ef41Sopenharmony_ci{ 7121cb0ef41Sopenharmony_ci // AsyncFunction destination is not returned and error is 7131cb0ef41Sopenharmony_ci // propagated. 7141cb0ef41Sopenharmony_ci 7151cb0ef41Sopenharmony_ci const ret = pipeline(async function*() { // eslint-disable-line require-yield 7161cb0ef41Sopenharmony_ci await Promise.resolve(); 7171cb0ef41Sopenharmony_ci throw new Error('kaboom'); 7181cb0ef41Sopenharmony_ci }, async function*(source) { // eslint-disable-line require-yield 7191cb0ef41Sopenharmony_ci for await (const chunk of source) { } // eslint-disable-line no-unused-vars, no-empty 7201cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 7211cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 7221cb0ef41Sopenharmony_ci })); 7231cb0ef41Sopenharmony_ci ret.resume(); 7241cb0ef41Sopenharmony_ci assert.strictEqual(typeof ret.pipe, 'function'); 7251cb0ef41Sopenharmony_ci} 7261cb0ef41Sopenharmony_ci 7271cb0ef41Sopenharmony_ci{ 7281cb0ef41Sopenharmony_ci const s = new PassThrough(); 7291cb0ef41Sopenharmony_ci pipeline(async function*() { // eslint-disable-line require-yield 7301cb0ef41Sopenharmony_ci throw new Error('kaboom'); 7311cb0ef41Sopenharmony_ci }, s, common.mustCall((err) => { 7321cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 7331cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, true); 7341cb0ef41Sopenharmony_ci })); 7351cb0ef41Sopenharmony_ci} 7361cb0ef41Sopenharmony_ci 7371cb0ef41Sopenharmony_ci{ 7381cb0ef41Sopenharmony_ci const s = new PassThrough(); 7391cb0ef41Sopenharmony_ci pipeline(async function*() { // eslint-disable-line require-yield 7401cb0ef41Sopenharmony_ci throw new Error('kaboom'); 7411cb0ef41Sopenharmony_ci }(), s, common.mustCall((err) => { 7421cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 7431cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, true); 7441cb0ef41Sopenharmony_ci })); 7451cb0ef41Sopenharmony_ci} 7461cb0ef41Sopenharmony_ci 7471cb0ef41Sopenharmony_ci{ 7481cb0ef41Sopenharmony_ci const s = new PassThrough(); 7491cb0ef41Sopenharmony_ci pipeline(function*() { // eslint-disable-line require-yield 7501cb0ef41Sopenharmony_ci throw new Error('kaboom'); 7511cb0ef41Sopenharmony_ci }, s, common.mustCall((err, val) => { 7521cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 7531cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, true); 7541cb0ef41Sopenharmony_ci })); 7551cb0ef41Sopenharmony_ci} 7561cb0ef41Sopenharmony_ci 7571cb0ef41Sopenharmony_ci{ 7581cb0ef41Sopenharmony_ci const s = new PassThrough(); 7591cb0ef41Sopenharmony_ci pipeline(function*() { // eslint-disable-line require-yield 7601cb0ef41Sopenharmony_ci throw new Error('kaboom'); 7611cb0ef41Sopenharmony_ci }(), s, common.mustCall((err, val) => { 7621cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 7631cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, true); 7641cb0ef41Sopenharmony_ci })); 7651cb0ef41Sopenharmony_ci} 7661cb0ef41Sopenharmony_ci 7671cb0ef41Sopenharmony_ci{ 7681cb0ef41Sopenharmony_ci const s = new PassThrough(); 7691cb0ef41Sopenharmony_ci pipeline(async function*() { 7701cb0ef41Sopenharmony_ci await Promise.resolve(); 7711cb0ef41Sopenharmony_ci yield 'hello'; 7721cb0ef41Sopenharmony_ci yield 'world'; 7731cb0ef41Sopenharmony_ci }, s, async function(source) { 7741cb0ef41Sopenharmony_ci for await (const chunk of source) { // eslint-disable-line no-unused-vars 7751cb0ef41Sopenharmony_ci throw new Error('kaboom'); 7761cb0ef41Sopenharmony_ci } 7771cb0ef41Sopenharmony_ci }, common.mustCall((err, val) => { 7781cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 7791cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, true); 7801cb0ef41Sopenharmony_ci })); 7811cb0ef41Sopenharmony_ci} 7821cb0ef41Sopenharmony_ci 7831cb0ef41Sopenharmony_ci{ 7841cb0ef41Sopenharmony_ci const s = new PassThrough(); 7851cb0ef41Sopenharmony_ci const ret = pipeline(function() { 7861cb0ef41Sopenharmony_ci return ['hello', 'world']; 7871cb0ef41Sopenharmony_ci }, s, async function*(source) { // eslint-disable-line require-yield 7881cb0ef41Sopenharmony_ci for await (const chunk of source) { // eslint-disable-line no-unused-vars 7891cb0ef41Sopenharmony_ci throw new Error('kaboom'); 7901cb0ef41Sopenharmony_ci } 7911cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 7921cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 7931cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, true); 7941cb0ef41Sopenharmony_ci })); 7951cb0ef41Sopenharmony_ci ret.resume(); 7961cb0ef41Sopenharmony_ci assert.strictEqual(typeof ret.pipe, 'function'); 7971cb0ef41Sopenharmony_ci} 7981cb0ef41Sopenharmony_ci 7991cb0ef41Sopenharmony_ci{ 8001cb0ef41Sopenharmony_ci // Legacy streams without async iterator. 8011cb0ef41Sopenharmony_ci 8021cb0ef41Sopenharmony_ci const s = new PassThrough(); 8031cb0ef41Sopenharmony_ci s.push('asd'); 8041cb0ef41Sopenharmony_ci s.push(null); 8051cb0ef41Sopenharmony_ci s[Symbol.asyncIterator] = null; 8061cb0ef41Sopenharmony_ci let ret = ''; 8071cb0ef41Sopenharmony_ci pipeline(s, async function(source) { 8081cb0ef41Sopenharmony_ci for await (const chunk of source) { 8091cb0ef41Sopenharmony_ci ret += chunk; 8101cb0ef41Sopenharmony_ci } 8111cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 8121cb0ef41Sopenharmony_ci assert.strictEqual(err, undefined); 8131cb0ef41Sopenharmony_ci assert.strictEqual(ret, 'asd'); 8141cb0ef41Sopenharmony_ci })); 8151cb0ef41Sopenharmony_ci} 8161cb0ef41Sopenharmony_ci 8171cb0ef41Sopenharmony_ci{ 8181cb0ef41Sopenharmony_ci // v1 streams without read(). 8191cb0ef41Sopenharmony_ci 8201cb0ef41Sopenharmony_ci const s = new Stream(); 8211cb0ef41Sopenharmony_ci process.nextTick(() => { 8221cb0ef41Sopenharmony_ci s.emit('data', 'asd'); 8231cb0ef41Sopenharmony_ci s.emit('end'); 8241cb0ef41Sopenharmony_ci }); 8251cb0ef41Sopenharmony_ci // 'destroyer' can be called multiple times, 8261cb0ef41Sopenharmony_ci // once from stream wrapper and 8271cb0ef41Sopenharmony_ci // once from iterator wrapper. 8281cb0ef41Sopenharmony_ci s.close = common.mustCallAtLeast(1); 8291cb0ef41Sopenharmony_ci let ret = ''; 8301cb0ef41Sopenharmony_ci pipeline(s, async function(source) { 8311cb0ef41Sopenharmony_ci for await (const chunk of source) { 8321cb0ef41Sopenharmony_ci ret += chunk; 8331cb0ef41Sopenharmony_ci } 8341cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 8351cb0ef41Sopenharmony_ci assert.strictEqual(err, undefined); 8361cb0ef41Sopenharmony_ci assert.strictEqual(ret, 'asd'); 8371cb0ef41Sopenharmony_ci })); 8381cb0ef41Sopenharmony_ci} 8391cb0ef41Sopenharmony_ci 8401cb0ef41Sopenharmony_ci{ 8411cb0ef41Sopenharmony_ci // v1 error streams without read(). 8421cb0ef41Sopenharmony_ci 8431cb0ef41Sopenharmony_ci const s = new Stream(); 8441cb0ef41Sopenharmony_ci process.nextTick(() => { 8451cb0ef41Sopenharmony_ci s.emit('error', new Error('kaboom')); 8461cb0ef41Sopenharmony_ci }); 8471cb0ef41Sopenharmony_ci s.destroy = common.mustCall(); 8481cb0ef41Sopenharmony_ci pipeline(s, async function(source) { 8491cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 8501cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 8511cb0ef41Sopenharmony_ci })); 8521cb0ef41Sopenharmony_ci} 8531cb0ef41Sopenharmony_ci 8541cb0ef41Sopenharmony_ci{ 8551cb0ef41Sopenharmony_ci const s = new PassThrough(); 8561cb0ef41Sopenharmony_ci assert.throws(() => { 8571cb0ef41Sopenharmony_ci pipeline(function(source) { 8581cb0ef41Sopenharmony_ci }, s, () => {}); 8591cb0ef41Sopenharmony_ci }, (err) => { 8601cb0ef41Sopenharmony_ci assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 8611cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, false); 8621cb0ef41Sopenharmony_ci return true; 8631cb0ef41Sopenharmony_ci }); 8641cb0ef41Sopenharmony_ci} 8651cb0ef41Sopenharmony_ci 8661cb0ef41Sopenharmony_ci{ 8671cb0ef41Sopenharmony_ci const s = new PassThrough(); 8681cb0ef41Sopenharmony_ci assert.throws(() => { 8691cb0ef41Sopenharmony_ci pipeline(s, function(source) { 8701cb0ef41Sopenharmony_ci }, s, () => {}); 8711cb0ef41Sopenharmony_ci }, (err) => { 8721cb0ef41Sopenharmony_ci assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 8731cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, false); 8741cb0ef41Sopenharmony_ci return true; 8751cb0ef41Sopenharmony_ci }); 8761cb0ef41Sopenharmony_ci} 8771cb0ef41Sopenharmony_ci 8781cb0ef41Sopenharmony_ci{ 8791cb0ef41Sopenharmony_ci const s = new PassThrough(); 8801cb0ef41Sopenharmony_ci assert.throws(() => { 8811cb0ef41Sopenharmony_ci pipeline(s, function(source) { 8821cb0ef41Sopenharmony_ci }, () => {}); 8831cb0ef41Sopenharmony_ci }, (err) => { 8841cb0ef41Sopenharmony_ci assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 8851cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, false); 8861cb0ef41Sopenharmony_ci return true; 8871cb0ef41Sopenharmony_ci }); 8881cb0ef41Sopenharmony_ci} 8891cb0ef41Sopenharmony_ci 8901cb0ef41Sopenharmony_ci{ 8911cb0ef41Sopenharmony_ci const s = new PassThrough(); 8921cb0ef41Sopenharmony_ci assert.throws(() => { 8931cb0ef41Sopenharmony_ci pipeline(s, function*(source) { 8941cb0ef41Sopenharmony_ci }, () => {}); 8951cb0ef41Sopenharmony_ci }, (err) => { 8961cb0ef41Sopenharmony_ci assert.strictEqual(err.code, 'ERR_INVALID_RETURN_VALUE'); 8971cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, false); 8981cb0ef41Sopenharmony_ci return true; 8991cb0ef41Sopenharmony_ci }); 9001cb0ef41Sopenharmony_ci} 9011cb0ef41Sopenharmony_ci 9021cb0ef41Sopenharmony_ci{ 9031cb0ef41Sopenharmony_ci let res = ''; 9041cb0ef41Sopenharmony_ci pipeline(async function*() { 9051cb0ef41Sopenharmony_ci await Promise.resolve(); 9061cb0ef41Sopenharmony_ci yield 'hello'; 9071cb0ef41Sopenharmony_ci yield 'world'; 9081cb0ef41Sopenharmony_ci }, new Transform({ 9091cb0ef41Sopenharmony_ci transform(chunk, encoding, cb) { 9101cb0ef41Sopenharmony_ci cb(new Error('kaboom')); 9111cb0ef41Sopenharmony_ci } 9121cb0ef41Sopenharmony_ci }), async function(source) { 9131cb0ef41Sopenharmony_ci for await (const chunk of source) { 9141cb0ef41Sopenharmony_ci res += chunk; 9151cb0ef41Sopenharmony_ci } 9161cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 9171cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 9181cb0ef41Sopenharmony_ci assert.strictEqual(res, ''); 9191cb0ef41Sopenharmony_ci })); 9201cb0ef41Sopenharmony_ci} 9211cb0ef41Sopenharmony_ci 9221cb0ef41Sopenharmony_ci{ 9231cb0ef41Sopenharmony_ci let res = ''; 9241cb0ef41Sopenharmony_ci pipeline(async function*() { 9251cb0ef41Sopenharmony_ci await Promise.resolve(); 9261cb0ef41Sopenharmony_ci yield 'hello'; 9271cb0ef41Sopenharmony_ci yield 'world'; 9281cb0ef41Sopenharmony_ci }, new Transform({ 9291cb0ef41Sopenharmony_ci transform(chunk, encoding, cb) { 9301cb0ef41Sopenharmony_ci process.nextTick(cb, new Error('kaboom')); 9311cb0ef41Sopenharmony_ci } 9321cb0ef41Sopenharmony_ci }), async function(source) { 9331cb0ef41Sopenharmony_ci for await (const chunk of source) { 9341cb0ef41Sopenharmony_ci res += chunk; 9351cb0ef41Sopenharmony_ci } 9361cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 9371cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 9381cb0ef41Sopenharmony_ci assert.strictEqual(res, ''); 9391cb0ef41Sopenharmony_ci })); 9401cb0ef41Sopenharmony_ci} 9411cb0ef41Sopenharmony_ci 9421cb0ef41Sopenharmony_ci{ 9431cb0ef41Sopenharmony_ci let res = ''; 9441cb0ef41Sopenharmony_ci pipeline(async function*() { 9451cb0ef41Sopenharmony_ci await Promise.resolve(); 9461cb0ef41Sopenharmony_ci yield 'hello'; 9471cb0ef41Sopenharmony_ci yield 'world'; 9481cb0ef41Sopenharmony_ci }, new Transform({ 9491cb0ef41Sopenharmony_ci decodeStrings: false, 9501cb0ef41Sopenharmony_ci transform(chunk, encoding, cb) { 9511cb0ef41Sopenharmony_ci cb(null, chunk.toUpperCase()); 9521cb0ef41Sopenharmony_ci } 9531cb0ef41Sopenharmony_ci }), async function(source) { 9541cb0ef41Sopenharmony_ci for await (const chunk of source) { 9551cb0ef41Sopenharmony_ci res += chunk; 9561cb0ef41Sopenharmony_ci } 9571cb0ef41Sopenharmony_ci }, common.mustSucceed(() => { 9581cb0ef41Sopenharmony_ci assert.strictEqual(res, 'HELLOWORLD'); 9591cb0ef41Sopenharmony_ci })); 9601cb0ef41Sopenharmony_ci} 9611cb0ef41Sopenharmony_ci 9621cb0ef41Sopenharmony_ci{ 9631cb0ef41Sopenharmony_ci // Ensure no unhandled rejection from async function. 9641cb0ef41Sopenharmony_ci 9651cb0ef41Sopenharmony_ci pipeline(async function*() { 9661cb0ef41Sopenharmony_ci yield 'hello'; 9671cb0ef41Sopenharmony_ci }, async function(source) { 9681cb0ef41Sopenharmony_ci throw new Error('kaboom'); 9691cb0ef41Sopenharmony_ci }, common.mustCall((err) => { 9701cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 9711cb0ef41Sopenharmony_ci })); 9721cb0ef41Sopenharmony_ci} 9731cb0ef41Sopenharmony_ci 9741cb0ef41Sopenharmony_ci{ 9751cb0ef41Sopenharmony_ci const src = new PassThrough({ autoDestroy: false }); 9761cb0ef41Sopenharmony_ci const dst = new PassThrough({ autoDestroy: false }); 9771cb0ef41Sopenharmony_ci pipeline(src, dst, common.mustCall(() => { 9781cb0ef41Sopenharmony_ci assert.strictEqual(src.destroyed, false); 9791cb0ef41Sopenharmony_ci assert.strictEqual(dst.destroyed, false); 9801cb0ef41Sopenharmony_ci })); 9811cb0ef41Sopenharmony_ci src.end(); 9821cb0ef41Sopenharmony_ci} 9831cb0ef41Sopenharmony_ci 9841cb0ef41Sopenharmony_ci{ 9851cb0ef41Sopenharmony_ci // Make sure 'close' before 'end' finishes without error 9861cb0ef41Sopenharmony_ci // if readable has received eof. 9871cb0ef41Sopenharmony_ci // Ref: https://github.com/nodejs/node/issues/29699 9881cb0ef41Sopenharmony_ci const r = new Readable(); 9891cb0ef41Sopenharmony_ci const w = new Writable({ 9901cb0ef41Sopenharmony_ci write(chunk, encoding, cb) { 9911cb0ef41Sopenharmony_ci cb(); 9921cb0ef41Sopenharmony_ci } 9931cb0ef41Sopenharmony_ci }); 9941cb0ef41Sopenharmony_ci pipeline(r, w, (err) => { 9951cb0ef41Sopenharmony_ci assert.strictEqual(err, undefined); 9961cb0ef41Sopenharmony_ci }); 9971cb0ef41Sopenharmony_ci r.push('asd'); 9981cb0ef41Sopenharmony_ci r.push(null); 9991cb0ef41Sopenharmony_ci r.emit('close'); 10001cb0ef41Sopenharmony_ci} 10011cb0ef41Sopenharmony_ci 10021cb0ef41Sopenharmony_ci{ 10031cb0ef41Sopenharmony_ci const server = http.createServer((req, res) => { 10041cb0ef41Sopenharmony_ci }); 10051cb0ef41Sopenharmony_ci 10061cb0ef41Sopenharmony_ci server.listen(0, () => { 10071cb0ef41Sopenharmony_ci const req = http.request({ 10081cb0ef41Sopenharmony_ci port: server.address().port 10091cb0ef41Sopenharmony_ci }); 10101cb0ef41Sopenharmony_ci 10111cb0ef41Sopenharmony_ci const body = new PassThrough(); 10121cb0ef41Sopenharmony_ci pipeline( 10131cb0ef41Sopenharmony_ci body, 10141cb0ef41Sopenharmony_ci req, 10151cb0ef41Sopenharmony_ci common.mustSucceed(() => { 10161cb0ef41Sopenharmony_ci assert(!req.res); 10171cb0ef41Sopenharmony_ci assert(!req.aborted); 10181cb0ef41Sopenharmony_ci req.abort(); 10191cb0ef41Sopenharmony_ci server.close(); 10201cb0ef41Sopenharmony_ci }) 10211cb0ef41Sopenharmony_ci ); 10221cb0ef41Sopenharmony_ci body.end(); 10231cb0ef41Sopenharmony_ci }); 10241cb0ef41Sopenharmony_ci} 10251cb0ef41Sopenharmony_ci 10261cb0ef41Sopenharmony_ci{ 10271cb0ef41Sopenharmony_ci const src = new PassThrough(); 10281cb0ef41Sopenharmony_ci const dst = new PassThrough(); 10291cb0ef41Sopenharmony_ci pipeline(src, dst, common.mustSucceed(() => { 10301cb0ef41Sopenharmony_ci assert.strictEqual(dst.destroyed, false); 10311cb0ef41Sopenharmony_ci })); 10321cb0ef41Sopenharmony_ci src.end(); 10331cb0ef41Sopenharmony_ci} 10341cb0ef41Sopenharmony_ci 10351cb0ef41Sopenharmony_ci{ 10361cb0ef41Sopenharmony_ci const src = new PassThrough(); 10371cb0ef41Sopenharmony_ci const dst = new PassThrough(); 10381cb0ef41Sopenharmony_ci dst.readable = false; 10391cb0ef41Sopenharmony_ci pipeline(src, dst, common.mustSucceed(() => { 10401cb0ef41Sopenharmony_ci assert.strictEqual(dst.destroyed, true); 10411cb0ef41Sopenharmony_ci })); 10421cb0ef41Sopenharmony_ci src.end(); 10431cb0ef41Sopenharmony_ci} 10441cb0ef41Sopenharmony_ci 10451cb0ef41Sopenharmony_ci{ 10461cb0ef41Sopenharmony_ci let res = ''; 10471cb0ef41Sopenharmony_ci const rs = new Readable({ 10481cb0ef41Sopenharmony_ci read() { 10491cb0ef41Sopenharmony_ci setImmediate(() => { 10501cb0ef41Sopenharmony_ci rs.push('hello'); 10511cb0ef41Sopenharmony_ci }); 10521cb0ef41Sopenharmony_ci } 10531cb0ef41Sopenharmony_ci }); 10541cb0ef41Sopenharmony_ci const ws = new Writable({ 10551cb0ef41Sopenharmony_ci write: common.mustNotCall() 10561cb0ef41Sopenharmony_ci }); 10571cb0ef41Sopenharmony_ci pipeline(rs, async function*(stream) { // eslint-disable-line require-yield 10581cb0ef41Sopenharmony_ci for await (const chunk of stream) { // eslint-disable-line no-unused-vars 10591cb0ef41Sopenharmony_ci throw new Error('kaboom'); 10601cb0ef41Sopenharmony_ci } 10611cb0ef41Sopenharmony_ci }, async function *(source) { // eslint-disable-line require-yield 10621cb0ef41Sopenharmony_ci for await (const chunk of source) { 10631cb0ef41Sopenharmony_ci res += chunk; 10641cb0ef41Sopenharmony_ci } 10651cb0ef41Sopenharmony_ci }, ws, common.mustCall((err) => { 10661cb0ef41Sopenharmony_ci assert.strictEqual(err.message, 'kaboom'); 10671cb0ef41Sopenharmony_ci assert.strictEqual(res, ''); 10681cb0ef41Sopenharmony_ci })); 10691cb0ef41Sopenharmony_ci} 10701cb0ef41Sopenharmony_ci 10711cb0ef41Sopenharmony_ci{ 10721cb0ef41Sopenharmony_ci const server = http.createServer((req, res) => { 10731cb0ef41Sopenharmony_ci req.socket.on('error', common.mustNotCall()); 10741cb0ef41Sopenharmony_ci pipeline(req, new PassThrough(), (err) => { 10751cb0ef41Sopenharmony_ci assert.ifError(err); 10761cb0ef41Sopenharmony_ci res.end(); 10771cb0ef41Sopenharmony_ci server.close(); 10781cb0ef41Sopenharmony_ci }); 10791cb0ef41Sopenharmony_ci }); 10801cb0ef41Sopenharmony_ci 10811cb0ef41Sopenharmony_ci server.listen(0, () => { 10821cb0ef41Sopenharmony_ci const req = http.request({ 10831cb0ef41Sopenharmony_ci method: 'PUT', 10841cb0ef41Sopenharmony_ci port: server.address().port 10851cb0ef41Sopenharmony_ci }); 10861cb0ef41Sopenharmony_ci req.end('asd123'); 10871cb0ef41Sopenharmony_ci req.on('response', common.mustCall()); 10881cb0ef41Sopenharmony_ci req.on('error', common.mustNotCall()); 10891cb0ef41Sopenharmony_ci }); 10901cb0ef41Sopenharmony_ci} 10911cb0ef41Sopenharmony_ci 10921cb0ef41Sopenharmony_ci{ 10931cb0ef41Sopenharmony_ci // Might still want to be able to use the writable side 10941cb0ef41Sopenharmony_ci // of src. This is in the case where e.g. the Duplex input 10951cb0ef41Sopenharmony_ci // is not directly connected to its output. Such a case could 10961cb0ef41Sopenharmony_ci // happen when the Duplex is reading from a socket and then echos 10971cb0ef41Sopenharmony_ci // the data back on the same socket. 10981cb0ef41Sopenharmony_ci const src = new PassThrough(); 10991cb0ef41Sopenharmony_ci assert.strictEqual(src.writable, true); 11001cb0ef41Sopenharmony_ci const dst = new PassThrough(); 11011cb0ef41Sopenharmony_ci pipeline(src, dst, common.mustCall((err) => { 11021cb0ef41Sopenharmony_ci assert.strictEqual(src.writable, true); 11031cb0ef41Sopenharmony_ci assert.strictEqual(src.destroyed, false); 11041cb0ef41Sopenharmony_ci })); 11051cb0ef41Sopenharmony_ci src.push(null); 11061cb0ef41Sopenharmony_ci} 11071cb0ef41Sopenharmony_ci 11081cb0ef41Sopenharmony_ci{ 11091cb0ef41Sopenharmony_ci const src = new PassThrough(); 11101cb0ef41Sopenharmony_ci const dst = pipeline( 11111cb0ef41Sopenharmony_ci src, 11121cb0ef41Sopenharmony_ci async function * (source) { 11131cb0ef41Sopenharmony_ci for await (const chunk of source) { 11141cb0ef41Sopenharmony_ci yield chunk; 11151cb0ef41Sopenharmony_ci } 11161cb0ef41Sopenharmony_ci }, 11171cb0ef41Sopenharmony_ci common.mustCall((err) => { 11181cb0ef41Sopenharmony_ci assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE'); 11191cb0ef41Sopenharmony_ci }) 11201cb0ef41Sopenharmony_ci ); 11211cb0ef41Sopenharmony_ci src.push('asd'); 11221cb0ef41Sopenharmony_ci dst.destroy(); 11231cb0ef41Sopenharmony_ci} 11241cb0ef41Sopenharmony_ci 11251cb0ef41Sopenharmony_ci{ 11261cb0ef41Sopenharmony_ci pipeline(async function * () { 11271cb0ef41Sopenharmony_ci yield 'asd'; 11281cb0ef41Sopenharmony_ci }, async function * (source) { 11291cb0ef41Sopenharmony_ci for await (const chunk of source) { 11301cb0ef41Sopenharmony_ci yield { chunk }; 11311cb0ef41Sopenharmony_ci } 11321cb0ef41Sopenharmony_ci }, common.mustSucceed()); 11331cb0ef41Sopenharmony_ci} 11341cb0ef41Sopenharmony_ci 11351cb0ef41Sopenharmony_ci{ 11361cb0ef41Sopenharmony_ci let closed = false; 11371cb0ef41Sopenharmony_ci const src = new Readable({ 11381cb0ef41Sopenharmony_ci read() {}, 11391cb0ef41Sopenharmony_ci destroy(err, cb) { 11401cb0ef41Sopenharmony_ci process.nextTick(cb); 11411cb0ef41Sopenharmony_ci } 11421cb0ef41Sopenharmony_ci }); 11431cb0ef41Sopenharmony_ci const dst = new Writable({ 11441cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 11451cb0ef41Sopenharmony_ci callback(); 11461cb0ef41Sopenharmony_ci } 11471cb0ef41Sopenharmony_ci }); 11481cb0ef41Sopenharmony_ci src.on('close', () => { 11491cb0ef41Sopenharmony_ci closed = true; 11501cb0ef41Sopenharmony_ci }); 11511cb0ef41Sopenharmony_ci src.push(null); 11521cb0ef41Sopenharmony_ci pipeline(src, dst, common.mustCall((err) => { 11531cb0ef41Sopenharmony_ci assert.strictEqual(closed, true); 11541cb0ef41Sopenharmony_ci })); 11551cb0ef41Sopenharmony_ci} 11561cb0ef41Sopenharmony_ci 11571cb0ef41Sopenharmony_ci{ 11581cb0ef41Sopenharmony_ci let closed = false; 11591cb0ef41Sopenharmony_ci const src = new Readable({ 11601cb0ef41Sopenharmony_ci read() {}, 11611cb0ef41Sopenharmony_ci destroy(err, cb) { 11621cb0ef41Sopenharmony_ci process.nextTick(cb); 11631cb0ef41Sopenharmony_ci } 11641cb0ef41Sopenharmony_ci }); 11651cb0ef41Sopenharmony_ci const dst = new Duplex({}); 11661cb0ef41Sopenharmony_ci src.on('close', common.mustCall(() => { 11671cb0ef41Sopenharmony_ci closed = true; 11681cb0ef41Sopenharmony_ci })); 11691cb0ef41Sopenharmony_ci src.push(null); 11701cb0ef41Sopenharmony_ci pipeline(src, dst, common.mustCall((err) => { 11711cb0ef41Sopenharmony_ci assert.strictEqual(closed, true); 11721cb0ef41Sopenharmony_ci })); 11731cb0ef41Sopenharmony_ci} 11741cb0ef41Sopenharmony_ci 11751cb0ef41Sopenharmony_ci{ 11761cb0ef41Sopenharmony_ci const server = net.createServer(common.mustCall((socket) => { 11771cb0ef41Sopenharmony_ci // echo server 11781cb0ef41Sopenharmony_ci pipeline(socket, socket, common.mustSucceed()); 11791cb0ef41Sopenharmony_ci // 13 force destroys the socket before it has a chance to emit finish 11801cb0ef41Sopenharmony_ci socket.on('finish', common.mustCall(() => { 11811cb0ef41Sopenharmony_ci server.close(); 11821cb0ef41Sopenharmony_ci })); 11831cb0ef41Sopenharmony_ci })).listen(0, common.mustCall(() => { 11841cb0ef41Sopenharmony_ci const socket = net.connect(server.address().port); 11851cb0ef41Sopenharmony_ci socket.end(); 11861cb0ef41Sopenharmony_ci })); 11871cb0ef41Sopenharmony_ci} 11881cb0ef41Sopenharmony_ci 11891cb0ef41Sopenharmony_ci{ 11901cb0ef41Sopenharmony_ci const d = new Duplex({ 11911cb0ef41Sopenharmony_ci autoDestroy: false, 11921cb0ef41Sopenharmony_ci write: common.mustCall((data, enc, cb) => { 11931cb0ef41Sopenharmony_ci d.push(data); 11941cb0ef41Sopenharmony_ci cb(); 11951cb0ef41Sopenharmony_ci }), 11961cb0ef41Sopenharmony_ci read: common.mustCall(() => { 11971cb0ef41Sopenharmony_ci d.push(null); 11981cb0ef41Sopenharmony_ci }), 11991cb0ef41Sopenharmony_ci final: common.mustCall((cb) => { 12001cb0ef41Sopenharmony_ci setTimeout(() => { 12011cb0ef41Sopenharmony_ci assert.strictEqual(d.destroyed, false); 12021cb0ef41Sopenharmony_ci cb(); 12031cb0ef41Sopenharmony_ci }, 1000); 12041cb0ef41Sopenharmony_ci }), 12051cb0ef41Sopenharmony_ci destroy: common.mustNotCall() 12061cb0ef41Sopenharmony_ci }); 12071cb0ef41Sopenharmony_ci 12081cb0ef41Sopenharmony_ci const sink = new Writable({ 12091cb0ef41Sopenharmony_ci write: common.mustCall((data, enc, cb) => { 12101cb0ef41Sopenharmony_ci cb(); 12111cb0ef41Sopenharmony_ci }) 12121cb0ef41Sopenharmony_ci }); 12131cb0ef41Sopenharmony_ci 12141cb0ef41Sopenharmony_ci pipeline(d, sink, common.mustSucceed()); 12151cb0ef41Sopenharmony_ci 12161cb0ef41Sopenharmony_ci d.write('test'); 12171cb0ef41Sopenharmony_ci d.end(); 12181cb0ef41Sopenharmony_ci} 12191cb0ef41Sopenharmony_ci 12201cb0ef41Sopenharmony_ci{ 12211cb0ef41Sopenharmony_ci const server = net.createServer(common.mustCall((socket) => { 12221cb0ef41Sopenharmony_ci // echo server 12231cb0ef41Sopenharmony_ci pipeline(socket, socket, common.mustSucceed()); 12241cb0ef41Sopenharmony_ci socket.on('finish', common.mustCall(() => { 12251cb0ef41Sopenharmony_ci server.close(); 12261cb0ef41Sopenharmony_ci })); 12271cb0ef41Sopenharmony_ci })).listen(0, common.mustCall(() => { 12281cb0ef41Sopenharmony_ci const socket = net.connect(server.address().port); 12291cb0ef41Sopenharmony_ci socket.end(); 12301cb0ef41Sopenharmony_ci })); 12311cb0ef41Sopenharmony_ci} 12321cb0ef41Sopenharmony_ci 12331cb0ef41Sopenharmony_ci{ 12341cb0ef41Sopenharmony_ci const d = new Duplex({ 12351cb0ef41Sopenharmony_ci autoDestroy: false, 12361cb0ef41Sopenharmony_ci write: common.mustCall((data, enc, cb) => { 12371cb0ef41Sopenharmony_ci d.push(data); 12381cb0ef41Sopenharmony_ci cb(); 12391cb0ef41Sopenharmony_ci }), 12401cb0ef41Sopenharmony_ci read: common.mustCall(() => { 12411cb0ef41Sopenharmony_ci d.push(null); 12421cb0ef41Sopenharmony_ci }), 12431cb0ef41Sopenharmony_ci final: common.mustCall((cb) => { 12441cb0ef41Sopenharmony_ci setTimeout(() => { 12451cb0ef41Sopenharmony_ci assert.strictEqual(d.destroyed, false); 12461cb0ef41Sopenharmony_ci cb(); 12471cb0ef41Sopenharmony_ci }, 1000); 12481cb0ef41Sopenharmony_ci }), 12491cb0ef41Sopenharmony_ci // `destroy()` won't be invoked by pipeline since 12501cb0ef41Sopenharmony_ci // the writable side has not completed when 12511cb0ef41Sopenharmony_ci // the pipeline has completed. 12521cb0ef41Sopenharmony_ci destroy: common.mustNotCall() 12531cb0ef41Sopenharmony_ci }); 12541cb0ef41Sopenharmony_ci 12551cb0ef41Sopenharmony_ci const sink = new Writable({ 12561cb0ef41Sopenharmony_ci write: common.mustCall((data, enc, cb) => { 12571cb0ef41Sopenharmony_ci cb(); 12581cb0ef41Sopenharmony_ci }) 12591cb0ef41Sopenharmony_ci }); 12601cb0ef41Sopenharmony_ci 12611cb0ef41Sopenharmony_ci pipeline(d, sink, common.mustSucceed()); 12621cb0ef41Sopenharmony_ci 12631cb0ef41Sopenharmony_ci d.write('test'); 12641cb0ef41Sopenharmony_ci d.end(); 12651cb0ef41Sopenharmony_ci} 12661cb0ef41Sopenharmony_ci 12671cb0ef41Sopenharmony_ci{ 12681cb0ef41Sopenharmony_ci const r = new Readable({ 12691cb0ef41Sopenharmony_ci read() {} 12701cb0ef41Sopenharmony_ci }); 12711cb0ef41Sopenharmony_ci r.push('hello'); 12721cb0ef41Sopenharmony_ci r.push('world'); 12731cb0ef41Sopenharmony_ci r.push(null); 12741cb0ef41Sopenharmony_ci let res = ''; 12751cb0ef41Sopenharmony_ci const w = new Writable({ 12761cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 12771cb0ef41Sopenharmony_ci res += chunk; 12781cb0ef41Sopenharmony_ci callback(); 12791cb0ef41Sopenharmony_ci } 12801cb0ef41Sopenharmony_ci }); 12811cb0ef41Sopenharmony_ci pipeline([r, w], common.mustSucceed(() => { 12821cb0ef41Sopenharmony_ci assert.strictEqual(res, 'helloworld'); 12831cb0ef41Sopenharmony_ci })); 12841cb0ef41Sopenharmony_ci} 12851cb0ef41Sopenharmony_ci 12861cb0ef41Sopenharmony_ci{ 12871cb0ef41Sopenharmony_ci let flushed = false; 12881cb0ef41Sopenharmony_ci const makeStream = () => 12891cb0ef41Sopenharmony_ci new Transform({ 12901cb0ef41Sopenharmony_ci transform: (chunk, enc, cb) => cb(null, chunk), 12911cb0ef41Sopenharmony_ci flush: (cb) => 12921cb0ef41Sopenharmony_ci setTimeout(() => { 12931cb0ef41Sopenharmony_ci flushed = true; 12941cb0ef41Sopenharmony_ci cb(null); 12951cb0ef41Sopenharmony_ci }, 1), 12961cb0ef41Sopenharmony_ci }); 12971cb0ef41Sopenharmony_ci 12981cb0ef41Sopenharmony_ci const input = new Readable(); 12991cb0ef41Sopenharmony_ci input.push(null); 13001cb0ef41Sopenharmony_ci 13011cb0ef41Sopenharmony_ci pipeline( 13021cb0ef41Sopenharmony_ci input, 13031cb0ef41Sopenharmony_ci makeStream(), 13041cb0ef41Sopenharmony_ci common.mustCall(() => { 13051cb0ef41Sopenharmony_ci assert.strictEqual(flushed, true); 13061cb0ef41Sopenharmony_ci }), 13071cb0ef41Sopenharmony_ci ); 13081cb0ef41Sopenharmony_ci} 13091cb0ef41Sopenharmony_ci{ 13101cb0ef41Sopenharmony_ci function createThenable() { 13111cb0ef41Sopenharmony_ci let counter = 0; 13121cb0ef41Sopenharmony_ci return { 13131cb0ef41Sopenharmony_ci get then() { 13141cb0ef41Sopenharmony_ci if (counter++) { 13151cb0ef41Sopenharmony_ci throw new Error('Cannot access `then` more than once'); 13161cb0ef41Sopenharmony_ci } 13171cb0ef41Sopenharmony_ci return Function.prototype; 13181cb0ef41Sopenharmony_ci }, 13191cb0ef41Sopenharmony_ci }; 13201cb0ef41Sopenharmony_ci } 13211cb0ef41Sopenharmony_ci 13221cb0ef41Sopenharmony_ci pipeline( 13231cb0ef41Sopenharmony_ci function* () { 13241cb0ef41Sopenharmony_ci yield 0; 13251cb0ef41Sopenharmony_ci }, 13261cb0ef41Sopenharmony_ci createThenable, 13271cb0ef41Sopenharmony_ci () => common.mustNotCall(), 13281cb0ef41Sopenharmony_ci ); 13291cb0ef41Sopenharmony_ci} 13301cb0ef41Sopenharmony_ci 13311cb0ef41Sopenharmony_ci 13321cb0ef41Sopenharmony_ci{ 13331cb0ef41Sopenharmony_ci const ac = new AbortController(); 13341cb0ef41Sopenharmony_ci const r = Readable.from(async function* () { 13351cb0ef41Sopenharmony_ci for (let i = 0; i < 10; i++) { 13361cb0ef41Sopenharmony_ci await Promise.resolve(); 13371cb0ef41Sopenharmony_ci yield String(i); 13381cb0ef41Sopenharmony_ci if (i === 5) { 13391cb0ef41Sopenharmony_ci ac.abort(); 13401cb0ef41Sopenharmony_ci } 13411cb0ef41Sopenharmony_ci } 13421cb0ef41Sopenharmony_ci }()); 13431cb0ef41Sopenharmony_ci let res = ''; 13441cb0ef41Sopenharmony_ci const w = new Writable({ 13451cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 13461cb0ef41Sopenharmony_ci res += chunk; 13471cb0ef41Sopenharmony_ci callback(); 13481cb0ef41Sopenharmony_ci } 13491cb0ef41Sopenharmony_ci }); 13501cb0ef41Sopenharmony_ci const cb = common.mustCall((err) => { 13511cb0ef41Sopenharmony_ci assert.strictEqual(err.name, 'AbortError'); 13521cb0ef41Sopenharmony_ci assert.strictEqual(res, '012345'); 13531cb0ef41Sopenharmony_ci assert.strictEqual(w.destroyed, true); 13541cb0ef41Sopenharmony_ci assert.strictEqual(r.destroyed, true); 13551cb0ef41Sopenharmony_ci assert.strictEqual(pipelined.destroyed, true); 13561cb0ef41Sopenharmony_ci }); 13571cb0ef41Sopenharmony_ci const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb)); 13581cb0ef41Sopenharmony_ci} 13591cb0ef41Sopenharmony_ci 13601cb0ef41Sopenharmony_ci{ 13611cb0ef41Sopenharmony_ci pipeline([1, 2, 3], PassThrough({ objectMode: true }), 13621cb0ef41Sopenharmony_ci common.mustSucceed(() => {})); 13631cb0ef41Sopenharmony_ci 13641cb0ef41Sopenharmony_ci let res = ''; 13651cb0ef41Sopenharmony_ci const w = new Writable({ 13661cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 13671cb0ef41Sopenharmony_ci res += chunk; 13681cb0ef41Sopenharmony_ci callback(); 13691cb0ef41Sopenharmony_ci }, 13701cb0ef41Sopenharmony_ci }); 13711cb0ef41Sopenharmony_ci pipeline(['1', '2', '3'], w, common.mustSucceed(() => { 13721cb0ef41Sopenharmony_ci assert.strictEqual(res, '123'); 13731cb0ef41Sopenharmony_ci })); 13741cb0ef41Sopenharmony_ci} 13751cb0ef41Sopenharmony_ci 13761cb0ef41Sopenharmony_ci{ 13771cb0ef41Sopenharmony_ci const content = 'abc'; 13781cb0ef41Sopenharmony_ci pipeline(Buffer.from(content), PassThrough({ objectMode: true }), 13791cb0ef41Sopenharmony_ci common.mustSucceed(() => {})); 13801cb0ef41Sopenharmony_ci 13811cb0ef41Sopenharmony_ci let res = ''; 13821cb0ef41Sopenharmony_ci pipeline(Buffer.from(content), async function*(previous) { 13831cb0ef41Sopenharmony_ci for await (const val of previous) { 13841cb0ef41Sopenharmony_ci res += String.fromCharCode(val); 13851cb0ef41Sopenharmony_ci yield val; 13861cb0ef41Sopenharmony_ci } 13871cb0ef41Sopenharmony_ci }, common.mustSucceed(() => { 13881cb0ef41Sopenharmony_ci assert.strictEqual(res, content); 13891cb0ef41Sopenharmony_ci })); 13901cb0ef41Sopenharmony_ci} 13911cb0ef41Sopenharmony_ci 13921cb0ef41Sopenharmony_ci{ 13931cb0ef41Sopenharmony_ci const ac = new AbortController(); 13941cb0ef41Sopenharmony_ci const signal = ac.signal; 13951cb0ef41Sopenharmony_ci pipelinep( 13961cb0ef41Sopenharmony_ci async function * ({ signal }) { // eslint-disable-line require-yield 13971cb0ef41Sopenharmony_ci await tsp.setTimeout(1e6, signal); 13981cb0ef41Sopenharmony_ci }, 13991cb0ef41Sopenharmony_ci async function(source) { 14001cb0ef41Sopenharmony_ci 14011cb0ef41Sopenharmony_ci }, 14021cb0ef41Sopenharmony_ci { signal } 14031cb0ef41Sopenharmony_ci ).catch(common.mustCall((err) => { 14041cb0ef41Sopenharmony_ci assert.strictEqual(err.name, 'AbortError'); 14051cb0ef41Sopenharmony_ci })); 14061cb0ef41Sopenharmony_ci ac.abort(); 14071cb0ef41Sopenharmony_ci} 14081cb0ef41Sopenharmony_ci 14091cb0ef41Sopenharmony_ci{ 14101cb0ef41Sopenharmony_ci async function run() { 14111cb0ef41Sopenharmony_ci let finished = false; 14121cb0ef41Sopenharmony_ci let text = ''; 14131cb0ef41Sopenharmony_ci const write = new Writable({ 14141cb0ef41Sopenharmony_ci write(data, enc, cb) { 14151cb0ef41Sopenharmony_ci text += data; 14161cb0ef41Sopenharmony_ci cb(); 14171cb0ef41Sopenharmony_ci } 14181cb0ef41Sopenharmony_ci }); 14191cb0ef41Sopenharmony_ci write.on('finish', () => { 14201cb0ef41Sopenharmony_ci finished = true; 14211cb0ef41Sopenharmony_ci }); 14221cb0ef41Sopenharmony_ci 14231cb0ef41Sopenharmony_ci await pipelinep([Readable.from('Hello World!'), write]); 14241cb0ef41Sopenharmony_ci assert(finished); 14251cb0ef41Sopenharmony_ci assert.strictEqual(text, 'Hello World!'); 14261cb0ef41Sopenharmony_ci } 14271cb0ef41Sopenharmony_ci 14281cb0ef41Sopenharmony_ci run(); 14291cb0ef41Sopenharmony_ci} 14301cb0ef41Sopenharmony_ci 14311cb0ef41Sopenharmony_ci{ 14321cb0ef41Sopenharmony_ci let finished = false; 14331cb0ef41Sopenharmony_ci let text = ''; 14341cb0ef41Sopenharmony_ci const write = new Writable({ 14351cb0ef41Sopenharmony_ci write(data, enc, cb) { 14361cb0ef41Sopenharmony_ci text += data; 14371cb0ef41Sopenharmony_ci cb(); 14381cb0ef41Sopenharmony_ci } 14391cb0ef41Sopenharmony_ci }); 14401cb0ef41Sopenharmony_ci write.on('finish', () => { 14411cb0ef41Sopenharmony_ci finished = true; 14421cb0ef41Sopenharmony_ci }); 14431cb0ef41Sopenharmony_ci 14441cb0ef41Sopenharmony_ci pipeline([Readable.from('Hello World!'), write], common.mustSucceed(() => { 14451cb0ef41Sopenharmony_ci assert(finished); 14461cb0ef41Sopenharmony_ci assert.strictEqual(text, 'Hello World!'); 14471cb0ef41Sopenharmony_ci })); 14481cb0ef41Sopenharmony_ci} 14491cb0ef41Sopenharmony_ci 14501cb0ef41Sopenharmony_ci{ 14511cb0ef41Sopenharmony_ci const pipelinePromise = promisify(pipeline); 14521cb0ef41Sopenharmony_ci 14531cb0ef41Sopenharmony_ci async function run() { 14541cb0ef41Sopenharmony_ci const read = new Readable({ 14551cb0ef41Sopenharmony_ci read() {} 14561cb0ef41Sopenharmony_ci }); 14571cb0ef41Sopenharmony_ci 14581cb0ef41Sopenharmony_ci const duplex = new PassThrough(); 14591cb0ef41Sopenharmony_ci 14601cb0ef41Sopenharmony_ci read.push(null); 14611cb0ef41Sopenharmony_ci 14621cb0ef41Sopenharmony_ci await pipelinePromise(read, duplex); 14631cb0ef41Sopenharmony_ci 14641cb0ef41Sopenharmony_ci assert.strictEqual(duplex.destroyed, false); 14651cb0ef41Sopenharmony_ci } 14661cb0ef41Sopenharmony_ci 14671cb0ef41Sopenharmony_ci run().then(common.mustCall()); 14681cb0ef41Sopenharmony_ci} 14691cb0ef41Sopenharmony_ci 14701cb0ef41Sopenharmony_ci{ 14711cb0ef41Sopenharmony_ci const pipelinePromise = promisify(pipeline); 14721cb0ef41Sopenharmony_ci 14731cb0ef41Sopenharmony_ci async function run() { 14741cb0ef41Sopenharmony_ci const read = new Readable({ 14751cb0ef41Sopenharmony_ci read() {} 14761cb0ef41Sopenharmony_ci }); 14771cb0ef41Sopenharmony_ci 14781cb0ef41Sopenharmony_ci const duplex = new PassThrough(); 14791cb0ef41Sopenharmony_ci 14801cb0ef41Sopenharmony_ci read.push(null); 14811cb0ef41Sopenharmony_ci 14821cb0ef41Sopenharmony_ci await pipelinePromise(read, duplex, { end: false }); 14831cb0ef41Sopenharmony_ci 14841cb0ef41Sopenharmony_ci assert.strictEqual(duplex.destroyed, false); 14851cb0ef41Sopenharmony_ci assert.strictEqual(duplex.writableEnded, false); 14861cb0ef41Sopenharmony_ci } 14871cb0ef41Sopenharmony_ci 14881cb0ef41Sopenharmony_ci run().then(common.mustCall()); 14891cb0ef41Sopenharmony_ci} 14901cb0ef41Sopenharmony_ci 14911cb0ef41Sopenharmony_ci{ 14921cb0ef41Sopenharmony_ci const s = new PassThrough({ objectMode: true }); 14931cb0ef41Sopenharmony_ci pipeline(async function*() { 14941cb0ef41Sopenharmony_ci await Promise.resolve(); 14951cb0ef41Sopenharmony_ci yield 'hello'; 14961cb0ef41Sopenharmony_ci yield 'world'; 14971cb0ef41Sopenharmony_ci yield 'world'; 14981cb0ef41Sopenharmony_ci }, s, async function(source) { 14991cb0ef41Sopenharmony_ci let ret = ''; 15001cb0ef41Sopenharmony_ci let n = 0; 15011cb0ef41Sopenharmony_ci for await (const chunk of source) { 15021cb0ef41Sopenharmony_ci if (n++ > 1) { 15031cb0ef41Sopenharmony_ci break; 15041cb0ef41Sopenharmony_ci } 15051cb0ef41Sopenharmony_ci ret += chunk; 15061cb0ef41Sopenharmony_ci } 15071cb0ef41Sopenharmony_ci return ret; 15081cb0ef41Sopenharmony_ci }, common.mustCall((err, val) => { 15091cb0ef41Sopenharmony_ci assert.strictEqual(err, undefined); 15101cb0ef41Sopenharmony_ci assert.strictEqual(val, 'helloworld'); 15111cb0ef41Sopenharmony_ci assert.strictEqual(s.destroyed, true); 15121cb0ef41Sopenharmony_ci })); 15131cb0ef41Sopenharmony_ci} 15141cb0ef41Sopenharmony_ci 15151cb0ef41Sopenharmony_ci{ 15161cb0ef41Sopenharmony_ci const s = new PassThrough({ objectMode: true }); 15171cb0ef41Sopenharmony_ci pipeline(async function*() { 15181cb0ef41Sopenharmony_ci await Promise.resolve(); 15191cb0ef41Sopenharmony_ci yield 'hello'; 15201cb0ef41Sopenharmony_ci yield 'world'; 15211cb0ef41Sopenharmony_ci yield 'world'; 15221cb0ef41Sopenharmony_ci }, s, async function(source) { 15231cb0ef41Sopenharmony_ci return null; 15241cb0ef41Sopenharmony_ci }, common.mustCall((err, val) => { 15251cb0ef41Sopenharmony_ci assert.strictEqual(err, undefined); 15261cb0ef41Sopenharmony_ci assert.strictEqual(val, null); 15271cb0ef41Sopenharmony_ci })); 15281cb0ef41Sopenharmony_ci} 15291cb0ef41Sopenharmony_ci 15301cb0ef41Sopenharmony_ci{ 15311cb0ef41Sopenharmony_ci // Mimics a legacy stream without the .destroy method 15321cb0ef41Sopenharmony_ci class LegacyWritable extends Stream { 15331cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 15341cb0ef41Sopenharmony_ci callback(); 15351cb0ef41Sopenharmony_ci } 15361cb0ef41Sopenharmony_ci } 15371cb0ef41Sopenharmony_ci 15381cb0ef41Sopenharmony_ci const writable = new LegacyWritable(); 15391cb0ef41Sopenharmony_ci writable.on('error', common.mustCall((err) => { 15401cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('stop')); 15411cb0ef41Sopenharmony_ci })); 15421cb0ef41Sopenharmony_ci 15431cb0ef41Sopenharmony_ci pipeline( 15441cb0ef41Sopenharmony_ci Readable.from({ 15451cb0ef41Sopenharmony_ci [Symbol.asyncIterator]() { 15461cb0ef41Sopenharmony_ci return { 15471cb0ef41Sopenharmony_ci next() { 15481cb0ef41Sopenharmony_ci return Promise.reject(new Error('stop')); 15491cb0ef41Sopenharmony_ci } 15501cb0ef41Sopenharmony_ci }; 15511cb0ef41Sopenharmony_ci } 15521cb0ef41Sopenharmony_ci }), 15531cb0ef41Sopenharmony_ci writable, 15541cb0ef41Sopenharmony_ci common.mustCall((err) => { 15551cb0ef41Sopenharmony_ci assert.deepStrictEqual(err, new Error('stop')); 15561cb0ef41Sopenharmony_ci }) 15571cb0ef41Sopenharmony_ci ); 15581cb0ef41Sopenharmony_ci} 15591cb0ef41Sopenharmony_ci 15601cb0ef41Sopenharmony_ci{ 15611cb0ef41Sopenharmony_ci class CustomReadable extends Readable { 15621cb0ef41Sopenharmony_ci _read() { 15631cb0ef41Sopenharmony_ci this.push('asd'); 15641cb0ef41Sopenharmony_ci this.push(null); 15651cb0ef41Sopenharmony_ci } 15661cb0ef41Sopenharmony_ci } 15671cb0ef41Sopenharmony_ci 15681cb0ef41Sopenharmony_ci class CustomWritable extends Writable { 15691cb0ef41Sopenharmony_ci constructor() { 15701cb0ef41Sopenharmony_ci super(); 15711cb0ef41Sopenharmony_ci this.endCount = 0; 15721cb0ef41Sopenharmony_ci this.str = ''; 15731cb0ef41Sopenharmony_ci } 15741cb0ef41Sopenharmony_ci 15751cb0ef41Sopenharmony_ci _write(chunk, enc, cb) { 15761cb0ef41Sopenharmony_ci this.str += chunk; 15771cb0ef41Sopenharmony_ci cb(); 15781cb0ef41Sopenharmony_ci } 15791cb0ef41Sopenharmony_ci 15801cb0ef41Sopenharmony_ci end() { 15811cb0ef41Sopenharmony_ci this.endCount += 1; 15821cb0ef41Sopenharmony_ci super.end(); 15831cb0ef41Sopenharmony_ci } 15841cb0ef41Sopenharmony_ci } 15851cb0ef41Sopenharmony_ci 15861cb0ef41Sopenharmony_ci const readable = new CustomReadable(); 15871cb0ef41Sopenharmony_ci const writable = new CustomWritable(); 15881cb0ef41Sopenharmony_ci 15891cb0ef41Sopenharmony_ci pipeline(readable, writable, common.mustSucceed(() => { 15901cb0ef41Sopenharmony_ci assert.strictEqual(writable.str, 'asd'); 15911cb0ef41Sopenharmony_ci assert.strictEqual(writable.endCount, 1); 15921cb0ef41Sopenharmony_ci })); 15931cb0ef41Sopenharmony_ci} 15941cb0ef41Sopenharmony_ci 15951cb0ef41Sopenharmony_ci{ 15961cb0ef41Sopenharmony_ci const readable = new Readable({ 15971cb0ef41Sopenharmony_ci read() {} 15981cb0ef41Sopenharmony_ci }); 15991cb0ef41Sopenharmony_ci readable.on('end', common.mustCall(() => { 16001cb0ef41Sopenharmony_ci pipeline(readable, new PassThrough(), common.mustSucceed()); 16011cb0ef41Sopenharmony_ci })); 16021cb0ef41Sopenharmony_ci readable.push(null); 16031cb0ef41Sopenharmony_ci readable.read(); 16041cb0ef41Sopenharmony_ci} 16051cb0ef41Sopenharmony_ci 16061cb0ef41Sopenharmony_ci{ 16071cb0ef41Sopenharmony_ci const dup = new Duplex({ 16081cb0ef41Sopenharmony_ci read() {}, 16091cb0ef41Sopenharmony_ci write(chunk, enc, cb) { 16101cb0ef41Sopenharmony_ci cb(); 16111cb0ef41Sopenharmony_ci } 16121cb0ef41Sopenharmony_ci }); 16131cb0ef41Sopenharmony_ci dup.on('end', common.mustCall(() => { 16141cb0ef41Sopenharmony_ci pipeline(dup, new PassThrough(), common.mustSucceed()); 16151cb0ef41Sopenharmony_ci })); 16161cb0ef41Sopenharmony_ci dup.push(null); 16171cb0ef41Sopenharmony_ci dup.read(); 16181cb0ef41Sopenharmony_ci} 16191cb0ef41Sopenharmony_ci 16201cb0ef41Sopenharmony_ci{ 16211cb0ef41Sopenharmony_ci let res = ''; 16221cb0ef41Sopenharmony_ci const writable = new Writable({ 16231cb0ef41Sopenharmony_ci write(chunk, enc, cb) { 16241cb0ef41Sopenharmony_ci res += chunk; 16251cb0ef41Sopenharmony_ci cb(); 16261cb0ef41Sopenharmony_ci } 16271cb0ef41Sopenharmony_ci }); 16281cb0ef41Sopenharmony_ci pipelinep(async function*() { 16291cb0ef41Sopenharmony_ci yield 'hello'; 16301cb0ef41Sopenharmony_ci await Promise.resolve(); 16311cb0ef41Sopenharmony_ci yield 'world'; 16321cb0ef41Sopenharmony_ci }, writable, { end: false }).then(common.mustCall(() => { 16331cb0ef41Sopenharmony_ci assert.strictEqual(res, 'helloworld'); 16341cb0ef41Sopenharmony_ci assert.strictEqual(writable.closed, false); 16351cb0ef41Sopenharmony_ci })); 16361cb0ef41Sopenharmony_ci} 16371cb0ef41Sopenharmony_ci 16381cb0ef41Sopenharmony_ci{ 16391cb0ef41Sopenharmony_ci const r = new Readable(); 16401cb0ef41Sopenharmony_ci for (let i = 0; i < 4000; i++) { 16411cb0ef41Sopenharmony_ci r.push('asdfdagljanfgkaljdfn'); 16421cb0ef41Sopenharmony_ci } 16431cb0ef41Sopenharmony_ci r.push(null); 16441cb0ef41Sopenharmony_ci 16451cb0ef41Sopenharmony_ci let ended = false; 16461cb0ef41Sopenharmony_ci r.on('end', () => { 16471cb0ef41Sopenharmony_ci ended = true; 16481cb0ef41Sopenharmony_ci }); 16491cb0ef41Sopenharmony_ci 16501cb0ef41Sopenharmony_ci const w = new Writable({ 16511cb0ef41Sopenharmony_ci write(chunk, enc, cb) { 16521cb0ef41Sopenharmony_ci cb(null); 16531cb0ef41Sopenharmony_ci }, 16541cb0ef41Sopenharmony_ci final: common.mustCall((cb) => { 16551cb0ef41Sopenharmony_ci assert.strictEqual(ended, true); 16561cb0ef41Sopenharmony_ci cb(null); 16571cb0ef41Sopenharmony_ci }) 16581cb0ef41Sopenharmony_ci }); 16591cb0ef41Sopenharmony_ci 16601cb0ef41Sopenharmony_ci pipeline(r, w, common.mustCall((err) => { 16611cb0ef41Sopenharmony_ci assert.strictEqual(err, undefined); 16621cb0ef41Sopenharmony_ci })); 16631cb0ef41Sopenharmony_ci 16641cb0ef41Sopenharmony_ci} 1665