1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const common = require('../common'); 25const { Readable: R, Writable: W } = require('stream'); 26const assert = require('assert'); 27 28const EE = require('events').EventEmitter; 29 30class TestReader extends R { 31 constructor(n) { 32 super(); 33 this._buffer = Buffer.alloc(n || 100, 'x'); 34 this._pos = 0; 35 this._bufs = 10; 36 } 37 38 _read(n) { 39 const max = this._buffer.length - this._pos; 40 n = Math.max(n, 0); 41 const toRead = Math.min(n, max); 42 if (toRead === 0) { 43 // Simulate the read buffer filling up with some more bytes some time 44 // in the future. 45 setTimeout(() => { 46 this._pos = 0; 47 this._bufs -= 1; 48 if (this._bufs <= 0) { 49 // read them all! 50 if (!this.ended) 51 this.push(null); 52 } else { 53 // now we have more. 54 // kinda cheating by calling _read, but whatever, 55 // it's just fake anyway. 56 this._read(n); 57 } 58 }, 10); 59 return; 60 } 61 62 const ret = this._buffer.slice(this._pos, this._pos + toRead); 63 this._pos += toRead; 64 this.push(ret); 65 } 66} 67 68class TestWriter extends EE { 69 constructor() { 70 super(); 71 this.received = []; 72 this.flush = false; 73 } 74 75 write(c) { 76 this.received.push(c.toString()); 77 this.emit('write', c); 78 return true; 79 } 80 81 end(c) { 82 if (c) this.write(c); 83 this.emit('end', this.received); 84 } 85} 86 87{ 88 // Test basic functionality 89 const r = new TestReader(20); 90 91 const reads = []; 92 const expect = [ 'x', 93 'xx', 94 'xxx', 95 'xxxx', 96 'xxxxx', 97 'xxxxxxxxx', 98 'xxxxxxxxxx', 99 'xxxxxxxxxxxx', 100 'xxxxxxxxxxxxx', 101 'xxxxxxxxxxxxxxx', 102 'xxxxxxxxxxxxxxxxx', 103 'xxxxxxxxxxxxxxxxxxx', 104 'xxxxxxxxxxxxxxxxxxxxx', 105 'xxxxxxxxxxxxxxxxxxxxxxx', 106 'xxxxxxxxxxxxxxxxxxxxxxxxx', 107 'xxxxxxxxxxxxxxxxxxxxx' ]; 108 109 r.on('end', common.mustCall(function() { 110 assert.deepStrictEqual(reads, expect); 111 })); 112 113 let readSize = 1; 114 function flow() { 115 let res; 116 while (null !== (res = r.read(readSize++))) { 117 reads.push(res.toString()); 118 } 119 r.once('readable', flow); 120 } 121 122 flow(); 123} 124 125{ 126 // Verify pipe 127 const r = new TestReader(5); 128 129 const expect = [ 'xxxxx', 130 'xxxxx', 131 'xxxxx', 132 'xxxxx', 133 'xxxxx', 134 'xxxxx', 135 'xxxxx', 136 'xxxxx', 137 'xxxxx', 138 'xxxxx' ]; 139 140 const w = new TestWriter(); 141 142 w.on('end', common.mustCall(function(received) { 143 assert.deepStrictEqual(received, expect); 144 })); 145 146 r.pipe(w); 147} 148 149 150[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { 151 // Verify unpipe 152 const r = new TestReader(5); 153 154 // Unpipe after 3 writes, then write to another stream instead. 155 let expect = [ 'xxxxx', 156 'xxxxx', 157 'xxxxx', 158 'xxxxx', 159 'xxxxx', 160 'xxxxx', 161 'xxxxx', 162 'xxxxx', 163 'xxxxx', 164 'xxxxx' ]; 165 expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; 166 167 const w = [ new TestWriter(), new TestWriter() ]; 168 169 let writes = SPLIT; 170 w[0].on('write', function() { 171 if (--writes === 0) { 172 r.unpipe(); 173 assert.deepStrictEqual(r._readableState.pipes, []); 174 w[0].end(); 175 r.pipe(w[1]); 176 assert.deepStrictEqual(r._readableState.pipes, [w[1]]); 177 } 178 }); 179 180 let ended = 0; 181 182 w[0].on('end', common.mustCall(function(results) { 183 ended++; 184 assert.strictEqual(ended, 1); 185 assert.deepStrictEqual(results, expect[0]); 186 })); 187 188 w[1].on('end', common.mustCall(function(results) { 189 ended++; 190 assert.strictEqual(ended, 2); 191 assert.deepStrictEqual(results, expect[1]); 192 })); 193 194 r.pipe(w[0]); 195}); 196 197 198{ 199 // Verify both writers get the same data when piping to destinations 200 const r = new TestReader(5); 201 const w = [ new TestWriter(), new TestWriter() ]; 202 203 const expect = [ 'xxxxx', 204 'xxxxx', 205 'xxxxx', 206 'xxxxx', 207 'xxxxx', 208 'xxxxx', 209 'xxxxx', 210 'xxxxx', 211 'xxxxx', 212 'xxxxx' ]; 213 214 w[0].on('end', common.mustCall(function(received) { 215 assert.deepStrictEqual(received, expect); 216 })); 217 w[1].on('end', common.mustCall(function(received) { 218 assert.deepStrictEqual(received, expect); 219 })); 220 221 r.pipe(w[0]); 222 r.pipe(w[1]); 223} 224 225 226[1, 2, 3, 4, 5, 6, 7, 8, 9].forEach(function(SPLIT) { 227 // Verify multi-unpipe 228 const r = new TestReader(5); 229 230 // Unpipe after 3 writes, then write to another stream instead. 231 let expect = [ 'xxxxx', 232 'xxxxx', 233 'xxxxx', 234 'xxxxx', 235 'xxxxx', 236 'xxxxx', 237 'xxxxx', 238 'xxxxx', 239 'xxxxx', 240 'xxxxx' ]; 241 expect = [ expect.slice(0, SPLIT), expect.slice(SPLIT) ]; 242 243 const w = [ new TestWriter(), new TestWriter(), new TestWriter() ]; 244 245 let writes = SPLIT; 246 w[0].on('write', function() { 247 if (--writes === 0) { 248 r.unpipe(); 249 w[0].end(); 250 r.pipe(w[1]); 251 } 252 }); 253 254 let ended = 0; 255 256 w[0].on('end', common.mustCall(function(results) { 257 ended++; 258 assert.strictEqual(ended, 1); 259 assert.deepStrictEqual(results, expect[0]); 260 })); 261 262 w[1].on('end', common.mustCall(function(results) { 263 ended++; 264 assert.strictEqual(ended, 2); 265 assert.deepStrictEqual(results, expect[1]); 266 })); 267 268 r.pipe(w[0]); 269 r.pipe(w[2]); 270}); 271 272{ 273 // Verify that back pressure is respected 274 const r = new R({ objectMode: true }); 275 r._read = common.mustNotCall(); 276 let counter = 0; 277 r.push(['one']); 278 r.push(['two']); 279 r.push(['three']); 280 r.push(['four']); 281 r.push(null); 282 283 const w1 = new R(); 284 w1.write = function(chunk) { 285 assert.strictEqual(chunk[0], 'one'); 286 w1.emit('close'); 287 process.nextTick(function() { 288 r.pipe(w2); 289 r.pipe(w3); 290 }); 291 }; 292 w1.end = common.mustNotCall(); 293 294 r.pipe(w1); 295 296 const expected = ['two', 'two', 'three', 'three', 'four', 'four']; 297 298 const w2 = new R(); 299 w2.write = function(chunk) { 300 assert.strictEqual(chunk[0], expected.shift()); 301 assert.strictEqual(counter, 0); 302 303 counter++; 304 305 if (chunk[0] === 'four') { 306 return true; 307 } 308 309 setTimeout(function() { 310 counter--; 311 w2.emit('drain'); 312 }, 10); 313 314 return false; 315 }; 316 w2.end = common.mustCall(); 317 318 const w3 = new R(); 319 w3.write = function(chunk) { 320 assert.strictEqual(chunk[0], expected.shift()); 321 assert.strictEqual(counter, 1); 322 323 counter++; 324 325 if (chunk[0] === 'four') { 326 return true; 327 } 328 329 setTimeout(function() { 330 counter--; 331 w3.emit('drain'); 332 }, 50); 333 334 return false; 335 }; 336 w3.end = common.mustCall(function() { 337 assert.strictEqual(counter, 2); 338 assert.strictEqual(expected.length, 0); 339 }); 340} 341 342{ 343 // Verify read(0) behavior for ended streams 344 const r = new R(); 345 let written = false; 346 let ended = false; 347 r._read = common.mustNotCall(); 348 349 r.push(Buffer.from('foo')); 350 r.push(null); 351 352 const v = r.read(0); 353 354 assert.strictEqual(v, null); 355 356 const w = new R(); 357 w.write = function(buffer) { 358 written = true; 359 assert.strictEqual(ended, false); 360 assert.strictEqual(buffer.toString(), 'foo'); 361 }; 362 363 w.end = common.mustCall(function() { 364 ended = true; 365 assert.strictEqual(written, true); 366 }); 367 368 r.pipe(w); 369} 370 371{ 372 // Verify synchronous _read ending 373 const r = new R(); 374 let called = false; 375 r._read = function(n) { 376 r.push(null); 377 }; 378 379 r.once('end', function() { 380 // Verify that this is called before the next tick 381 called = true; 382 }); 383 384 r.read(); 385 386 process.nextTick(function() { 387 assert.strictEqual(called, true); 388 }); 389} 390 391{ 392 // Verify that adding readable listeners trigger data flow 393 const r = new R({ highWaterMark: 5 }); 394 let onReadable = false; 395 let readCalled = 0; 396 397 r._read = function(n) { 398 if (readCalled++ === 2) 399 r.push(null); 400 else 401 r.push(Buffer.from('asdf')); 402 }; 403 404 r.on('readable', function() { 405 onReadable = true; 406 r.read(); 407 }); 408 409 r.on('end', common.mustCall(function() { 410 assert.strictEqual(readCalled, 3); 411 assert.ok(onReadable); 412 })); 413} 414 415{ 416 // Verify that streams are chainable 417 const r = new R(); 418 r._read = common.mustCall(); 419 const r2 = r.setEncoding('utf8').pause().resume().pause(); 420 assert.strictEqual(r, r2); 421} 422 423{ 424 // Verify readableEncoding property 425 assert(Object.hasOwn(R.prototype, 'readableEncoding')); 426 427 const r = new R({ encoding: 'utf8' }); 428 assert.strictEqual(r.readableEncoding, 'utf8'); 429} 430 431{ 432 // Verify readableObjectMode property 433 assert(Object.hasOwn(R.prototype, 'readableObjectMode')); 434 435 const r = new R({ objectMode: true }); 436 assert.strictEqual(r.readableObjectMode, true); 437} 438 439{ 440 // Verify writableObjectMode property 441 assert(Object.hasOwn(W.prototype, 'writableObjectMode')); 442 443 const w = new W({ objectMode: true }); 444 assert.strictEqual(w.writableObjectMode, true); 445} 446