1'use strict';
2
3const common = require('../common');
4const fixtures = require('../common/fixtures');
5const {
6  Readable,
7} = require('stream');
8const assert = require('assert');
9const { setTimeout } = require('timers/promises');
10const { createReadStream } = require('fs');
11
12function oneTo5() {
13  return Readable.from([1, 2, 3, 4, 5]);
14}
15
16{
17  // flatMap works on synchronous streams with a synchronous mapper
18  (async () => {
19    assert.deepStrictEqual(
20      await oneTo5().flatMap((x) => [x + x]).toArray(),
21      [2, 4, 6, 8, 10]
22    );
23    assert.deepStrictEqual(
24      await oneTo5().flatMap(() => []).toArray(),
25      []
26    );
27    assert.deepStrictEqual(
28      await oneTo5().flatMap((x) => [x, x]).toArray(),
29      [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
30    );
31  })().then(common.mustCall());
32}
33
34
35{
36  // flatMap works on sync/async streams with an asynchronous mapper
37  (async () => {
38    assert.deepStrictEqual(
39      await oneTo5().flatMap(async (x) => [x, x]).toArray(),
40      [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
41    );
42    const asyncOneTo5 = oneTo5().map(async (x) => x);
43    assert.deepStrictEqual(
44      await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(),
45      [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]
46    );
47  })().then(common.mustCall());
48}
49{
50  // flatMap works on a stream where mapping returns a stream
51  (async () => {
52    const result = await oneTo5().flatMap(async (x) => {
53      return Readable.from([x, x]);
54    }).toArray();
55    assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]);
56  })().then(common.mustCall());
57  // flatMap works on an objectMode stream where mappign returns a stream
58  (async () => {
59    const result = await oneTo5().flatMap(() => {
60      return createReadStream(fixtures.path('x.txt'));
61    }).toArray();
62    // The resultant stream is in object mode so toArray shouldn't flatten
63    assert.strictEqual(result.length, 5);
64    assert.deepStrictEqual(
65      Buffer.concat(result).toString(),
66      'xyz\n'.repeat(5)
67    );
68
69  })().then(common.mustCall());
70
71}
72
73{
74  // Concurrency + AbortSignal
75  const ac = new AbortController();
76  const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
77    await setTimeout(100, { signal });
78  }), { signal: ac.signal, concurrency: 2 });
79  // pump
80  assert.rejects(async () => {
81    for await (const item of stream) {
82      // nope
83      console.log(item);
84    }
85  }, {
86    name: 'AbortError',
87  }).then(common.mustCall());
88
89  queueMicrotask(() => {
90    ac.abort();
91  });
92}
93
94{
95  // Already aborted AbortSignal
96  const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => {
97    await setTimeout(100, { signal });
98  }), { signal: AbortSignal.abort() });
99  // pump
100  assert.rejects(async () => {
101    for await (const item of stream) {
102      // nope
103      console.log(item);
104    }
105  }, {
106    name: 'AbortError',
107  }).then(common.mustCall());
108}
109
110{
111  // Error cases
112  assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/);
113  assert.throws(() => Readable.from([1]).flatMap((x) => x, {
114    concurrency: 'Foo'
115  }), /ERR_OUT_OF_RANGE/);
116  assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
117  assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
118}
119{
120  // Test result is a Readable
121  const stream = oneTo5().flatMap((x) => x);
122  assert.strictEqual(stream.readable, true);
123}
124{
125  const stream = oneTo5();
126  Object.defineProperty(stream, 'map', {
127    value: common.mustNotCall(),
128  });
129  // Check that map isn't getting called.
130  stream.flatMap(() => true);
131}
132