11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst common = require('../common');
41cb0ef41Sopenharmony_ciconst {
51cb0ef41Sopenharmony_ci  Readable,
61cb0ef41Sopenharmony_ci} = require('stream');
71cb0ef41Sopenharmony_ciconst assert = require('assert');
81cb0ef41Sopenharmony_ciconst { once } = require('events');
91cb0ef41Sopenharmony_ci
101cb0ef41Sopenharmony_ci{
111cb0ef41Sopenharmony_ci  // forEach works on synchronous streams with a synchronous predicate
121cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3]);
131cb0ef41Sopenharmony_ci  const result = [1, 2, 3];
141cb0ef41Sopenharmony_ci  (async () => {
151cb0ef41Sopenharmony_ci    await stream.forEach((value) => assert.strictEqual(value, result.shift()));
161cb0ef41Sopenharmony_ci  })().then(common.mustCall());
171cb0ef41Sopenharmony_ci}
181cb0ef41Sopenharmony_ci
191cb0ef41Sopenharmony_ci{
201cb0ef41Sopenharmony_ci  // forEach works an asynchronous streams
211cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3]).filter(async (x) => {
221cb0ef41Sopenharmony_ci    await Promise.resolve();
231cb0ef41Sopenharmony_ci    return true;
241cb0ef41Sopenharmony_ci  });
251cb0ef41Sopenharmony_ci  const result = [1, 2, 3];
261cb0ef41Sopenharmony_ci  (async () => {
271cb0ef41Sopenharmony_ci    await stream.forEach((value) => assert.strictEqual(value, result.shift()));
281cb0ef41Sopenharmony_ci  })().then(common.mustCall());
291cb0ef41Sopenharmony_ci}
301cb0ef41Sopenharmony_ci
311cb0ef41Sopenharmony_ci{
321cb0ef41Sopenharmony_ci  // forEach works on asynchronous streams with a asynchronous forEach fn
331cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3]).filter(async (x) => {
341cb0ef41Sopenharmony_ci    await Promise.resolve();
351cb0ef41Sopenharmony_ci    return true;
361cb0ef41Sopenharmony_ci  });
371cb0ef41Sopenharmony_ci  const result = [1, 2, 3];
381cb0ef41Sopenharmony_ci  (async () => {
391cb0ef41Sopenharmony_ci    await stream.forEach(async (value) => {
401cb0ef41Sopenharmony_ci      await Promise.resolve();
411cb0ef41Sopenharmony_ci      assert.strictEqual(value, result.shift());
421cb0ef41Sopenharmony_ci    });
431cb0ef41Sopenharmony_ci  })().then(common.mustCall());
441cb0ef41Sopenharmony_ci}
451cb0ef41Sopenharmony_ci
461cb0ef41Sopenharmony_ci{
471cb0ef41Sopenharmony_ci  // forEach works on an infinite stream
481cb0ef41Sopenharmony_ci  const ac = new AbortController();
491cb0ef41Sopenharmony_ci  const { signal } = ac;
501cb0ef41Sopenharmony_ci  const stream = Readable.from(async function* () {
511cb0ef41Sopenharmony_ci    while (true) yield 1;
521cb0ef41Sopenharmony_ci  }(), { signal });
531cb0ef41Sopenharmony_ci  let i = 0;
541cb0ef41Sopenharmony_ci  assert.rejects(stream.forEach(common.mustCall((x) => {
551cb0ef41Sopenharmony_ci    i++;
561cb0ef41Sopenharmony_ci    if (i === 10) ac.abort();
571cb0ef41Sopenharmony_ci    assert.strictEqual(x, 1);
581cb0ef41Sopenharmony_ci  }, 10)), { name: 'AbortError' }).then(common.mustCall());
591cb0ef41Sopenharmony_ci}
601cb0ef41Sopenharmony_ci
611cb0ef41Sopenharmony_ci{
621cb0ef41Sopenharmony_ci  // Emitting an error during `forEach`
631cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]);
641cb0ef41Sopenharmony_ci  assert.rejects(stream.forEach(async (x) => {
651cb0ef41Sopenharmony_ci    if (x === 3) {
661cb0ef41Sopenharmony_ci      stream.emit('error', new Error('boom'));
671cb0ef41Sopenharmony_ci    }
681cb0ef41Sopenharmony_ci  }), /boom/).then(common.mustCall());
691cb0ef41Sopenharmony_ci}
701cb0ef41Sopenharmony_ci
711cb0ef41Sopenharmony_ci{
721cb0ef41Sopenharmony_ci  // Throwing an error during `forEach` (sync)
731cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]);
741cb0ef41Sopenharmony_ci  assert.rejects(stream.forEach((x) => {
751cb0ef41Sopenharmony_ci    if (x === 3) {
761cb0ef41Sopenharmony_ci      throw new Error('boom');
771cb0ef41Sopenharmony_ci    }
781cb0ef41Sopenharmony_ci  }), /boom/).then(common.mustCall());
791cb0ef41Sopenharmony_ci}
801cb0ef41Sopenharmony_ci
811cb0ef41Sopenharmony_ci{
821cb0ef41Sopenharmony_ci  // Throwing an error during `forEach` (async)
831cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]);
841cb0ef41Sopenharmony_ci  assert.rejects(stream.forEach(async (x) => {
851cb0ef41Sopenharmony_ci    if (x === 3) {
861cb0ef41Sopenharmony_ci      return Promise.reject(new Error('boom'));
871cb0ef41Sopenharmony_ci    }
881cb0ef41Sopenharmony_ci  }), /boom/).then(common.mustCall());
891cb0ef41Sopenharmony_ci}
901cb0ef41Sopenharmony_ci
911cb0ef41Sopenharmony_ci{
921cb0ef41Sopenharmony_ci  // Concurrency + AbortSignal
931cb0ef41Sopenharmony_ci  const ac = new AbortController();
941cb0ef41Sopenharmony_ci  let calls = 0;
951cb0ef41Sopenharmony_ci  const forEachPromise =
961cb0ef41Sopenharmony_ci    Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
971cb0ef41Sopenharmony_ci      calls++;
981cb0ef41Sopenharmony_ci      await once(signal, 'abort');
991cb0ef41Sopenharmony_ci    }, { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
1001cb0ef41Sopenharmony_ci  // pump
1011cb0ef41Sopenharmony_ci  assert.rejects(async () => {
1021cb0ef41Sopenharmony_ci    await forEachPromise;
1031cb0ef41Sopenharmony_ci  }, {
1041cb0ef41Sopenharmony_ci    name: 'AbortError',
1051cb0ef41Sopenharmony_ci  }).then(common.mustCall());
1061cb0ef41Sopenharmony_ci
1071cb0ef41Sopenharmony_ci  setImmediate(() => {
1081cb0ef41Sopenharmony_ci    ac.abort();
1091cb0ef41Sopenharmony_ci    assert.strictEqual(calls, 2);
1101cb0ef41Sopenharmony_ci  });
1111cb0ef41Sopenharmony_ci}
1121cb0ef41Sopenharmony_ci
1131cb0ef41Sopenharmony_ci{
1141cb0ef41Sopenharmony_ci  // Error cases
1151cb0ef41Sopenharmony_ci  assert.rejects(async () => {
1161cb0ef41Sopenharmony_ci    await Readable.from([1]).forEach(1);
1171cb0ef41Sopenharmony_ci  }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
1181cb0ef41Sopenharmony_ci  assert.rejects(async () => {
1191cb0ef41Sopenharmony_ci    await Readable.from([1]).forEach((x) => x, {
1201cb0ef41Sopenharmony_ci      concurrency: 'Foo'
1211cb0ef41Sopenharmony_ci    });
1221cb0ef41Sopenharmony_ci  }, /ERR_OUT_OF_RANGE/).then(common.mustCall());
1231cb0ef41Sopenharmony_ci  assert.rejects(async () => {
1241cb0ef41Sopenharmony_ci    await Readable.from([1]).forEach((x) => x, 1);
1251cb0ef41Sopenharmony_ci  }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
1261cb0ef41Sopenharmony_ci}
1271cb0ef41Sopenharmony_ci{
1281cb0ef41Sopenharmony_ci  // Test result is a Promise
1291cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
1301cb0ef41Sopenharmony_ci  assert.strictEqual(typeof stream.then, 'function');
1311cb0ef41Sopenharmony_ci}
1321cb0ef41Sopenharmony_ci{
1331cb0ef41Sopenharmony_ci  const stream = Readable.from([1, 2, 3, 4, 5]);
1341cb0ef41Sopenharmony_ci  Object.defineProperty(stream, 'map', {
1351cb0ef41Sopenharmony_ci    value: common.mustNotCall(),
1361cb0ef41Sopenharmony_ci  });
1371cb0ef41Sopenharmony_ci  // Check that map isn't getting called.
1381cb0ef41Sopenharmony_ci  stream.forEach(() => true);
1391cb0ef41Sopenharmony_ci}
140