1'use strict'; 2 3const { pipeline } = require('internal/streams/pipeline'); 4const Duplex = require('internal/streams/duplex'); 5const { destroyer } = require('internal/streams/destroy'); 6const { 7 isNodeStream, 8 isReadable, 9 isWritable, 10 isWebStream, 11 isTransformStream, 12 isWritableStream, 13 isReadableStream, 14} = require('internal/streams/utils'); 15const { 16 AbortError, 17 codes: { 18 ERR_INVALID_ARG_VALUE, 19 ERR_MISSING_ARGS, 20 }, 21} = require('internal/errors'); 22const eos = require('internal/streams/end-of-stream'); 23 24module.exports = function compose(...streams) { 25 if (streams.length === 0) { 26 throw new ERR_MISSING_ARGS('streams'); 27 } 28 29 if (streams.length === 1) { 30 return Duplex.from(streams[0]); 31 } 32 33 const orgStreams = [...streams]; 34 35 if (typeof streams[0] === 'function') { 36 streams[0] = Duplex.from(streams[0]); 37 } 38 39 if (typeof streams[streams.length - 1] === 'function') { 40 const idx = streams.length - 1; 41 streams[idx] = Duplex.from(streams[idx]); 42 } 43 44 for (let n = 0; n < streams.length; ++n) { 45 if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) { 46 // TODO(ronag): Add checks for non streams. 47 continue; 48 } 49 if ( 50 n < streams.length - 1 && 51 !( 52 isReadable(streams[n]) || 53 isReadableStream(streams[n]) || 54 isTransformStream(streams[n]) 55 ) 56 ) { 57 throw new ERR_INVALID_ARG_VALUE( 58 `streams[${n}]`, 59 orgStreams[n], 60 'must be readable', 61 ); 62 } 63 if ( 64 n > 0 && 65 !( 66 isWritable(streams[n]) || 67 isWritableStream(streams[n]) || 68 isTransformStream(streams[n]) 69 ) 70 ) { 71 throw new ERR_INVALID_ARG_VALUE( 72 `streams[${n}]`, 73 orgStreams[n], 74 'must be writable', 75 ); 76 } 77 } 78 79 let ondrain; 80 let onfinish; 81 let onreadable; 82 let onclose; 83 let d; 84 85 function onfinished(err) { 86 const cb = onclose; 87 onclose = null; 88 89 if (cb) { 90 cb(err); 91 } else if (err) { 92 d.destroy(err); 93 } else if (!readable && !writable) { 94 d.destroy(); 95 } 96 } 97 98 const head = streams[0]; 99 const tail = pipeline(streams, onfinished); 100 101 const writable = !!( 102 isWritable(head) || 103 isWritableStream(head) || 104 isTransformStream(head) 105 ); 106 const readable = !!( 107 isReadable(tail) || 108 isReadableStream(tail) || 109 isTransformStream(tail) 110 ); 111 112 // TODO(ronag): Avoid double buffering. 113 // Implement Writable/Readable/Duplex traits. 114 // See, https://github.com/nodejs/node/pull/33515. 115 d = new Duplex({ 116 // TODO (ronag): highWaterMark? 117 writableObjectMode: !!head?.writableObjectMode, 118 readableObjectMode: !!tail?.readableObjectMode, 119 writable, 120 readable, 121 }); 122 123 if (writable) { 124 if (isNodeStream(head)) { 125 d._write = function(chunk, encoding, callback) { 126 if (head.write(chunk, encoding)) { 127 callback(); 128 } else { 129 ondrain = callback; 130 } 131 }; 132 133 d._final = function(callback) { 134 head.end(); 135 onfinish = callback; 136 }; 137 138 head.on('drain', function() { 139 if (ondrain) { 140 const cb = ondrain; 141 ondrain = null; 142 cb(); 143 } 144 }); 145 } else if (isWebStream(head)) { 146 const writable = isTransformStream(head) ? head.writable : head; 147 const writer = writable.getWriter(); 148 149 d._write = async function(chunk, encoding, callback) { 150 try { 151 await writer.ready; 152 writer.write(chunk).catch(() => {}); 153 callback(); 154 } catch (err) { 155 callback(err); 156 } 157 }; 158 159 d._final = async function(callback) { 160 try { 161 await writer.ready; 162 writer.close().catch(() => {}); 163 onfinish = callback; 164 } catch (err) { 165 callback(err); 166 } 167 }; 168 } 169 170 const toRead = isTransformStream(tail) ? tail.readable : tail; 171 172 eos(toRead, () => { 173 if (onfinish) { 174 const cb = onfinish; 175 onfinish = null; 176 cb(); 177 } 178 }); 179 } 180 181 if (readable) { 182 if (isNodeStream(tail)) { 183 tail.on('readable', function() { 184 if (onreadable) { 185 const cb = onreadable; 186 onreadable = null; 187 cb(); 188 } 189 }); 190 191 tail.on('end', function() { 192 d.push(null); 193 }); 194 195 d._read = function() { 196 while (true) { 197 const buf = tail.read(); 198 if (buf === null) { 199 onreadable = d._read; 200 return; 201 } 202 203 if (!d.push(buf)) { 204 return; 205 } 206 } 207 }; 208 } else if (isWebStream(tail)) { 209 const readable = isTransformStream(tail) ? tail.readable : tail; 210 const reader = readable.getReader(); 211 d._read = async function() { 212 while (true) { 213 try { 214 const { value, done } = await reader.read(); 215 216 if (!d.push(value)) { 217 return; 218 } 219 220 if (done) { 221 d.push(null); 222 return; 223 } 224 } catch { 225 return; 226 } 227 } 228 }; 229 } 230 } 231 232 d._destroy = function(err, callback) { 233 if (!err && onclose !== null) { 234 err = new AbortError(); 235 } 236 237 onreadable = null; 238 ondrain = null; 239 onfinish = null; 240 241 if (onclose === null) { 242 callback(err); 243 } else { 244 onclose = callback; 245 if (isNodeStream(tail)) { 246 destroyer(tail, err); 247 } 248 } 249 }; 250 251 return d; 252}; 253