1'use strict';
2
3const { pipeline } = require('internal/streams/pipeline');
4const Duplex = require('internal/streams/duplex');
5const { destroyer } = require('internal/streams/destroy');
6const {
7  isNodeStream,
8  isReadable,
9  isWritable,
10  isWebStream,
11  isTransformStream,
12  isWritableStream,
13  isReadableStream,
14} = require('internal/streams/utils');
15const {
16  AbortError,
17  codes: {
18    ERR_INVALID_ARG_VALUE,
19    ERR_MISSING_ARGS,
20  },
21} = require('internal/errors');
22const eos = require('internal/streams/end-of-stream');
23
24module.exports = function compose(...streams) {
25  if (streams.length === 0) {
26    throw new ERR_MISSING_ARGS('streams');
27  }
28
29  if (streams.length === 1) {
30    return Duplex.from(streams[0]);
31  }
32
33  const orgStreams = [...streams];
34
35  if (typeof streams[0] === 'function') {
36    streams[0] = Duplex.from(streams[0]);
37  }
38
39  if (typeof streams[streams.length - 1] === 'function') {
40    const idx = streams.length - 1;
41    streams[idx] = Duplex.from(streams[idx]);
42  }
43
44  for (let n = 0; n < streams.length; ++n) {
45    if (!isNodeStream(streams[n]) && !isWebStream(streams[n])) {
46      // TODO(ronag): Add checks for non streams.
47      continue;
48    }
49    if (
50      n < streams.length - 1 &&
51      !(
52        isReadable(streams[n]) ||
53        isReadableStream(streams[n]) ||
54        isTransformStream(streams[n])
55      )
56    ) {
57      throw new ERR_INVALID_ARG_VALUE(
58        `streams[${n}]`,
59        orgStreams[n],
60        'must be readable',
61      );
62    }
63    if (
64      n > 0 &&
65      !(
66        isWritable(streams[n]) ||
67        isWritableStream(streams[n]) ||
68        isTransformStream(streams[n])
69      )
70    ) {
71      throw new ERR_INVALID_ARG_VALUE(
72        `streams[${n}]`,
73        orgStreams[n],
74        'must be writable',
75      );
76    }
77  }
78
79  let ondrain;
80  let onfinish;
81  let onreadable;
82  let onclose;
83  let d;
84
85  function onfinished(err) {
86    const cb = onclose;
87    onclose = null;
88
89    if (cb) {
90      cb(err);
91    } else if (err) {
92      d.destroy(err);
93    } else if (!readable && !writable) {
94      d.destroy();
95    }
96  }
97
98  const head = streams[0];
99  const tail = pipeline(streams, onfinished);
100
101  const writable = !!(
102    isWritable(head) ||
103    isWritableStream(head) ||
104    isTransformStream(head)
105  );
106  const readable = !!(
107    isReadable(tail) ||
108    isReadableStream(tail) ||
109    isTransformStream(tail)
110  );
111
112  // TODO(ronag): Avoid double buffering.
113  // Implement Writable/Readable/Duplex traits.
114  // See, https://github.com/nodejs/node/pull/33515.
115  d = new Duplex({
116    // TODO (ronag): highWaterMark?
117    writableObjectMode: !!head?.writableObjectMode,
118    readableObjectMode: !!tail?.readableObjectMode,
119    writable,
120    readable,
121  });
122
123  if (writable) {
124    if (isNodeStream(head)) {
125      d._write = function(chunk, encoding, callback) {
126        if (head.write(chunk, encoding)) {
127          callback();
128        } else {
129          ondrain = callback;
130        }
131      };
132
133      d._final = function(callback) {
134        head.end();
135        onfinish = callback;
136      };
137
138      head.on('drain', function() {
139        if (ondrain) {
140          const cb = ondrain;
141          ondrain = null;
142          cb();
143        }
144      });
145    } else if (isWebStream(head)) {
146      const writable = isTransformStream(head) ? head.writable : head;
147      const writer = writable.getWriter();
148
149      d._write = async function(chunk, encoding, callback) {
150        try {
151          await writer.ready;
152          writer.write(chunk).catch(() => {});
153          callback();
154        } catch (err) {
155          callback(err);
156        }
157      };
158
159      d._final = async function(callback) {
160        try {
161          await writer.ready;
162          writer.close().catch(() => {});
163          onfinish = callback;
164        } catch (err) {
165          callback(err);
166        }
167      };
168    }
169
170    const toRead = isTransformStream(tail) ? tail.readable : tail;
171
172    eos(toRead, () => {
173      if (onfinish) {
174        const cb = onfinish;
175        onfinish = null;
176        cb();
177      }
178    });
179  }
180
181  if (readable) {
182    if (isNodeStream(tail)) {
183      tail.on('readable', function() {
184        if (onreadable) {
185          const cb = onreadable;
186          onreadable = null;
187          cb();
188        }
189      });
190
191      tail.on('end', function() {
192        d.push(null);
193      });
194
195      d._read = function() {
196        while (true) {
197          const buf = tail.read();
198          if (buf === null) {
199            onreadable = d._read;
200            return;
201          }
202
203          if (!d.push(buf)) {
204            return;
205          }
206        }
207      };
208    } else if (isWebStream(tail)) {
209      const readable = isTransformStream(tail) ? tail.readable : tail;
210      const reader = readable.getReader();
211      d._read = async function() {
212        while (true) {
213          try {
214            const { value, done } = await reader.read();
215
216            if (!d.push(value)) {
217              return;
218            }
219
220            if (done) {
221              d.push(null);
222              return;
223            }
224          } catch {
225            return;
226          }
227        }
228      };
229    }
230  }
231
232  d._destroy = function(err, callback) {
233    if (!err && onclose !== null) {
234      err = new AbortError();
235    }
236
237    onreadable = null;
238    ondrain = null;
239    onfinish = null;
240
241    if (onclose === null) {
242      callback(err);
243    } else {
244      onclose = callback;
245      if (isNodeStream(tail)) {
246        destroyer(tail, err);
247      }
248    }
249  };
250
251  return d;
252};
253