1'use strict';
2
3const common = require('../common');
4const {
5  Readable,
6} = require('stream');
7const assert = require('assert');
8const { once } = require('events');
9const { setTimeout } = require('timers/promises');
10
11function createDependentPromises(n) {
12  const promiseAndResolveArray = [];
13
14  for (let i = 0; i < n; i++) {
15    let res;
16    const promise = new Promise((resolve) => {
17      if (i === 0) {
18        res = resolve;
19        return;
20      }
21      res = () => promiseAndResolveArray[i - 1][0].then(resolve);
22    });
23
24    promiseAndResolveArray.push([promise, res]);
25  }
26
27  return promiseAndResolveArray;
28}
29
30{
31  // Map works on synchronous streams with a synchronous mapper
32  const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
33  (async () => {
34    assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
35  })().then(common.mustCall());
36}
37
38{
39  // Map works on synchronous streams with an asynchronous mapper
40  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
41    await Promise.resolve();
42    return x + x;
43  });
44  (async () => {
45    assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
46  })().then(common.mustCall());
47}
48
49{
50  // Map works on asynchronous streams with a asynchronous mapper
51  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
52    return x + x;
53  }).map((x) => x + x);
54  (async () => {
55    assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]);
56  })().then(common.mustCall());
57}
58
59{
60  // Map works on an infinite stream
61  const stream = Readable.from(async function* () {
62    while (true) yield 1;
63  }()).map(common.mustCall(async (x) => {
64    return x + x;
65  }, 5));
66  (async () => {
67    let i = 1;
68    for await (const item of stream) {
69      assert.strictEqual(item, 2);
70      if (++i === 5) break;
71    }
72  })().then(common.mustCall());
73}
74
75{
76  // Map works on non-objectMode streams
77  const stream = new Readable({
78    read() {
79      this.push(Uint8Array.from([1]));
80      this.push(Uint8Array.from([2]));
81      this.push(null);
82    }
83  }).map(async ([x]) => {
84    return x + x;
85  }).map((x) => x + x);
86  const result = [4, 8];
87  (async () => {
88    for await (const item of stream) {
89      assert.strictEqual(item, result.shift());
90    }
91  })().then(common.mustCall());
92}
93
94{
95  // Does not care about data events
96  const source = new Readable({
97    read() {
98      this.push(Uint8Array.from([1]));
99      this.push(Uint8Array.from([2]));
100      this.push(null);
101    }
102  });
103  setImmediate(() => stream.emit('data', Uint8Array.from([1])));
104  const stream = source.map(async ([x]) => {
105    return x + x;
106  }).map((x) => x + x);
107  const result = [4, 8];
108  (async () => {
109    for await (const item of stream) {
110      assert.strictEqual(item, result.shift());
111    }
112  })().then(common.mustCall());
113}
114
115{
116  // Emitting an error during `map`
117  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
118    if (x === 3) {
119      stream.emit('error', new Error('boom'));
120    }
121    return x + x;
122  });
123  assert.rejects(
124    stream.map((x) => x + x).toArray(),
125    /boom/,
126  ).then(common.mustCall());
127}
128
129{
130  // Throwing an error during `map` (sync)
131  const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => {
132    if (x === 3) {
133      throw new Error('boom');
134    }
135    return x + x;
136  });
137  assert.rejects(
138    stream.map((x) => x + x).toArray(),
139    /boom/,
140  ).then(common.mustCall());
141}
142
143
144{
145  // Throwing an error during `map` (async)
146  const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
147    if (x === 3) {
148      throw new Error('boom');
149    }
150    return x + x;
151  });
152  assert.rejects(
153    stream.map((x) => x + x).toArray(),
154    /boom/,
155  ).then(common.mustCall());
156}
157
158{
159  // Concurrency + AbortSignal
160  const ac = new AbortController();
161  const range = Readable.from([1, 2, 3, 4, 5]);
162  const stream = range.map(common.mustCall(async (_, { signal }) => {
163    await once(signal, 'abort');
164    throw signal.reason;
165  }, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
166  // pump
167  assert.rejects(async () => {
168    for await (const item of stream) {
169      assert.fail('should not reach here, got ' + item);
170    }
171  }, {
172    name: 'AbortError',
173  }).then(common.mustCall());
174
175  setImmediate(() => {
176    ac.abort();
177  });
178}
179
180{
181  // Concurrency result order
182  const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
183    await setTimeout(10 - item, { signal });
184    return item;
185  }, { concurrency: 2 });
186
187  (async () => {
188    const expected = [1, 2];
189    for await (const item of stream) {
190      assert.strictEqual(item, expected.shift());
191    }
192  })().then(common.mustCall());
193}
194
195
196{
197  // highWaterMark with small concurrency
198  const finishOrder = [];
199
200  const promises = createDependentPromises(4);
201
202  const raw = Readable.from([2, 0, 1, 3]);
203  const stream = raw.map(async (item) => {
204    const [promise, resolve] = promises[item];
205    resolve();
206
207    await promise;
208    finishOrder.push(item);
209    return item;
210  }, { concurrency: 2 });
211
212  (async () => {
213    await stream.toArray();
214
215    assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
216  })().then(common.mustCall(), common.mustNotCall());
217}
218
219{
220  // highWaterMark with a lot of items and large concurrency
221  const finishOrder = [];
222
223  const promises = createDependentPromises(20);
224
225  const input = [10, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 11, 12, 13, 18, 15, 16, 17, 14, 19];
226  const raw = Readable.from(input);
227  // Should be
228  // 10, 1, 0, 3, 4, 2 | next: 0
229  // 10, 1, 3, 4, 2, 5 | next: 1
230  // 10, 3, 4, 2, 5, 7 | next: 2
231  // 10, 3, 4, 5, 7, 8 | next: 3
232  // 10, 4, 5, 7, 8, 9 | next: 4
233  // 10, 5, 7, 8, 9, 6 | next: 5
234  // 10, 7, 8, 9, 6, 11 | next: 6
235  // 10, 7, 8, 9, 11, 12 | next: 7
236  // 10, 8, 9, 11, 12, 13 | next: 8
237  // 10, 9, 11, 12, 13, 18 | next: 9
238  // 10, 11, 12, 13, 18, 15 | next: 10
239  // 11, 12, 13, 18, 15, 16 | next: 11
240  // 12, 13, 18, 15, 16, 17 | next: 12
241  // 13, 18, 15, 16, 17, 14 | next: 13
242  // 18, 15, 16, 17, 14, 19 | next: 14
243  // 18, 15, 16, 17, 19 | next: 15
244  // 18, 16, 17, 19 | next: 16
245  // 18, 17, 19 | next: 17
246  // 18, 19 | next: 18
247  // 19 | next: 19
248  //
249
250  const stream = raw.map(async (item) => {
251    const [promise, resolve] = promises[item];
252    resolve();
253
254    await promise;
255    finishOrder.push(item);
256    return item;
257  }, { concurrency: 6 });
258
259  (async () => {
260    const outputOrder = await stream.toArray();
261
262    assert.deepStrictEqual(outputOrder, input);
263    assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
264  })().then(common.mustCall(), common.mustNotCall());
265}
266
267{
268  // Custom highWaterMark with a lot of items and large concurrency
269  const finishOrder = [];
270
271  const promises = createDependentPromises(20);
272
273  const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19];
274  const raw = Readable.from(input);
275  // Should be
276  // 11, 1, 0, 3, 4     | next: 0, buffer: []
277  // 11, 1, 3, 4, 2     | next: 1, buffer: [0]
278  // 11, 3, 4, 2, 5     | next: 2, buffer: [0, 1]
279  // 11, 3, 4, 5, 7     | next: 3, buffer: [0, 1, 2]
280  // 11, 4, 5, 7, 8     | next: 4, buffer: [0, 1, 2, 3]
281  // 11, 5, 7, 8, 9     | next: 5, buffer: [0, 1, 2, 3, 4]
282  // 11, 7, 8, 9, 6     | next: 6, buffer: [0, 1, 2, 3, 4, 5]
283  // 11, 7, 8, 9, 10    | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
284  // 11, 8, 9, 10, 12   | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
285  // 11, 9, 10, 12, 13  | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
286  // 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
287  // 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
288  // 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
289  // 13, 18, 15, 16, 17 | next: 13, buffer: []
290  // 18, 15, 16, 17, 14 | next: 14, buffer: []
291  // 18, 15, 16, 17, 19 | next: 15, buffer: [14]
292  // 18, 16, 17, 19     | next: 16, buffer: [14, 15]
293  // 18, 17, 19         | next: 17, buffer: [14, 15, 16]
294  // 18, 19             | next: 18, buffer: [14, 15, 16, 17]
295  // 19                 | next: 19, buffer: [] -- all items flushed
296  //
297
298  const stream = raw.map(async (item) => {
299    const [promise, resolve] = promises[item];
300    resolve();
301
302    await promise;
303    finishOrder.push(item);
304    return item;
305  }, { concurrency: 5, highWaterMark: 7 });
306
307  (async () => {
308    const outputOrder = await stream.toArray();
309
310    assert.deepStrictEqual(outputOrder, input);
311    assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
312  })().then(common.mustCall(), common.mustNotCall());
313}
314
315{
316  // Where there is a delay between the first and the next item it should not wait for filled queue
317  // before yielding to the user
318  const promises = createDependentPromises(3);
319
320  const raw = Readable.from([0, 1, 2]);
321
322  const stream = raw
323      .map(async (item) => {
324        if (item !== 0) {
325          await promises[item][0];
326        }
327
328        return item;
329      }, { concurrency: 2 })
330      .map((item) => {
331        // eslint-disable-next-line no-unused-vars
332        for (const [_, resolve] of promises) {
333          resolve();
334        }
335
336        return item;
337      });
338
339  (async () => {
340    await stream.toArray();
341  })().then(common.mustCall(), common.mustNotCall());
342}
343
344{
345  // Error cases
346  assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
347  assert.throws(() => Readable.from([1]).map((x) => x, {
348    concurrency: 'Foo'
349  }), /ERR_OUT_OF_RANGE/);
350  assert.throws(() => Readable.from([1]).map((x) => x, {
351    concurrency: -1
352  }), /ERR_OUT_OF_RANGE/);
353  assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
354  assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
355}
356{
357  // Test result is a Readable
358  const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
359  assert.strictEqual(stream.readable, true);
360}
361