1'use strict'; 2 3const common = require('../common'); 4const { 5 Readable, 6} = require('stream'); 7const assert = require('assert'); 8const { once } = require('events'); 9const { setTimeout } = require('timers/promises'); 10 11function createDependentPromises(n) { 12 const promiseAndResolveArray = []; 13 14 for (let i = 0; i < n; i++) { 15 let res; 16 const promise = new Promise((resolve) => { 17 if (i === 0) { 18 res = resolve; 19 return; 20 } 21 res = () => promiseAndResolveArray[i - 1][0].then(resolve); 22 }); 23 24 promiseAndResolveArray.push([promise, res]); 25 } 26 27 return promiseAndResolveArray; 28} 29 30{ 31 // Map works on synchronous streams with a synchronous mapper 32 const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); 33 (async () => { 34 assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); 35 })().then(common.mustCall()); 36} 37 38{ 39 // Map works on synchronous streams with an asynchronous mapper 40 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 41 await Promise.resolve(); 42 return x + x; 43 }); 44 (async () => { 45 assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]); 46 })().then(common.mustCall()); 47} 48 49{ 50 // Map works on asynchronous streams with a asynchronous mapper 51 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 52 return x + x; 53 }).map((x) => x + x); 54 (async () => { 55 assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]); 56 })().then(common.mustCall()); 57} 58 59{ 60 // Map works on an infinite stream 61 const stream = Readable.from(async function* () { 62 while (true) yield 1; 63 }()).map(common.mustCall(async (x) => { 64 return x + x; 65 }, 5)); 66 (async () => { 67 let i = 1; 68 for await (const item of stream) { 69 assert.strictEqual(item, 2); 70 if (++i === 5) break; 71 } 72 })().then(common.mustCall()); 73} 74 75{ 76 // Map works on non-objectMode streams 77 const stream = new Readable({ 78 read() { 79 this.push(Uint8Array.from([1])); 80 this.push(Uint8Array.from([2])); 81 this.push(null); 82 } 83 }).map(async ([x]) => { 84 return x + x; 85 }).map((x) => x + x); 86 const result = [4, 8]; 87 (async () => { 88 for await (const item of stream) { 89 assert.strictEqual(item, result.shift()); 90 } 91 })().then(common.mustCall()); 92} 93 94{ 95 // Does not care about data events 96 const source = new Readable({ 97 read() { 98 this.push(Uint8Array.from([1])); 99 this.push(Uint8Array.from([2])); 100 this.push(null); 101 } 102 }); 103 setImmediate(() => stream.emit('data', Uint8Array.from([1]))); 104 const stream = source.map(async ([x]) => { 105 return x + x; 106 }).map((x) => x + x); 107 const result = [4, 8]; 108 (async () => { 109 for await (const item of stream) { 110 assert.strictEqual(item, result.shift()); 111 } 112 })().then(common.mustCall()); 113} 114 115{ 116 // Emitting an error during `map` 117 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 118 if (x === 3) { 119 stream.emit('error', new Error('boom')); 120 } 121 return x + x; 122 }); 123 assert.rejects( 124 stream.map((x) => x + x).toArray(), 125 /boom/, 126 ).then(common.mustCall()); 127} 128 129{ 130 // Throwing an error during `map` (sync) 131 const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => { 132 if (x === 3) { 133 throw new Error('boom'); 134 } 135 return x + x; 136 }); 137 assert.rejects( 138 stream.map((x) => x + x).toArray(), 139 /boom/, 140 ).then(common.mustCall()); 141} 142 143 144{ 145 // Throwing an error during `map` (async) 146 const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { 147 if (x === 3) { 148 throw new Error('boom'); 149 } 150 return x + x; 151 }); 152 assert.rejects( 153 stream.map((x) => x + x).toArray(), 154 /boom/, 155 ).then(common.mustCall()); 156} 157 158{ 159 // Concurrency + AbortSignal 160 const ac = new AbortController(); 161 const range = Readable.from([1, 2, 3, 4, 5]); 162 const stream = range.map(common.mustCall(async (_, { signal }) => { 163 await once(signal, 'abort'); 164 throw signal.reason; 165 }, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 }); 166 // pump 167 assert.rejects(async () => { 168 for await (const item of stream) { 169 assert.fail('should not reach here, got ' + item); 170 } 171 }, { 172 name: 'AbortError', 173 }).then(common.mustCall()); 174 175 setImmediate(() => { 176 ac.abort(); 177 }); 178} 179 180{ 181 // Concurrency result order 182 const stream = Readable.from([1, 2]).map(async (item, { signal }) => { 183 await setTimeout(10 - item, { signal }); 184 return item; 185 }, { concurrency: 2 }); 186 187 (async () => { 188 const expected = [1, 2]; 189 for await (const item of stream) { 190 assert.strictEqual(item, expected.shift()); 191 } 192 })().then(common.mustCall()); 193} 194 195 196{ 197 // highWaterMark with small concurrency 198 const finishOrder = []; 199 200 const promises = createDependentPromises(4); 201 202 const raw = Readable.from([2, 0, 1, 3]); 203 const stream = raw.map(async (item) => { 204 const [promise, resolve] = promises[item]; 205 resolve(); 206 207 await promise; 208 finishOrder.push(item); 209 return item; 210 }, { concurrency: 2 }); 211 212 (async () => { 213 await stream.toArray(); 214 215 assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]); 216 })().then(common.mustCall(), common.mustNotCall()); 217} 218 219{ 220 // highWaterMark with a lot of items and large concurrency 221 const finishOrder = []; 222 223 const promises = createDependentPromises(20); 224 225 const input = [10, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 11, 12, 13, 18, 15, 16, 17, 14, 19]; 226 const raw = Readable.from(input); 227 // Should be 228 // 10, 1, 0, 3, 4, 2 | next: 0 229 // 10, 1, 3, 4, 2, 5 | next: 1 230 // 10, 3, 4, 2, 5, 7 | next: 2 231 // 10, 3, 4, 5, 7, 8 | next: 3 232 // 10, 4, 5, 7, 8, 9 | next: 4 233 // 10, 5, 7, 8, 9, 6 | next: 5 234 // 10, 7, 8, 9, 6, 11 | next: 6 235 // 10, 7, 8, 9, 11, 12 | next: 7 236 // 10, 8, 9, 11, 12, 13 | next: 8 237 // 10, 9, 11, 12, 13, 18 | next: 9 238 // 10, 11, 12, 13, 18, 15 | next: 10 239 // 11, 12, 13, 18, 15, 16 | next: 11 240 // 12, 13, 18, 15, 16, 17 | next: 12 241 // 13, 18, 15, 16, 17, 14 | next: 13 242 // 18, 15, 16, 17, 14, 19 | next: 14 243 // 18, 15, 16, 17, 19 | next: 15 244 // 18, 16, 17, 19 | next: 16 245 // 18, 17, 19 | next: 17 246 // 18, 19 | next: 18 247 // 19 | next: 19 248 // 249 250 const stream = raw.map(async (item) => { 251 const [promise, resolve] = promises[item]; 252 resolve(); 253 254 await promise; 255 finishOrder.push(item); 256 return item; 257 }, { concurrency: 6 }); 258 259 (async () => { 260 const outputOrder = await stream.toArray(); 261 262 assert.deepStrictEqual(outputOrder, input); 263 assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); 264 })().then(common.mustCall(), common.mustNotCall()); 265} 266 267{ 268 // Custom highWaterMark with a lot of items and large concurrency 269 const finishOrder = []; 270 271 const promises = createDependentPromises(20); 272 273 const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19]; 274 const raw = Readable.from(input); 275 // Should be 276 // 11, 1, 0, 3, 4 | next: 0, buffer: [] 277 // 11, 1, 3, 4, 2 | next: 1, buffer: [0] 278 // 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1] 279 // 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2] 280 // 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3] 281 // 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4] 282 // 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5] 283 // 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full 284 // 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6] 285 // 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6] 286 // 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6] 287 // 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6] 288 // 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it 289 // 13, 18, 15, 16, 17 | next: 13, buffer: [] 290 // 18, 15, 16, 17, 14 | next: 14, buffer: [] 291 // 18, 15, 16, 17, 19 | next: 15, buffer: [14] 292 // 18, 16, 17, 19 | next: 16, buffer: [14, 15] 293 // 18, 17, 19 | next: 17, buffer: [14, 15, 16] 294 // 18, 19 | next: 18, buffer: [14, 15, 16, 17] 295 // 19 | next: 19, buffer: [] -- all items flushed 296 // 297 298 const stream = raw.map(async (item) => { 299 const [promise, resolve] = promises[item]; 300 resolve(); 301 302 await promise; 303 finishOrder.push(item); 304 return item; 305 }, { concurrency: 5, highWaterMark: 7 }); 306 307 (async () => { 308 const outputOrder = await stream.toArray(); 309 310 assert.deepStrictEqual(outputOrder, input); 311 assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]); 312 })().then(common.mustCall(), common.mustNotCall()); 313} 314 315{ 316 // Where there is a delay between the first and the next item it should not wait for filled queue 317 // before yielding to the user 318 const promises = createDependentPromises(3); 319 320 const raw = Readable.from([0, 1, 2]); 321 322 const stream = raw 323 .map(async (item) => { 324 if (item !== 0) { 325 await promises[item][0]; 326 } 327 328 return item; 329 }, { concurrency: 2 }) 330 .map((item) => { 331 // eslint-disable-next-line no-unused-vars 332 for (const [_, resolve] of promises) { 333 resolve(); 334 } 335 336 return item; 337 }); 338 339 (async () => { 340 await stream.toArray(); 341 })().then(common.mustCall(), common.mustNotCall()); 342} 343 344{ 345 // Error cases 346 assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/); 347 assert.throws(() => Readable.from([1]).map((x) => x, { 348 concurrency: 'Foo' 349 }), /ERR_OUT_OF_RANGE/); 350 assert.throws(() => Readable.from([1]).map((x) => x, { 351 concurrency: -1 352 }), /ERR_OUT_OF_RANGE/); 353 assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/); 354 assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/); 355} 356{ 357 // Test result is a Readable 358 const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x); 359 assert.strictEqual(stream.readable, true); 360} 361