11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ciconst common = require('../common'); 31cb0ef41Sopenharmony_ciconst stream = require('stream'); 41cb0ef41Sopenharmony_ciconst assert = require('assert'); 51cb0ef41Sopenharmony_ci 61cb0ef41Sopenharmony_ci// A consumer stream with a very low highWaterMark, which starts in a state 71cb0ef41Sopenharmony_ci// where it buffers the chunk it receives rather than indicating that they 81cb0ef41Sopenharmony_ci// have been consumed. 91cb0ef41Sopenharmony_ciconst writable = new stream.Writable({ 101cb0ef41Sopenharmony_ci highWaterMark: 5 111cb0ef41Sopenharmony_ci}); 121cb0ef41Sopenharmony_ci 131cb0ef41Sopenharmony_cilet isCurrentlyBufferingWrites = true; 141cb0ef41Sopenharmony_ciconst queue = []; 151cb0ef41Sopenharmony_ci 161cb0ef41Sopenharmony_ciwritable._write = (chunk, encoding, cb) => { 171cb0ef41Sopenharmony_ci if (isCurrentlyBufferingWrites) 181cb0ef41Sopenharmony_ci queue.push({ chunk, cb }); 191cb0ef41Sopenharmony_ci else 201cb0ef41Sopenharmony_ci cb(); 211cb0ef41Sopenharmony_ci}; 221cb0ef41Sopenharmony_ci 231cb0ef41Sopenharmony_ciconst readable = new stream.Readable({ 241cb0ef41Sopenharmony_ci read() {} 251cb0ef41Sopenharmony_ci}); 261cb0ef41Sopenharmony_ci 271cb0ef41Sopenharmony_cireadable.pipe(writable); 281cb0ef41Sopenharmony_ci 291cb0ef41Sopenharmony_cireadable.once('pause', common.mustCall(() => { 301cb0ef41Sopenharmony_ci assert.strictEqual( 311cb0ef41Sopenharmony_ci readable._readableState.awaitDrainWriters, 321cb0ef41Sopenharmony_ci writable, 331cb0ef41Sopenharmony_ci 'Expected awaitDrainWriters to be a Writable but instead got ' + 341cb0ef41Sopenharmony_ci `${readable._readableState.awaitDrainWriters}` 351cb0ef41Sopenharmony_ci ); 361cb0ef41Sopenharmony_ci // First pause, resume manually. The next write() to writable will still 371cb0ef41Sopenharmony_ci // return false, because chunks are still being buffered, so it will increase 381cb0ef41Sopenharmony_ci // the awaitDrain counter again. 391cb0ef41Sopenharmony_ci 401cb0ef41Sopenharmony_ci process.nextTick(common.mustCall(() => { 411cb0ef41Sopenharmony_ci readable.resume(); 421cb0ef41Sopenharmony_ci })); 431cb0ef41Sopenharmony_ci 441cb0ef41Sopenharmony_ci readable.once('pause', common.mustCall(() => { 451cb0ef41Sopenharmony_ci assert.strictEqual( 461cb0ef41Sopenharmony_ci readable._readableState.awaitDrainWriters, 471cb0ef41Sopenharmony_ci writable, 481cb0ef41Sopenharmony_ci '.resume() should not reset the awaitDrainWriters, but instead got ' + 491cb0ef41Sopenharmony_ci `${readable._readableState.awaitDrainWriters}` 501cb0ef41Sopenharmony_ci ); 511cb0ef41Sopenharmony_ci // Second pause, handle all chunks from now on. Once all callbacks that 521cb0ef41Sopenharmony_ci // are currently queued up are handled, the awaitDrain drain counter should 531cb0ef41Sopenharmony_ci // fall back to 0 and all chunks that are pending on the readable side 541cb0ef41Sopenharmony_ci // should be flushed. 551cb0ef41Sopenharmony_ci isCurrentlyBufferingWrites = false; 561cb0ef41Sopenharmony_ci for (const queued of queue) 571cb0ef41Sopenharmony_ci queued.cb(); 581cb0ef41Sopenharmony_ci })); 591cb0ef41Sopenharmony_ci})); 601cb0ef41Sopenharmony_ci 611cb0ef41Sopenharmony_cireadable.push(Buffer.alloc(100)); // Fill the writable HWM, first 'pause'. 621cb0ef41Sopenharmony_cireadable.push(Buffer.alloc(100)); // Second 'pause'. 631cb0ef41Sopenharmony_cireadable.push(Buffer.alloc(100)); // Should get through to the writable. 641cb0ef41Sopenharmony_cireadable.push(null); 651cb0ef41Sopenharmony_ci 661cb0ef41Sopenharmony_ciwritable.on('finish', common.mustCall(() => { 671cb0ef41Sopenharmony_ci assert.strictEqual( 681cb0ef41Sopenharmony_ci readable._readableState.awaitDrainWriters, 691cb0ef41Sopenharmony_ci null, 701cb0ef41Sopenharmony_ci `awaitDrainWriters should be reset to null 711cb0ef41Sopenharmony_ci after all chunks are written but instead got 721cb0ef41Sopenharmony_ci ${readable._readableState.awaitDrainWriters}` 731cb0ef41Sopenharmony_ci ); 741cb0ef41Sopenharmony_ci // Everything okay, all chunks were written. 751cb0ef41Sopenharmony_ci})); 76