1// Ported from https://github.com/mafintosh/end-of-stream with 2// permission from the author, Mathias Buus (@mafintosh). 3 4'use strict'; 5 6const { 7 AbortError, 8 codes, 9} = require('internal/errors'); 10const { 11 ERR_INVALID_ARG_TYPE, 12 ERR_STREAM_PREMATURE_CLOSE, 13} = codes; 14const { 15 kEmptyObject, 16 once, 17} = require('internal/util'); 18const { 19 validateAbortSignal, 20 validateFunction, 21 validateObject, 22 validateBoolean, 23} = require('internal/validators'); 24 25const { 26 Promise, 27 PromisePrototypeThen, 28 SymbolDispose, 29} = primordials; 30 31const { 32 isClosed, 33 isReadable, 34 isReadableNodeStream, 35 isReadableStream, 36 isReadableFinished, 37 isReadableErrored, 38 isWritable, 39 isWritableNodeStream, 40 isWritableStream, 41 isWritableFinished, 42 isWritableErrored, 43 isNodeStream, 44 willEmitClose: _willEmitClose, 45 kIsClosedPromise, 46} = require('internal/streams/utils'); 47let addAbortListener; 48 49function isRequest(stream) { 50 return stream.setHeader && typeof stream.abort === 'function'; 51} 52 53const nop = () => {}; 54 55function eos(stream, options, callback) { 56 if (arguments.length === 2) { 57 callback = options; 58 options = kEmptyObject; 59 } else if (options == null) { 60 options = kEmptyObject; 61 } else { 62 validateObject(options, 'options'); 63 } 64 validateFunction(callback, 'callback'); 65 validateAbortSignal(options.signal, 'options.signal'); 66 67 callback = once(callback); 68 69 if (isReadableStream(stream) || isWritableStream(stream)) { 70 return eosWeb(stream, options, callback); 71 } 72 73 if (!isNodeStream(stream)) { 74 throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream); 75 } 76 77 const readable = options.readable ?? isReadableNodeStream(stream); 78 const writable = options.writable ?? isWritableNodeStream(stream); 79 80 const wState = stream._writableState; 81 const rState = stream._readableState; 82 83 const onlegacyfinish = () => { 84 if (!stream.writable) { 85 onfinish(); 86 } 87 }; 88 89 // TODO (ronag): Improve soft detection to include core modules and 90 // common ecosystem modules that do properly emit 'close' but fail 91 // this generic check. 92 let willEmitClose = ( 93 _willEmitClose(stream) && 94 isReadableNodeStream(stream) === readable && 95 isWritableNodeStream(stream) === writable 96 ); 97 98 let writableFinished = isWritableFinished(stream, false); 99 const onfinish = () => { 100 writableFinished = true; 101 // Stream should not be destroyed here. If it is that 102 // means that user space is doing something differently and 103 // we cannot trust willEmitClose. 104 if (stream.destroyed) { 105 willEmitClose = false; 106 } 107 108 if (willEmitClose && (!stream.readable || readable)) { 109 return; 110 } 111 112 if (!readable || readableFinished) { 113 callback.call(stream); 114 } 115 }; 116 117 let readableFinished = isReadableFinished(stream, false); 118 const onend = () => { 119 readableFinished = true; 120 // Stream should not be destroyed here. If it is that 121 // means that user space is doing something differently and 122 // we cannot trust willEmitClose. 123 if (stream.destroyed) { 124 willEmitClose = false; 125 } 126 127 if (willEmitClose && (!stream.writable || writable)) { 128 return; 129 } 130 131 if (!writable || writableFinished) { 132 callback.call(stream); 133 } 134 }; 135 136 const onerror = (err) => { 137 callback.call(stream, err); 138 }; 139 140 let closed = isClosed(stream); 141 142 const onclose = () => { 143 closed = true; 144 145 const errored = isWritableErrored(stream) || isReadableErrored(stream); 146 147 if (errored && typeof errored !== 'boolean') { 148 return callback.call(stream, errored); 149 } 150 151 if (readable && !readableFinished && isReadableNodeStream(stream, true)) { 152 if (!isReadableFinished(stream, false)) 153 return callback.call(stream, 154 new ERR_STREAM_PREMATURE_CLOSE()); 155 } 156 if (writable && !writableFinished) { 157 if (!isWritableFinished(stream, false)) 158 return callback.call(stream, 159 new ERR_STREAM_PREMATURE_CLOSE()); 160 } 161 162 callback.call(stream); 163 }; 164 165 const onclosed = () => { 166 closed = true; 167 168 const errored = isWritableErrored(stream) || isReadableErrored(stream); 169 170 if (errored && typeof errored !== 'boolean') { 171 return callback.call(stream, errored); 172 } 173 174 callback.call(stream); 175 }; 176 177 const onrequest = () => { 178 stream.req.on('finish', onfinish); 179 }; 180 181 if (isRequest(stream)) { 182 stream.on('complete', onfinish); 183 if (!willEmitClose) { 184 stream.on('abort', onclose); 185 } 186 if (stream.req) { 187 onrequest(); 188 } else { 189 stream.on('request', onrequest); 190 } 191 } else if (writable && !wState) { // legacy streams 192 stream.on('end', onlegacyfinish); 193 stream.on('close', onlegacyfinish); 194 } 195 196 // Not all streams will emit 'close' after 'aborted'. 197 if (!willEmitClose && typeof stream.aborted === 'boolean') { 198 stream.on('aborted', onclose); 199 } 200 201 stream.on('end', onend); 202 stream.on('finish', onfinish); 203 if (options.error !== false) { 204 stream.on('error', onerror); 205 } 206 stream.on('close', onclose); 207 208 if (closed) { 209 process.nextTick(onclose); 210 } else if (wState?.errorEmitted || rState?.errorEmitted) { 211 if (!willEmitClose) { 212 process.nextTick(onclosed); 213 } 214 } else if ( 215 !readable && 216 (!willEmitClose || isReadable(stream)) && 217 (writableFinished || isWritable(stream) === false) 218 ) { 219 process.nextTick(onclosed); 220 } else if ( 221 !writable && 222 (!willEmitClose || isWritable(stream)) && 223 (readableFinished || isReadable(stream) === false) 224 ) { 225 process.nextTick(onclosed); 226 } else if ((rState && stream.req && stream.aborted)) { 227 process.nextTick(onclosed); 228 } 229 230 const cleanup = () => { 231 callback = nop; 232 stream.removeListener('aborted', onclose); 233 stream.removeListener('complete', onfinish); 234 stream.removeListener('abort', onclose); 235 stream.removeListener('request', onrequest); 236 if (stream.req) stream.req.removeListener('finish', onfinish); 237 stream.removeListener('end', onlegacyfinish); 238 stream.removeListener('close', onlegacyfinish); 239 stream.removeListener('finish', onfinish); 240 stream.removeListener('end', onend); 241 stream.removeListener('error', onerror); 242 stream.removeListener('close', onclose); 243 }; 244 245 if (options.signal && !closed) { 246 const abort = () => { 247 // Keep it because cleanup removes it. 248 const endCallback = callback; 249 cleanup(); 250 endCallback.call( 251 stream, 252 new AbortError(undefined, { cause: options.signal.reason })); 253 }; 254 if (options.signal.aborted) { 255 process.nextTick(abort); 256 } else { 257 addAbortListener ??= require('events').addAbortListener; 258 const disposable = addAbortListener(options.signal, abort); 259 const originalCallback = callback; 260 callback = once((...args) => { 261 disposable[SymbolDispose](); 262 originalCallback.apply(stream, args); 263 }); 264 } 265 } 266 267 return cleanup; 268} 269 270function eosWeb(stream, options, callback) { 271 let isAborted = false; 272 let abort = nop; 273 if (options.signal) { 274 abort = () => { 275 isAborted = true; 276 callback.call(stream, new AbortError(undefined, { cause: options.signal.reason })); 277 }; 278 if (options.signal.aborted) { 279 process.nextTick(abort); 280 } else { 281 addAbortListener ??= require('events').addAbortListener; 282 const disposable = addAbortListener(options.signal, abort); 283 const originalCallback = callback; 284 callback = once((...args) => { 285 disposable[SymbolDispose](); 286 originalCallback.apply(stream, args); 287 }); 288 } 289 } 290 const resolverFn = (...args) => { 291 if (!isAborted) { 292 process.nextTick(() => callback.apply(stream, args)); 293 } 294 }; 295 PromisePrototypeThen( 296 stream[kIsClosedPromise].promise, 297 resolverFn, 298 resolverFn, 299 ); 300 return nop; 301} 302 303function finished(stream, opts) { 304 let autoCleanup = false; 305 if (opts === null) { 306 opts = kEmptyObject; 307 } 308 if (opts?.cleanup) { 309 validateBoolean(opts.cleanup, 'cleanup'); 310 autoCleanup = opts.cleanup; 311 } 312 return new Promise((resolve, reject) => { 313 const cleanup = eos(stream, opts, (err) => { 314 if (autoCleanup) { 315 cleanup(); 316 } 317 if (err) { 318 reject(err); 319 } else { 320 resolve(); 321 } 322 }); 323 }); 324} 325 326module.exports = eos; 327module.exports.finished = finished; 328