1'use strict';
2
3const common = require('../common');
4const {
5  Readable,
6} = require('stream');
7const assert = require('assert');
8
9function sum(p, c) {
10  return p + c;
11}
12
13{
14  // Does the same thing as `(await stream.toArray()).reduce(...)`
15  (async () => {
16    const tests = [
17      [[], sum, 0],
18      [[1], sum, 0],
19      [[1, 2, 3, 4, 5], sum, 0],
20      [[...Array(100).keys()], sum, 0],
21      [['a', 'b', 'c'], sum, ''],
22      [[1, 2], sum],
23      [[1, 2, 3], (x, y) => y],
24    ];
25    for (const [values, fn, initial] of tests) {
26      const streamReduce = await Readable.from(values)
27                                         .reduce(fn, initial);
28      const arrayReduce = values.reduce(fn, initial);
29      assert.deepStrictEqual(streamReduce, arrayReduce);
30    }
31    // Does the same thing as `(await stream.toArray()).reduce(...)` with an
32    // asynchronous reducer
33    for (const [values, fn, initial] of tests) {
34      const streamReduce = await Readable.from(values)
35                                         .map(async (x) => x)
36                                         .reduce(fn, initial);
37      const arrayReduce = values.reduce(fn, initial);
38      assert.deepStrictEqual(streamReduce, arrayReduce);
39    }
40  })().then(common.mustCall());
41}
42{
43  // Works with an async reducer, with or without initial value
44  (async () => {
45    const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
46    assert.strictEqual(six, 6);
47  })().then(common.mustCall());
48  (async () => {
49    const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
50    assert.strictEqual(six, 6);
51  })().then(common.mustCall());
52}
53{
54  // Works lazily
55  assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
56    .map(common.mustCall((x) => {
57      return x;
58    }, 3)) // Two consumed and one buffered by `map` due to default concurrency
59    .reduce(async (p, c) => {
60      if (p === 1) {
61        throw new Error('boom');
62      }
63      return c;
64    }, 0)
65  , /boom/).then(common.mustCall());
66}
67
68{
69  // Support for AbortSignal
70  const ac = new AbortController();
71  assert.rejects(async () => {
72    await Readable.from([1, 2, 3]).reduce(async (p, c) => {
73      if (c === 3) {
74        await new Promise(() => {}); // Explicitly do not pass signal here
75      }
76      return Promise.resolve();
77    }, 0, { signal: ac.signal });
78  }, {
79    name: 'AbortError',
80  }).then(common.mustCall());
81  ac.abort();
82}
83
84
85{
86  // Support for AbortSignal - pre aborted
87  const stream = Readable.from([1, 2, 3]);
88  assert.rejects(async () => {
89    await stream.reduce(async (p, c) => {
90      if (c === 3) {
91        await new Promise(() => {}); // Explicitly do not pass signal here
92      }
93      return Promise.resolve();
94    }, 0, { signal: AbortSignal.abort() });
95  }, {
96    name: 'AbortError',
97  }).then(common.mustCall(() => {
98    assert.strictEqual(stream.destroyed, true);
99  }));
100}
101
102{
103  // Support for AbortSignal - deep
104  const stream = Readable.from([1, 2, 3]);
105  assert.rejects(async () => {
106    await stream.reduce(async (p, c, { signal }) => {
107      signal.addEventListener('abort', common.mustCall(), { once: true });
108      if (c === 3) {
109        await new Promise(() => {}); // Explicitly do not pass signal here
110      }
111      return Promise.resolve();
112    }, 0, { signal: AbortSignal.abort() });
113  }, {
114    name: 'AbortError',
115  }).then(common.mustCall(() => {
116    assert.strictEqual(stream.destroyed, true);
117  }));
118}
119
120{
121  // Error cases
122  assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
123  assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
124  assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/);
125  assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/);
126}
127
128{
129  // Test result is a Promise
130  const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
131  assert.ok(result instanceof Promise);
132}
133