1'use strict'; 2 3const common = require('../common'); 4const fixtures = require('../common/fixtures'); 5const { 6 Readable, 7} = require('stream'); 8const assert = require('assert'); 9const { setTimeout } = require('timers/promises'); 10const { createReadStream } = require('fs'); 11 12function oneTo5() { 13 return Readable.from([1, 2, 3, 4, 5]); 14} 15 16{ 17 // flatMap works on synchronous streams with a synchronous mapper 18 (async () => { 19 assert.deepStrictEqual( 20 await oneTo5().flatMap((x) => [x + x]).toArray(), 21 [2, 4, 6, 8, 10] 22 ); 23 assert.deepStrictEqual( 24 await oneTo5().flatMap(() => []).toArray(), 25 [] 26 ); 27 assert.deepStrictEqual( 28 await oneTo5().flatMap((x) => [x, x]).toArray(), 29 [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] 30 ); 31 })().then(common.mustCall()); 32} 33 34 35{ 36 // flatMap works on sync/async streams with an asynchronous mapper 37 (async () => { 38 assert.deepStrictEqual( 39 await oneTo5().flatMap(async (x) => [x, x]).toArray(), 40 [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] 41 ); 42 const asyncOneTo5 = oneTo5().map(async (x) => x); 43 assert.deepStrictEqual( 44 await asyncOneTo5.flatMap(async (x) => [x, x]).toArray(), 45 [1, 1, 2, 2, 3, 3, 4, 4, 5, 5] 46 ); 47 })().then(common.mustCall()); 48} 49{ 50 // flatMap works on a stream where mapping returns a stream 51 (async () => { 52 const result = await oneTo5().flatMap(async (x) => { 53 return Readable.from([x, x]); 54 }).toArray(); 55 assert.deepStrictEqual(result, [1, 1, 2, 2, 3, 3, 4, 4, 5, 5]); 56 })().then(common.mustCall()); 57 // flatMap works on an objectMode stream where mappign returns a stream 58 (async () => { 59 const result = await oneTo5().flatMap(() => { 60 return createReadStream(fixtures.path('x.txt')); 61 }).toArray(); 62 // The resultant stream is in object mode so toArray shouldn't flatten 63 assert.strictEqual(result.length, 5); 64 assert.deepStrictEqual( 65 Buffer.concat(result).toString(), 66 'xyz\n'.repeat(5) 67 ); 68 69 })().then(common.mustCall()); 70 71} 72 73{ 74 // Concurrency + AbortSignal 75 const ac = new AbortController(); 76 const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { 77 await setTimeout(100, { signal }); 78 }), { signal: ac.signal, concurrency: 2 }); 79 // pump 80 assert.rejects(async () => { 81 for await (const item of stream) { 82 // nope 83 console.log(item); 84 } 85 }, { 86 name: 'AbortError', 87 }).then(common.mustCall()); 88 89 queueMicrotask(() => { 90 ac.abort(); 91 }); 92} 93 94{ 95 // Already aborted AbortSignal 96 const stream = oneTo5().flatMap(common.mustNotCall(async (_, { signal }) => { 97 await setTimeout(100, { signal }); 98 }), { signal: AbortSignal.abort() }); 99 // pump 100 assert.rejects(async () => { 101 for await (const item of stream) { 102 // nope 103 console.log(item); 104 } 105 }, { 106 name: 'AbortError', 107 }).then(common.mustCall()); 108} 109 110{ 111 // Error cases 112 assert.throws(() => Readable.from([1]).flatMap(1), /ERR_INVALID_ARG_TYPE/); 113 assert.throws(() => Readable.from([1]).flatMap((x) => x, { 114 concurrency: 'Foo' 115 }), /ERR_OUT_OF_RANGE/); 116 assert.throws(() => Readable.from([1]).flatMap((x) => x, 1), /ERR_INVALID_ARG_TYPE/); 117 assert.throws(() => Readable.from([1]).flatMap((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); 118} 119{ 120 // Test result is a Readable 121 const stream = oneTo5().flatMap((x) => x); 122 assert.strictEqual(stream.readable, true); 123} 124{ 125 const stream = oneTo5(); 126 Object.defineProperty(stream, 'map', { 127 value: common.mustNotCall(), 128 }); 129 // Check that map isn't getting called. 130 stream.flatMap(() => true); 131} 132