1'use strict'; 2 3const common = require('../common'); 4const { 5 Readable, Transform, 6} = require('stream'); 7const assert = require('assert'); 8 9{ 10 // with async generator 11 const stream = Readable.from(['a', 'b', 'c', 'd']).compose(async function *(stream) { 12 let str = ''; 13 for await (const chunk of stream) { 14 str += chunk; 15 16 if (str.length === 2) { 17 yield str; 18 str = ''; 19 } 20 } 21 }); 22 const result = ['ab', 'cd']; 23 (async () => { 24 for await (const item of stream) { 25 assert.strictEqual(item, result.shift()); 26 } 27 })().then(common.mustCall()); 28} 29 30{ 31 // With Transformer 32 const stream = Readable.from(['a', 'b', 'c', 'd']).compose(new Transform({ 33 objectMode: true, 34 transform: common.mustCall((chunk, encoding, callback) => { 35 callback(null, chunk); 36 }, 4) 37 })); 38 const result = ['a', 'b', 'c', 'd']; 39 (async () => { 40 for await (const item of stream) { 41 assert.strictEqual(item, result.shift()); 42 } 43 })().then(common.mustCall()); 44} 45 46{ 47 // Throwing an error during `compose` (before waiting for data) 48 const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield 49 50 throw new Error('boom'); 51 }); 52 53 assert.rejects(async () => { 54 for await (const item of stream) { 55 assert.fail('should not reach here, got ' + item); 56 } 57 }, /boom/).then(common.mustCall()); 58} 59 60{ 61 // Throwing an error during `compose` (when waiting for data) 62 const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { 63 for await (const chunk of stream) { 64 if (chunk === 3) { 65 throw new Error('boom'); 66 } 67 yield chunk; 68 } 69 }); 70 71 assert.rejects( 72 stream.toArray(), 73 /boom/, 74 ).then(common.mustCall()); 75} 76 77{ 78 // Throwing an error during `compose` (after finishing all readable data) 79 const stream = Readable.from([1, 2, 3, 4, 5]).compose(async function *(stream) { // eslint-disable-line require-yield 80 81 // eslint-disable-next-line no-unused-vars,no-empty 82 for await (const chunk of stream) { 83 } 84 85 throw new Error('boom'); 86 }); 87 assert.rejects( 88 stream.toArray(), 89 /boom/, 90 ).then(common.mustCall()); 91} 92 93{ 94 // AbortSignal 95 const ac = new AbortController(); 96 const stream = Readable.from([1, 2, 3, 4, 5]) 97 .compose(async function *(source) { 98 // Should not reach here 99 for await (const chunk of source) { 100 yield chunk; 101 } 102 }, { signal: ac.signal }); 103 104 ac.abort(); 105 106 assert.rejects(async () => { 107 for await (const item of stream) { 108 assert.fail('should not reach here, got ' + item); 109 } 110 }, { 111 name: 'AbortError', 112 }).then(common.mustCall()); 113} 114 115{ 116 assert.throws( 117 () => Readable.from(['a']).compose(Readable.from(['b'])), 118 { code: 'ERR_INVALID_ARG_VALUE' } 119 ); 120} 121 122{ 123 assert.throws( 124 () => Readable.from(['a']).compose(), 125 { code: 'ERR_INVALID_ARG_TYPE' } 126 ); 127} 128