11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst common = require('../common');
41cb0ef41Sopenharmony_ciconst stream = require('stream');
51cb0ef41Sopenharmony_ciconst {
61cb0ef41Sopenharmony_ci  Readable, Writable, promises,
71cb0ef41Sopenharmony_ci} = stream;
81cb0ef41Sopenharmony_ciconst {
91cb0ef41Sopenharmony_ci  finished, pipeline,
101cb0ef41Sopenharmony_ci} = require('stream/promises');
111cb0ef41Sopenharmony_ciconst fs = require('fs');
121cb0ef41Sopenharmony_ciconst assert = require('assert');
131cb0ef41Sopenharmony_ciconst { promisify } = require('util');
141cb0ef41Sopenharmony_ci
151cb0ef41Sopenharmony_ciassert.strictEqual(promises.pipeline, pipeline);
161cb0ef41Sopenharmony_ciassert.strictEqual(promises.finished, finished);
171cb0ef41Sopenharmony_ciassert.strictEqual(pipeline, promisify(stream.pipeline));
181cb0ef41Sopenharmony_ciassert.strictEqual(finished, promisify(stream.finished));
191cb0ef41Sopenharmony_ci
201cb0ef41Sopenharmony_ci// pipeline success
211cb0ef41Sopenharmony_ci{
221cb0ef41Sopenharmony_ci  let finished = false;
231cb0ef41Sopenharmony_ci  const processed = [];
241cb0ef41Sopenharmony_ci  const expected = [Buffer.from('a'), Buffer.from('b'), Buffer.from('c')];
251cb0ef41Sopenharmony_ci
261cb0ef41Sopenharmony_ci  const read = new Readable({
271cb0ef41Sopenharmony_ci    read() {
281cb0ef41Sopenharmony_ci    }
291cb0ef41Sopenharmony_ci  });
301cb0ef41Sopenharmony_ci
311cb0ef41Sopenharmony_ci  const write = new Writable({
321cb0ef41Sopenharmony_ci    write(data, enc, cb) {
331cb0ef41Sopenharmony_ci      processed.push(data);
341cb0ef41Sopenharmony_ci      cb();
351cb0ef41Sopenharmony_ci    }
361cb0ef41Sopenharmony_ci  });
371cb0ef41Sopenharmony_ci
381cb0ef41Sopenharmony_ci  write.on('finish', () => {
391cb0ef41Sopenharmony_ci    finished = true;
401cb0ef41Sopenharmony_ci  });
411cb0ef41Sopenharmony_ci
421cb0ef41Sopenharmony_ci  for (let i = 0; i < expected.length; i++) {
431cb0ef41Sopenharmony_ci    read.push(expected[i]);
441cb0ef41Sopenharmony_ci  }
451cb0ef41Sopenharmony_ci  read.push(null);
461cb0ef41Sopenharmony_ci
471cb0ef41Sopenharmony_ci  pipeline(read, write).then(common.mustCall((value) => {
481cb0ef41Sopenharmony_ci    assert.ok(finished);
491cb0ef41Sopenharmony_ci    assert.deepStrictEqual(processed, expected);
501cb0ef41Sopenharmony_ci  }));
511cb0ef41Sopenharmony_ci}
521cb0ef41Sopenharmony_ci
531cb0ef41Sopenharmony_ci// pipeline error
541cb0ef41Sopenharmony_ci{
551cb0ef41Sopenharmony_ci  const read = new Readable({
561cb0ef41Sopenharmony_ci    read() {
571cb0ef41Sopenharmony_ci    }
581cb0ef41Sopenharmony_ci  });
591cb0ef41Sopenharmony_ci
601cb0ef41Sopenharmony_ci  const write = new Writable({
611cb0ef41Sopenharmony_ci    write(data, enc, cb) {
621cb0ef41Sopenharmony_ci      cb();
631cb0ef41Sopenharmony_ci    }
641cb0ef41Sopenharmony_ci  });
651cb0ef41Sopenharmony_ci
661cb0ef41Sopenharmony_ci  read.push('data');
671cb0ef41Sopenharmony_ci  setImmediate(() => read.destroy());
681cb0ef41Sopenharmony_ci
691cb0ef41Sopenharmony_ci  pipeline(read, write).catch(common.mustCall((err) => {
701cb0ef41Sopenharmony_ci    assert.ok(err, 'should have an error');
711cb0ef41Sopenharmony_ci  }));
721cb0ef41Sopenharmony_ci}
731cb0ef41Sopenharmony_ci
741cb0ef41Sopenharmony_ci// finished success
751cb0ef41Sopenharmony_ci{
761cb0ef41Sopenharmony_ci  async function run() {
771cb0ef41Sopenharmony_ci    const rs = fs.createReadStream(__filename);
781cb0ef41Sopenharmony_ci
791cb0ef41Sopenharmony_ci    let ended = false;
801cb0ef41Sopenharmony_ci    rs.resume();
811cb0ef41Sopenharmony_ci    rs.on('end', () => {
821cb0ef41Sopenharmony_ci      ended = true;
831cb0ef41Sopenharmony_ci    });
841cb0ef41Sopenharmony_ci    await finished(rs);
851cb0ef41Sopenharmony_ci    assert(ended);
861cb0ef41Sopenharmony_ci  }
871cb0ef41Sopenharmony_ci
881cb0ef41Sopenharmony_ci  run().then(common.mustCall());
891cb0ef41Sopenharmony_ci}
901cb0ef41Sopenharmony_ci
911cb0ef41Sopenharmony_ci// finished error
921cb0ef41Sopenharmony_ci{
931cb0ef41Sopenharmony_ci  const rs = fs.createReadStream('file-does-not-exist');
941cb0ef41Sopenharmony_ci
951cb0ef41Sopenharmony_ci  assert.rejects(finished(rs), {
961cb0ef41Sopenharmony_ci    code: 'ENOENT'
971cb0ef41Sopenharmony_ci  }).then(common.mustCall());
981cb0ef41Sopenharmony_ci}
991cb0ef41Sopenharmony_ci
1001cb0ef41Sopenharmony_ci{
1011cb0ef41Sopenharmony_ci  const streamObj = new Readable();
1021cb0ef41Sopenharmony_ci  assert.throws(() => {
1031cb0ef41Sopenharmony_ci    // Passing cleanup option not as boolean
1041cb0ef41Sopenharmony_ci    // should throw error
1051cb0ef41Sopenharmony_ci    finished(streamObj, { cleanup: 2 });
1061cb0ef41Sopenharmony_ci  }, { code: 'ERR_INVALID_ARG_TYPE' });
1071cb0ef41Sopenharmony_ci}
1081cb0ef41Sopenharmony_ci
1091cb0ef41Sopenharmony_ci// Below code should not throw any errors as the
1101cb0ef41Sopenharmony_ci// streamObj is `Stream` and cleanup is boolean
1111cb0ef41Sopenharmony_ci{
1121cb0ef41Sopenharmony_ci  const streamObj = new Readable();
1131cb0ef41Sopenharmony_ci  finished(streamObj, { cleanup: true });
1141cb0ef41Sopenharmony_ci}
1151cb0ef41Sopenharmony_ci
1161cb0ef41Sopenharmony_ci
1171cb0ef41Sopenharmony_ci// Cleanup function should not be called when cleanup is set to false
1181cb0ef41Sopenharmony_ci// listenerCount should be 1 after calling finish
1191cb0ef41Sopenharmony_ci{
1201cb0ef41Sopenharmony_ci  const streamObj = new Writable();
1211cb0ef41Sopenharmony_ci  assert.strictEqual(streamObj.listenerCount('end'), 0);
1221cb0ef41Sopenharmony_ci  finished(streamObj, { cleanup: false }).then(common.mustCall(() => {
1231cb0ef41Sopenharmony_ci    assert.strictEqual(streamObj.listenerCount('end'), 1);
1241cb0ef41Sopenharmony_ci  }));
1251cb0ef41Sopenharmony_ci  streamObj.end();
1261cb0ef41Sopenharmony_ci}
1271cb0ef41Sopenharmony_ci
1281cb0ef41Sopenharmony_ci// Cleanup function should be called when cleanup is set to true
1291cb0ef41Sopenharmony_ci// listenerCount should be 0 after calling finish
1301cb0ef41Sopenharmony_ci{
1311cb0ef41Sopenharmony_ci  const streamObj = new Writable();
1321cb0ef41Sopenharmony_ci  assert.strictEqual(streamObj.listenerCount('end'), 0);
1331cb0ef41Sopenharmony_ci  finished(streamObj, { cleanup: true }).then(common.mustCall(() => {
1341cb0ef41Sopenharmony_ci    assert.strictEqual(streamObj.listenerCount('end'), 0);
1351cb0ef41Sopenharmony_ci  }));
1361cb0ef41Sopenharmony_ci  streamObj.end();
1371cb0ef41Sopenharmony_ci}
1381cb0ef41Sopenharmony_ci
1391cb0ef41Sopenharmony_ci// Cleanup function should not be called when cleanup has not been set
1401cb0ef41Sopenharmony_ci// listenerCount should be 1 after calling finish
1411cb0ef41Sopenharmony_ci{
1421cb0ef41Sopenharmony_ci  const streamObj = new Writable();
1431cb0ef41Sopenharmony_ci  assert.strictEqual(streamObj.listenerCount('end'), 0);
1441cb0ef41Sopenharmony_ci  finished(streamObj).then(common.mustCall(() => {
1451cb0ef41Sopenharmony_ci    assert.strictEqual(streamObj.listenerCount('end'), 1);
1461cb0ef41Sopenharmony_ci  }));
1471cb0ef41Sopenharmony_ci  streamObj.end();
1481cb0ef41Sopenharmony_ci}
149