11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ciconst common = require('../common'); 31cb0ef41Sopenharmony_ciconst assert = require('assert'); 41cb0ef41Sopenharmony_ciconst { Readable, Writable, PassThrough } = require('stream'); 51cb0ef41Sopenharmony_ci 61cb0ef41Sopenharmony_ci{ 71cb0ef41Sopenharmony_ci let ticks = 17; 81cb0ef41Sopenharmony_ci 91cb0ef41Sopenharmony_ci const rs = new Readable({ 101cb0ef41Sopenharmony_ci objectMode: true, 111cb0ef41Sopenharmony_ci read: () => { 121cb0ef41Sopenharmony_ci if (ticks-- > 0) 131cb0ef41Sopenharmony_ci return process.nextTick(() => rs.push({})); 141cb0ef41Sopenharmony_ci rs.push({}); 151cb0ef41Sopenharmony_ci rs.push(null); 161cb0ef41Sopenharmony_ci } 171cb0ef41Sopenharmony_ci }); 181cb0ef41Sopenharmony_ci 191cb0ef41Sopenharmony_ci const ws = new Writable({ 201cb0ef41Sopenharmony_ci highWaterMark: 0, 211cb0ef41Sopenharmony_ci objectMode: true, 221cb0ef41Sopenharmony_ci write: (data, end, cb) => setImmediate(cb) 231cb0ef41Sopenharmony_ci }); 241cb0ef41Sopenharmony_ci 251cb0ef41Sopenharmony_ci rs.on('end', common.mustCall()); 261cb0ef41Sopenharmony_ci ws.on('finish', common.mustCall()); 271cb0ef41Sopenharmony_ci rs.pipe(ws); 281cb0ef41Sopenharmony_ci} 291cb0ef41Sopenharmony_ci 301cb0ef41Sopenharmony_ci{ 311cb0ef41Sopenharmony_ci let missing = 8; 321cb0ef41Sopenharmony_ci 331cb0ef41Sopenharmony_ci const rs = new Readable({ 341cb0ef41Sopenharmony_ci objectMode: true, 351cb0ef41Sopenharmony_ci read: () => { 361cb0ef41Sopenharmony_ci if (missing--) rs.push({}); 371cb0ef41Sopenharmony_ci else rs.push(null); 381cb0ef41Sopenharmony_ci } 391cb0ef41Sopenharmony_ci }); 401cb0ef41Sopenharmony_ci 411cb0ef41Sopenharmony_ci const pt = rs 421cb0ef41Sopenharmony_ci .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })) 431cb0ef41Sopenharmony_ci .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); 441cb0ef41Sopenharmony_ci 451cb0ef41Sopenharmony_ci pt.on('end', () => { 461cb0ef41Sopenharmony_ci wrapper.push(null); 471cb0ef41Sopenharmony_ci }); 481cb0ef41Sopenharmony_ci 491cb0ef41Sopenharmony_ci const wrapper = new Readable({ 501cb0ef41Sopenharmony_ci objectMode: true, 511cb0ef41Sopenharmony_ci read: () => { 521cb0ef41Sopenharmony_ci process.nextTick(() => { 531cb0ef41Sopenharmony_ci let data = pt.read(); 541cb0ef41Sopenharmony_ci if (data === null) { 551cb0ef41Sopenharmony_ci pt.once('readable', () => { 561cb0ef41Sopenharmony_ci data = pt.read(); 571cb0ef41Sopenharmony_ci if (data !== null) wrapper.push(data); 581cb0ef41Sopenharmony_ci }); 591cb0ef41Sopenharmony_ci } else { 601cb0ef41Sopenharmony_ci wrapper.push(data); 611cb0ef41Sopenharmony_ci } 621cb0ef41Sopenharmony_ci }); 631cb0ef41Sopenharmony_ci } 641cb0ef41Sopenharmony_ci }); 651cb0ef41Sopenharmony_ci 661cb0ef41Sopenharmony_ci wrapper.resume(); 671cb0ef41Sopenharmony_ci wrapper.on('end', common.mustCall()); 681cb0ef41Sopenharmony_ci} 691cb0ef41Sopenharmony_ci 701cb0ef41Sopenharmony_ci{ 711cb0ef41Sopenharmony_ci // Only register drain if there is backpressure. 721cb0ef41Sopenharmony_ci const rs = new Readable({ read() {} }); 731cb0ef41Sopenharmony_ci 741cb0ef41Sopenharmony_ci const pt = rs 751cb0ef41Sopenharmony_ci .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); 761cb0ef41Sopenharmony_ci assert.strictEqual(pt.listenerCount('drain'), 0); 771cb0ef41Sopenharmony_ci pt.on('finish', () => { 781cb0ef41Sopenharmony_ci assert.strictEqual(pt.listenerCount('drain'), 0); 791cb0ef41Sopenharmony_ci }); 801cb0ef41Sopenharmony_ci 811cb0ef41Sopenharmony_ci rs.push('asd'); 821cb0ef41Sopenharmony_ci assert.strictEqual(pt.listenerCount('drain'), 0); 831cb0ef41Sopenharmony_ci 841cb0ef41Sopenharmony_ci process.nextTick(() => { 851cb0ef41Sopenharmony_ci rs.push('asd'); 861cb0ef41Sopenharmony_ci assert.strictEqual(pt.listenerCount('drain'), 0); 871cb0ef41Sopenharmony_ci rs.push(null); 881cb0ef41Sopenharmony_ci assert.strictEqual(pt.listenerCount('drain'), 0); 891cb0ef41Sopenharmony_ci }); 901cb0ef41Sopenharmony_ci} 91