11cb0ef41Sopenharmony_ci// Ported from https://github.com/mafintosh/pump with 21cb0ef41Sopenharmony_ci// permission from the author, Mathias Buus (@mafintosh). 31cb0ef41Sopenharmony_ci 41cb0ef41Sopenharmony_ci'use strict'; 51cb0ef41Sopenharmony_ci 61cb0ef41Sopenharmony_ciconst { 71cb0ef41Sopenharmony_ci ArrayIsArray, 81cb0ef41Sopenharmony_ci Promise, 91cb0ef41Sopenharmony_ci SymbolAsyncIterator, 101cb0ef41Sopenharmony_ci SymbolDispose, 111cb0ef41Sopenharmony_ci} = primordials; 121cb0ef41Sopenharmony_ci 131cb0ef41Sopenharmony_ciconst eos = require('internal/streams/end-of-stream'); 141cb0ef41Sopenharmony_ciconst { once } = require('internal/util'); 151cb0ef41Sopenharmony_ciconst destroyImpl = require('internal/streams/destroy'); 161cb0ef41Sopenharmony_ciconst Duplex = require('internal/streams/duplex'); 171cb0ef41Sopenharmony_ciconst { 181cb0ef41Sopenharmony_ci aggregateTwoErrors, 191cb0ef41Sopenharmony_ci codes: { 201cb0ef41Sopenharmony_ci ERR_INVALID_ARG_TYPE, 211cb0ef41Sopenharmony_ci ERR_INVALID_RETURN_VALUE, 221cb0ef41Sopenharmony_ci ERR_MISSING_ARGS, 231cb0ef41Sopenharmony_ci ERR_STREAM_DESTROYED, 241cb0ef41Sopenharmony_ci ERR_STREAM_PREMATURE_CLOSE, 251cb0ef41Sopenharmony_ci }, 261cb0ef41Sopenharmony_ci AbortError, 271cb0ef41Sopenharmony_ci} = require('internal/errors'); 281cb0ef41Sopenharmony_ci 291cb0ef41Sopenharmony_ciconst { 301cb0ef41Sopenharmony_ci validateFunction, 311cb0ef41Sopenharmony_ci validateAbortSignal, 321cb0ef41Sopenharmony_ci} = require('internal/validators'); 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_ciconst { 351cb0ef41Sopenharmony_ci isIterable, 361cb0ef41Sopenharmony_ci isReadable, 371cb0ef41Sopenharmony_ci isReadableNodeStream, 381cb0ef41Sopenharmony_ci isNodeStream, 391cb0ef41Sopenharmony_ci isTransformStream, 401cb0ef41Sopenharmony_ci isWebStream, 411cb0ef41Sopenharmony_ci isReadableStream, 421cb0ef41Sopenharmony_ci isReadableFinished, 431cb0ef41Sopenharmony_ci} = require('internal/streams/utils'); 441cb0ef41Sopenharmony_ciconst { AbortController } = require('internal/abort_controller'); 451cb0ef41Sopenharmony_ci 461cb0ef41Sopenharmony_cilet PassThrough; 471cb0ef41Sopenharmony_cilet Readable; 481cb0ef41Sopenharmony_cilet addAbortListener; 491cb0ef41Sopenharmony_ci 501cb0ef41Sopenharmony_cifunction destroyer(stream, reading, writing) { 511cb0ef41Sopenharmony_ci let finished = false; 521cb0ef41Sopenharmony_ci stream.on('close', () => { 531cb0ef41Sopenharmony_ci finished = true; 541cb0ef41Sopenharmony_ci }); 551cb0ef41Sopenharmony_ci 561cb0ef41Sopenharmony_ci const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => { 571cb0ef41Sopenharmony_ci finished = !err; 581cb0ef41Sopenharmony_ci }); 591cb0ef41Sopenharmony_ci 601cb0ef41Sopenharmony_ci return { 611cb0ef41Sopenharmony_ci destroy: (err) => { 621cb0ef41Sopenharmony_ci if (finished) return; 631cb0ef41Sopenharmony_ci finished = true; 641cb0ef41Sopenharmony_ci destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe')); 651cb0ef41Sopenharmony_ci }, 661cb0ef41Sopenharmony_ci cleanup, 671cb0ef41Sopenharmony_ci }; 681cb0ef41Sopenharmony_ci} 691cb0ef41Sopenharmony_ci 701cb0ef41Sopenharmony_cifunction popCallback(streams) { 711cb0ef41Sopenharmony_ci // Streams should never be an empty array. It should always contain at least 721cb0ef41Sopenharmony_ci // a single stream. Therefore optimize for the average case instead of 731cb0ef41Sopenharmony_ci // checking for length === 0 as well. 741cb0ef41Sopenharmony_ci validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]'); 751cb0ef41Sopenharmony_ci return streams.pop(); 761cb0ef41Sopenharmony_ci} 771cb0ef41Sopenharmony_ci 781cb0ef41Sopenharmony_cifunction makeAsyncIterable(val) { 791cb0ef41Sopenharmony_ci if (isIterable(val)) { 801cb0ef41Sopenharmony_ci return val; 811cb0ef41Sopenharmony_ci } else if (isReadableNodeStream(val)) { 821cb0ef41Sopenharmony_ci // Legacy streams are not Iterable. 831cb0ef41Sopenharmony_ci return fromReadable(val); 841cb0ef41Sopenharmony_ci } 851cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 861cb0ef41Sopenharmony_ci 'val', ['Readable', 'Iterable', 'AsyncIterable'], val); 871cb0ef41Sopenharmony_ci} 881cb0ef41Sopenharmony_ci 891cb0ef41Sopenharmony_ciasync function* fromReadable(val) { 901cb0ef41Sopenharmony_ci if (!Readable) { 911cb0ef41Sopenharmony_ci Readable = require('internal/streams/readable'); 921cb0ef41Sopenharmony_ci } 931cb0ef41Sopenharmony_ci 941cb0ef41Sopenharmony_ci yield* Readable.prototype[SymbolAsyncIterator].call(val); 951cb0ef41Sopenharmony_ci} 961cb0ef41Sopenharmony_ci 971cb0ef41Sopenharmony_ciasync function pumpToNode(iterable, writable, finish, { end }) { 981cb0ef41Sopenharmony_ci let error; 991cb0ef41Sopenharmony_ci let onresolve = null; 1001cb0ef41Sopenharmony_ci 1011cb0ef41Sopenharmony_ci const resume = (err) => { 1021cb0ef41Sopenharmony_ci if (err) { 1031cb0ef41Sopenharmony_ci error = err; 1041cb0ef41Sopenharmony_ci } 1051cb0ef41Sopenharmony_ci 1061cb0ef41Sopenharmony_ci if (onresolve) { 1071cb0ef41Sopenharmony_ci const callback = onresolve; 1081cb0ef41Sopenharmony_ci onresolve = null; 1091cb0ef41Sopenharmony_ci callback(); 1101cb0ef41Sopenharmony_ci } 1111cb0ef41Sopenharmony_ci }; 1121cb0ef41Sopenharmony_ci 1131cb0ef41Sopenharmony_ci const wait = () => new Promise((resolve, reject) => { 1141cb0ef41Sopenharmony_ci if (error) { 1151cb0ef41Sopenharmony_ci reject(error); 1161cb0ef41Sopenharmony_ci } else { 1171cb0ef41Sopenharmony_ci onresolve = () => { 1181cb0ef41Sopenharmony_ci if (error) { 1191cb0ef41Sopenharmony_ci reject(error); 1201cb0ef41Sopenharmony_ci } else { 1211cb0ef41Sopenharmony_ci resolve(); 1221cb0ef41Sopenharmony_ci } 1231cb0ef41Sopenharmony_ci }; 1241cb0ef41Sopenharmony_ci } 1251cb0ef41Sopenharmony_ci }); 1261cb0ef41Sopenharmony_ci 1271cb0ef41Sopenharmony_ci writable.on('drain', resume); 1281cb0ef41Sopenharmony_ci const cleanup = eos(writable, { readable: false }, resume); 1291cb0ef41Sopenharmony_ci 1301cb0ef41Sopenharmony_ci try { 1311cb0ef41Sopenharmony_ci if (writable.writableNeedDrain) { 1321cb0ef41Sopenharmony_ci await wait(); 1331cb0ef41Sopenharmony_ci } 1341cb0ef41Sopenharmony_ci 1351cb0ef41Sopenharmony_ci for await (const chunk of iterable) { 1361cb0ef41Sopenharmony_ci if (!writable.write(chunk)) { 1371cb0ef41Sopenharmony_ci await wait(); 1381cb0ef41Sopenharmony_ci } 1391cb0ef41Sopenharmony_ci } 1401cb0ef41Sopenharmony_ci 1411cb0ef41Sopenharmony_ci if (end) { 1421cb0ef41Sopenharmony_ci writable.end(); 1431cb0ef41Sopenharmony_ci await wait(); 1441cb0ef41Sopenharmony_ci } 1451cb0ef41Sopenharmony_ci 1461cb0ef41Sopenharmony_ci finish(); 1471cb0ef41Sopenharmony_ci } catch (err) { 1481cb0ef41Sopenharmony_ci finish(error !== err ? aggregateTwoErrors(error, err) : err); 1491cb0ef41Sopenharmony_ci } finally { 1501cb0ef41Sopenharmony_ci cleanup(); 1511cb0ef41Sopenharmony_ci writable.off('drain', resume); 1521cb0ef41Sopenharmony_ci } 1531cb0ef41Sopenharmony_ci} 1541cb0ef41Sopenharmony_ci 1551cb0ef41Sopenharmony_ciasync function pumpToWeb(readable, writable, finish, { end }) { 1561cb0ef41Sopenharmony_ci if (isTransformStream(writable)) { 1571cb0ef41Sopenharmony_ci writable = writable.writable; 1581cb0ef41Sopenharmony_ci } 1591cb0ef41Sopenharmony_ci // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure 1601cb0ef41Sopenharmony_ci const writer = writable.getWriter(); 1611cb0ef41Sopenharmony_ci try { 1621cb0ef41Sopenharmony_ci for await (const chunk of readable) { 1631cb0ef41Sopenharmony_ci await writer.ready; 1641cb0ef41Sopenharmony_ci writer.write(chunk).catch(() => {}); 1651cb0ef41Sopenharmony_ci } 1661cb0ef41Sopenharmony_ci 1671cb0ef41Sopenharmony_ci await writer.ready; 1681cb0ef41Sopenharmony_ci 1691cb0ef41Sopenharmony_ci if (end) { 1701cb0ef41Sopenharmony_ci await writer.close(); 1711cb0ef41Sopenharmony_ci } 1721cb0ef41Sopenharmony_ci 1731cb0ef41Sopenharmony_ci finish(); 1741cb0ef41Sopenharmony_ci } catch (err) { 1751cb0ef41Sopenharmony_ci try { 1761cb0ef41Sopenharmony_ci await writer.abort(err); 1771cb0ef41Sopenharmony_ci finish(err); 1781cb0ef41Sopenharmony_ci } catch (err) { 1791cb0ef41Sopenharmony_ci finish(err); 1801cb0ef41Sopenharmony_ci } 1811cb0ef41Sopenharmony_ci } 1821cb0ef41Sopenharmony_ci} 1831cb0ef41Sopenharmony_ci 1841cb0ef41Sopenharmony_cifunction pipeline(...streams) { 1851cb0ef41Sopenharmony_ci return pipelineImpl(streams, once(popCallback(streams))); 1861cb0ef41Sopenharmony_ci} 1871cb0ef41Sopenharmony_ci 1881cb0ef41Sopenharmony_cifunction pipelineImpl(streams, callback, opts) { 1891cb0ef41Sopenharmony_ci if (streams.length === 1 && ArrayIsArray(streams[0])) { 1901cb0ef41Sopenharmony_ci streams = streams[0]; 1911cb0ef41Sopenharmony_ci } 1921cb0ef41Sopenharmony_ci 1931cb0ef41Sopenharmony_ci if (streams.length < 2) { 1941cb0ef41Sopenharmony_ci throw new ERR_MISSING_ARGS('streams'); 1951cb0ef41Sopenharmony_ci } 1961cb0ef41Sopenharmony_ci 1971cb0ef41Sopenharmony_ci const ac = new AbortController(); 1981cb0ef41Sopenharmony_ci const signal = ac.signal; 1991cb0ef41Sopenharmony_ci const outerSignal = opts?.signal; 2001cb0ef41Sopenharmony_ci 2011cb0ef41Sopenharmony_ci // Need to cleanup event listeners if last stream is readable 2021cb0ef41Sopenharmony_ci // https://github.com/nodejs/node/issues/35452 2031cb0ef41Sopenharmony_ci const lastStreamCleanup = []; 2041cb0ef41Sopenharmony_ci 2051cb0ef41Sopenharmony_ci validateAbortSignal(outerSignal, 'options.signal'); 2061cb0ef41Sopenharmony_ci 2071cb0ef41Sopenharmony_ci function abort() { 2081cb0ef41Sopenharmony_ci finishImpl(new AbortError()); 2091cb0ef41Sopenharmony_ci } 2101cb0ef41Sopenharmony_ci 2111cb0ef41Sopenharmony_ci addAbortListener ??= require('events').addAbortListener; 2121cb0ef41Sopenharmony_ci let disposable; 2131cb0ef41Sopenharmony_ci if (outerSignal) { 2141cb0ef41Sopenharmony_ci disposable = addAbortListener(outerSignal, abort); 2151cb0ef41Sopenharmony_ci } 2161cb0ef41Sopenharmony_ci 2171cb0ef41Sopenharmony_ci let error; 2181cb0ef41Sopenharmony_ci let value; 2191cb0ef41Sopenharmony_ci const destroys = []; 2201cb0ef41Sopenharmony_ci 2211cb0ef41Sopenharmony_ci let finishCount = 0; 2221cb0ef41Sopenharmony_ci 2231cb0ef41Sopenharmony_ci function finish(err) { 2241cb0ef41Sopenharmony_ci finishImpl(err, --finishCount === 0); 2251cb0ef41Sopenharmony_ci } 2261cb0ef41Sopenharmony_ci 2271cb0ef41Sopenharmony_ci function finishImpl(err, final) { 2281cb0ef41Sopenharmony_ci if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) { 2291cb0ef41Sopenharmony_ci error = err; 2301cb0ef41Sopenharmony_ci } 2311cb0ef41Sopenharmony_ci 2321cb0ef41Sopenharmony_ci if (!error && !final) { 2331cb0ef41Sopenharmony_ci return; 2341cb0ef41Sopenharmony_ci } 2351cb0ef41Sopenharmony_ci 2361cb0ef41Sopenharmony_ci while (destroys.length) { 2371cb0ef41Sopenharmony_ci destroys.shift()(error); 2381cb0ef41Sopenharmony_ci } 2391cb0ef41Sopenharmony_ci 2401cb0ef41Sopenharmony_ci disposable?.[SymbolDispose](); 2411cb0ef41Sopenharmony_ci ac.abort(); 2421cb0ef41Sopenharmony_ci 2431cb0ef41Sopenharmony_ci if (final) { 2441cb0ef41Sopenharmony_ci if (!error) { 2451cb0ef41Sopenharmony_ci lastStreamCleanup.forEach((fn) => fn()); 2461cb0ef41Sopenharmony_ci } 2471cb0ef41Sopenharmony_ci process.nextTick(callback, error, value); 2481cb0ef41Sopenharmony_ci } 2491cb0ef41Sopenharmony_ci } 2501cb0ef41Sopenharmony_ci 2511cb0ef41Sopenharmony_ci let ret; 2521cb0ef41Sopenharmony_ci for (let i = 0; i < streams.length; i++) { 2531cb0ef41Sopenharmony_ci const stream = streams[i]; 2541cb0ef41Sopenharmony_ci const reading = i < streams.length - 1; 2551cb0ef41Sopenharmony_ci const writing = i > 0; 2561cb0ef41Sopenharmony_ci const end = reading || opts?.end !== false; 2571cb0ef41Sopenharmony_ci const isLastStream = i === streams.length - 1; 2581cb0ef41Sopenharmony_ci 2591cb0ef41Sopenharmony_ci if (isNodeStream(stream)) { 2601cb0ef41Sopenharmony_ci if (end) { 2611cb0ef41Sopenharmony_ci const { destroy, cleanup } = destroyer(stream, reading, writing); 2621cb0ef41Sopenharmony_ci destroys.push(destroy); 2631cb0ef41Sopenharmony_ci 2641cb0ef41Sopenharmony_ci if (isReadable(stream) && isLastStream) { 2651cb0ef41Sopenharmony_ci lastStreamCleanup.push(cleanup); 2661cb0ef41Sopenharmony_ci } 2671cb0ef41Sopenharmony_ci } 2681cb0ef41Sopenharmony_ci 2691cb0ef41Sopenharmony_ci // Catch stream errors that occur after pipe/pump has completed. 2701cb0ef41Sopenharmony_ci function onError(err) { 2711cb0ef41Sopenharmony_ci if ( 2721cb0ef41Sopenharmony_ci err && 2731cb0ef41Sopenharmony_ci err.name !== 'AbortError' && 2741cb0ef41Sopenharmony_ci err.code !== 'ERR_STREAM_PREMATURE_CLOSE' 2751cb0ef41Sopenharmony_ci ) { 2761cb0ef41Sopenharmony_ci finish(err); 2771cb0ef41Sopenharmony_ci } 2781cb0ef41Sopenharmony_ci } 2791cb0ef41Sopenharmony_ci stream.on('error', onError); 2801cb0ef41Sopenharmony_ci if (isReadable(stream) && isLastStream) { 2811cb0ef41Sopenharmony_ci lastStreamCleanup.push(() => { 2821cb0ef41Sopenharmony_ci stream.removeListener('error', onError); 2831cb0ef41Sopenharmony_ci }); 2841cb0ef41Sopenharmony_ci } 2851cb0ef41Sopenharmony_ci } 2861cb0ef41Sopenharmony_ci 2871cb0ef41Sopenharmony_ci if (i === 0) { 2881cb0ef41Sopenharmony_ci if (typeof stream === 'function') { 2891cb0ef41Sopenharmony_ci ret = stream({ signal }); 2901cb0ef41Sopenharmony_ci if (!isIterable(ret)) { 2911cb0ef41Sopenharmony_ci throw new ERR_INVALID_RETURN_VALUE( 2921cb0ef41Sopenharmony_ci 'Iterable, AsyncIterable or Stream', 'source', ret); 2931cb0ef41Sopenharmony_ci } 2941cb0ef41Sopenharmony_ci } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) { 2951cb0ef41Sopenharmony_ci ret = stream; 2961cb0ef41Sopenharmony_ci } else { 2971cb0ef41Sopenharmony_ci ret = Duplex.from(stream); 2981cb0ef41Sopenharmony_ci } 2991cb0ef41Sopenharmony_ci } else if (typeof stream === 'function') { 3001cb0ef41Sopenharmony_ci if (isTransformStream(ret)) { 3011cb0ef41Sopenharmony_ci ret = makeAsyncIterable(ret?.readable); 3021cb0ef41Sopenharmony_ci } else { 3031cb0ef41Sopenharmony_ci ret = makeAsyncIterable(ret); 3041cb0ef41Sopenharmony_ci } 3051cb0ef41Sopenharmony_ci ret = stream(ret, { signal }); 3061cb0ef41Sopenharmony_ci 3071cb0ef41Sopenharmony_ci if (reading) { 3081cb0ef41Sopenharmony_ci if (!isIterable(ret, true)) { 3091cb0ef41Sopenharmony_ci throw new ERR_INVALID_RETURN_VALUE( 3101cb0ef41Sopenharmony_ci 'AsyncIterable', `transform[${i - 1}]`, ret); 3111cb0ef41Sopenharmony_ci } 3121cb0ef41Sopenharmony_ci } else { 3131cb0ef41Sopenharmony_ci if (!PassThrough) { 3141cb0ef41Sopenharmony_ci PassThrough = require('internal/streams/passthrough'); 3151cb0ef41Sopenharmony_ci } 3161cb0ef41Sopenharmony_ci 3171cb0ef41Sopenharmony_ci // If the last argument to pipeline is not a stream 3181cb0ef41Sopenharmony_ci // we must create a proxy stream so that pipeline(...) 3191cb0ef41Sopenharmony_ci // always returns a stream which can be further 3201cb0ef41Sopenharmony_ci // composed through `.pipe(stream)`. 3211cb0ef41Sopenharmony_ci 3221cb0ef41Sopenharmony_ci const pt = new PassThrough({ 3231cb0ef41Sopenharmony_ci objectMode: true, 3241cb0ef41Sopenharmony_ci }); 3251cb0ef41Sopenharmony_ci 3261cb0ef41Sopenharmony_ci // Handle Promises/A+ spec, `then` could be a getter that throws on 3271cb0ef41Sopenharmony_ci // second use. 3281cb0ef41Sopenharmony_ci const then = ret?.then; 3291cb0ef41Sopenharmony_ci if (typeof then === 'function') { 3301cb0ef41Sopenharmony_ci finishCount++; 3311cb0ef41Sopenharmony_ci then.call(ret, 3321cb0ef41Sopenharmony_ci (val) => { 3331cb0ef41Sopenharmony_ci value = val; 3341cb0ef41Sopenharmony_ci if (val != null) { 3351cb0ef41Sopenharmony_ci pt.write(val); 3361cb0ef41Sopenharmony_ci } 3371cb0ef41Sopenharmony_ci if (end) { 3381cb0ef41Sopenharmony_ci pt.end(); 3391cb0ef41Sopenharmony_ci } 3401cb0ef41Sopenharmony_ci process.nextTick(finish); 3411cb0ef41Sopenharmony_ci }, (err) => { 3421cb0ef41Sopenharmony_ci pt.destroy(err); 3431cb0ef41Sopenharmony_ci process.nextTick(finish, err); 3441cb0ef41Sopenharmony_ci }, 3451cb0ef41Sopenharmony_ci ); 3461cb0ef41Sopenharmony_ci } else if (isIterable(ret, true)) { 3471cb0ef41Sopenharmony_ci finishCount++; 3481cb0ef41Sopenharmony_ci pumpToNode(ret, pt, finish, { end }); 3491cb0ef41Sopenharmony_ci } else if (isReadableStream(ret) || isTransformStream(ret)) { 3501cb0ef41Sopenharmony_ci const toRead = ret.readable || ret; 3511cb0ef41Sopenharmony_ci finishCount++; 3521cb0ef41Sopenharmony_ci pumpToNode(toRead, pt, finish, { end }); 3531cb0ef41Sopenharmony_ci } else { 3541cb0ef41Sopenharmony_ci throw new ERR_INVALID_RETURN_VALUE( 3551cb0ef41Sopenharmony_ci 'AsyncIterable or Promise', 'destination', ret); 3561cb0ef41Sopenharmony_ci } 3571cb0ef41Sopenharmony_ci 3581cb0ef41Sopenharmony_ci ret = pt; 3591cb0ef41Sopenharmony_ci 3601cb0ef41Sopenharmony_ci const { destroy, cleanup } = destroyer(ret, false, true); 3611cb0ef41Sopenharmony_ci destroys.push(destroy); 3621cb0ef41Sopenharmony_ci if (isLastStream) { 3631cb0ef41Sopenharmony_ci lastStreamCleanup.push(cleanup); 3641cb0ef41Sopenharmony_ci } 3651cb0ef41Sopenharmony_ci } 3661cb0ef41Sopenharmony_ci } else if (isNodeStream(stream)) { 3671cb0ef41Sopenharmony_ci if (isReadableNodeStream(ret)) { 3681cb0ef41Sopenharmony_ci finishCount += 2; 3691cb0ef41Sopenharmony_ci const cleanup = pipe(ret, stream, finish, { end }); 3701cb0ef41Sopenharmony_ci if (isReadable(stream) && isLastStream) { 3711cb0ef41Sopenharmony_ci lastStreamCleanup.push(cleanup); 3721cb0ef41Sopenharmony_ci } 3731cb0ef41Sopenharmony_ci } else if (isTransformStream(ret) || isReadableStream(ret)) { 3741cb0ef41Sopenharmony_ci const toRead = ret.readable || ret; 3751cb0ef41Sopenharmony_ci finishCount++; 3761cb0ef41Sopenharmony_ci pumpToNode(toRead, stream, finish, { end }); 3771cb0ef41Sopenharmony_ci } else if (isIterable(ret)) { 3781cb0ef41Sopenharmony_ci finishCount++; 3791cb0ef41Sopenharmony_ci pumpToNode(ret, stream, finish, { end }); 3801cb0ef41Sopenharmony_ci } else { 3811cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 3821cb0ef41Sopenharmony_ci 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); 3831cb0ef41Sopenharmony_ci } 3841cb0ef41Sopenharmony_ci ret = stream; 3851cb0ef41Sopenharmony_ci } else if (isWebStream(stream)) { 3861cb0ef41Sopenharmony_ci if (isReadableNodeStream(ret)) { 3871cb0ef41Sopenharmony_ci finishCount++; 3881cb0ef41Sopenharmony_ci pumpToWeb(makeAsyncIterable(ret), stream, finish, { end }); 3891cb0ef41Sopenharmony_ci } else if (isReadableStream(ret) || isIterable(ret)) { 3901cb0ef41Sopenharmony_ci finishCount++; 3911cb0ef41Sopenharmony_ci pumpToWeb(ret, stream, finish, { end }); 3921cb0ef41Sopenharmony_ci } else if (isTransformStream(ret)) { 3931cb0ef41Sopenharmony_ci finishCount++; 3941cb0ef41Sopenharmony_ci pumpToWeb(ret.readable, stream, finish, { end }); 3951cb0ef41Sopenharmony_ci } else { 3961cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 3971cb0ef41Sopenharmony_ci 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret); 3981cb0ef41Sopenharmony_ci } 3991cb0ef41Sopenharmony_ci ret = stream; 4001cb0ef41Sopenharmony_ci } else { 4011cb0ef41Sopenharmony_ci ret = Duplex.from(stream); 4021cb0ef41Sopenharmony_ci } 4031cb0ef41Sopenharmony_ci } 4041cb0ef41Sopenharmony_ci 4051cb0ef41Sopenharmony_ci if (signal?.aborted || outerSignal?.aborted) { 4061cb0ef41Sopenharmony_ci process.nextTick(abort); 4071cb0ef41Sopenharmony_ci } 4081cb0ef41Sopenharmony_ci 4091cb0ef41Sopenharmony_ci return ret; 4101cb0ef41Sopenharmony_ci} 4111cb0ef41Sopenharmony_ci 4121cb0ef41Sopenharmony_cifunction pipe(src, dst, finish, { end }) { 4131cb0ef41Sopenharmony_ci let ended = false; 4141cb0ef41Sopenharmony_ci dst.on('close', () => { 4151cb0ef41Sopenharmony_ci if (!ended) { 4161cb0ef41Sopenharmony_ci // Finish if the destination closes before the source has completed. 4171cb0ef41Sopenharmony_ci finish(new ERR_STREAM_PREMATURE_CLOSE()); 4181cb0ef41Sopenharmony_ci } 4191cb0ef41Sopenharmony_ci }); 4201cb0ef41Sopenharmony_ci 4211cb0ef41Sopenharmony_ci src.pipe(dst, { end: false }); // If end is true we already will have a listener to end dst. 4221cb0ef41Sopenharmony_ci 4231cb0ef41Sopenharmony_ci if (end) { 4241cb0ef41Sopenharmony_ci // Compat. Before node v10.12.0 stdio used to throw an error so 4251cb0ef41Sopenharmony_ci // pipe() did/does not end() stdio destinations. 4261cb0ef41Sopenharmony_ci // Now they allow it but "secretly" don't close the underlying fd. 4271cb0ef41Sopenharmony_ci 4281cb0ef41Sopenharmony_ci function endFn() { 4291cb0ef41Sopenharmony_ci ended = true; 4301cb0ef41Sopenharmony_ci dst.end(); 4311cb0ef41Sopenharmony_ci } 4321cb0ef41Sopenharmony_ci 4331cb0ef41Sopenharmony_ci if (isReadableFinished(src)) { // End the destination if the source has already ended. 4341cb0ef41Sopenharmony_ci process.nextTick(endFn); 4351cb0ef41Sopenharmony_ci } else { 4361cb0ef41Sopenharmony_ci src.once('end', endFn); 4371cb0ef41Sopenharmony_ci } 4381cb0ef41Sopenharmony_ci } else { 4391cb0ef41Sopenharmony_ci finish(); 4401cb0ef41Sopenharmony_ci } 4411cb0ef41Sopenharmony_ci 4421cb0ef41Sopenharmony_ci eos(src, { readable: true, writable: false }, (err) => { 4431cb0ef41Sopenharmony_ci const rState = src._readableState; 4441cb0ef41Sopenharmony_ci if ( 4451cb0ef41Sopenharmony_ci err && 4461cb0ef41Sopenharmony_ci err.code === 'ERR_STREAM_PREMATURE_CLOSE' && 4471cb0ef41Sopenharmony_ci (rState && rState.ended && !rState.errored && !rState.errorEmitted) 4481cb0ef41Sopenharmony_ci ) { 4491cb0ef41Sopenharmony_ci // Some readable streams will emit 'close' before 'end'. However, since 4501cb0ef41Sopenharmony_ci // this is on the readable side 'end' should still be emitted if the 4511cb0ef41Sopenharmony_ci // stream has been ended and no error emitted. This should be allowed in 4521cb0ef41Sopenharmony_ci // favor of backwards compatibility. Since the stream is piped to a 4531cb0ef41Sopenharmony_ci // destination this should not result in any observable difference. 4541cb0ef41Sopenharmony_ci // We don't need to check if this is a writable premature close since 4551cb0ef41Sopenharmony_ci // eos will only fail with premature close on the reading side for 4561cb0ef41Sopenharmony_ci // duplex streams. 4571cb0ef41Sopenharmony_ci src 4581cb0ef41Sopenharmony_ci .once('end', finish) 4591cb0ef41Sopenharmony_ci .once('error', finish); 4601cb0ef41Sopenharmony_ci } else { 4611cb0ef41Sopenharmony_ci finish(err); 4621cb0ef41Sopenharmony_ci } 4631cb0ef41Sopenharmony_ci }); 4641cb0ef41Sopenharmony_ci return eos(dst, { readable: false, writable: true }, finish); 4651cb0ef41Sopenharmony_ci} 4661cb0ef41Sopenharmony_ci 4671cb0ef41Sopenharmony_cimodule.exports = { pipelineImpl, pipeline }; 468