11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst { pipeline } = require('internal/streams/pipeline'); 41cb0ef41Sopenharmony_ciconst Duplex = require('internal/streams/duplex'); 51cb0ef41Sopenharmony_ciconst { destroyer } = require('internal/streams/destroy'); 61cb0ef41Sopenharmony_ciconst { 71cb0ef41Sopenharmony_ci isNodeStream, 81cb0ef41Sopenharmony_ci isReadable, 91cb0ef41Sopenharmony_ci isWritable, 101cb0ef41Sopenharmony_ci isWebStream, 111cb0ef41Sopenharmony_ci isTransformStream, 121cb0ef41Sopenharmony_ci isWritableStream, 131cb0ef41Sopenharmony_ci isReadableStream, 141cb0ef41Sopenharmony_ci} = require('internal/streams/utils'); 151cb0ef41Sopenharmony_ciconst { 161cb0ef41Sopenharmony_ci AbortError, 171cb0ef41Sopenharmony_ci codes: { 181cb0ef41Sopenharmony_ci ERR_INVALID_ARG_VALUE, 191cb0ef41Sopenharmony_ci ERR_MISSING_ARGS, 201cb0ef41Sopenharmony_ci }, 211cb0ef41Sopenharmony_ci} = require('internal/errors'); 221cb0ef41Sopenharmony_ciconst eos = require('internal/streams/end-of-stream'); 231cb0ef41Sopenharmony_ci 241cb0ef41Sopenharmony_cimodule.exports = function compose(...streams) { 251cb0ef41Sopenharmony_ci if (streams.length === 0) { 261cb0ef41Sopenharmony_ci throw new ERR_MISSING_ARGS('streams'); 271cb0ef41Sopenharmony_ci } 281cb0ef41Sopenharmony_ci 291cb0ef41Sopenharmony_ci if (streams.length === 1) { 301cb0ef41Sopenharmony_ci return Duplex.from(streams[0]); 311cb0ef41Sopenharmony_ci } 321cb0ef41Sopenharmony_ci 331cb0ef41Sopenharmony_ci const orgStreams = [...streams]; 341cb0ef41Sopenharmony_ci 351cb0ef41Sopenharmony_ci if (typeof streams[0] === 'function') { 361cb0ef41Sopenharmony_ci streams[0] = Duplex.from(streams[0]); 371cb0ef41Sopenharmony_ci } 381cb0ef41Sopenharmony_ci 391cb0ef41Sopenharmony_ci if (typeof streams[streams.length - 1] === 'function') { 401cb0ef41Sopenharmony_ci const idx = streams.length - 1; 411cb0ef41Sopenharmony_ci streams[idx] = Duplex.from(streams[idx]); 421cb0ef41Sopenharmony_ci } 431cb0ef41Sopenharmony_ci 441cb0ef41Sopenharmony_ci for (let n = 0; n < streams.length; ++n) { 451cb0ef41Sopenharmony_ci if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { 461cb0ef41Sopenharmony_ci // TODO(ronag): Add checks for non streams. 471cb0ef41Sopenharmony_ci continue; 481cb0ef41Sopenharmony_ci } 491cb0ef41Sopenharmony_ci if ( 501cb0ef41Sopenharmony_ci n < streams.length - 1 && 511cb0ef41Sopenharmony_ci !( 521cb0ef41Sopenharmony_ci isReadable(streams[n]) || 531cb0ef41Sopenharmony_ci isReadableStream(streams[n]) || 541cb0ef41Sopenharmony_ci isTransformStream(streams[n]) 551cb0ef41Sopenharmony_ci ) 561cb0ef41Sopenharmony_ci ) { 571cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_VALUE( 581cb0ef41Sopenharmony_ci `streams[${n}]`, 591cb0ef41Sopenharmony_ci orgStreams[n], 601cb0ef41Sopenharmony_ci 'must be readable', 611cb0ef41Sopenharmony_ci ); 621cb0ef41Sopenharmony_ci } 631cb0ef41Sopenharmony_ci if ( 641cb0ef41Sopenharmony_ci n > 0 && 651cb0ef41Sopenharmony_ci !( 661cb0ef41Sopenharmony_ci isWritable(streams[n]) || 671cb0ef41Sopenharmony_ci isWritableStream(streams[n]) || 681cb0ef41Sopenharmony_ci isTransformStream(streams[n]) 691cb0ef41Sopenharmony_ci ) 701cb0ef41Sopenharmony_ci ) { 711cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_VALUE( 721cb0ef41Sopenharmony_ci `streams[${n}]`, 731cb0ef41Sopenharmony_ci orgStreams[n], 741cb0ef41Sopenharmony_ci 'must be writable', 751cb0ef41Sopenharmony_ci ); 761cb0ef41Sopenharmony_ci } 771cb0ef41Sopenharmony_ci } 781cb0ef41Sopenharmony_ci 791cb0ef41Sopenharmony_ci let ondrain; 801cb0ef41Sopenharmony_ci let onfinish; 811cb0ef41Sopenharmony_ci let onreadable; 821cb0ef41Sopenharmony_ci let onclose; 831cb0ef41Sopenharmony_ci let d; 841cb0ef41Sopenharmony_ci 851cb0ef41Sopenharmony_ci function onfinished(err) { 861cb0ef41Sopenharmony_ci const cb = onclose; 871cb0ef41Sopenharmony_ci onclose = null; 881cb0ef41Sopenharmony_ci 891cb0ef41Sopenharmony_ci if (cb) { 901cb0ef41Sopenharmony_ci cb(err); 911cb0ef41Sopenharmony_ci } else if (err) { 921cb0ef41Sopenharmony_ci d.destroy(err); 931cb0ef41Sopenharmony_ci } else if (!readable && !writable) { 941cb0ef41Sopenharmony_ci d.destroy(); 951cb0ef41Sopenharmony_ci } 961cb0ef41Sopenharmony_ci } 971cb0ef41Sopenharmony_ci 981cb0ef41Sopenharmony_ci const head = streams[0]; 991cb0ef41Sopenharmony_ci const tail = pipeline(streams, onfinished); 1001cb0ef41Sopenharmony_ci 1011cb0ef41Sopenharmony_ci const writable = !!( 1021cb0ef41Sopenharmony_ci isWritable(head) || 1031cb0ef41Sopenharmony_ci isWritableStream(head) || 1041cb0ef41Sopenharmony_ci isTransformStream(head) 1051cb0ef41Sopenharmony_ci ); 1061cb0ef41Sopenharmony_ci const readable = !!( 1071cb0ef41Sopenharmony_ci isReadable(tail) || 1081cb0ef41Sopenharmony_ci isReadableStream(tail) || 1091cb0ef41Sopenharmony_ci isTransformStream(tail) 1101cb0ef41Sopenharmony_ci ); 1111cb0ef41Sopenharmony_ci 1121cb0ef41Sopenharmony_ci // TODO(ronag): Avoid double buffering. 1131cb0ef41Sopenharmony_ci // Implement Writable/Readable/Duplex traits. 1141cb0ef41Sopenharmony_ci // See, https://github.com/nodejs/node/pull/33515. 1151cb0ef41Sopenharmony_ci d = new Duplex({ 1161cb0ef41Sopenharmony_ci // TODO (ronag): highWaterMark? 1171cb0ef41Sopenharmony_ci writableObjectMode: !!head?.writableObjectMode, 1181cb0ef41Sopenharmony_ci readableObjectMode: !!tail?.readableObjectMode, 1191cb0ef41Sopenharmony_ci writable, 1201cb0ef41Sopenharmony_ci readable, 1211cb0ef41Sopenharmony_ci }); 1221cb0ef41Sopenharmony_ci 1231cb0ef41Sopenharmony_ci if (writable) { 1241cb0ef41Sopenharmony_ci if (isNodeStream(head)) { 1251cb0ef41Sopenharmony_ci d._write = function(chunk, encoding, callback) { 1261cb0ef41Sopenharmony_ci if (head.write(chunk, encoding)) { 1271cb0ef41Sopenharmony_ci callback(); 1281cb0ef41Sopenharmony_ci } else { 1291cb0ef41Sopenharmony_ci ondrain = callback; 1301cb0ef41Sopenharmony_ci } 1311cb0ef41Sopenharmony_ci }; 1321cb0ef41Sopenharmony_ci 1331cb0ef41Sopenharmony_ci d._final = function(callback) { 1341cb0ef41Sopenharmony_ci head.end(); 1351cb0ef41Sopenharmony_ci onfinish = callback; 1361cb0ef41Sopenharmony_ci }; 1371cb0ef41Sopenharmony_ci 1381cb0ef41Sopenharmony_ci head.on('drain', function() { 1391cb0ef41Sopenharmony_ci if (ondrain) { 1401cb0ef41Sopenharmony_ci const cb = ondrain; 1411cb0ef41Sopenharmony_ci ondrain = null; 1421cb0ef41Sopenharmony_ci cb(); 1431cb0ef41Sopenharmony_ci } 1441cb0ef41Sopenharmony_ci }); 1451cb0ef41Sopenharmony_ci } else if (isWebStream(head)) { 1461cb0ef41Sopenharmony_ci const writable = isTransformStream(head) ? head.writable : head; 1471cb0ef41Sopenharmony_ci const writer = writable.getWriter(); 1481cb0ef41Sopenharmony_ci 1491cb0ef41Sopenharmony_ci d._write = async function(chunk, encoding, callback) { 1501cb0ef41Sopenharmony_ci try { 1511cb0ef41Sopenharmony_ci await writer.ready; 1521cb0ef41Sopenharmony_ci writer.write(chunk).catch(() => {}); 1531cb0ef41Sopenharmony_ci callback(); 1541cb0ef41Sopenharmony_ci } catch (err) { 1551cb0ef41Sopenharmony_ci callback(err); 1561cb0ef41Sopenharmony_ci } 1571cb0ef41Sopenharmony_ci }; 1581cb0ef41Sopenharmony_ci 1591cb0ef41Sopenharmony_ci d._final = async function(callback) { 1601cb0ef41Sopenharmony_ci try { 1611cb0ef41Sopenharmony_ci await writer.ready; 1621cb0ef41Sopenharmony_ci writer.close().catch(() => {}); 1631cb0ef41Sopenharmony_ci onfinish = callback; 1641cb0ef41Sopenharmony_ci } catch (err) { 1651cb0ef41Sopenharmony_ci callback(err); 1661cb0ef41Sopenharmony_ci } 1671cb0ef41Sopenharmony_ci }; 1681cb0ef41Sopenharmony_ci } 1691cb0ef41Sopenharmony_ci 1701cb0ef41Sopenharmony_ci const toRead = isTransformStream(tail) ? tail.readable : tail; 1711cb0ef41Sopenharmony_ci 1721cb0ef41Sopenharmony_ci eos(toRead, () => { 1731cb0ef41Sopenharmony_ci if (onfinish) { 1741cb0ef41Sopenharmony_ci const cb = onfinish; 1751cb0ef41Sopenharmony_ci onfinish = null; 1761cb0ef41Sopenharmony_ci cb(); 1771cb0ef41Sopenharmony_ci } 1781cb0ef41Sopenharmony_ci }); 1791cb0ef41Sopenharmony_ci } 1801cb0ef41Sopenharmony_ci 1811cb0ef41Sopenharmony_ci if (readable) { 1821cb0ef41Sopenharmony_ci if (isNodeStream(tail)) { 1831cb0ef41Sopenharmony_ci tail.on('readable', function() { 1841cb0ef41Sopenharmony_ci if (onreadable) { 1851cb0ef41Sopenharmony_ci const cb = onreadable; 1861cb0ef41Sopenharmony_ci onreadable = null; 1871cb0ef41Sopenharmony_ci cb(); 1881cb0ef41Sopenharmony_ci } 1891cb0ef41Sopenharmony_ci }); 1901cb0ef41Sopenharmony_ci 1911cb0ef41Sopenharmony_ci tail.on('end', function() { 1921cb0ef41Sopenharmony_ci d.push(null); 1931cb0ef41Sopenharmony_ci }); 1941cb0ef41Sopenharmony_ci 1951cb0ef41Sopenharmony_ci d._read = function() { 1961cb0ef41Sopenharmony_ci while (true) { 1971cb0ef41Sopenharmony_ci const buf = tail.read(); 1981cb0ef41Sopenharmony_ci if (buf === null) { 1991cb0ef41Sopenharmony_ci onreadable = d._read; 2001cb0ef41Sopenharmony_ci return; 2011cb0ef41Sopenharmony_ci } 2021cb0ef41Sopenharmony_ci 2031cb0ef41Sopenharmony_ci if (!d.push(buf)) { 2041cb0ef41Sopenharmony_ci return; 2051cb0ef41Sopenharmony_ci } 2061cb0ef41Sopenharmony_ci } 2071cb0ef41Sopenharmony_ci }; 2081cb0ef41Sopenharmony_ci } else if (isWebStream(tail)) { 2091cb0ef41Sopenharmony_ci const readable = isTransformStream(tail) ? tail.readable : tail; 2101cb0ef41Sopenharmony_ci const reader = readable.getReader(); 2111cb0ef41Sopenharmony_ci d._read = async function() { 2121cb0ef41Sopenharmony_ci while (true) { 2131cb0ef41Sopenharmony_ci try { 2141cb0ef41Sopenharmony_ci const { value, done } = await reader.read(); 2151cb0ef41Sopenharmony_ci 2161cb0ef41Sopenharmony_ci if (!d.push(value)) { 2171cb0ef41Sopenharmony_ci return; 2181cb0ef41Sopenharmony_ci } 2191cb0ef41Sopenharmony_ci 2201cb0ef41Sopenharmony_ci if (done) { 2211cb0ef41Sopenharmony_ci d.push(null); 2221cb0ef41Sopenharmony_ci return; 2231cb0ef41Sopenharmony_ci } 2241cb0ef41Sopenharmony_ci } catch { 2251cb0ef41Sopenharmony_ci return; 2261cb0ef41Sopenharmony_ci } 2271cb0ef41Sopenharmony_ci } 2281cb0ef41Sopenharmony_ci }; 2291cb0ef41Sopenharmony_ci } 2301cb0ef41Sopenharmony_ci } 2311cb0ef41Sopenharmony_ci 2321cb0ef41Sopenharmony_ci d._destroy = function(err, callback) { 2331cb0ef41Sopenharmony_ci if (!err && onclose !== null) { 2341cb0ef41Sopenharmony_ci err = new AbortError(); 2351cb0ef41Sopenharmony_ci } 2361cb0ef41Sopenharmony_ci 2371cb0ef41Sopenharmony_ci onreadable = null; 2381cb0ef41Sopenharmony_ci ondrain = null; 2391cb0ef41Sopenharmony_ci onfinish = null; 2401cb0ef41Sopenharmony_ci 2411cb0ef41Sopenharmony_ci if (onclose === null) { 2421cb0ef41Sopenharmony_ci callback(err); 2431cb0ef41Sopenharmony_ci } else { 2441cb0ef41Sopenharmony_ci onclose = callback; 2451cb0ef41Sopenharmony_ci if (isNodeStream(tail)) { 2461cb0ef41Sopenharmony_ci destroyer(tail, err); 2471cb0ef41Sopenharmony_ci } 2481cb0ef41Sopenharmony_ci } 2491cb0ef41Sopenharmony_ci }; 2501cb0ef41Sopenharmony_ci 2511cb0ef41Sopenharmony_ci return d; 2521cb0ef41Sopenharmony_ci}; 253