1'use strict'; 2 3const common = require('../common'); 4const { 5 Readable, 6} = require('stream'); 7const assert = require('assert'); 8const { once } = require('events'); 9 10{ 11 // forEach works on synchronous streams with a synchronous predicate 12 const stream = Readable.from([1, 2, 3]); 13 const result = [1, 2, 3]; 14 (async () => { 15 await stream.forEach((value) => assert.strictEqual(value, result.shift())); 16 })().then(common.mustCall()); 17} 18 19{ 20 // forEach works an asynchronous streams 21 const stream = Readable.from([1, 2, 3]).filter(async (x) => { 22 await Promise.resolve(); 23 return true; 24 }); 25 const result = [1, 2, 3]; 26 (async () => { 27 await stream.forEach((value) => assert.strictEqual(value, result.shift())); 28 })().then(common.mustCall()); 29} 30 31{ 32 // forEach works on asynchronous streams with a asynchronous forEach fn 33 const stream = Readable.from([1, 2, 3]).filter(async (x) => { 34 await Promise.resolve(); 35 return true; 36 }); 37 const result = [1, 2, 3]; 38 (async () => { 39 await stream.forEach(async (value) => { 40 await Promise.resolve(); 41 assert.strictEqual(value, result.shift()); 42 }); 43 })().then(common.mustCall()); 44} 45 46{ 47 // forEach works on an infinite stream 48 const ac = new AbortController(); 49 const { signal } = ac; 50 const stream = Readable.from(async function* () { 51 while (true) yield 1; 52 }(), { signal }); 53 let i = 0; 54 assert.rejects(stream.forEach(common.mustCall((x) => { 55 i++; 56 if (i === 10) ac.abort(); 57 assert.strictEqual(x, 1); 58 }, 10)), { name: 'AbortError' }).then(common.mustCall()); 59} 60 61{ 62 // Emitting an error during `forEach` 63 const stream = Readable.from([1, 2, 3, 4, 5]); 64 assert.rejects(stream.forEach(async (x) => { 65 if (x === 3) { 66 stream.emit('error', new Error('boom')); 67 } 68 }), /boom/).then(common.mustCall()); 69} 70 71{ 72 // Throwing an error during `forEach` (sync) 73 const stream = Readable.from([1, 2, 3, 4, 5]); 74 assert.rejects(stream.forEach((x) => { 75 if (x === 3) { 76 throw new Error('boom'); 77 } 78 }), /boom/).then(common.mustCall()); 79} 80 81{ 82 // Throwing an error during `forEach` (async) 83 const stream = Readable.from([1, 2, 3, 4, 5]); 84 assert.rejects(stream.forEach(async (x) => { 85 if (x === 3) { 86 return Promise.reject(new Error('boom')); 87 } 88 }), /boom/).then(common.mustCall()); 89} 90 91{ 92 // Concurrency + AbortSignal 93 const ac = new AbortController(); 94 let calls = 0; 95 const forEachPromise = 96 Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => { 97 calls++; 98 await once(signal, 'abort'); 99 }, { signal: ac.signal, concurrency: 2, highWaterMark: 0 }); 100 // pump 101 assert.rejects(async () => { 102 await forEachPromise; 103 }, { 104 name: 'AbortError', 105 }).then(common.mustCall()); 106 107 setImmediate(() => { 108 ac.abort(); 109 assert.strictEqual(calls, 2); 110 }); 111} 112 113{ 114 // Error cases 115 assert.rejects(async () => { 116 await Readable.from([1]).forEach(1); 117 }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); 118 assert.rejects(async () => { 119 await Readable.from([1]).forEach((x) => x, { 120 concurrency: 'Foo' 121 }); 122 }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); 123 assert.rejects(async () => { 124 await Readable.from([1]).forEach((x) => x, 1); 125 }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); 126} 127{ 128 // Test result is a Promise 129 const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true); 130 assert.strictEqual(typeof stream.then, 'function'); 131} 132{ 133 const stream = Readable.from([1, 2, 3, 4, 5]); 134 Object.defineProperty(stream, 'map', { 135 value: common.mustNotCall(), 136 }); 137 // Check that map isn't getting called. 138 stream.forEach(() => true); 139} 140