11cb0ef41Sopenharmony_ci// Copyright Joyent, Inc. and other Node contributors.
21cb0ef41Sopenharmony_ci//
31cb0ef41Sopenharmony_ci// Permission is hereby granted, free of charge, to any person obtaining a
41cb0ef41Sopenharmony_ci// copy of this software and associated documentation files (the
51cb0ef41Sopenharmony_ci// "Software"), to deal in the Software without restriction, including
61cb0ef41Sopenharmony_ci// without limitation the rights to use, copy, modify, merge, publish,
71cb0ef41Sopenharmony_ci// distribute, sublicense, and/or sell copies of the Software, and to permit
81cb0ef41Sopenharmony_ci// persons to whom the Software is furnished to do so, subject to the
91cb0ef41Sopenharmony_ci// following conditions:
101cb0ef41Sopenharmony_ci//
111cb0ef41Sopenharmony_ci// The above copyright notice and this permission notice shall be included
121cb0ef41Sopenharmony_ci// in all copies or substantial portions of the Software.
131cb0ef41Sopenharmony_ci//
141cb0ef41Sopenharmony_ci// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
151cb0ef41Sopenharmony_ci// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
161cb0ef41Sopenharmony_ci// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
171cb0ef41Sopenharmony_ci// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
181cb0ef41Sopenharmony_ci// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
191cb0ef41Sopenharmony_ci// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
201cb0ef41Sopenharmony_ci// USE OR OTHER DEALINGS IN THE SOFTWARE.
211cb0ef41Sopenharmony_ci
221cb0ef41Sopenharmony_ci'use strict';
231cb0ef41Sopenharmony_cirequire('../common');
241cb0ef41Sopenharmony_ciconst assert = require('assert');
251cb0ef41Sopenharmony_ci
261cb0ef41Sopenharmony_ciconst stream = require('stream');
271cb0ef41Sopenharmony_ciconst Readable = stream.Readable;
281cb0ef41Sopenharmony_ciconst Writable = stream.Writable;
291cb0ef41Sopenharmony_ci
301cb0ef41Sopenharmony_ciconst totalChunks = 100;
311cb0ef41Sopenharmony_ciconst chunkSize = 99;
321cb0ef41Sopenharmony_ciconst expectTotalData = totalChunks * chunkSize;
331cb0ef41Sopenharmony_cilet expectEndingData = expectTotalData;
341cb0ef41Sopenharmony_ci
351cb0ef41Sopenharmony_ciconst r = new Readable({ highWaterMark: 1000 });
361cb0ef41Sopenharmony_cilet chunks = totalChunks;
371cb0ef41Sopenharmony_cir._read = function(n) {
381cb0ef41Sopenharmony_ci  console.log('_read called', chunks);
391cb0ef41Sopenharmony_ci  if (!(chunks % 2))
401cb0ef41Sopenharmony_ci    setImmediate(push);
411cb0ef41Sopenharmony_ci  else if (!(chunks % 3))
421cb0ef41Sopenharmony_ci    process.nextTick(push);
431cb0ef41Sopenharmony_ci  else
441cb0ef41Sopenharmony_ci    push();
451cb0ef41Sopenharmony_ci};
461cb0ef41Sopenharmony_ci
471cb0ef41Sopenharmony_cilet totalPushed = 0;
481cb0ef41Sopenharmony_cifunction push() {
491cb0ef41Sopenharmony_ci  const chunk = chunks-- > 0 ? Buffer.alloc(chunkSize, 'x') : null;
501cb0ef41Sopenharmony_ci  if (chunk) {
511cb0ef41Sopenharmony_ci    totalPushed += chunk.length;
521cb0ef41Sopenharmony_ci  }
531cb0ef41Sopenharmony_ci  console.log('chunks', chunks);
541cb0ef41Sopenharmony_ci  r.push(chunk);
551cb0ef41Sopenharmony_ci}
561cb0ef41Sopenharmony_ci
571cb0ef41Sopenharmony_ciread100();
581cb0ef41Sopenharmony_ci
591cb0ef41Sopenharmony_ci// First we read 100 bytes.
601cb0ef41Sopenharmony_cifunction read100() {
611cb0ef41Sopenharmony_ci  readn(100, onData);
621cb0ef41Sopenharmony_ci}
631cb0ef41Sopenharmony_ci
641cb0ef41Sopenharmony_cifunction readn(n, then) {
651cb0ef41Sopenharmony_ci  console.error(`read ${n}`);
661cb0ef41Sopenharmony_ci  expectEndingData -= n;
671cb0ef41Sopenharmony_ci  (function read() {
681cb0ef41Sopenharmony_ci    const c = r.read(n);
691cb0ef41Sopenharmony_ci    console.error('c', c);
701cb0ef41Sopenharmony_ci    if (!c)
711cb0ef41Sopenharmony_ci      r.once('readable', read);
721cb0ef41Sopenharmony_ci    else {
731cb0ef41Sopenharmony_ci      assert.strictEqual(c.length, n);
741cb0ef41Sopenharmony_ci      assert(!r.readableFlowing);
751cb0ef41Sopenharmony_ci      then();
761cb0ef41Sopenharmony_ci    }
771cb0ef41Sopenharmony_ci  })();
781cb0ef41Sopenharmony_ci}
791cb0ef41Sopenharmony_ci
801cb0ef41Sopenharmony_ci// Then we listen to some data events.
811cb0ef41Sopenharmony_cifunction onData() {
821cb0ef41Sopenharmony_ci  expectEndingData -= 100;
831cb0ef41Sopenharmony_ci  console.error('onData');
841cb0ef41Sopenharmony_ci  let seen = 0;
851cb0ef41Sopenharmony_ci  r.on('data', function od(c) {
861cb0ef41Sopenharmony_ci    seen += c.length;
871cb0ef41Sopenharmony_ci    if (seen >= 100) {
881cb0ef41Sopenharmony_ci      // Seen enough
891cb0ef41Sopenharmony_ci      r.removeListener('data', od);
901cb0ef41Sopenharmony_ci      r.pause();
911cb0ef41Sopenharmony_ci      if (seen > 100) {
921cb0ef41Sopenharmony_ci        // Oh no, seen too much!
931cb0ef41Sopenharmony_ci        // Put the extra back.
941cb0ef41Sopenharmony_ci        const diff = seen - 100;
951cb0ef41Sopenharmony_ci        r.unshift(c.slice(c.length - diff));
961cb0ef41Sopenharmony_ci        console.error('seen too much', seen, diff);
971cb0ef41Sopenharmony_ci      }
981cb0ef41Sopenharmony_ci
991cb0ef41Sopenharmony_ci      // Nothing should be lost in-between.
1001cb0ef41Sopenharmony_ci      setImmediate(pipeLittle);
1011cb0ef41Sopenharmony_ci    }
1021cb0ef41Sopenharmony_ci  });
1031cb0ef41Sopenharmony_ci}
1041cb0ef41Sopenharmony_ci
1051cb0ef41Sopenharmony_ci// Just pipe 200 bytes, then unshift the extra and unpipe.
1061cb0ef41Sopenharmony_cifunction pipeLittle() {
1071cb0ef41Sopenharmony_ci  expectEndingData -= 200;
1081cb0ef41Sopenharmony_ci  console.error('pipe a little');
1091cb0ef41Sopenharmony_ci  const w = new Writable();
1101cb0ef41Sopenharmony_ci  let written = 0;
1111cb0ef41Sopenharmony_ci  w.on('finish', () => {
1121cb0ef41Sopenharmony_ci    assert.strictEqual(written, 200);
1131cb0ef41Sopenharmony_ci    setImmediate(read1234);
1141cb0ef41Sopenharmony_ci  });
1151cb0ef41Sopenharmony_ci  w._write = function(chunk, encoding, cb) {
1161cb0ef41Sopenharmony_ci    written += chunk.length;
1171cb0ef41Sopenharmony_ci    if (written >= 200) {
1181cb0ef41Sopenharmony_ci      r.unpipe(w);
1191cb0ef41Sopenharmony_ci      w.end();
1201cb0ef41Sopenharmony_ci      cb();
1211cb0ef41Sopenharmony_ci      if (written > 200) {
1221cb0ef41Sopenharmony_ci        const diff = written - 200;
1231cb0ef41Sopenharmony_ci        written -= diff;
1241cb0ef41Sopenharmony_ci        r.unshift(chunk.slice(chunk.length - diff));
1251cb0ef41Sopenharmony_ci      }
1261cb0ef41Sopenharmony_ci    } else {
1271cb0ef41Sopenharmony_ci      setImmediate(cb);
1281cb0ef41Sopenharmony_ci    }
1291cb0ef41Sopenharmony_ci  };
1301cb0ef41Sopenharmony_ci  r.pipe(w);
1311cb0ef41Sopenharmony_ci}
1321cb0ef41Sopenharmony_ci
1331cb0ef41Sopenharmony_ci// Now read 1234 more bytes.
1341cb0ef41Sopenharmony_cifunction read1234() {
1351cb0ef41Sopenharmony_ci  readn(1234, resumePause);
1361cb0ef41Sopenharmony_ci}
1371cb0ef41Sopenharmony_ci
1381cb0ef41Sopenharmony_cifunction resumePause() {
1391cb0ef41Sopenharmony_ci  console.error('resumePause');
1401cb0ef41Sopenharmony_ci  // Don't read anything, just resume and re-pause a whole bunch.
1411cb0ef41Sopenharmony_ci  r.resume();
1421cb0ef41Sopenharmony_ci  r.pause();
1431cb0ef41Sopenharmony_ci  r.resume();
1441cb0ef41Sopenharmony_ci  r.pause();
1451cb0ef41Sopenharmony_ci  r.resume();
1461cb0ef41Sopenharmony_ci  r.pause();
1471cb0ef41Sopenharmony_ci  r.resume();
1481cb0ef41Sopenharmony_ci  r.pause();
1491cb0ef41Sopenharmony_ci  r.resume();
1501cb0ef41Sopenharmony_ci  r.pause();
1511cb0ef41Sopenharmony_ci  setImmediate(pipe);
1521cb0ef41Sopenharmony_ci}
1531cb0ef41Sopenharmony_ci
1541cb0ef41Sopenharmony_ci
1551cb0ef41Sopenharmony_cifunction pipe() {
1561cb0ef41Sopenharmony_ci  console.error('pipe the rest');
1571cb0ef41Sopenharmony_ci  const w = new Writable();
1581cb0ef41Sopenharmony_ci  let written = 0;
1591cb0ef41Sopenharmony_ci  w._write = function(chunk, encoding, cb) {
1601cb0ef41Sopenharmony_ci    written += chunk.length;
1611cb0ef41Sopenharmony_ci    cb();
1621cb0ef41Sopenharmony_ci  };
1631cb0ef41Sopenharmony_ci  w.on('finish', () => {
1641cb0ef41Sopenharmony_ci    console.error('written', written, totalPushed);
1651cb0ef41Sopenharmony_ci    assert.strictEqual(written, expectEndingData);
1661cb0ef41Sopenharmony_ci    assert.strictEqual(totalPushed, expectTotalData);
1671cb0ef41Sopenharmony_ci    console.log('ok');
1681cb0ef41Sopenharmony_ci  });
1691cb0ef41Sopenharmony_ci  r.pipe(w);
1701cb0ef41Sopenharmony_ci}
171