1'use strict';
2
3const common = require('../common');
4const stream = require('stream');
5const {
6  Readable, Writable, promises,
7} = stream;
8const {
9  finished, pipeline,
10} = require('stream/promises');
11const fs = require('fs');
12const assert = require('assert');
13const { promisify } = require('util');
14
15assert.strictEqual(promises.pipeline, pipeline);
16assert.strictEqual(promises.finished, finished);
17assert.strictEqual(pipeline, promisify(stream.pipeline));
18assert.strictEqual(finished, promisify(stream.finished));
19
20// pipeline success
21{
22  let finished = false;
23  const processed = [];
24  const expected = [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')];
25
26  const read = new Readable({
27    read() {
28    }
29  });
30
31  const write = new Writable({
32    write(data, enc, cb) {
33      processed.push(data);
34      cb();
35    }
36  });
37
38  write.on('finish', () => {
39    finished = true;
40  });
41
42  for (let i = 0; i < expected.length; i++) {
43    read.push(expected[i]);
44  }
45  read.push(null);
46
47  pipeline(read, write).then(common.mustCall((value) => {
48    assert.ok(finished);
49    assert.deepStrictEqual(processed, expected);
50  }));
51}
52
53// pipeline error
54{
55  const read = new Readable({
56    read() {
57    }
58  });
59
60  const write = new Writable({
61    write(data, enc, cb) {
62      cb();
63    }
64  });
65
66  read.push('data');
67  setImmediate(() => read.destroy());
68
69  pipeline(read, write).catch(common.mustCall((err) => {
70    assert.ok(err, 'should have an error');
71  }));
72}
73
74// finished success
75{
76  async function run() {
77    const rs = fs.createReadStream(__filename);
78
79    let ended = false;
80    rs.resume();
81    rs.on('end', () => {
82      ended = true;
83    });
84    await finished(rs);
85    assert(ended);
86  }
87
88  run().then(common.mustCall());
89}
90
91// finished error
92{
93  const rs = fs.createReadStream('file-does-not-exist');
94
95  assert.rejects(finished(rs), {
96    code: 'ENOENT'
97  }).then(common.mustCall());
98}
99
100{
101  const streamObj = new Readable();
102  assert.throws(() => {
103    // Passing cleanup option not as boolean
104    // should throw error
105    finished(streamObj, { cleanup: 2 });
106  }, { code: 'ERR_INVALID_ARG_TYPE' });
107}
108
109// Below code should not throw any errors as the
110// streamObj is `Stream` and cleanup is boolean
111{
112  const streamObj = new Readable();
113  finished(streamObj, { cleanup: true });
114}
115
116
117// Cleanup function should not be called when cleanup is set to false
118// listenerCount should be 1 after calling finish
119{
120  const streamObj = new Writable();
121  assert.strictEqual(streamObj.listenerCount('end'), 0);
122  finished(streamObj, { cleanup: false }).then(common.mustCall(() => {
123    assert.strictEqual(streamObj.listenerCount('end'), 1);
124  }));
125  streamObj.end();
126}
127
128// Cleanup function should be called when cleanup is set to true
129// listenerCount should be 0 after calling finish
130{
131  const streamObj = new Writable();
132  assert.strictEqual(streamObj.listenerCount('end'), 0);
133  finished(streamObj, { cleanup: true }).then(common.mustCall(() => {
134    assert.strictEqual(streamObj.listenerCount('end'), 0);
135  }));
136  streamObj.end();
137}
138
139// Cleanup function should not be called when cleanup has not been set
140// listenerCount should be 1 after calling finish
141{
142  const streamObj = new Writable();
143  assert.strictEqual(streamObj.listenerCount('end'), 0);
144  finished(streamObj).then(common.mustCall(() => {
145    assert.strictEqual(streamObj.listenerCount('end'), 1);
146  }));
147  streamObj.end();
148}
149