11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ciconst common = require('../common');
31cb0ef41Sopenharmony_ciconst stream = require('stream');
41cb0ef41Sopenharmony_ciconst assert = require('assert');
51cb0ef41Sopenharmony_ci
61cb0ef41Sopenharmony_ci// A consumer stream with a very low highWaterMark, which starts in a state
71cb0ef41Sopenharmony_ci// where it buffers the chunk it receives rather than indicating that they
81cb0ef41Sopenharmony_ci// have been consumed.
91cb0ef41Sopenharmony_ciconst writable = new stream.Writable({
101cb0ef41Sopenharmony_ci  highWaterMark: 5
111cb0ef41Sopenharmony_ci});
121cb0ef41Sopenharmony_ci
131cb0ef41Sopenharmony_cilet isCurrentlyBufferingWrites = true;
141cb0ef41Sopenharmony_ciconst queue = [];
151cb0ef41Sopenharmony_ci
161cb0ef41Sopenharmony_ciwritable._write = (chunk, encoding, cb) => {
171cb0ef41Sopenharmony_ci  if (isCurrentlyBufferingWrites)
181cb0ef41Sopenharmony_ci    queue.push({ chunk, cb });
191cb0ef41Sopenharmony_ci  else
201cb0ef41Sopenharmony_ci    cb();
211cb0ef41Sopenharmony_ci};
221cb0ef41Sopenharmony_ci
231cb0ef41Sopenharmony_ciconst readable = new stream.Readable({
241cb0ef41Sopenharmony_ci  read() {}
251cb0ef41Sopenharmony_ci});
261cb0ef41Sopenharmony_ci
271cb0ef41Sopenharmony_cireadable.pipe(writable);
281cb0ef41Sopenharmony_ci
291cb0ef41Sopenharmony_cireadable.once('pause', common.mustCall(() => {
301cb0ef41Sopenharmony_ci  assert.strictEqual(
311cb0ef41Sopenharmony_ci    readable._readableState.awaitDrainWriters,
321cb0ef41Sopenharmony_ci    writable,
331cb0ef41Sopenharmony_ci    'Expected awaitDrainWriters to be a Writable but instead got ' +
341cb0ef41Sopenharmony_ci    `${readable._readableState.awaitDrainWriters}`
351cb0ef41Sopenharmony_ci  );
361cb0ef41Sopenharmony_ci  // First pause, resume manually. The next write() to writable will still
371cb0ef41Sopenharmony_ci  // return false, because chunks are still being buffered, so it will increase
381cb0ef41Sopenharmony_ci  // the awaitDrain counter again.
391cb0ef41Sopenharmony_ci
401cb0ef41Sopenharmony_ci  process.nextTick(common.mustCall(() => {
411cb0ef41Sopenharmony_ci    readable.resume();
421cb0ef41Sopenharmony_ci  }));
431cb0ef41Sopenharmony_ci
441cb0ef41Sopenharmony_ci  readable.once('pause', common.mustCall(() => {
451cb0ef41Sopenharmony_ci    assert.strictEqual(
461cb0ef41Sopenharmony_ci      readable._readableState.awaitDrainWriters,
471cb0ef41Sopenharmony_ci      writable,
481cb0ef41Sopenharmony_ci      '.resume() should not reset the awaitDrainWriters, but instead got ' +
491cb0ef41Sopenharmony_ci      `${readable._readableState.awaitDrainWriters}`
501cb0ef41Sopenharmony_ci    );
511cb0ef41Sopenharmony_ci    // Second pause, handle all chunks from now on. Once all callbacks that
521cb0ef41Sopenharmony_ci    // are currently queued up are handled, the awaitDrain drain counter should
531cb0ef41Sopenharmony_ci    // fall back to 0 and all chunks that are pending on the readable side
541cb0ef41Sopenharmony_ci    // should be flushed.
551cb0ef41Sopenharmony_ci    isCurrentlyBufferingWrites = false;
561cb0ef41Sopenharmony_ci    for (const queued of queue)
571cb0ef41Sopenharmony_ci      queued.cb();
581cb0ef41Sopenharmony_ci  }));
591cb0ef41Sopenharmony_ci}));
601cb0ef41Sopenharmony_ci
611cb0ef41Sopenharmony_cireadable.push(Buffer.alloc(100));  // Fill the writable HWM, first 'pause'.
621cb0ef41Sopenharmony_cireadable.push(Buffer.alloc(100));  // Second 'pause'.
631cb0ef41Sopenharmony_cireadable.push(Buffer.alloc(100));  // Should get through to the writable.
641cb0ef41Sopenharmony_cireadable.push(null);
651cb0ef41Sopenharmony_ci
661cb0ef41Sopenharmony_ciwritable.on('finish', common.mustCall(() => {
671cb0ef41Sopenharmony_ci  assert.strictEqual(
681cb0ef41Sopenharmony_ci    readable._readableState.awaitDrainWriters,
691cb0ef41Sopenharmony_ci    null,
701cb0ef41Sopenharmony_ci    `awaitDrainWriters should be reset to null
711cb0ef41Sopenharmony_ci    after all chunks are written but instead got
721cb0ef41Sopenharmony_ci    ${readable._readableState.awaitDrainWriters}`
731cb0ef41Sopenharmony_ci  );
741cb0ef41Sopenharmony_ci  // Everything okay, all chunks were written.
751cb0ef41Sopenharmony_ci}));
76