11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst { pipeline } = require('internal/streams/pipeline');
41cb0ef41Sopenharmony_ciconst Duplex = require('internal/streams/duplex');
51cb0ef41Sopenharmony_ciconst { destroyer } = require('internal/streams/destroy');
61cb0ef41Sopenharmony_ciconst {
71cb0ef41Sopenharmony_ci  isNodeStream,
81cb0ef41Sopenharmony_ci  isReadable,
91cb0ef41Sopenharmony_ci  isWritable,
101cb0ef41Sopenharmony_ci  isWebStream,
111cb0ef41Sopenharmony_ci  isTransformStream,
121cb0ef41Sopenharmony_ci  isWritableStream,
131cb0ef41Sopenharmony_ci  isReadableStream,
141cb0ef41Sopenharmony_ci} = require('internal/streams/utils');
151cb0ef41Sopenharmony_ciconst {
161cb0ef41Sopenharmony_ci  AbortError,
171cb0ef41Sopenharmony_ci  codes: {
181cb0ef41Sopenharmony_ci    ERR_INVALID_ARG_VALUE,
191cb0ef41Sopenharmony_ci    ERR_MISSING_ARGS,
201cb0ef41Sopenharmony_ci  },
211cb0ef41Sopenharmony_ci} = require('internal/errors');
221cb0ef41Sopenharmony_ciconst eos = require('internal/streams/end-of-stream');
231cb0ef41Sopenharmony_ci
241cb0ef41Sopenharmony_cimodule.exports = function compose(...streams) {
251cb0ef41Sopenharmony_ci  if (streams.length === 0) {
261cb0ef41Sopenharmony_ci    throw new ERR_MISSING_ARGS('streams');
271cb0ef41Sopenharmony_ci  }
281cb0ef41Sopenharmony_ci
291cb0ef41Sopenharmony_ci  if (streams.length === 1) {
301cb0ef41Sopenharmony_ci    return Duplex.from(streams[0]);
311cb0ef41Sopenharmony_ci  }
321cb0ef41Sopenharmony_ci
331cb0ef41Sopenharmony_ci  const orgStreams = [...streams];
341cb0ef41Sopenharmony_ci
351cb0ef41Sopenharmony_ci  if (typeof streams[0] === 'function') {
361cb0ef41Sopenharmony_ci    streams[0] = Duplex.from(streams[0]);
371cb0ef41Sopenharmony_ci  }
381cb0ef41Sopenharmony_ci
391cb0ef41Sopenharmony_ci  if (typeof streams[streams.length - 1] === 'function') {
401cb0ef41Sopenharmony_ci    const idx = streams.length - 1;
411cb0ef41Sopenharmony_ci    streams[idx] = Duplex.from(streams[idx]);
421cb0ef41Sopenharmony_ci  }
431cb0ef41Sopenharmony_ci
441cb0ef41Sopenharmony_ci  for (let n = 0; n < streams.length; ++n) {
451cb0ef41Sopenharmony_ci    if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
461cb0ef41Sopenharmony_ci      // TODO(ronag): Add checks for non streams.
471cb0ef41Sopenharmony_ci      continue;
481cb0ef41Sopenharmony_ci    }
491cb0ef41Sopenharmony_ci    if (
501cb0ef41Sopenharmony_ci      n < streams.length - 1 &&
511cb0ef41Sopenharmony_ci      !(
521cb0ef41Sopenharmony_ci        isReadable(streams[n]) ||
531cb0ef41Sopenharmony_ci        isReadableStream(streams[n]) ||
541cb0ef41Sopenharmony_ci        isTransformStream(streams[n])
551cb0ef41Sopenharmony_ci      )
561cb0ef41Sopenharmony_ci    ) {
571cb0ef41Sopenharmony_ci      throw new ERR_INVALID_ARG_VALUE(
581cb0ef41Sopenharmony_ci        `streams[${n}]`,
591cb0ef41Sopenharmony_ci        orgStreams[n],
601cb0ef41Sopenharmony_ci        'must be readable',
611cb0ef41Sopenharmony_ci      );
621cb0ef41Sopenharmony_ci    }
631cb0ef41Sopenharmony_ci    if (
641cb0ef41Sopenharmony_ci      n > 0 &&
651cb0ef41Sopenharmony_ci      !(
661cb0ef41Sopenharmony_ci        isWritable(streams[n]) ||
671cb0ef41Sopenharmony_ci        isWritableStream(streams[n]) ||
681cb0ef41Sopenharmony_ci        isTransformStream(streams[n])
691cb0ef41Sopenharmony_ci      )
701cb0ef41Sopenharmony_ci    ) {
711cb0ef41Sopenharmony_ci      throw new ERR_INVALID_ARG_VALUE(
721cb0ef41Sopenharmony_ci        `streams[${n}]`,
731cb0ef41Sopenharmony_ci        orgStreams[n],
741cb0ef41Sopenharmony_ci        'must be writable',
751cb0ef41Sopenharmony_ci      );
761cb0ef41Sopenharmony_ci    }
771cb0ef41Sopenharmony_ci  }
781cb0ef41Sopenharmony_ci
791cb0ef41Sopenharmony_ci  let ondrain;
801cb0ef41Sopenharmony_ci  let onfinish;
811cb0ef41Sopenharmony_ci  let onreadable;
821cb0ef41Sopenharmony_ci  let onclose;
831cb0ef41Sopenharmony_ci  let d;
841cb0ef41Sopenharmony_ci
851cb0ef41Sopenharmony_ci  function onfinished(err) {
861cb0ef41Sopenharmony_ci    const cb = onclose;
871cb0ef41Sopenharmony_ci    onclose = null;
881cb0ef41Sopenharmony_ci
891cb0ef41Sopenharmony_ci    if (cb) {
901cb0ef41Sopenharmony_ci      cb(err);
911cb0ef41Sopenharmony_ci    } else if (err) {
921cb0ef41Sopenharmony_ci      d.destroy(err);
931cb0ef41Sopenharmony_ci    } else if (!readable && !writable) {
941cb0ef41Sopenharmony_ci      d.destroy();
951cb0ef41Sopenharmony_ci    }
961cb0ef41Sopenharmony_ci  }
971cb0ef41Sopenharmony_ci
981cb0ef41Sopenharmony_ci  const head = streams[0];
991cb0ef41Sopenharmony_ci  const tail = pipeline(streams, onfinished);
1001cb0ef41Sopenharmony_ci
1011cb0ef41Sopenharmony_ci  const writable = !!(
1021cb0ef41Sopenharmony_ci    isWritable(head) ||
1031cb0ef41Sopenharmony_ci    isWritableStream(head) ||
1041cb0ef41Sopenharmony_ci    isTransformStream(head)
1051cb0ef41Sopenharmony_ci  );
1061cb0ef41Sopenharmony_ci  const readable = !!(
1071cb0ef41Sopenharmony_ci    isReadable(tail) ||
1081cb0ef41Sopenharmony_ci    isReadableStream(tail) ||
1091cb0ef41Sopenharmony_ci    isTransformStream(tail)
1101cb0ef41Sopenharmony_ci  );
1111cb0ef41Sopenharmony_ci
1121cb0ef41Sopenharmony_ci  // TODO(ronag): Avoid double buffering.
1131cb0ef41Sopenharmony_ci  // Implement Writable/Readable/Duplex traits.
1141cb0ef41Sopenharmony_ci  // See, https://github.com/nodejs/node/pull/33515.
1151cb0ef41Sopenharmony_ci  d = new Duplex({
1161cb0ef41Sopenharmony_ci    // TODO (ronag): highWaterMark?
1171cb0ef41Sopenharmony_ci    writableObjectMode: !!head?.writableObjectMode,
1181cb0ef41Sopenharmony_ci    readableObjectMode: !!tail?.readableObjectMode,
1191cb0ef41Sopenharmony_ci    writable,
1201cb0ef41Sopenharmony_ci    readable,
1211cb0ef41Sopenharmony_ci  });
1221cb0ef41Sopenharmony_ci
1231cb0ef41Sopenharmony_ci  if (writable) {
1241cb0ef41Sopenharmony_ci    if (isNodeStream(head)) {
1251cb0ef41Sopenharmony_ci      d._write = function(chunk, encoding, callback) {
1261cb0ef41Sopenharmony_ci        if (head.write(chunk, encoding)) {
1271cb0ef41Sopenharmony_ci          callback();
1281cb0ef41Sopenharmony_ci        } else {
1291cb0ef41Sopenharmony_ci          ondrain = callback;
1301cb0ef41Sopenharmony_ci        }
1311cb0ef41Sopenharmony_ci      };
1321cb0ef41Sopenharmony_ci
1331cb0ef41Sopenharmony_ci      d._final = function(callback) {
1341cb0ef41Sopenharmony_ci        head.end();
1351cb0ef41Sopenharmony_ci        onfinish = callback;
1361cb0ef41Sopenharmony_ci      };
1371cb0ef41Sopenharmony_ci
1381cb0ef41Sopenharmony_ci      head.on('drain', function() {
1391cb0ef41Sopenharmony_ci        if (ondrain) {
1401cb0ef41Sopenharmony_ci          const cb = ondrain;
1411cb0ef41Sopenharmony_ci          ondrain = null;
1421cb0ef41Sopenharmony_ci          cb();
1431cb0ef41Sopenharmony_ci        }
1441cb0ef41Sopenharmony_ci      });
1451cb0ef41Sopenharmony_ci    } else if (isWebStream(head)) {
1461cb0ef41Sopenharmony_ci      const writable = isTransformStream(head) ? head.writable : head;
1471cb0ef41Sopenharmony_ci      const writer = writable.getWriter();
1481cb0ef41Sopenharmony_ci
1491cb0ef41Sopenharmony_ci      d._write = async function(chunk, encoding, callback) {
1501cb0ef41Sopenharmony_ci        try {
1511cb0ef41Sopenharmony_ci          await writer.ready;
1521cb0ef41Sopenharmony_ci          writer.write(chunk).catch(() => {});
1531cb0ef41Sopenharmony_ci          callback();
1541cb0ef41Sopenharmony_ci        } catch (err) {
1551cb0ef41Sopenharmony_ci          callback(err);
1561cb0ef41Sopenharmony_ci        }
1571cb0ef41Sopenharmony_ci      };
1581cb0ef41Sopenharmony_ci
1591cb0ef41Sopenharmony_ci      d._final = async function(callback) {
1601cb0ef41Sopenharmony_ci        try {
1611cb0ef41Sopenharmony_ci          await writer.ready;
1621cb0ef41Sopenharmony_ci          writer.close().catch(() => {});
1631cb0ef41Sopenharmony_ci          onfinish = callback;
1641cb0ef41Sopenharmony_ci        } catch (err) {
1651cb0ef41Sopenharmony_ci          callback(err);
1661cb0ef41Sopenharmony_ci        }
1671cb0ef41Sopenharmony_ci      };
1681cb0ef41Sopenharmony_ci    }
1691cb0ef41Sopenharmony_ci
1701cb0ef41Sopenharmony_ci    const toRead = isTransformStream(tail) ? tail.readable : tail;
1711cb0ef41Sopenharmony_ci
1721cb0ef41Sopenharmony_ci    eos(toRead, () => {
1731cb0ef41Sopenharmony_ci      if (onfinish) {
1741cb0ef41Sopenharmony_ci        const cb = onfinish;
1751cb0ef41Sopenharmony_ci        onfinish = null;
1761cb0ef41Sopenharmony_ci        cb();
1771cb0ef41Sopenharmony_ci      }
1781cb0ef41Sopenharmony_ci    });
1791cb0ef41Sopenharmony_ci  }
1801cb0ef41Sopenharmony_ci
1811cb0ef41Sopenharmony_ci  if (readable) {
1821cb0ef41Sopenharmony_ci    if (isNodeStream(tail)) {
1831cb0ef41Sopenharmony_ci      tail.on('readable', function() {
1841cb0ef41Sopenharmony_ci        if (onreadable) {
1851cb0ef41Sopenharmony_ci          const cb = onreadable;
1861cb0ef41Sopenharmony_ci          onreadable = null;
1871cb0ef41Sopenharmony_ci          cb();
1881cb0ef41Sopenharmony_ci        }
1891cb0ef41Sopenharmony_ci      });
1901cb0ef41Sopenharmony_ci
1911cb0ef41Sopenharmony_ci      tail.on('end', function() {
1921cb0ef41Sopenharmony_ci        d.push(null);
1931cb0ef41Sopenharmony_ci      });
1941cb0ef41Sopenharmony_ci
1951cb0ef41Sopenharmony_ci      d._read = function() {
1961cb0ef41Sopenharmony_ci        while (true) {
1971cb0ef41Sopenharmony_ci          const buf = tail.read();
1981cb0ef41Sopenharmony_ci          if (buf === null) {
1991cb0ef41Sopenharmony_ci            onreadable = d._read;
2001cb0ef41Sopenharmony_ci            return;
2011cb0ef41Sopenharmony_ci          }
2021cb0ef41Sopenharmony_ci
2031cb0ef41Sopenharmony_ci          if (!d.push(buf)) {
2041cb0ef41Sopenharmony_ci            return;
2051cb0ef41Sopenharmony_ci          }
2061cb0ef41Sopenharmony_ci        }
2071cb0ef41Sopenharmony_ci      };
2081cb0ef41Sopenharmony_ci    } else if (isWebStream(tail)) {
2091cb0ef41Sopenharmony_ci      const readable = isTransformStream(tail) ? tail.readable : tail;
2101cb0ef41Sopenharmony_ci      const reader = readable.getReader();
2111cb0ef41Sopenharmony_ci      d._read = async function() {
2121cb0ef41Sopenharmony_ci        while (true) {
2131cb0ef41Sopenharmony_ci          try {
2141cb0ef41Sopenharmony_ci            const { value, done } = await reader.read();
2151cb0ef41Sopenharmony_ci
2161cb0ef41Sopenharmony_ci            if (!d.push(value)) {
2171cb0ef41Sopenharmony_ci              return;
2181cb0ef41Sopenharmony_ci            }
2191cb0ef41Sopenharmony_ci
2201cb0ef41Sopenharmony_ci            if (done) {
2211cb0ef41Sopenharmony_ci              d.push(null);
2221cb0ef41Sopenharmony_ci              return;
2231cb0ef41Sopenharmony_ci            }
2241cb0ef41Sopenharmony_ci          } catch {
2251cb0ef41Sopenharmony_ci            return;
2261cb0ef41Sopenharmony_ci          }
2271cb0ef41Sopenharmony_ci        }
2281cb0ef41Sopenharmony_ci      };
2291cb0ef41Sopenharmony_ci    }
2301cb0ef41Sopenharmony_ci  }
2311cb0ef41Sopenharmony_ci
2321cb0ef41Sopenharmony_ci  d._destroy = function(err, callback) {
2331cb0ef41Sopenharmony_ci    if (!err && onclose !== null) {
2341cb0ef41Sopenharmony_ci      err = new AbortError();
2351cb0ef41Sopenharmony_ci    }
2361cb0ef41Sopenharmony_ci
2371cb0ef41Sopenharmony_ci    onreadable = null;
2381cb0ef41Sopenharmony_ci    ondrain = null;
2391cb0ef41Sopenharmony_ci    onfinish = null;
2401cb0ef41Sopenharmony_ci
2411cb0ef41Sopenharmony_ci    if (onclose === null) {
2421cb0ef41Sopenharmony_ci      callback(err);
2431cb0ef41Sopenharmony_ci    } else {
2441cb0ef41Sopenharmony_ci      onclose = callback;
2451cb0ef41Sopenharmony_ci      if (isNodeStream(tail)) {
2461cb0ef41Sopenharmony_ci        destroyer(tail, err);
2471cb0ef41Sopenharmony_ci      }
2481cb0ef41Sopenharmony_ci    }
2491cb0ef41Sopenharmony_ci  };
2501cb0ef41Sopenharmony_ci
2511cb0ef41Sopenharmony_ci  return d;
2521cb0ef41Sopenharmony_ci};
253