1'use strict'; 2 3const common = require('../common'); 4const { 5 Readable, 6} = require('stream'); 7const assert = require('assert'); 8 9function sum(p, c) { 10 return p + c; 11} 12 13{ 14 // Does the same thing as `(await stream.toArray()).reduce(...)` 15 (async () => { 16 const tests = [ 17 [[], sum, 0], 18 [[1], sum, 0], 19 [[1, 2, 3, 4, 5], sum, 0], 20 [[...Array(100).keys()], sum, 0], 21 [['a', 'b', 'c'], sum, ''], 22 [[1, 2], sum], 23 [[1, 2, 3], (x, y) => y], 24 ]; 25 for (const [values, fn, initial] of tests) { 26 const streamReduce = await Readable.from(values) 27 .reduce(fn, initial); 28 const arrayReduce = values.reduce(fn, initial); 29 assert.deepStrictEqual(streamReduce, arrayReduce); 30 } 31 // Does the same thing as `(await stream.toArray()).reduce(...)` with an 32 // asynchronous reducer 33 for (const [values, fn, initial] of tests) { 34 const streamReduce = await Readable.from(values) 35 .map(async (x) => x) 36 .reduce(fn, initial); 37 const arrayReduce = values.reduce(fn, initial); 38 assert.deepStrictEqual(streamReduce, arrayReduce); 39 } 40 })().then(common.mustCall()); 41} 42{ 43 // Works with an async reducer, with or without initial value 44 (async () => { 45 const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0); 46 assert.strictEqual(six, 6); 47 })().then(common.mustCall()); 48 (async () => { 49 const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c); 50 assert.strictEqual(six, 6); 51 })().then(common.mustCall()); 52} 53{ 54 // Works lazily 55 assert.rejects(Readable.from([1, 2, 3, 4, 5, 6]) 56 .map(common.mustCall((x) => { 57 return x; 58 }, 3)) // Two consumed and one buffered by `map` due to default concurrency 59 .reduce(async (p, c) => { 60 if (p === 1) { 61 throw new Error('boom'); 62 } 63 return c; 64 }, 0) 65 , /boom/).then(common.mustCall()); 66} 67 68{ 69 // Support for AbortSignal 70 const ac = new AbortController(); 71 assert.rejects(async () => { 72 await Readable.from([1, 2, 3]).reduce(async (p, c) => { 73 if (c === 3) { 74 await new Promise(() => {}); // Explicitly do not pass signal here 75 } 76 return Promise.resolve(); 77 }, 0, { signal: ac.signal }); 78 }, { 79 name: 'AbortError', 80 }).then(common.mustCall()); 81 ac.abort(); 82} 83 84 85{ 86 // Support for AbortSignal - pre aborted 87 const stream = Readable.from([1, 2, 3]); 88 assert.rejects(async () => { 89 await stream.reduce(async (p, c) => { 90 if (c === 3) { 91 await new Promise(() => {}); // Explicitly do not pass signal here 92 } 93 return Promise.resolve(); 94 }, 0, { signal: AbortSignal.abort() }); 95 }, { 96 name: 'AbortError', 97 }).then(common.mustCall(() => { 98 assert.strictEqual(stream.destroyed, true); 99 })); 100} 101 102{ 103 // Support for AbortSignal - deep 104 const stream = Readable.from([1, 2, 3]); 105 assert.rejects(async () => { 106 await stream.reduce(async (p, c, { signal }) => { 107 signal.addEventListener('abort', common.mustCall(), { once: true }); 108 if (c === 3) { 109 await new Promise(() => {}); // Explicitly do not pass signal here 110 } 111 return Promise.resolve(); 112 }, 0, { signal: AbortSignal.abort() }); 113 }, { 114 name: 'AbortError', 115 }).then(common.mustCall(() => { 116 assert.strictEqual(stream.destroyed, true); 117 })); 118} 119 120{ 121 // Error cases 122 assert.rejects(() => Readable.from([]).reduce(1), /TypeError/); 123 assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/); 124 assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, 1), /ERR_INVALID_ARG_TYPE/); 125 assert.rejects(() => Readable.from([]).reduce((x, y) => x + y, 0, { signal: true }), /ERR_INVALID_ARG_TYPE/); 126} 127 128{ 129 // Test result is a Promise 130 const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0); 131 assert.ok(result instanceof Promise); 132} 133