11cb0ef41Sopenharmony_ci// META: global=window,worker
21cb0ef41Sopenharmony_ci// META: script=../resources/test-utils.js
31cb0ef41Sopenharmony_ci// META: script=../resources/rs-utils.js
41cb0ef41Sopenharmony_ci// META: script=../resources/recording-streams.js
51cb0ef41Sopenharmony_ci'use strict';
61cb0ef41Sopenharmony_ci
71cb0ef41Sopenharmony_ciconst error1 = new Error('error1!');
81cb0ef41Sopenharmony_cierror1.name = 'error1';
91cb0ef41Sopenharmony_ci
101cb0ef41Sopenharmony_cipromise_test(t => {
111cb0ef41Sopenharmony_ci
121cb0ef41Sopenharmony_ci  const rs = recordingReadableStream({
131cb0ef41Sopenharmony_ci    start(controller) {
141cb0ef41Sopenharmony_ci      controller.enqueue('a');
151cb0ef41Sopenharmony_ci      controller.enqueue('b');
161cb0ef41Sopenharmony_ci      controller.close();
171cb0ef41Sopenharmony_ci    }
181cb0ef41Sopenharmony_ci  });
191cb0ef41Sopenharmony_ci
201cb0ef41Sopenharmony_ci  const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 }));
211cb0ef41Sopenharmony_ci
221cb0ef41Sopenharmony_ci  const pipePromise = rs.pipeTo(ws, { preventCancel: true });
231cb0ef41Sopenharmony_ci
241cb0ef41Sopenharmony_ci  // Wait and make sure it doesn't do any reading.
251cb0ef41Sopenharmony_ci  return flushAsyncEvents().then(() => {
261cb0ef41Sopenharmony_ci    ws.controller.error(error1);
271cb0ef41Sopenharmony_ci  })
281cb0ef41Sopenharmony_ci  .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error'))
291cb0ef41Sopenharmony_ci  .then(() => {
301cb0ef41Sopenharmony_ci    assert_array_equals(rs.eventsWithoutPulls, []);
311cb0ef41Sopenharmony_ci    assert_array_equals(ws.events, []);
321cb0ef41Sopenharmony_ci  })
331cb0ef41Sopenharmony_ci  .then(() => readableStreamToArray(rs))
341cb0ef41Sopenharmony_ci  .then(chunksNotPreviouslyRead => {
351cb0ef41Sopenharmony_ci    assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']);
361cb0ef41Sopenharmony_ci  });
371cb0ef41Sopenharmony_ci
381cb0ef41Sopenharmony_ci}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks');
391cb0ef41Sopenharmony_ci
401cb0ef41Sopenharmony_cipromise_test(() => {
411cb0ef41Sopenharmony_ci
421cb0ef41Sopenharmony_ci  const rs = recordingReadableStream({
431cb0ef41Sopenharmony_ci    start(controller) {
441cb0ef41Sopenharmony_ci      controller.enqueue('b');
451cb0ef41Sopenharmony_ci      controller.close();
461cb0ef41Sopenharmony_ci    }
471cb0ef41Sopenharmony_ci  });
481cb0ef41Sopenharmony_ci
491cb0ef41Sopenharmony_ci  let resolveWritePromise;
501cb0ef41Sopenharmony_ci  const ws = recordingWritableStream({
511cb0ef41Sopenharmony_ci    write() {
521cb0ef41Sopenharmony_ci      if (!resolveWritePromise) {
531cb0ef41Sopenharmony_ci        // first write
541cb0ef41Sopenharmony_ci        return new Promise(resolve => {
551cb0ef41Sopenharmony_ci          resolveWritePromise = resolve;
561cb0ef41Sopenharmony_ci        });
571cb0ef41Sopenharmony_ci      }
581cb0ef41Sopenharmony_ci      return undefined;
591cb0ef41Sopenharmony_ci    }
601cb0ef41Sopenharmony_ci  });
611cb0ef41Sopenharmony_ci
621cb0ef41Sopenharmony_ci  const writer = ws.getWriter();
631cb0ef41Sopenharmony_ci  const firstWritePromise = writer.write('a');
641cb0ef41Sopenharmony_ci  assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
651cb0ef41Sopenharmony_ci  writer.releaseLock();
661cb0ef41Sopenharmony_ci
671cb0ef41Sopenharmony_ci  // firstWritePromise won't settle until we call resolveWritePromise.
681cb0ef41Sopenharmony_ci
691cb0ef41Sopenharmony_ci  const pipePromise = rs.pipeTo(ws);
701cb0ef41Sopenharmony_ci
711cb0ef41Sopenharmony_ci  return flushAsyncEvents().then(() => resolveWritePromise())
721cb0ef41Sopenharmony_ci    .then(() => Promise.all([firstWritePromise, pipePromise]))
731cb0ef41Sopenharmony_ci    .then(() => {
741cb0ef41Sopenharmony_ci      assert_array_equals(rs.eventsWithoutPulls, []);
751cb0ef41Sopenharmony_ci      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
761cb0ef41Sopenharmony_ci    });
771cb0ef41Sopenharmony_ci
781cb0ef41Sopenharmony_ci}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does');
791cb0ef41Sopenharmony_ci
801cb0ef41Sopenharmony_cipromise_test(() => {
811cb0ef41Sopenharmony_ci
821cb0ef41Sopenharmony_ci  const rs = recordingReadableStream();
831cb0ef41Sopenharmony_ci
841cb0ef41Sopenharmony_ci  let resolveWritePromise;
851cb0ef41Sopenharmony_ci  const ws = recordingWritableStream({
861cb0ef41Sopenharmony_ci    write() {
871cb0ef41Sopenharmony_ci      if (!resolveWritePromise) {
881cb0ef41Sopenharmony_ci        // first write
891cb0ef41Sopenharmony_ci        return new Promise(resolve => {
901cb0ef41Sopenharmony_ci          resolveWritePromise = resolve;
911cb0ef41Sopenharmony_ci        });
921cb0ef41Sopenharmony_ci      }
931cb0ef41Sopenharmony_ci      return undefined;
941cb0ef41Sopenharmony_ci    }
951cb0ef41Sopenharmony_ci  });
961cb0ef41Sopenharmony_ci
971cb0ef41Sopenharmony_ci  const writer = ws.getWriter();
981cb0ef41Sopenharmony_ci  writer.write('a');
991cb0ef41Sopenharmony_ci
1001cb0ef41Sopenharmony_ci  return flushAsyncEvents().then(() => {
1011cb0ef41Sopenharmony_ci    assert_array_equals(ws.events, ['write', 'a']);
1021cb0ef41Sopenharmony_ci    assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0');
1031cb0ef41Sopenharmony_ci    writer.releaseLock();
1041cb0ef41Sopenharmony_ci
1051cb0ef41Sopenharmony_ci    const pipePromise = rs.pipeTo(ws);
1061cb0ef41Sopenharmony_ci
1071cb0ef41Sopenharmony_ci    rs.controller.enqueue('b');
1081cb0ef41Sopenharmony_ci    resolveWritePromise();
1091cb0ef41Sopenharmony_ci    rs.controller.close();
1101cb0ef41Sopenharmony_ci
1111cb0ef41Sopenharmony_ci    return pipePromise.then(() => {
1121cb0ef41Sopenharmony_ci      assert_array_equals(rs.eventsWithoutPulls, []);
1131cb0ef41Sopenharmony_ci      assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']);
1141cb0ef41Sopenharmony_ci    });
1151cb0ef41Sopenharmony_ci  });
1161cb0ef41Sopenharmony_ci
1171cb0ef41Sopenharmony_ci}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' +
1181cb0ef41Sopenharmony_ci   'stream becomes non-empty and the writable stream starts desiring chunks');
1191cb0ef41Sopenharmony_ci
1201cb0ef41Sopenharmony_cipromise_test(() => {
1211cb0ef41Sopenharmony_ci  const unreadChunks = ['b', 'c', 'd'];
1221cb0ef41Sopenharmony_ci
1231cb0ef41Sopenharmony_ci  const rs = recordingReadableStream({
1241cb0ef41Sopenharmony_ci    pull(controller) {
1251cb0ef41Sopenharmony_ci      controller.enqueue(unreadChunks.shift());
1261cb0ef41Sopenharmony_ci      if (unreadChunks.length === 0) {
1271cb0ef41Sopenharmony_ci        controller.close();
1281cb0ef41Sopenharmony_ci      }
1291cb0ef41Sopenharmony_ci    }
1301cb0ef41Sopenharmony_ci  }, new CountQueuingStrategy({ highWaterMark: 0 }));
1311cb0ef41Sopenharmony_ci
1321cb0ef41Sopenharmony_ci  let resolveWritePromise;
1331cb0ef41Sopenharmony_ci  const ws = recordingWritableStream({
1341cb0ef41Sopenharmony_ci    write() {
1351cb0ef41Sopenharmony_ci      if (!resolveWritePromise) {
1361cb0ef41Sopenharmony_ci        // first write
1371cb0ef41Sopenharmony_ci        return new Promise(resolve => {
1381cb0ef41Sopenharmony_ci          resolveWritePromise = resolve;
1391cb0ef41Sopenharmony_ci        });
1401cb0ef41Sopenharmony_ci      }
1411cb0ef41Sopenharmony_ci      return undefined;
1421cb0ef41Sopenharmony_ci    }
1431cb0ef41Sopenharmony_ci  }, new CountQueuingStrategy({ highWaterMark: 3 }));
1441cb0ef41Sopenharmony_ci
1451cb0ef41Sopenharmony_ci  const writer = ws.getWriter();
1461cb0ef41Sopenharmony_ci  const firstWritePromise = writer.write('a');
1471cb0ef41Sopenharmony_ci  assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2');
1481cb0ef41Sopenharmony_ci  writer.releaseLock();
1491cb0ef41Sopenharmony_ci
1501cb0ef41Sopenharmony_ci  // firstWritePromise won't settle until we call resolveWritePromise.
1511cb0ef41Sopenharmony_ci
1521cb0ef41Sopenharmony_ci  const pipePromise = rs.pipeTo(ws);
1531cb0ef41Sopenharmony_ci
1541cb0ef41Sopenharmony_ci  return flushAsyncEvents().then(() => {
1551cb0ef41Sopenharmony_ci    assert_array_equals(ws.events, ['write', 'a']);
1561cb0ef41Sopenharmony_ci    assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached');
1571cb0ef41Sopenharmony_ci  }).then(() => resolveWritePromise())
1581cb0ef41Sopenharmony_ci    .then(() => Promise.all([firstWritePromise, pipePromise]))
1591cb0ef41Sopenharmony_ci    .then(() => {
1601cb0ef41Sopenharmony_ci      assert_array_equals(rs.events, ['pull', 'pull', 'pull']);
1611cb0ef41Sopenharmony_ci      assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']);
1621cb0ef41Sopenharmony_ci    });
1631cb0ef41Sopenharmony_ci
1641cb0ef41Sopenharmony_ci}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones');
1651cb0ef41Sopenharmony_ci
1661cb0ef41Sopenharmony_ciclass StepTracker {
1671cb0ef41Sopenharmony_ci  constructor() {
1681cb0ef41Sopenharmony_ci    this.waiters = [];
1691cb0ef41Sopenharmony_ci    this.wakers = [];
1701cb0ef41Sopenharmony_ci  }
1711cb0ef41Sopenharmony_ci
1721cb0ef41Sopenharmony_ci  // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the
1731cb0ef41Sopenharmony_ci  // promise is resolved.
1741cb0ef41Sopenharmony_ci  waitThenAdvance(n) {
1751cb0ef41Sopenharmony_ci    if (this.waiters[n] === undefined) {
1761cb0ef41Sopenharmony_ci      this.waiters[n] = new Promise(resolve => {
1771cb0ef41Sopenharmony_ci        this.wakers[n] = resolve;
1781cb0ef41Sopenharmony_ci      });
1791cb0ef41Sopenharmony_ci      this.waiters[n]
1801cb0ef41Sopenharmony_ci          .then(() => flushAsyncEvents())
1811cb0ef41Sopenharmony_ci          .then(() => {
1821cb0ef41Sopenharmony_ci            if (this.wakers[n + 1] !== undefined) {
1831cb0ef41Sopenharmony_ci              this.wakers[n + 1]();
1841cb0ef41Sopenharmony_ci            }
1851cb0ef41Sopenharmony_ci          });
1861cb0ef41Sopenharmony_ci    }
1871cb0ef41Sopenharmony_ci    if (n == 0) {
1881cb0ef41Sopenharmony_ci      this.wakers[0]();
1891cb0ef41Sopenharmony_ci    }
1901cb0ef41Sopenharmony_ci    return this.waiters[n];
1911cb0ef41Sopenharmony_ci  }
1921cb0ef41Sopenharmony_ci}
1931cb0ef41Sopenharmony_ci
1941cb0ef41Sopenharmony_cipromise_test(() => {
1951cb0ef41Sopenharmony_ci  const steps = new StepTracker();
1961cb0ef41Sopenharmony_ci  const desiredSizes = [];
1971cb0ef41Sopenharmony_ci  const rs = recordingReadableStream({
1981cb0ef41Sopenharmony_ci    start(controller) {
1991cb0ef41Sopenharmony_ci      steps.waitThenAdvance(1).then(() => enqueue('a'));
2001cb0ef41Sopenharmony_ci      steps.waitThenAdvance(3).then(() => enqueue('b'));
2011cb0ef41Sopenharmony_ci      steps.waitThenAdvance(5).then(() => enqueue('c'));
2021cb0ef41Sopenharmony_ci      steps.waitThenAdvance(7).then(() => enqueue('d'));
2031cb0ef41Sopenharmony_ci      steps.waitThenAdvance(11).then(() => controller.close());
2041cb0ef41Sopenharmony_ci
2051cb0ef41Sopenharmony_ci      function enqueue(chunk) {
2061cb0ef41Sopenharmony_ci        controller.enqueue(chunk);
2071cb0ef41Sopenharmony_ci        desiredSizes.push(controller.desiredSize);
2081cb0ef41Sopenharmony_ci      }
2091cb0ef41Sopenharmony_ci    }
2101cb0ef41Sopenharmony_ci  });
2111cb0ef41Sopenharmony_ci
2121cb0ef41Sopenharmony_ci  const chunksFinishedWriting = [];
2131cb0ef41Sopenharmony_ci  const writableStartPromise = Promise.resolve();
2141cb0ef41Sopenharmony_ci  let writeCalled = false;
2151cb0ef41Sopenharmony_ci  const ws = recordingWritableStream({
2161cb0ef41Sopenharmony_ci    start() {
2171cb0ef41Sopenharmony_ci      return writableStartPromise;
2181cb0ef41Sopenharmony_ci    },
2191cb0ef41Sopenharmony_ci    write(chunk) {
2201cb0ef41Sopenharmony_ci      const waitForStep = writeCalled ? 12 : 9;
2211cb0ef41Sopenharmony_ci      writeCalled = true;
2221cb0ef41Sopenharmony_ci      return steps.waitThenAdvance(waitForStep).then(() => {
2231cb0ef41Sopenharmony_ci        chunksFinishedWriting.push(chunk);
2241cb0ef41Sopenharmony_ci      });
2251cb0ef41Sopenharmony_ci    }
2261cb0ef41Sopenharmony_ci  });
2271cb0ef41Sopenharmony_ci
2281cb0ef41Sopenharmony_ci  return writableStartPromise.then(() => {
2291cb0ef41Sopenharmony_ci    const pipePromise = rs.pipeTo(ws);
2301cb0ef41Sopenharmony_ci    steps.waitThenAdvance(0);
2311cb0ef41Sopenharmony_ci
2321cb0ef41Sopenharmony_ci    return Promise.all([
2331cb0ef41Sopenharmony_ci      steps.waitThenAdvance(2).then(() => {
2341cb0ef41Sopenharmony_ci        assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing');
2351cb0ef41Sopenharmony_ci        assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written');
2361cb0ef41Sopenharmony_ci
2371cb0ef41Sopenharmony_ci        // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request
2381cb0ef41Sopenharmony_ci        // promise, leaving the queue empty.
2391cb0ef41Sopenharmony_ci        assert_array_equals(desiredSizes, [1],
2401cb0ef41Sopenharmony_ci                            'at step 2, the desiredSize at the last enqueue (step 1) must have been 1');
2411cb0ef41Sopenharmony_ci        assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1');
2421cb0ef41Sopenharmony_ci      }),
2431cb0ef41Sopenharmony_ci
2441cb0ef41Sopenharmony_ci      steps.waitThenAdvance(4).then(() => {
2451cb0ef41Sopenharmony_ci        assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing');
2461cb0ef41Sopenharmony_ci        assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written');
2471cb0ef41Sopenharmony_ci
2481cb0ef41Sopenharmony_ci        // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at
2491cb0ef41Sopenharmony_ci        // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue
2501cb0ef41Sopenharmony_ci        // had size 1 (thus desiredSize of 0).
2511cb0ef41Sopenharmony_ci        assert_array_equals(desiredSizes, [1, 0],
2521cb0ef41Sopenharmony_ci                            'at step 4, the desiredSize at the last enqueue (step 3) must have been 0');
2531cb0ef41Sopenharmony_ci        assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0');
2541cb0ef41Sopenharmony_ci      }),
2551cb0ef41Sopenharmony_ci
2561cb0ef41Sopenharmony_ci      steps.waitThenAdvance(6).then(() => {
2571cb0ef41Sopenharmony_ci        assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing');
2581cb0ef41Sopenharmony_ci        assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written');
2591cb0ef41Sopenharmony_ci
2601cb0ef41Sopenharmony_ci        // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until
2611cb0ef41Sopenharmony_ci        // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of
2621cb0ef41Sopenharmony_ci        // -1.
2631cb0ef41Sopenharmony_ci        assert_array_equals(desiredSizes, [1, 0, -1],
2641cb0ef41Sopenharmony_ci                            'at step 6, the desiredSize at the last enqueue (step 5) must have been -1');
2651cb0ef41Sopenharmony_ci        assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1');
2661cb0ef41Sopenharmony_ci      }),
2671cb0ef41Sopenharmony_ci
2681cb0ef41Sopenharmony_ci      steps.waitThenAdvance(8).then(() => {
2691cb0ef41Sopenharmony_ci        assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing');
2701cb0ef41Sopenharmony_ci        assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written');
2711cb0ef41Sopenharmony_ci
2721cb0ef41Sopenharmony_ci        // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c',
2731cb0ef41Sopenharmony_ci        // and 'd'.
2741cb0ef41Sopenharmony_ci        assert_array_equals(desiredSizes, [1, 0, -1, -2],
2751cb0ef41Sopenharmony_ci                            'at step 8, the desiredSize at the last enqueue (step 7) must have been -2');
2761cb0ef41Sopenharmony_ci        assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2');
2771cb0ef41Sopenharmony_ci      }),
2781cb0ef41Sopenharmony_ci
2791cb0ef41Sopenharmony_ci      steps.waitThenAdvance(10).then(() => {
2801cb0ef41Sopenharmony_ci        assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing');
2811cb0ef41Sopenharmony_ci        assert_array_equals(ws.events, ['write', 'a', 'write', 'b'],
2821cb0ef41Sopenharmony_ci                            'at step 10, two chunks must have been written');
2831cb0ef41Sopenharmony_ci
2841cb0ef41Sopenharmony_ci        assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1');
2851cb0ef41Sopenharmony_ci      }),
2861cb0ef41Sopenharmony_ci
2871cb0ef41Sopenharmony_ci      pipePromise.then(() => {
2881cb0ef41Sopenharmony_ci        assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source');
2891cb0ef41Sopenharmony_ci        assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing');
2901cb0ef41Sopenharmony_ci
2911cb0ef41Sopenharmony_ci        assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream');
2921cb0ef41Sopenharmony_ci        assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'],
2931cb0ef41Sopenharmony_ci                            'all chunks were written (and the WritableStream closed)');
2941cb0ef41Sopenharmony_ci      })
2951cb0ef41Sopenharmony_ci    ]);
2961cb0ef41Sopenharmony_ci  });
2971cb0ef41Sopenharmony_ci}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream');
298