11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ciconst common = require('../common'); 31cb0ef41Sopenharmony_ciconst stream = require('stream'); 41cb0ef41Sopenharmony_ciconst assert = require('assert'); 51cb0ef41Sopenharmony_ci 61cb0ef41Sopenharmony_ci// This is very similar to test-stream-pipe-cleanup-pause.js. 71cb0ef41Sopenharmony_ci 81cb0ef41Sopenharmony_ciconst reader = new stream.Readable(); 91cb0ef41Sopenharmony_ciconst writer1 = new stream.Writable(); 101cb0ef41Sopenharmony_ciconst writer2 = new stream.Writable(); 111cb0ef41Sopenharmony_ciconst writer3 = new stream.Writable(); 121cb0ef41Sopenharmony_ci 131cb0ef41Sopenharmony_ci// 560000 is chosen here because it is larger than the (default) highWaterMark 141cb0ef41Sopenharmony_ci// and will cause `.write()` to return false 151cb0ef41Sopenharmony_ci// See: https://github.com/nodejs/node/issues/5820 161cb0ef41Sopenharmony_ciconst buffer = Buffer.allocUnsafe(560000); 171cb0ef41Sopenharmony_ci 181cb0ef41Sopenharmony_cireader._read = () => {}; 191cb0ef41Sopenharmony_ci 201cb0ef41Sopenharmony_ciwriter1._write = common.mustCall(function(chunk, encoding, cb) { 211cb0ef41Sopenharmony_ci this.emit('chunk-received'); 221cb0ef41Sopenharmony_ci process.nextTick(cb); 231cb0ef41Sopenharmony_ci}, 1); 241cb0ef41Sopenharmony_ci 251cb0ef41Sopenharmony_ciwriter1.once('chunk-received', () => { 261cb0ef41Sopenharmony_ci assert.strictEqual( 271cb0ef41Sopenharmony_ci reader._readableState.awaitDrainWriters.size, 281cb0ef41Sopenharmony_ci 0, 291cb0ef41Sopenharmony_ci 'awaitDrain initial value should be 0, actual is ' + 301cb0ef41Sopenharmony_ci reader._readableState.awaitDrainWriters.size 311cb0ef41Sopenharmony_ci ); 321cb0ef41Sopenharmony_ci setImmediate(() => { 331cb0ef41Sopenharmony_ci // This one should *not* get through to writer1 because writer2 is not 341cb0ef41Sopenharmony_ci // "done" processing. 351cb0ef41Sopenharmony_ci reader.push(buffer); 361cb0ef41Sopenharmony_ci }); 371cb0ef41Sopenharmony_ci}); 381cb0ef41Sopenharmony_ci 391cb0ef41Sopenharmony_ci// A "slow" consumer: 401cb0ef41Sopenharmony_ciwriter2._write = common.mustCall((chunk, encoding, cb) => { 411cb0ef41Sopenharmony_ci assert.strictEqual( 421cb0ef41Sopenharmony_ci reader._readableState.awaitDrainWriters.size, 431cb0ef41Sopenharmony_ci 1, 441cb0ef41Sopenharmony_ci 'awaitDrain should be 1 after first push, actual is ' + 451cb0ef41Sopenharmony_ci reader._readableState.awaitDrainWriters.size 461cb0ef41Sopenharmony_ci ); 471cb0ef41Sopenharmony_ci // Not calling cb here to "simulate" slow stream. 481cb0ef41Sopenharmony_ci // This should be called exactly once, since the first .write() call 491cb0ef41Sopenharmony_ci // will return false. 501cb0ef41Sopenharmony_ci}, 1); 511cb0ef41Sopenharmony_ci 521cb0ef41Sopenharmony_ciwriter3._write = common.mustCall((chunk, encoding, cb) => { 531cb0ef41Sopenharmony_ci assert.strictEqual( 541cb0ef41Sopenharmony_ci reader._readableState.awaitDrainWriters.size, 551cb0ef41Sopenharmony_ci 2, 561cb0ef41Sopenharmony_ci 'awaitDrain should be 2 after second push, actual is ' + 571cb0ef41Sopenharmony_ci reader._readableState.awaitDrainWriters.size 581cb0ef41Sopenharmony_ci ); 591cb0ef41Sopenharmony_ci // Not calling cb here to "simulate" slow stream. 601cb0ef41Sopenharmony_ci // This should be called exactly once, since the first .write() call 611cb0ef41Sopenharmony_ci // will return false. 621cb0ef41Sopenharmony_ci}, 1); 631cb0ef41Sopenharmony_ci 641cb0ef41Sopenharmony_cireader.pipe(writer1); 651cb0ef41Sopenharmony_cireader.pipe(writer2); 661cb0ef41Sopenharmony_cireader.pipe(writer3); 671cb0ef41Sopenharmony_cireader.push(buffer); 68