1'use strict';
2
3const common = require('../common');
4const assert = require('assert');
5const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream');
6const { ReadableStream, WritableStream } = require('stream/web');
7const { Blob } = require('buffer');
8
9{
10  const d = Duplex.from({
11    readable: new Readable({
12      read() {
13        this.push('asd');
14        this.push(null);
15      }
16    })
17  });
18  assert.strictEqual(d.readable, true);
19  assert.strictEqual(d.writable, false);
20  d.once('readable', common.mustCall(function() {
21    assert.strictEqual(d.read().toString(), 'asd');
22  }));
23  d.once('end', common.mustCall(function() {
24    assert.strictEqual(d.readable, false);
25  }));
26}
27
28{
29  const d = Duplex.from(new Readable({
30    read() {
31      this.push('asd');
32      this.push(null);
33    }
34  }));
35  assert.strictEqual(d.readable, true);
36  assert.strictEqual(d.writable, false);
37  d.once('readable', common.mustCall(function() {
38    assert.strictEqual(d.read().toString(), 'asd');
39  }));
40  d.once('end', common.mustCall(function() {
41    assert.strictEqual(d.readable, false);
42  }));
43}
44
45{
46  let ret = '';
47  const d = Duplex.from(new Writable({
48    write(chunk, encoding, callback) {
49      ret += chunk;
50      callback();
51    }
52  }));
53  assert.strictEqual(d.readable, false);
54  assert.strictEqual(d.writable, true);
55  d.end('asd');
56  d.on('finish', common.mustCall(function() {
57    assert.strictEqual(d.writable, false);
58    assert.strictEqual(ret, 'asd');
59  }));
60}
61
62{
63  let ret = '';
64  const d = Duplex.from({
65    writable: new Writable({
66      write(chunk, encoding, callback) {
67        ret += chunk;
68        callback();
69      }
70    })
71  });
72  assert.strictEqual(d.readable, false);
73  assert.strictEqual(d.writable, true);
74  d.end('asd');
75  d.on('finish', common.mustCall(function() {
76    assert.strictEqual(d.writable, false);
77    assert.strictEqual(ret, 'asd');
78  }));
79}
80
81{
82  let ret = '';
83  const d = Duplex.from({
84    readable: new Readable({
85      read() {
86        this.push('asd');
87        this.push(null);
88      }
89    }),
90    writable: new Writable({
91      write(chunk, encoding, callback) {
92        ret += chunk;
93        callback();
94      }
95    })
96  });
97  assert.strictEqual(d.readable, true);
98  assert.strictEqual(d.writable, true);
99  d.once('readable', common.mustCall(function() {
100    assert.strictEqual(d.read().toString(), 'asd');
101  }));
102  d.once('end', common.mustCall(function() {
103    assert.strictEqual(d.readable, false);
104  }));
105  d.end('asd');
106  d.once('finish', common.mustCall(function() {
107    assert.strictEqual(d.writable, false);
108    assert.strictEqual(ret, 'asd');
109  }));
110}
111
112{
113  const d = Duplex.from(Promise.resolve('asd'));
114  assert.strictEqual(d.readable, true);
115  assert.strictEqual(d.writable, false);
116  d.once('readable', common.mustCall(function() {
117    assert.strictEqual(d.read().toString(), 'asd');
118  }));
119  d.once('end', common.mustCall(function() {
120    assert.strictEqual(d.readable, false);
121  }));
122}
123
124{
125  // https://github.com/nodejs/node/issues/40497
126  pipeline(
127    ['abc\ndef\nghi'],
128    Duplex.from(async function * (source) {
129      let rest = '';
130      for await (const chunk of source) {
131        const lines = (rest + chunk.toString()).split('\n');
132        rest = lines.pop();
133        for (const line of lines) {
134          yield line;
135        }
136      }
137      yield rest;
138    }),
139    async function * (source) { // eslint-disable-line require-yield
140      let ret = '';
141      for await (const x of source) {
142        ret += x;
143      }
144      assert.strictEqual(ret, 'abcdefghi');
145    },
146    common.mustSucceed(),
147  );
148}
149
150// Ensure that isDuplexNodeStream was called
151{
152  const duplex = new Duplex();
153  assert.strictEqual(Duplex.from(duplex), duplex);
154}
155
156// Ensure that Duplex.from works for blobs
157{
158  const blob = new Blob(['blob']);
159  const expectedByteLength = blob.size;
160  const duplex = Duplex.from(blob);
161  duplex.on('data', common.mustCall((arrayBuffer) => {
162    assert.strictEqual(arrayBuffer.byteLength, expectedByteLength);
163  }));
164}
165
166// Ensure that given a promise rejection it emits an error
167{
168  const myErrorMessage = 'myCustomError';
169  Duplex.from(Promise.reject(myErrorMessage))
170    .on('error', common.mustCall((error) => {
171      assert.strictEqual(error, myErrorMessage);
172    }));
173}
174
175// Ensure that given a promise rejection on an async function it emits an error
176{
177  const myErrorMessage = 'myCustomError';
178  async function asyncFn() {
179    return Promise.reject(myErrorMessage);
180  }
181
182  Duplex.from(asyncFn)
183    .on('error', common.mustCall((error) => {
184      assert.strictEqual(error, myErrorMessage);
185    }));
186}
187
188// Ensure that Duplex.from throws an Invalid return value when function is void
189{
190  assert.throws(() => Duplex.from(() => {}), {
191    code: 'ERR_INVALID_RETURN_VALUE',
192  });
193}
194
195// Ensure data if a sub object has a readable stream it's duplexified
196{
197  const msg = Buffer.from('hello');
198  const duplex = Duplex.from({
199    readable: Readable({
200      read() {
201        this.push(msg);
202        this.push(null);
203      }
204    })
205  }).on('data', common.mustCall((data) => {
206    assert.strictEqual(data, msg);
207  }));
208
209  assert.strictEqual(duplex.writable, false);
210}
211
212// Ensure data if a sub object has a writable stream it's duplexified
213{
214  const msg = Buffer.from('hello');
215  const duplex = Duplex.from({
216    writable: Writable({
217      write: common.mustCall((data) => {
218        assert.strictEqual(data, msg);
219      })
220    })
221  });
222
223  duplex.write(msg);
224  assert.strictEqual(duplex.readable, false);
225}
226
227// Ensure data if a sub object has a writable and readable stream it's duplexified
228{
229  const msg = Buffer.from('hello');
230
231  const duplex = Duplex.from({
232    readable: Readable({
233      read() {
234        this.push(msg);
235        this.push(null);
236      }
237    }),
238    writable: Writable({
239      write: common.mustCall((data) => {
240        assert.strictEqual(data, msg);
241      })
242    })
243  });
244
245  duplex.pipe(duplex)
246    .on('data', common.mustCall((data) => {
247      assert.strictEqual(data, msg);
248      assert.strictEqual(duplex.readable, true);
249      assert.strictEqual(duplex.writable, true);
250    }))
251    .on('end', common.mustCall());
252}
253
254// Ensure that given readable stream that throws an error it calls destroy
255{
256  const myErrorMessage = 'error!';
257  const duplex = Duplex.from(Readable({
258    read() {
259      throw new Error(myErrorMessage);
260    }
261  }));
262  duplex.on('error', common.mustCall((msg) => {
263    assert.strictEqual(msg.message, myErrorMessage);
264  }));
265}
266
267// Ensure that given writable stream that throws an error it calls destroy
268{
269  const myErrorMessage = 'error!';
270  const duplex = Duplex.from(Writable({
271    write(chunk, enc, cb) {
272      cb(myErrorMessage);
273    }
274  }));
275
276  duplex.on('error', common.mustCall((msg) => {
277    assert.strictEqual(msg, myErrorMessage);
278  }));
279
280  duplex.write('test');
281}
282
283{
284  const through = new PassThrough({ objectMode: true });
285
286  let res = '';
287  const d = Readable.from(['foo', 'bar'], { objectMode: true })
288    .pipe(Duplex.from({
289      writable: through,
290      readable: through
291    }));
292
293  d.on('data', (data) => {
294    d.pause();
295    setImmediate(() => {
296      d.resume();
297    });
298    res += data;
299  }).on('end', common.mustCall(() => {
300    assert.strictEqual(res, 'foobar');
301  })).on('close', common.mustCall());
302}
303
304function makeATestReadableStream(value) {
305  return new ReadableStream({
306    start(controller) {
307      controller.enqueue(value);
308      controller.close();
309    }
310  });
311}
312
313function makeATestWritableStream(writeFunc) {
314  return new WritableStream({
315    write(chunk) {
316      writeFunc(chunk);
317    }
318  });
319}
320
321{
322  const d = Duplex.from({
323    readable: makeATestReadableStream('foo'),
324  });
325  assert.strictEqual(d.readable, true);
326  assert.strictEqual(d.writable, false);
327
328  d.on('data', common.mustCall((data) => {
329    assert.strictEqual(data.toString(), 'foo');
330  }));
331
332  d.on('end', common.mustCall(() => {
333    assert.strictEqual(d.readable, false);
334  }));
335}
336
337{
338  const d = Duplex.from(makeATestReadableStream('foo'));
339
340  assert.strictEqual(d.readable, true);
341  assert.strictEqual(d.writable, false);
342
343  d.on('data', common.mustCall((data) => {
344    assert.strictEqual(data.toString(), 'foo');
345  }));
346
347  d.on('end', common.mustCall(() => {
348    assert.strictEqual(d.readable, false);
349  }));
350}
351
352{
353  let ret = '';
354  const d = Duplex.from({
355    writable: makeATestWritableStream((chunk) => ret += chunk),
356  });
357
358  assert.strictEqual(d.readable, false);
359  assert.strictEqual(d.writable, true);
360
361  d.end('foo');
362  d.on('finish', common.mustCall(() => {
363    assert.strictEqual(ret, 'foo');
364    assert.strictEqual(d.writable, false);
365  }));
366}
367
368{
369  let ret = '';
370  const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk));
371
372  assert.strictEqual(d.readable, false);
373  assert.strictEqual(d.writable, true);
374
375  d.end('foo');
376  d.on('finish', common.mustCall(() => {
377    assert.strictEqual(ret, 'foo');
378    assert.strictEqual(d.writable, false);
379  }));
380}
381
382{
383  let ret = '';
384  const d = Duplex.from({
385    readable: makeATestReadableStream('foo'),
386    writable: makeATestWritableStream((chunk) => ret += chunk),
387  });
388
389  d.end('bar');
390
391  d.on('data', common.mustCall((data) => {
392    assert.strictEqual(data.toString(), 'foo');
393  }));
394
395  d.on('end', common.mustCall(() => {
396    assert.strictEqual(d.readable, false);
397  }));
398
399  d.on('finish', common.mustCall(() => {
400    assert.strictEqual(ret, 'bar');
401    assert.strictEqual(d.writable, false);
402  }));
403}
404