11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst common = require('../common');
41cb0ef41Sopenharmony_ciconst {
51cb0ef41Sopenharmony_ci  Readable, Transform,
61cb0ef41Sopenharmony_ci} = require('stream');
71cb0ef41Sopenharmony_ciconst assert = require('assert');
81cb0ef41Sopenharmony_ci
91cb0ef41Sopenharmony_ci{
101cb0ef41Sopenharmony_ci  // with async generator
111cb0ef41Sopenharmony_ci  const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) {
121cb0ef41Sopenharmony_ci    let str = '';
131cb0ef41Sopenharmony_ci    for await (const chunk of stream) {
141cb0ef41Sopenharmony_ci      str += chunk;
151cb0ef41Sopenharmony_ci
161cb0ef41Sopenharmony_ci      if (str.length === 2) {
171cb0ef41Sopenharmony_ci        yield str;
181cb0ef41Sopenharmony_ci        str = '';
191cb0ef41Sopenharmony_ci      }
201cb0ef41Sopenharmony_ci    }
211cb0ef41Sopenharmony_ci  });
221cb0ef41Sopenharmony_ci  const result = ['ab', 'cd'];
231cb0ef41Sopenharmony_ci  (async () => {
241cb0ef41Sopenharmony_ci    for await (const item of stream) {
251cb0ef41Sopenharmony_ci      assert.strictEqual(item, result.shift());
261cb0ef41Sopenharmony_ci    }
271cb0ef41Sopenharmony_ci  })().then(common.mustCall());
281cb0ef41Sopenharmony_ci}
291cb0ef41Sopenharmony_ci
301cb0ef41Sopenharmony_ci{
311cb0ef41Sopenharmony_ci  // With Transformer
321cb0ef41Sopenharmony_ci  const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({
331cb0ef41Sopenharmony_ci    objectMode: true,
341cb0ef41Sopenharmony_ci    transform: common.mustCall((chunk, encoding, callback) => {
351cb0ef41Sopenharmony_ci      callback(null, chunk);
361cb0ef41Sopenharmony_ci    }, 4)
371cb0ef41Sopenharmony_ci  }));
381cb0ef41Sopenharmony_ci  const result = ['a', 'b', 'c', 'd'];
391cb0ef41Sopenharmony_ci  (async () => {
401cb0ef41Sopenharmony_ci    for await (const item of stream) {
411cb0ef41Sopenharmony_ci      assert.strictEqual(item, result.shift());
421cb0ef41Sopenharmony_ci    }
431cb0ef41Sopenharmony_ci  })().then(common.mustCall());
441cb0ef41Sopenharmony_ci}
451cb0ef41Sopenharmony_ci
461cb0ef41Sopenharmony_ci{
471cb0ef41Sopenharmony_ci  // Throwing an error during `compose` (before waiting for data)
481cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
491cb0ef41Sopenharmony_ci
501cb0ef41Sopenharmony_ci    throw new Error('boom');
511cb0ef41Sopenharmony_ci  });
521cb0ef41Sopenharmony_ci
531cb0ef41Sopenharmony_ci  assert.rejects(async () => {
541cb0ef41Sopenharmony_ci    for await (const item of stream) {
551cb0ef41Sopenharmony_ci      assert.fail('should not reach here, got ' + item);
561cb0ef41Sopenharmony_ci    }
571cb0ef41Sopenharmony_ci  }, /boom/).then(common.mustCall());
581cb0ef41Sopenharmony_ci}
591cb0ef41Sopenharmony_ci
601cb0ef41Sopenharmony_ci{
611cb0ef41Sopenharmony_ci  // Throwing an error during `compose` (when waiting for data)
621cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) {
631cb0ef41Sopenharmony_ci    for await (const chunk of stream) {
641cb0ef41Sopenharmony_ci      if (chunk === 3) {
651cb0ef41Sopenharmony_ci        throw new Error('boom');
661cb0ef41Sopenharmony_ci      }
671cb0ef41Sopenharmony_ci      yield chunk;
681cb0ef41Sopenharmony_ci    }
691cb0ef41Sopenharmony_ci  });
701cb0ef41Sopenharmony_ci
711cb0ef41Sopenharmony_ci  assert.rejects(
721cb0ef41Sopenharmony_ci    stream.toArray(),
731cb0ef41Sopenharmony_ci    /boom/,
741cb0ef41Sopenharmony_ci  ).then(common.mustCall());
751cb0ef41Sopenharmony_ci}
761cb0ef41Sopenharmony_ci
771cb0ef41Sopenharmony_ci{
781cb0ef41Sopenharmony_ci  // Throwing an error during `compose` (after finishing all readable data)
791cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield
801cb0ef41Sopenharmony_ci
811cb0ef41Sopenharmony_ci    // eslint-disable-next-line no-unused-vars,no-empty
821cb0ef41Sopenharmony_ci    for await (const chunk of stream) {
831cb0ef41Sopenharmony_ci    }
841cb0ef41Sopenharmony_ci
851cb0ef41Sopenharmony_ci    throw new Error('boom');
861cb0ef41Sopenharmony_ci  });
871cb0ef41Sopenharmony_ci  assert.rejects(
881cb0ef41Sopenharmony_ci    stream.toArray(),
891cb0ef41Sopenharmony_ci    /boom/,
901cb0ef41Sopenharmony_ci  ).then(common.mustCall());
911cb0ef41Sopenharmony_ci}
921cb0ef41Sopenharmony_ci
931cb0ef41Sopenharmony_ci{
941cb0ef41Sopenharmony_ci  // AbortSignal
951cb0ef41Sopenharmony_ci  const ac = new AbortController();
961cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5])
971cb0ef41Sopenharmony_ci    .compose(async function *(source) {
981cb0ef41Sopenharmony_ci      // Should not reach here
991cb0ef41Sopenharmony_ci      for await (const chunk of source) {
1001cb0ef41Sopenharmony_ci        yield chunk;
1011cb0ef41Sopenharmony_ci      }
1021cb0ef41Sopenharmony_ci    }, { signal: ac.signal });
1031cb0ef41Sopenharmony_ci
1041cb0ef41Sopenharmony_ci  ac.abort();
1051cb0ef41Sopenharmony_ci
1061cb0ef41Sopenharmony_ci  assert.rejects(async () => {
1071cb0ef41Sopenharmony_ci    for await (const item of stream) {
1081cb0ef41Sopenharmony_ci      assert.fail('should not reach here, got ' + item);
1091cb0ef41Sopenharmony_ci    }
1101cb0ef41Sopenharmony_ci  }, {
1111cb0ef41Sopenharmony_ci    name: 'AbortError',
1121cb0ef41Sopenharmony_ci  }).then(common.mustCall());
1131cb0ef41Sopenharmony_ci}
1141cb0ef41Sopenharmony_ci
1151cb0ef41Sopenharmony_ci{
1161cb0ef41Sopenharmony_ci  assert.throws(
1171cb0ef41Sopenharmony_ci    () => Readable.from(['a']).compose(Readable.from(['b'])),
1181cb0ef41Sopenharmony_ci    { code: 'ERR_INVALID_ARG_VALUE' }
1191cb0ef41Sopenharmony_ci  );
1201cb0ef41Sopenharmony_ci}
1211cb0ef41Sopenharmony_ci
1221cb0ef41Sopenharmony_ci{
1231cb0ef41Sopenharmony_ci  assert.throws(
1241cb0ef41Sopenharmony_ci    () => Readable.from(['a']).compose(),
1251cb0ef41Sopenharmony_ci    { code: 'ERR_INVALID_ARG_TYPE' }
1261cb0ef41Sopenharmony_ci  );
1271cb0ef41Sopenharmony_ci}
128