11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ciconst common = require('../common'); 31cb0ef41Sopenharmony_ciconst assert = require('assert'); 41cb0ef41Sopenharmony_ciconst Readable = require('stream').Readable; 51cb0ef41Sopenharmony_ci 61cb0ef41Sopenharmony_ciconst readable = new Readable({ 71cb0ef41Sopenharmony_ci read: () => {} 81cb0ef41Sopenharmony_ci}); 91cb0ef41Sopenharmony_ci 101cb0ef41Sopenharmony_ci// Initialized to false. 111cb0ef41Sopenharmony_ciassert.strictEqual(readable._readableState.needReadable, false); 121cb0ef41Sopenharmony_ci 131cb0ef41Sopenharmony_cireadable.on('readable', common.mustCall(() => { 141cb0ef41Sopenharmony_ci // When the readable event fires, needReadable is reset. 151cb0ef41Sopenharmony_ci assert.strictEqual(readable._readableState.needReadable, false); 161cb0ef41Sopenharmony_ci readable.read(); 171cb0ef41Sopenharmony_ci})); 181cb0ef41Sopenharmony_ci 191cb0ef41Sopenharmony_ci// If a readable listener is attached, then a readable event is needed. 201cb0ef41Sopenharmony_ciassert.strictEqual(readable._readableState.needReadable, true); 211cb0ef41Sopenharmony_ci 221cb0ef41Sopenharmony_cireadable.push('foo'); 231cb0ef41Sopenharmony_cireadable.push(null); 241cb0ef41Sopenharmony_ci 251cb0ef41Sopenharmony_cireadable.on('end', common.mustCall(() => { 261cb0ef41Sopenharmony_ci // No need to emit readable anymore when the stream ends. 271cb0ef41Sopenharmony_ci assert.strictEqual(readable._readableState.needReadable, false); 281cb0ef41Sopenharmony_ci})); 291cb0ef41Sopenharmony_ci 301cb0ef41Sopenharmony_ciconst asyncReadable = new Readable({ 311cb0ef41Sopenharmony_ci read: () => {} 321cb0ef41Sopenharmony_ci}); 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_ciasyncReadable.on('readable', common.mustCall(() => { 351cb0ef41Sopenharmony_ci if (asyncReadable.read() !== null) { 361cb0ef41Sopenharmony_ci // After each read(), the buffer is empty. 371cb0ef41Sopenharmony_ci // If the stream doesn't end now, 381cb0ef41Sopenharmony_ci // then we need to notify the reader on future changes. 391cb0ef41Sopenharmony_ci assert.strictEqual(asyncReadable._readableState.needReadable, true); 401cb0ef41Sopenharmony_ci } 411cb0ef41Sopenharmony_ci}, 2)); 421cb0ef41Sopenharmony_ci 431cb0ef41Sopenharmony_ciprocess.nextTick(common.mustCall(() => { 441cb0ef41Sopenharmony_ci asyncReadable.push('foooo'); 451cb0ef41Sopenharmony_ci})); 461cb0ef41Sopenharmony_ciprocess.nextTick(common.mustCall(() => { 471cb0ef41Sopenharmony_ci asyncReadable.push('bar'); 481cb0ef41Sopenharmony_ci})); 491cb0ef41Sopenharmony_cisetImmediate(common.mustCall(() => { 501cb0ef41Sopenharmony_ci asyncReadable.push(null); 511cb0ef41Sopenharmony_ci assert.strictEqual(asyncReadable._readableState.needReadable, false); 521cb0ef41Sopenharmony_ci})); 531cb0ef41Sopenharmony_ci 541cb0ef41Sopenharmony_ciconst flowing = new Readable({ 551cb0ef41Sopenharmony_ci read: () => {} 561cb0ef41Sopenharmony_ci}); 571cb0ef41Sopenharmony_ci 581cb0ef41Sopenharmony_ci// Notice this must be above the on('data') call. 591cb0ef41Sopenharmony_ciflowing.push('foooo'); 601cb0ef41Sopenharmony_ciflowing.push('bar'); 611cb0ef41Sopenharmony_ciflowing.push('quo'); 621cb0ef41Sopenharmony_ciprocess.nextTick(common.mustCall(() => { 631cb0ef41Sopenharmony_ci flowing.push(null); 641cb0ef41Sopenharmony_ci})); 651cb0ef41Sopenharmony_ci 661cb0ef41Sopenharmony_ci// When the buffer already has enough data, and the stream is 671cb0ef41Sopenharmony_ci// in flowing mode, there is no need for the readable event. 681cb0ef41Sopenharmony_ciflowing.on('data', common.mustCall(function(data) { 691cb0ef41Sopenharmony_ci assert.strictEqual(flowing._readableState.needReadable, false); 701cb0ef41Sopenharmony_ci}, 3)); 711cb0ef41Sopenharmony_ci 721cb0ef41Sopenharmony_ciconst slowProducer = new Readable({ 731cb0ef41Sopenharmony_ci read: () => {} 741cb0ef41Sopenharmony_ci}); 751cb0ef41Sopenharmony_ci 761cb0ef41Sopenharmony_cislowProducer.on('readable', common.mustCall(() => { 771cb0ef41Sopenharmony_ci const chunk = slowProducer.read(8); 781cb0ef41Sopenharmony_ci const state = slowProducer._readableState; 791cb0ef41Sopenharmony_ci if (chunk === null) { 801cb0ef41Sopenharmony_ci // The buffer doesn't have enough data, and the stream is not need, 811cb0ef41Sopenharmony_ci // we need to notify the reader when data arrives. 821cb0ef41Sopenharmony_ci assert.strictEqual(state.needReadable, true); 831cb0ef41Sopenharmony_ci } else { 841cb0ef41Sopenharmony_ci assert.strictEqual(state.needReadable, false); 851cb0ef41Sopenharmony_ci } 861cb0ef41Sopenharmony_ci}, 4)); 871cb0ef41Sopenharmony_ci 881cb0ef41Sopenharmony_ciprocess.nextTick(common.mustCall(() => { 891cb0ef41Sopenharmony_ci slowProducer.push('foo'); 901cb0ef41Sopenharmony_ci process.nextTick(common.mustCall(() => { 911cb0ef41Sopenharmony_ci slowProducer.push('foo'); 921cb0ef41Sopenharmony_ci process.nextTick(common.mustCall(() => { 931cb0ef41Sopenharmony_ci slowProducer.push('foo'); 941cb0ef41Sopenharmony_ci process.nextTick(common.mustCall(() => { 951cb0ef41Sopenharmony_ci slowProducer.push(null); 961cb0ef41Sopenharmony_ci })); 971cb0ef41Sopenharmony_ci })); 981cb0ef41Sopenharmony_ci })); 991cb0ef41Sopenharmony_ci})); 100