1'use strict';
2
3const common = require('../common');
4const {
5  Readable,
6} = require('stream');
7const assert = require('assert');
8const { once } = require('events');
9
10{
11  // forEach works on synchronous streams with a synchronous predicate
12  const stream = Readable.from([1, 2, 3]);
13  const result = [1, 2, 3];
14  (async () => {
15    await stream.forEach((value) => assert.strictEqual(value, result.shift()));
16  })().then(common.mustCall());
17}
18
19{
20  // forEach works an asynchronous streams
21  const stream = Readable.from([1, 2, 3]).filter(async (x) => {
22    await Promise.resolve();
23    return true;
24  });
25  const result = [1, 2, 3];
26  (async () => {
27    await stream.forEach((value) => assert.strictEqual(value, result.shift()));
28  })().then(common.mustCall());
29}
30
31{
32  // forEach works on asynchronous streams with a asynchronous forEach fn
33  const stream = Readable.from([1, 2, 3]).filter(async (x) => {
34    await Promise.resolve();
35    return true;
36  });
37  const result = [1, 2, 3];
38  (async () => {
39    await stream.forEach(async (value) => {
40      await Promise.resolve();
41      assert.strictEqual(value, result.shift());
42    });
43  })().then(common.mustCall());
44}
45
46{
47  // forEach works on an infinite stream
48  const ac = new AbortController();
49  const { signal } = ac;
50  const stream = Readable.from(async function* () {
51    while (true) yield 1;
52  }(), { signal });
53  let i = 0;
54  assert.rejects(stream.forEach(common.mustCall((x) => {
55    i++;
56    if (i === 10) ac.abort();
57    assert.strictEqual(x, 1);
58  }, 10)), { name: 'AbortError' }).then(common.mustCall());
59}
60
61{
62  // Emitting an error during `forEach`
63  const stream = Readable.from([1, 2, 3, 4, 5]);
64  assert.rejects(stream.forEach(async (x) => {
65    if (x === 3) {
66      stream.emit('error', new Error('boom'));
67    }
68  }), /boom/).then(common.mustCall());
69}
70
71{
72  // Throwing an error during `forEach` (sync)
73  const stream = Readable.from([1, 2, 3, 4, 5]);
74  assert.rejects(stream.forEach((x) => {
75    if (x === 3) {
76      throw new Error('boom');
77    }
78  }), /boom/).then(common.mustCall());
79}
80
81{
82  // Throwing an error during `forEach` (async)
83  const stream = Readable.from([1, 2, 3, 4, 5]);
84  assert.rejects(stream.forEach(async (x) => {
85    if (x === 3) {
86      return Promise.reject(new Error('boom'));
87    }
88  }), /boom/).then(common.mustCall());
89}
90
91{
92  // Concurrency + AbortSignal
93  const ac = new AbortController();
94  let calls = 0;
95  const forEachPromise =
96    Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
97      calls++;
98      await once(signal, 'abort');
99    }, { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
100  // pump
101  assert.rejects(async () => {
102    await forEachPromise;
103  }, {
104    name: 'AbortError',
105  }).then(common.mustCall());
106
107  setImmediate(() => {
108    ac.abort();
109    assert.strictEqual(calls, 2);
110  });
111}
112
113{
114  // Error cases
115  assert.rejects(async () => {
116    await Readable.from([1]).forEach(1);
117  }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
118  assert.rejects(async () => {
119    await Readable.from([1]).forEach((x) => x, {
120      concurrency: 'Foo'
121    });
122  }, /ERR_OUT_OF_RANGE/).then(common.mustCall());
123  assert.rejects(async () => {
124    await Readable.from([1]).forEach((x) => x, 1);
125  }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
126}
127{
128  // Test result is a Promise
129  const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
130  assert.strictEqual(typeof stream.then, 'function');
131}
132{
133  const stream = Readable.from([1, 2, 3, 4, 5]);
134  Object.defineProperty(stream, 'map', {
135    value: common.mustNotCall(),
136  });
137  // Check that map isn't getting called.
138  stream.forEach(() => true);
139}
140