11cb0ef41Sopenharmony_ci// Ported from https://github.com/mafintosh/pump with
21cb0ef41Sopenharmony_ci// permission from the author, Mathias Buus (@mafintosh).
31cb0ef41Sopenharmony_ci
41cb0ef41Sopenharmony_ci'use strict';
51cb0ef41Sopenharmony_ci
61cb0ef41Sopenharmony_ciconst {
71cb0ef41Sopenharmony_ci  ArrayIsArray,
81cb0ef41Sopenharmony_ci  Promise,
91cb0ef41Sopenharmony_ci  SymbolAsyncIterator,
101cb0ef41Sopenharmony_ci  SymbolDispose,
111cb0ef41Sopenharmony_ci} = primordials;
121cb0ef41Sopenharmony_ci
131cb0ef41Sopenharmony_ciconst eos = require('internal/streams/end-of-stream');
141cb0ef41Sopenharmony_ciconst { once } = require('internal/util');
151cb0ef41Sopenharmony_ciconst destroyImpl = require('internal/streams/destroy');
161cb0ef41Sopenharmony_ciconst Duplex = require('internal/streams/duplex');
171cb0ef41Sopenharmony_ciconst {
181cb0ef41Sopenharmony_ci  aggregateTwoErrors,
191cb0ef41Sopenharmony_ci  codes: {
201cb0ef41Sopenharmony_ci    ERR_INVALID_ARG_TYPE,
211cb0ef41Sopenharmony_ci    ERR_INVALID_RETURN_VALUE,
221cb0ef41Sopenharmony_ci    ERR_MISSING_ARGS,
231cb0ef41Sopenharmony_ci    ERR_STREAM_DESTROYED,
241cb0ef41Sopenharmony_ci    ERR_STREAM_PREMATURE_CLOSE,
251cb0ef41Sopenharmony_ci  },
261cb0ef41Sopenharmony_ci  AbortError,
271cb0ef41Sopenharmony_ci} = require('internal/errors');
281cb0ef41Sopenharmony_ci
291cb0ef41Sopenharmony_ciconst {
301cb0ef41Sopenharmony_ci  validateFunction,
311cb0ef41Sopenharmony_ci  validateAbortSignal,
321cb0ef41Sopenharmony_ci} = require('internal/validators');
331cb0ef41Sopenharmony_ci
341cb0ef41Sopenharmony_ciconst {
351cb0ef41Sopenharmony_ci  isIterable,
361cb0ef41Sopenharmony_ci  isReadable,
371cb0ef41Sopenharmony_ci  isReadableNodeStream,
381cb0ef41Sopenharmony_ci  isNodeStream,
391cb0ef41Sopenharmony_ci  isTransformStream,
401cb0ef41Sopenharmony_ci  isWebStream,
411cb0ef41Sopenharmony_ci  isReadableStream,
421cb0ef41Sopenharmony_ci  isReadableFinished,
431cb0ef41Sopenharmony_ci} = require('internal/streams/utils');
441cb0ef41Sopenharmony_ciconst { AbortController } = require('internal/abort_controller');
451cb0ef41Sopenharmony_ci
461cb0ef41Sopenharmony_cilet PassThrough;
471cb0ef41Sopenharmony_cilet Readable;
481cb0ef41Sopenharmony_cilet addAbortListener;
491cb0ef41Sopenharmony_ci
501cb0ef41Sopenharmony_cifunction destroyer(stream, reading, writing) {
511cb0ef41Sopenharmony_ci  let finished = false;
521cb0ef41Sopenharmony_ci  stream.on('close', () => {
531cb0ef41Sopenharmony_ci    finished = true;
541cb0ef41Sopenharmony_ci  });
551cb0ef41Sopenharmony_ci
561cb0ef41Sopenharmony_ci  const cleanup = eos(stream, { readable: reading, writable: writing }, (err) => {
571cb0ef41Sopenharmony_ci    finished = !err;
581cb0ef41Sopenharmony_ci  });
591cb0ef41Sopenharmony_ci
601cb0ef41Sopenharmony_ci  return {
611cb0ef41Sopenharmony_ci    destroy: (err) => {
621cb0ef41Sopenharmony_ci      if (finished) return;
631cb0ef41Sopenharmony_ci      finished = true;
641cb0ef41Sopenharmony_ci      destroyImpl.destroyer(stream, err || new ERR_STREAM_DESTROYED('pipe'));
651cb0ef41Sopenharmony_ci    },
661cb0ef41Sopenharmony_ci    cleanup,
671cb0ef41Sopenharmony_ci  };
681cb0ef41Sopenharmony_ci}
691cb0ef41Sopenharmony_ci
701cb0ef41Sopenharmony_cifunction popCallback(streams) {
711cb0ef41Sopenharmony_ci  // Streams should never be an empty array. It should always contain at least
721cb0ef41Sopenharmony_ci  // a single stream. Therefore optimize for the average case instead of
731cb0ef41Sopenharmony_ci  // checking for length === 0 as well.
741cb0ef41Sopenharmony_ci  validateFunction(streams[streams.length - 1], 'streams[stream.length - 1]');
751cb0ef41Sopenharmony_ci  return streams.pop();
761cb0ef41Sopenharmony_ci}
771cb0ef41Sopenharmony_ci
781cb0ef41Sopenharmony_cifunction makeAsyncIterable(val) {
791cb0ef41Sopenharmony_ci  if (isIterable(val)) {
801cb0ef41Sopenharmony_ci    return val;
811cb0ef41Sopenharmony_ci  } else if (isReadableNodeStream(val)) {
821cb0ef41Sopenharmony_ci    // Legacy streams are not Iterable.
831cb0ef41Sopenharmony_ci    return fromReadable(val);
841cb0ef41Sopenharmony_ci  }
851cb0ef41Sopenharmony_ci  throw new ERR_INVALID_ARG_TYPE(
861cb0ef41Sopenharmony_ci    'val', ['Readable', 'Iterable', 'AsyncIterable'], val);
871cb0ef41Sopenharmony_ci}
881cb0ef41Sopenharmony_ci
891cb0ef41Sopenharmony_ciasync function* fromReadable(val) {
901cb0ef41Sopenharmony_ci  if (!Readable) {
911cb0ef41Sopenharmony_ci    Readable = require('internal/streams/readable');
921cb0ef41Sopenharmony_ci  }
931cb0ef41Sopenharmony_ci
941cb0ef41Sopenharmony_ci  yield* Readable.prototype[SymbolAsyncIterator].call(val);
951cb0ef41Sopenharmony_ci}
961cb0ef41Sopenharmony_ci
971cb0ef41Sopenharmony_ciasync function pumpToNode(iterable, writable, finish, { end }) {
981cb0ef41Sopenharmony_ci  let error;
991cb0ef41Sopenharmony_ci  let onresolve = null;
1001cb0ef41Sopenharmony_ci
1011cb0ef41Sopenharmony_ci  const resume = (err) => {
1021cb0ef41Sopenharmony_ci    if (err) {
1031cb0ef41Sopenharmony_ci      error = err;
1041cb0ef41Sopenharmony_ci    }
1051cb0ef41Sopenharmony_ci
1061cb0ef41Sopenharmony_ci    if (onresolve) {
1071cb0ef41Sopenharmony_ci      const callback = onresolve;
1081cb0ef41Sopenharmony_ci      onresolve = null;
1091cb0ef41Sopenharmony_ci      callback();
1101cb0ef41Sopenharmony_ci    }
1111cb0ef41Sopenharmony_ci  };
1121cb0ef41Sopenharmony_ci
1131cb0ef41Sopenharmony_ci  const wait = () => new Promise((resolve, reject) => {
1141cb0ef41Sopenharmony_ci    if (error) {
1151cb0ef41Sopenharmony_ci      reject(error);
1161cb0ef41Sopenharmony_ci    } else {
1171cb0ef41Sopenharmony_ci      onresolve = () => {
1181cb0ef41Sopenharmony_ci        if (error) {
1191cb0ef41Sopenharmony_ci          reject(error);
1201cb0ef41Sopenharmony_ci        } else {
1211cb0ef41Sopenharmony_ci          resolve();
1221cb0ef41Sopenharmony_ci        }
1231cb0ef41Sopenharmony_ci      };
1241cb0ef41Sopenharmony_ci    }
1251cb0ef41Sopenharmony_ci  });
1261cb0ef41Sopenharmony_ci
1271cb0ef41Sopenharmony_ci  writable.on('drain', resume);
1281cb0ef41Sopenharmony_ci  const cleanup = eos(writable, { readable: false }, resume);
1291cb0ef41Sopenharmony_ci
1301cb0ef41Sopenharmony_ci  try {
1311cb0ef41Sopenharmony_ci    if (writable.writableNeedDrain) {
1321cb0ef41Sopenharmony_ci      await wait();
1331cb0ef41Sopenharmony_ci    }
1341cb0ef41Sopenharmony_ci
1351cb0ef41Sopenharmony_ci    for await (const chunk of iterable) {
1361cb0ef41Sopenharmony_ci      if (!writable.write(chunk)) {
1371cb0ef41Sopenharmony_ci        await wait();
1381cb0ef41Sopenharmony_ci      }
1391cb0ef41Sopenharmony_ci    }
1401cb0ef41Sopenharmony_ci
1411cb0ef41Sopenharmony_ci    if (end) {
1421cb0ef41Sopenharmony_ci      writable.end();
1431cb0ef41Sopenharmony_ci      await wait();
1441cb0ef41Sopenharmony_ci    }
1451cb0ef41Sopenharmony_ci
1461cb0ef41Sopenharmony_ci    finish();
1471cb0ef41Sopenharmony_ci  } catch (err) {
1481cb0ef41Sopenharmony_ci    finish(error !== err ? aggregateTwoErrors(error, err) : err);
1491cb0ef41Sopenharmony_ci  } finally {
1501cb0ef41Sopenharmony_ci    cleanup();
1511cb0ef41Sopenharmony_ci    writable.off('drain', resume);
1521cb0ef41Sopenharmony_ci  }
1531cb0ef41Sopenharmony_ci}
1541cb0ef41Sopenharmony_ci
1551cb0ef41Sopenharmony_ciasync function pumpToWeb(readable, writable, finish, { end }) {
1561cb0ef41Sopenharmony_ci  if (isTransformStream(writable)) {
1571cb0ef41Sopenharmony_ci    writable = writable.writable;
1581cb0ef41Sopenharmony_ci  }
1591cb0ef41Sopenharmony_ci  // https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
1601cb0ef41Sopenharmony_ci  const writer = writable.getWriter();
1611cb0ef41Sopenharmony_ci  try {
1621cb0ef41Sopenharmony_ci    for await (const chunk of readable) {
1631cb0ef41Sopenharmony_ci      await writer.ready;
1641cb0ef41Sopenharmony_ci      writer.write(chunk).catch(() => {});
1651cb0ef41Sopenharmony_ci    }
1661cb0ef41Sopenharmony_ci
1671cb0ef41Sopenharmony_ci    await writer.ready;
1681cb0ef41Sopenharmony_ci
1691cb0ef41Sopenharmony_ci    if (end) {
1701cb0ef41Sopenharmony_ci      await writer.close();
1711cb0ef41Sopenharmony_ci    }
1721cb0ef41Sopenharmony_ci
1731cb0ef41Sopenharmony_ci    finish();
1741cb0ef41Sopenharmony_ci  } catch (err) {
1751cb0ef41Sopenharmony_ci    try {
1761cb0ef41Sopenharmony_ci      await writer.abort(err);
1771cb0ef41Sopenharmony_ci      finish(err);
1781cb0ef41Sopenharmony_ci    } catch (err) {
1791cb0ef41Sopenharmony_ci      finish(err);
1801cb0ef41Sopenharmony_ci    }
1811cb0ef41Sopenharmony_ci  }
1821cb0ef41Sopenharmony_ci}
1831cb0ef41Sopenharmony_ci
1841cb0ef41Sopenharmony_cifunction pipeline(...streams) {
1851cb0ef41Sopenharmony_ci  return pipelineImpl(streams, once(popCallback(streams)));
1861cb0ef41Sopenharmony_ci}
1871cb0ef41Sopenharmony_ci
1881cb0ef41Sopenharmony_cifunction pipelineImpl(streams, callback, opts) {
1891cb0ef41Sopenharmony_ci  if (streams.length === 1 && ArrayIsArray(streams[0])) {
1901cb0ef41Sopenharmony_ci    streams = streams[0];
1911cb0ef41Sopenharmony_ci  }
1921cb0ef41Sopenharmony_ci
1931cb0ef41Sopenharmony_ci  if (streams.length < 2) {
1941cb0ef41Sopenharmony_ci    throw new ERR_MISSING_ARGS('streams');
1951cb0ef41Sopenharmony_ci  }
1961cb0ef41Sopenharmony_ci
1971cb0ef41Sopenharmony_ci  const ac = new AbortController();
1981cb0ef41Sopenharmony_ci  const signal = ac.signal;
1991cb0ef41Sopenharmony_ci  const outerSignal = opts?.signal;
2001cb0ef41Sopenharmony_ci
2011cb0ef41Sopenharmony_ci  // Need to cleanup event listeners if last stream is readable
2021cb0ef41Sopenharmony_ci  // https://github.com/nodejs/node/issues/35452
2031cb0ef41Sopenharmony_ci  const lastStreamCleanup = [];
2041cb0ef41Sopenharmony_ci
2051cb0ef41Sopenharmony_ci  validateAbortSignal(outerSignal, 'options.signal');
2061cb0ef41Sopenharmony_ci
2071cb0ef41Sopenharmony_ci  function abort() {
2081cb0ef41Sopenharmony_ci    finishImpl(new AbortError());
2091cb0ef41Sopenharmony_ci  }
2101cb0ef41Sopenharmony_ci
2111cb0ef41Sopenharmony_ci  addAbortListener ??= require('events').addAbortListener;
2121cb0ef41Sopenharmony_ci  let disposable;
2131cb0ef41Sopenharmony_ci  if (outerSignal) {
2141cb0ef41Sopenharmony_ci    disposable = addAbortListener(outerSignal, abort);
2151cb0ef41Sopenharmony_ci  }
2161cb0ef41Sopenharmony_ci
2171cb0ef41Sopenharmony_ci  let error;
2181cb0ef41Sopenharmony_ci  let value;
2191cb0ef41Sopenharmony_ci  const destroys = [];
2201cb0ef41Sopenharmony_ci
2211cb0ef41Sopenharmony_ci  let finishCount = 0;
2221cb0ef41Sopenharmony_ci
2231cb0ef41Sopenharmony_ci  function finish(err) {
2241cb0ef41Sopenharmony_ci    finishImpl(err, --finishCount === 0);
2251cb0ef41Sopenharmony_ci  }
2261cb0ef41Sopenharmony_ci
2271cb0ef41Sopenharmony_ci  function finishImpl(err, final) {
2281cb0ef41Sopenharmony_ci    if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
2291cb0ef41Sopenharmony_ci      error = err;
2301cb0ef41Sopenharmony_ci    }
2311cb0ef41Sopenharmony_ci
2321cb0ef41Sopenharmony_ci    if (!error && !final) {
2331cb0ef41Sopenharmony_ci      return;
2341cb0ef41Sopenharmony_ci    }
2351cb0ef41Sopenharmony_ci
2361cb0ef41Sopenharmony_ci    while (destroys.length) {
2371cb0ef41Sopenharmony_ci      destroys.shift()(error);
2381cb0ef41Sopenharmony_ci    }
2391cb0ef41Sopenharmony_ci
2401cb0ef41Sopenharmony_ci    disposable?.[SymbolDispose]();
2411cb0ef41Sopenharmony_ci    ac.abort();
2421cb0ef41Sopenharmony_ci
2431cb0ef41Sopenharmony_ci    if (final) {
2441cb0ef41Sopenharmony_ci      if (!error) {
2451cb0ef41Sopenharmony_ci        lastStreamCleanup.forEach((fn) => fn());
2461cb0ef41Sopenharmony_ci      }
2471cb0ef41Sopenharmony_ci      process.nextTick(callback, error, value);
2481cb0ef41Sopenharmony_ci    }
2491cb0ef41Sopenharmony_ci  }
2501cb0ef41Sopenharmony_ci
2511cb0ef41Sopenharmony_ci  let ret;
2521cb0ef41Sopenharmony_ci  for (let i = 0; i < streams.length; i++) {
2531cb0ef41Sopenharmony_ci    const stream = streams[i];
2541cb0ef41Sopenharmony_ci    const reading = i < streams.length - 1;
2551cb0ef41Sopenharmony_ci    const writing = i > 0;
2561cb0ef41Sopenharmony_ci    const end = reading || opts?.end !== false;
2571cb0ef41Sopenharmony_ci    const isLastStream = i === streams.length - 1;
2581cb0ef41Sopenharmony_ci
2591cb0ef41Sopenharmony_ci    if (isNodeStream(stream)) {
2601cb0ef41Sopenharmony_ci      if (end) {
2611cb0ef41Sopenharmony_ci        const { destroy, cleanup } = destroyer(stream, reading, writing);
2621cb0ef41Sopenharmony_ci        destroys.push(destroy);
2631cb0ef41Sopenharmony_ci
2641cb0ef41Sopenharmony_ci        if (isReadable(stream) && isLastStream) {
2651cb0ef41Sopenharmony_ci          lastStreamCleanup.push(cleanup);
2661cb0ef41Sopenharmony_ci        }
2671cb0ef41Sopenharmony_ci      }
2681cb0ef41Sopenharmony_ci
2691cb0ef41Sopenharmony_ci      // Catch stream errors that occur after pipe/pump has completed.
2701cb0ef41Sopenharmony_ci      function onError(err) {
2711cb0ef41Sopenharmony_ci        if (
2721cb0ef41Sopenharmony_ci          err &&
2731cb0ef41Sopenharmony_ci          err.name !== 'AbortError' &&
2741cb0ef41Sopenharmony_ci          err.code !== 'ERR_STREAM_PREMATURE_CLOSE'
2751cb0ef41Sopenharmony_ci        ) {
2761cb0ef41Sopenharmony_ci          finish(err);
2771cb0ef41Sopenharmony_ci        }
2781cb0ef41Sopenharmony_ci      }
2791cb0ef41Sopenharmony_ci      stream.on('error', onError);
2801cb0ef41Sopenharmony_ci      if (isReadable(stream) && isLastStream) {
2811cb0ef41Sopenharmony_ci        lastStreamCleanup.push(() => {
2821cb0ef41Sopenharmony_ci          stream.removeListener('error', onError);
2831cb0ef41Sopenharmony_ci        });
2841cb0ef41Sopenharmony_ci      }
2851cb0ef41Sopenharmony_ci    }
2861cb0ef41Sopenharmony_ci
2871cb0ef41Sopenharmony_ci    if (i === 0) {
2881cb0ef41Sopenharmony_ci      if (typeof stream === 'function') {
2891cb0ef41Sopenharmony_ci        ret = stream({ signal });
2901cb0ef41Sopenharmony_ci        if (!isIterable(ret)) {
2911cb0ef41Sopenharmony_ci          throw new ERR_INVALID_RETURN_VALUE(
2921cb0ef41Sopenharmony_ci            'Iterable, AsyncIterable or Stream', 'source', ret);
2931cb0ef41Sopenharmony_ci        }
2941cb0ef41Sopenharmony_ci      } else if (isIterable(stream) || isReadableNodeStream(stream) || isTransformStream(stream)) {
2951cb0ef41Sopenharmony_ci        ret = stream;
2961cb0ef41Sopenharmony_ci      } else {
2971cb0ef41Sopenharmony_ci        ret = Duplex.from(stream);
2981cb0ef41Sopenharmony_ci      }
2991cb0ef41Sopenharmony_ci    } else if (typeof stream === 'function') {
3001cb0ef41Sopenharmony_ci      if (isTransformStream(ret)) {
3011cb0ef41Sopenharmony_ci        ret = makeAsyncIterable(ret?.readable);
3021cb0ef41Sopenharmony_ci      } else {
3031cb0ef41Sopenharmony_ci        ret = makeAsyncIterable(ret);
3041cb0ef41Sopenharmony_ci      }
3051cb0ef41Sopenharmony_ci      ret = stream(ret, { signal });
3061cb0ef41Sopenharmony_ci
3071cb0ef41Sopenharmony_ci      if (reading) {
3081cb0ef41Sopenharmony_ci        if (!isIterable(ret, true)) {
3091cb0ef41Sopenharmony_ci          throw new ERR_INVALID_RETURN_VALUE(
3101cb0ef41Sopenharmony_ci            'AsyncIterable', `transform[${i - 1}]`, ret);
3111cb0ef41Sopenharmony_ci        }
3121cb0ef41Sopenharmony_ci      } else {
3131cb0ef41Sopenharmony_ci        if (!PassThrough) {
3141cb0ef41Sopenharmony_ci          PassThrough = require('internal/streams/passthrough');
3151cb0ef41Sopenharmony_ci        }
3161cb0ef41Sopenharmony_ci
3171cb0ef41Sopenharmony_ci        // If the last argument to pipeline is not a stream
3181cb0ef41Sopenharmony_ci        // we must create a proxy stream so that pipeline(...)
3191cb0ef41Sopenharmony_ci        // always returns a stream which can be further
3201cb0ef41Sopenharmony_ci        // composed through `.pipe(stream)`.
3211cb0ef41Sopenharmony_ci
3221cb0ef41Sopenharmony_ci        const pt = new PassThrough({
3231cb0ef41Sopenharmony_ci          objectMode: true,
3241cb0ef41Sopenharmony_ci        });
3251cb0ef41Sopenharmony_ci
3261cb0ef41Sopenharmony_ci        // Handle Promises/A+ spec, `then` could be a getter that throws on
3271cb0ef41Sopenharmony_ci        // second use.
3281cb0ef41Sopenharmony_ci        const then = ret?.then;
3291cb0ef41Sopenharmony_ci        if (typeof then === 'function') {
3301cb0ef41Sopenharmony_ci          finishCount++;
3311cb0ef41Sopenharmony_ci          then.call(ret,
3321cb0ef41Sopenharmony_ci                    (val) => {
3331cb0ef41Sopenharmony_ci                      value = val;
3341cb0ef41Sopenharmony_ci                      if (val != null) {
3351cb0ef41Sopenharmony_ci                        pt.write(val);
3361cb0ef41Sopenharmony_ci                      }
3371cb0ef41Sopenharmony_ci                      if (end) {
3381cb0ef41Sopenharmony_ci                        pt.end();
3391cb0ef41Sopenharmony_ci                      }
3401cb0ef41Sopenharmony_ci                      process.nextTick(finish);
3411cb0ef41Sopenharmony_ci                    }, (err) => {
3421cb0ef41Sopenharmony_ci                      pt.destroy(err);
3431cb0ef41Sopenharmony_ci                      process.nextTick(finish, err);
3441cb0ef41Sopenharmony_ci                    },
3451cb0ef41Sopenharmony_ci          );
3461cb0ef41Sopenharmony_ci        } else if (isIterable(ret, true)) {
3471cb0ef41Sopenharmony_ci          finishCount++;
3481cb0ef41Sopenharmony_ci          pumpToNode(ret, pt, finish, { end });
3491cb0ef41Sopenharmony_ci        } else if (isReadableStream(ret) || isTransformStream(ret)) {
3501cb0ef41Sopenharmony_ci          const toRead = ret.readable || ret;
3511cb0ef41Sopenharmony_ci          finishCount++;
3521cb0ef41Sopenharmony_ci          pumpToNode(toRead, pt, finish, { end });
3531cb0ef41Sopenharmony_ci        } else {
3541cb0ef41Sopenharmony_ci          throw new ERR_INVALID_RETURN_VALUE(
3551cb0ef41Sopenharmony_ci            'AsyncIterable or Promise', 'destination', ret);
3561cb0ef41Sopenharmony_ci        }
3571cb0ef41Sopenharmony_ci
3581cb0ef41Sopenharmony_ci        ret = pt;
3591cb0ef41Sopenharmony_ci
3601cb0ef41Sopenharmony_ci        const { destroy, cleanup } = destroyer(ret, false, true);
3611cb0ef41Sopenharmony_ci        destroys.push(destroy);
3621cb0ef41Sopenharmony_ci        if (isLastStream) {
3631cb0ef41Sopenharmony_ci          lastStreamCleanup.push(cleanup);
3641cb0ef41Sopenharmony_ci        }
3651cb0ef41Sopenharmony_ci      }
3661cb0ef41Sopenharmony_ci    } else if (isNodeStream(stream)) {
3671cb0ef41Sopenharmony_ci      if (isReadableNodeStream(ret)) {
3681cb0ef41Sopenharmony_ci        finishCount += 2;
3691cb0ef41Sopenharmony_ci        const cleanup = pipe(ret, stream, finish, { end });
3701cb0ef41Sopenharmony_ci        if (isReadable(stream) && isLastStream) {
3711cb0ef41Sopenharmony_ci          lastStreamCleanup.push(cleanup);
3721cb0ef41Sopenharmony_ci        }
3731cb0ef41Sopenharmony_ci      } else if (isTransformStream(ret) || isReadableStream(ret)) {
3741cb0ef41Sopenharmony_ci        const toRead = ret.readable || ret;
3751cb0ef41Sopenharmony_ci        finishCount++;
3761cb0ef41Sopenharmony_ci        pumpToNode(toRead, stream, finish, { end });
3771cb0ef41Sopenharmony_ci      } else if (isIterable(ret)) {
3781cb0ef41Sopenharmony_ci        finishCount++;
3791cb0ef41Sopenharmony_ci        pumpToNode(ret, stream, finish, { end });
3801cb0ef41Sopenharmony_ci      } else {
3811cb0ef41Sopenharmony_ci        throw new ERR_INVALID_ARG_TYPE(
3821cb0ef41Sopenharmony_ci          'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
3831cb0ef41Sopenharmony_ci      }
3841cb0ef41Sopenharmony_ci      ret = stream;
3851cb0ef41Sopenharmony_ci    } else if (isWebStream(stream)) {
3861cb0ef41Sopenharmony_ci      if (isReadableNodeStream(ret)) {
3871cb0ef41Sopenharmony_ci        finishCount++;
3881cb0ef41Sopenharmony_ci        pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
3891cb0ef41Sopenharmony_ci      } else if (isReadableStream(ret) || isIterable(ret)) {
3901cb0ef41Sopenharmony_ci        finishCount++;
3911cb0ef41Sopenharmony_ci        pumpToWeb(ret, stream, finish, { end });
3921cb0ef41Sopenharmony_ci      } else if (isTransformStream(ret)) {
3931cb0ef41Sopenharmony_ci        finishCount++;
3941cb0ef41Sopenharmony_ci        pumpToWeb(ret.readable, stream, finish, { end });
3951cb0ef41Sopenharmony_ci      } else {
3961cb0ef41Sopenharmony_ci        throw new ERR_INVALID_ARG_TYPE(
3971cb0ef41Sopenharmony_ci          'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
3981cb0ef41Sopenharmony_ci      }
3991cb0ef41Sopenharmony_ci      ret = stream;
4001cb0ef41Sopenharmony_ci    } else {
4011cb0ef41Sopenharmony_ci      ret = Duplex.from(stream);
4021cb0ef41Sopenharmony_ci    }
4031cb0ef41Sopenharmony_ci  }
4041cb0ef41Sopenharmony_ci
4051cb0ef41Sopenharmony_ci  if (signal?.aborted || outerSignal?.aborted) {
4061cb0ef41Sopenharmony_ci    process.nextTick(abort);
4071cb0ef41Sopenharmony_ci  }
4081cb0ef41Sopenharmony_ci
4091cb0ef41Sopenharmony_ci  return ret;
4101cb0ef41Sopenharmony_ci}
4111cb0ef41Sopenharmony_ci
4121cb0ef41Sopenharmony_cifunction pipe(src, dst, finish, { end }) {
4131cb0ef41Sopenharmony_ci  let ended = false;
4141cb0ef41Sopenharmony_ci  dst.on('close', () => {
4151cb0ef41Sopenharmony_ci    if (!ended) {
4161cb0ef41Sopenharmony_ci      // Finish if the destination closes before the source has completed.
4171cb0ef41Sopenharmony_ci      finish(new ERR_STREAM_PREMATURE_CLOSE());
4181cb0ef41Sopenharmony_ci    }
4191cb0ef41Sopenharmony_ci  });
4201cb0ef41Sopenharmony_ci
4211cb0ef41Sopenharmony_ci  src.pipe(dst, { end: false }); // If end is true we already will have a listener to end dst.
4221cb0ef41Sopenharmony_ci
4231cb0ef41Sopenharmony_ci  if (end) {
4241cb0ef41Sopenharmony_ci    // Compat. Before node v10.12.0 stdio used to throw an error so
4251cb0ef41Sopenharmony_ci    // pipe() did/does not end() stdio destinations.
4261cb0ef41Sopenharmony_ci    // Now they allow it but "secretly" don't close the underlying fd.
4271cb0ef41Sopenharmony_ci
4281cb0ef41Sopenharmony_ci    function endFn() {
4291cb0ef41Sopenharmony_ci      ended = true;
4301cb0ef41Sopenharmony_ci      dst.end();
4311cb0ef41Sopenharmony_ci    }
4321cb0ef41Sopenharmony_ci
4331cb0ef41Sopenharmony_ci    if (isReadableFinished(src)) { // End the destination if the source has already ended.
4341cb0ef41Sopenharmony_ci      process.nextTick(endFn);
4351cb0ef41Sopenharmony_ci    } else {
4361cb0ef41Sopenharmony_ci      src.once('end', endFn);
4371cb0ef41Sopenharmony_ci    }
4381cb0ef41Sopenharmony_ci  } else {
4391cb0ef41Sopenharmony_ci    finish();
4401cb0ef41Sopenharmony_ci  }
4411cb0ef41Sopenharmony_ci
4421cb0ef41Sopenharmony_ci  eos(src, { readable: true, writable: false }, (err) => {
4431cb0ef41Sopenharmony_ci    const rState = src._readableState;
4441cb0ef41Sopenharmony_ci    if (
4451cb0ef41Sopenharmony_ci      err &&
4461cb0ef41Sopenharmony_ci      err.code === 'ERR_STREAM_PREMATURE_CLOSE' &&
4471cb0ef41Sopenharmony_ci      (rState && rState.ended && !rState.errored && !rState.errorEmitted)
4481cb0ef41Sopenharmony_ci    ) {
4491cb0ef41Sopenharmony_ci      // Some readable streams will emit 'close' before 'end'. However, since
4501cb0ef41Sopenharmony_ci      // this is on the readable side 'end' should still be emitted if the
4511cb0ef41Sopenharmony_ci      // stream has been ended and no error emitted. This should be allowed in
4521cb0ef41Sopenharmony_ci      // favor of backwards compatibility. Since the stream is piped to a
4531cb0ef41Sopenharmony_ci      // destination this should not result in any observable difference.
4541cb0ef41Sopenharmony_ci      // We don't need to check if this is a writable premature close since
4551cb0ef41Sopenharmony_ci      // eos will only fail with premature close on the reading side for
4561cb0ef41Sopenharmony_ci      // duplex streams.
4571cb0ef41Sopenharmony_ci      src
4581cb0ef41Sopenharmony_ci        .once('end', finish)
4591cb0ef41Sopenharmony_ci        .once('error', finish);
4601cb0ef41Sopenharmony_ci    } else {
4611cb0ef41Sopenharmony_ci      finish(err);
4621cb0ef41Sopenharmony_ci    }
4631cb0ef41Sopenharmony_ci  });
4641cb0ef41Sopenharmony_ci  return eos(dst, { readable: false, writable: true }, finish);
4651cb0ef41Sopenharmony_ci}
4661cb0ef41Sopenharmony_ci
4671cb0ef41Sopenharmony_cimodule.exports = { pipelineImpl, pipeline };
468