1'use strict'; 2 3const common = require('../common'); 4const assert = require('assert'); 5const { Duplex, Readable, Writable, pipeline, PassThrough } = require('stream'); 6const { ReadableStream, WritableStream } = require('stream/web'); 7const { Blob } = require('buffer'); 8 9{ 10 const d = Duplex.from({ 11 readable: new Readable({ 12 read() { 13 this.push('asd'); 14 this.push(null); 15 } 16 }) 17 }); 18 assert.strictEqual(d.readable, true); 19 assert.strictEqual(d.writable, false); 20 d.once('readable', common.mustCall(function() { 21 assert.strictEqual(d.read().toString(), 'asd'); 22 })); 23 d.once('end', common.mustCall(function() { 24 assert.strictEqual(d.readable, false); 25 })); 26} 27 28{ 29 const d = Duplex.from(new Readable({ 30 read() { 31 this.push('asd'); 32 this.push(null); 33 } 34 })); 35 assert.strictEqual(d.readable, true); 36 assert.strictEqual(d.writable, false); 37 d.once('readable', common.mustCall(function() { 38 assert.strictEqual(d.read().toString(), 'asd'); 39 })); 40 d.once('end', common.mustCall(function() { 41 assert.strictEqual(d.readable, false); 42 })); 43} 44 45{ 46 let ret = ''; 47 const d = Duplex.from(new Writable({ 48 write(chunk, encoding, callback) { 49 ret += chunk; 50 callback(); 51 } 52 })); 53 assert.strictEqual(d.readable, false); 54 assert.strictEqual(d.writable, true); 55 d.end('asd'); 56 d.on('finish', common.mustCall(function() { 57 assert.strictEqual(d.writable, false); 58 assert.strictEqual(ret, 'asd'); 59 })); 60} 61 62{ 63 let ret = ''; 64 const d = Duplex.from({ 65 writable: new Writable({ 66 write(chunk, encoding, callback) { 67 ret += chunk; 68 callback(); 69 } 70 }) 71 }); 72 assert.strictEqual(d.readable, false); 73 assert.strictEqual(d.writable, true); 74 d.end('asd'); 75 d.on('finish', common.mustCall(function() { 76 assert.strictEqual(d.writable, false); 77 assert.strictEqual(ret, 'asd'); 78 })); 79} 80 81{ 82 let ret = ''; 83 const d = Duplex.from({ 84 readable: new Readable({ 85 read() { 86 this.push('asd'); 87 this.push(null); 88 } 89 }), 90 writable: new Writable({ 91 write(chunk, encoding, callback) { 92 ret += chunk; 93 callback(); 94 } 95 }) 96 }); 97 assert.strictEqual(d.readable, true); 98 assert.strictEqual(d.writable, true); 99 d.once('readable', common.mustCall(function() { 100 assert.strictEqual(d.read().toString(), 'asd'); 101 })); 102 d.once('end', common.mustCall(function() { 103 assert.strictEqual(d.readable, false); 104 })); 105 d.end('asd'); 106 d.once('finish', common.mustCall(function() { 107 assert.strictEqual(d.writable, false); 108 assert.strictEqual(ret, 'asd'); 109 })); 110} 111 112{ 113 const d = Duplex.from(Promise.resolve('asd')); 114 assert.strictEqual(d.readable, true); 115 assert.strictEqual(d.writable, false); 116 d.once('readable', common.mustCall(function() { 117 assert.strictEqual(d.read().toString(), 'asd'); 118 })); 119 d.once('end', common.mustCall(function() { 120 assert.strictEqual(d.readable, false); 121 })); 122} 123 124{ 125 // https://github.com/nodejs/node/issues/40497 126 pipeline( 127 ['abc\ndef\nghi'], 128 Duplex.from(async function * (source) { 129 let rest = ''; 130 for await (const chunk of source) { 131 const lines = (rest + chunk.toString()).split('\n'); 132 rest = lines.pop(); 133 for (const line of lines) { 134 yield line; 135 } 136 } 137 yield rest; 138 }), 139 async function * (source) { // eslint-disable-line require-yield 140 let ret = ''; 141 for await (const x of source) { 142 ret += x; 143 } 144 assert.strictEqual(ret, 'abcdefghi'); 145 }, 146 common.mustSucceed(), 147 ); 148} 149 150// Ensure that isDuplexNodeStream was called 151{ 152 const duplex = new Duplex(); 153 assert.strictEqual(Duplex.from(duplex), duplex); 154} 155 156// Ensure that Duplex.from works for blobs 157{ 158 const blob = new Blob(['blob']); 159 const expectedByteLength = blob.size; 160 const duplex = Duplex.from(blob); 161 duplex.on('data', common.mustCall((arrayBuffer) => { 162 assert.strictEqual(arrayBuffer.byteLength, expectedByteLength); 163 })); 164} 165 166// Ensure that given a promise rejection it emits an error 167{ 168 const myErrorMessage = 'myCustomError'; 169 Duplex.from(Promise.reject(myErrorMessage)) 170 .on('error', common.mustCall((error) => { 171 assert.strictEqual(error, myErrorMessage); 172 })); 173} 174 175// Ensure that given a promise rejection on an async function it emits an error 176{ 177 const myErrorMessage = 'myCustomError'; 178 async function asyncFn() { 179 return Promise.reject(myErrorMessage); 180 } 181 182 Duplex.from(asyncFn) 183 .on('error', common.mustCall((error) => { 184 assert.strictEqual(error, myErrorMessage); 185 })); 186} 187 188// Ensure that Duplex.from throws an Invalid return value when function is void 189{ 190 assert.throws(() => Duplex.from(() => {}), { 191 code: 'ERR_INVALID_RETURN_VALUE', 192 }); 193} 194 195// Ensure data if a sub object has a readable stream it's duplexified 196{ 197 const msg = Buffer.from('hello'); 198 const duplex = Duplex.from({ 199 readable: Readable({ 200 read() { 201 this.push(msg); 202 this.push(null); 203 } 204 }) 205 }).on('data', common.mustCall((data) => { 206 assert.strictEqual(data, msg); 207 })); 208 209 assert.strictEqual(duplex.writable, false); 210} 211 212// Ensure data if a sub object has a writable stream it's duplexified 213{ 214 const msg = Buffer.from('hello'); 215 const duplex = Duplex.from({ 216 writable: Writable({ 217 write: common.mustCall((data) => { 218 assert.strictEqual(data, msg); 219 }) 220 }) 221 }); 222 223 duplex.write(msg); 224 assert.strictEqual(duplex.readable, false); 225} 226 227// Ensure data if a sub object has a writable and readable stream it's duplexified 228{ 229 const msg = Buffer.from('hello'); 230 231 const duplex = Duplex.from({ 232 readable: Readable({ 233 read() { 234 this.push(msg); 235 this.push(null); 236 } 237 }), 238 writable: Writable({ 239 write: common.mustCall((data) => { 240 assert.strictEqual(data, msg); 241 }) 242 }) 243 }); 244 245 duplex.pipe(duplex) 246 .on('data', common.mustCall((data) => { 247 assert.strictEqual(data, msg); 248 assert.strictEqual(duplex.readable, true); 249 assert.strictEqual(duplex.writable, true); 250 })) 251 .on('end', common.mustCall()); 252} 253 254// Ensure that given readable stream that throws an error it calls destroy 255{ 256 const myErrorMessage = 'error!'; 257 const duplex = Duplex.from(Readable({ 258 read() { 259 throw new Error(myErrorMessage); 260 } 261 })); 262 duplex.on('error', common.mustCall((msg) => { 263 assert.strictEqual(msg.message, myErrorMessage); 264 })); 265} 266 267// Ensure that given writable stream that throws an error it calls destroy 268{ 269 const myErrorMessage = 'error!'; 270 const duplex = Duplex.from(Writable({ 271 write(chunk, enc, cb) { 272 cb(myErrorMessage); 273 } 274 })); 275 276 duplex.on('error', common.mustCall((msg) => { 277 assert.strictEqual(msg, myErrorMessage); 278 })); 279 280 duplex.write('test'); 281} 282 283{ 284 const through = new PassThrough({ objectMode: true }); 285 286 let res = ''; 287 const d = Readable.from(['foo', 'bar'], { objectMode: true }) 288 .pipe(Duplex.from({ 289 writable: through, 290 readable: through 291 })); 292 293 d.on('data', (data) => { 294 d.pause(); 295 setImmediate(() => { 296 d.resume(); 297 }); 298 res += data; 299 }).on('end', common.mustCall(() => { 300 assert.strictEqual(res, 'foobar'); 301 })).on('close', common.mustCall()); 302} 303 304function makeATestReadableStream(value) { 305 return new ReadableStream({ 306 start(controller) { 307 controller.enqueue(value); 308 controller.close(); 309 } 310 }); 311} 312 313function makeATestWritableStream(writeFunc) { 314 return new WritableStream({ 315 write(chunk) { 316 writeFunc(chunk); 317 } 318 }); 319} 320 321{ 322 const d = Duplex.from({ 323 readable: makeATestReadableStream('foo'), 324 }); 325 assert.strictEqual(d.readable, true); 326 assert.strictEqual(d.writable, false); 327 328 d.on('data', common.mustCall((data) => { 329 assert.strictEqual(data.toString(), 'foo'); 330 })); 331 332 d.on('end', common.mustCall(() => { 333 assert.strictEqual(d.readable, false); 334 })); 335} 336 337{ 338 const d = Duplex.from(makeATestReadableStream('foo')); 339 340 assert.strictEqual(d.readable, true); 341 assert.strictEqual(d.writable, false); 342 343 d.on('data', common.mustCall((data) => { 344 assert.strictEqual(data.toString(), 'foo'); 345 })); 346 347 d.on('end', common.mustCall(() => { 348 assert.strictEqual(d.readable, false); 349 })); 350} 351 352{ 353 let ret = ''; 354 const d = Duplex.from({ 355 writable: makeATestWritableStream((chunk) => ret += chunk), 356 }); 357 358 assert.strictEqual(d.readable, false); 359 assert.strictEqual(d.writable, true); 360 361 d.end('foo'); 362 d.on('finish', common.mustCall(() => { 363 assert.strictEqual(ret, 'foo'); 364 assert.strictEqual(d.writable, false); 365 })); 366} 367 368{ 369 let ret = ''; 370 const d = Duplex.from(makeATestWritableStream((chunk) => ret += chunk)); 371 372 assert.strictEqual(d.readable, false); 373 assert.strictEqual(d.writable, true); 374 375 d.end('foo'); 376 d.on('finish', common.mustCall(() => { 377 assert.strictEqual(ret, 'foo'); 378 assert.strictEqual(d.writable, false); 379 })); 380} 381 382{ 383 let ret = ''; 384 const d = Duplex.from({ 385 readable: makeATestReadableStream('foo'), 386 writable: makeATestWritableStream((chunk) => ret += chunk), 387 }); 388 389 d.end('bar'); 390 391 d.on('data', common.mustCall((data) => { 392 assert.strictEqual(data.toString(), 'foo'); 393 })); 394 395 d.on('end', common.mustCall(() => { 396 assert.strictEqual(d.readable, false); 397 })); 398 399 d.on('finish', common.mustCall(() => { 400 assert.strictEqual(ret, 'bar'); 401 assert.strictEqual(d.writable, false); 402 })); 403} 404