1// Ported from https://github.com/mafintosh/end-of-stream with
2// permission from the author, Mathias Buus (@mafintosh).
3
4'use strict';
5
6const {
7  AbortError,
8  codes,
9} = require('internal/errors');
10const {
11  ERR_INVALID_ARG_TYPE,
12  ERR_STREAM_PREMATURE_CLOSE,
13} = codes;
14const {
15  kEmptyObject,
16  once,
17} = require('internal/util');
18const {
19  validateAbortSignal,
20  validateFunction,
21  validateObject,
22  validateBoolean,
23} = require('internal/validators');
24
25const {
26  Promise,
27  PromisePrototypeThen,
28  SymbolDispose,
29} = primordials;
30
31const {
32  isClosed,
33  isReadable,
34  isReadableNodeStream,
35  isReadableStream,
36  isReadableFinished,
37  isReadableErrored,
38  isWritable,
39  isWritableNodeStream,
40  isWritableStream,
41  isWritableFinished,
42  isWritableErrored,
43  isNodeStream,
44  willEmitClose: _willEmitClose,
45  kIsClosedPromise,
46} = require('internal/streams/utils');
47let addAbortListener;
48
49function isRequest(stream) {
50  return stream.setHeader && typeof stream.abort === 'function';
51}
52
53const nop = () => {};
54
55function eos(stream, options, callback) {
56  if (arguments.length === 2) {
57    callback = options;
58    options = kEmptyObject;
59  } else if (options == null) {
60    options = kEmptyObject;
61  } else {
62    validateObject(options, 'options');
63  }
64  validateFunction(callback, 'callback');
65  validateAbortSignal(options.signal, 'options.signal');
66
67  callback = once(callback);
68
69  if (isReadableStream(stream) || isWritableStream(stream)) {
70    return eosWeb(stream, options, callback);
71  }
72
73  if (!isNodeStream(stream)) {
74    throw new ERR_INVALID_ARG_TYPE('stream', ['ReadableStream', 'WritableStream', 'Stream'], stream);
75  }
76
77  const readable = options.readable ?? isReadableNodeStream(stream);
78  const writable = options.writable ?? isWritableNodeStream(stream);
79
80  const wState = stream._writableState;
81  const rState = stream._readableState;
82
83  const onlegacyfinish = () => {
84    if (!stream.writable) {
85      onfinish();
86    }
87  };
88
89  // TODO (ronag): Improve soft detection to include core modules and
90  // common ecosystem modules that do properly emit 'close' but fail
91  // this generic check.
92  let willEmitClose = (
93    _willEmitClose(stream) &&
94    isReadableNodeStream(stream) === readable &&
95    isWritableNodeStream(stream) === writable
96  );
97
98  let writableFinished = isWritableFinished(stream, false);
99  const onfinish = () => {
100    writableFinished = true;
101    // Stream should not be destroyed here. If it is that
102    // means that user space is doing something differently and
103    // we cannot trust willEmitClose.
104    if (stream.destroyed) {
105      willEmitClose = false;
106    }
107
108    if (willEmitClose && (!stream.readable || readable)) {
109      return;
110    }
111
112    if (!readable || readableFinished) {
113      callback.call(stream);
114    }
115  };
116
117  let readableFinished = isReadableFinished(stream, false);
118  const onend = () => {
119    readableFinished = true;
120    // Stream should not be destroyed here. If it is that
121    // means that user space is doing something differently and
122    // we cannot trust willEmitClose.
123    if (stream.destroyed) {
124      willEmitClose = false;
125    }
126
127    if (willEmitClose && (!stream.writable || writable)) {
128      return;
129    }
130
131    if (!writable || writableFinished) {
132      callback.call(stream);
133    }
134  };
135
136  const onerror = (err) => {
137    callback.call(stream, err);
138  };
139
140  let closed = isClosed(stream);
141
142  const onclose = () => {
143    closed = true;
144
145    const errored = isWritableErrored(stream) || isReadableErrored(stream);
146
147    if (errored && typeof errored !== 'boolean') {
148      return callback.call(stream, errored);
149    }
150
151    if (readable && !readableFinished && isReadableNodeStream(stream, true)) {
152      if (!isReadableFinished(stream, false))
153        return callback.call(stream,
154                             new ERR_STREAM_PREMATURE_CLOSE());
155    }
156    if (writable && !writableFinished) {
157      if (!isWritableFinished(stream, false))
158        return callback.call(stream,
159                             new ERR_STREAM_PREMATURE_CLOSE());
160    }
161
162    callback.call(stream);
163  };
164
165  const onclosed = () => {
166    closed = true;
167
168    const errored = isWritableErrored(stream) || isReadableErrored(stream);
169
170    if (errored && typeof errored !== 'boolean') {
171      return callback.call(stream, errored);
172    }
173
174    callback.call(stream);
175  };
176
177  const onrequest = () => {
178    stream.req.on('finish', onfinish);
179  };
180
181  if (isRequest(stream)) {
182    stream.on('complete', onfinish);
183    if (!willEmitClose) {
184      stream.on('abort', onclose);
185    }
186    if (stream.req) {
187      onrequest();
188    } else {
189      stream.on('request', onrequest);
190    }
191  } else if (writable && !wState) { // legacy streams
192    stream.on('end', onlegacyfinish);
193    stream.on('close', onlegacyfinish);
194  }
195
196  // Not all streams will emit 'close' after 'aborted'.
197  if (!willEmitClose && typeof stream.aborted === 'boolean') {
198    stream.on('aborted', onclose);
199  }
200
201  stream.on('end', onend);
202  stream.on('finish', onfinish);
203  if (options.error !== false) {
204    stream.on('error', onerror);
205  }
206  stream.on('close', onclose);
207
208  if (closed) {
209    process.nextTick(onclose);
210  } else if (wState?.errorEmitted || rState?.errorEmitted) {
211    if (!willEmitClose) {
212      process.nextTick(onclosed);
213    }
214  } else if (
215    !readable &&
216    (!willEmitClose || isReadable(stream)) &&
217    (writableFinished || isWritable(stream) === false)
218  ) {
219    process.nextTick(onclosed);
220  } else if (
221    !writable &&
222    (!willEmitClose || isWritable(stream)) &&
223    (readableFinished || isReadable(stream) === false)
224  ) {
225    process.nextTick(onclosed);
226  } else if ((rState && stream.req && stream.aborted)) {
227    process.nextTick(onclosed);
228  }
229
230  const cleanup = () => {
231    callback = nop;
232    stream.removeListener('aborted', onclose);
233    stream.removeListener('complete', onfinish);
234    stream.removeListener('abort', onclose);
235    stream.removeListener('request', onrequest);
236    if (stream.req) stream.req.removeListener('finish', onfinish);
237    stream.removeListener('end', onlegacyfinish);
238    stream.removeListener('close', onlegacyfinish);
239    stream.removeListener('finish', onfinish);
240    stream.removeListener('end', onend);
241    stream.removeListener('error', onerror);
242    stream.removeListener('close', onclose);
243  };
244
245  if (options.signal && !closed) {
246    const abort = () => {
247      // Keep it because cleanup removes it.
248      const endCallback = callback;
249      cleanup();
250      endCallback.call(
251        stream,
252        new AbortError(undefined, { cause: options.signal.reason }));
253    };
254    if (options.signal.aborted) {
255      process.nextTick(abort);
256    } else {
257      addAbortListener ??= require('events').addAbortListener;
258      const disposable = addAbortListener(options.signal, abort);
259      const originalCallback = callback;
260      callback = once((...args) => {
261        disposable[SymbolDispose]();
262        originalCallback.apply(stream, args);
263      });
264    }
265  }
266
267  return cleanup;
268}
269
270function eosWeb(stream, options, callback) {
271  let isAborted = false;
272  let abort = nop;
273  if (options.signal) {
274    abort = () => {
275      isAborted = true;
276      callback.call(stream, new AbortError(undefined, { cause: options.signal.reason }));
277    };
278    if (options.signal.aborted) {
279      process.nextTick(abort);
280    } else {
281      addAbortListener ??= require('events').addAbortListener;
282      const disposable = addAbortListener(options.signal, abort);
283      const originalCallback = callback;
284      callback = once((...args) => {
285        disposable[SymbolDispose]();
286        originalCallback.apply(stream, args);
287      });
288    }
289  }
290  const resolverFn = (...args) => {
291    if (!isAborted) {
292      process.nextTick(() => callback.apply(stream, args));
293    }
294  };
295  PromisePrototypeThen(
296    stream[kIsClosedPromise].promise,
297    resolverFn,
298    resolverFn,
299  );
300  return nop;
301}
302
303function finished(stream, opts) {
304  let autoCleanup = false;
305  if (opts === null) {
306    opts = kEmptyObject;
307  }
308  if (opts?.cleanup) {
309    validateBoolean(opts.cleanup, 'cleanup');
310    autoCleanup = opts.cleanup;
311  }
312  return new Promise((resolve, reject) => {
313    const cleanup = eos(stream, opts, (err) => {
314      if (autoCleanup) {
315        cleanup();
316      }
317      if (err) {
318        reject(err);
319      } else {
320        resolve();
321      }
322    });
323  });
324}
325
326module.exports = eos;
327module.exports.finished = finished;
328