11cb0ef41Sopenharmony_ci// META: global=window,worker 21cb0ef41Sopenharmony_ci// META: script=../resources/test-utils.js 31cb0ef41Sopenharmony_ci// META: script=../resources/rs-utils.js 41cb0ef41Sopenharmony_ci// META: script=../resources/recording-streams.js 51cb0ef41Sopenharmony_ci'use strict'; 61cb0ef41Sopenharmony_ci 71cb0ef41Sopenharmony_ciconst error1 = new Error('error1!'); 81cb0ef41Sopenharmony_cierror1.name = 'error1'; 91cb0ef41Sopenharmony_ci 101cb0ef41Sopenharmony_cipromise_test(t => { 111cb0ef41Sopenharmony_ci 121cb0ef41Sopenharmony_ci const rs = recordingReadableStream({ 131cb0ef41Sopenharmony_ci start(controller) { 141cb0ef41Sopenharmony_ci controller.enqueue('a'); 151cb0ef41Sopenharmony_ci controller.enqueue('b'); 161cb0ef41Sopenharmony_ci controller.close(); 171cb0ef41Sopenharmony_ci } 181cb0ef41Sopenharmony_ci }); 191cb0ef41Sopenharmony_ci 201cb0ef41Sopenharmony_ci const ws = recordingWritableStream(undefined, new CountQueuingStrategy({ highWaterMark: 0 })); 211cb0ef41Sopenharmony_ci 221cb0ef41Sopenharmony_ci const pipePromise = rs.pipeTo(ws, { preventCancel: true }); 231cb0ef41Sopenharmony_ci 241cb0ef41Sopenharmony_ci // Wait and make sure it doesn't do any reading. 251cb0ef41Sopenharmony_ci return flushAsyncEvents().then(() => { 261cb0ef41Sopenharmony_ci ws.controller.error(error1); 271cb0ef41Sopenharmony_ci }) 281cb0ef41Sopenharmony_ci .then(() => promise_rejects_exactly(t, error1, pipePromise, 'pipeTo must reject with the same error')) 291cb0ef41Sopenharmony_ci .then(() => { 301cb0ef41Sopenharmony_ci assert_array_equals(rs.eventsWithoutPulls, []); 311cb0ef41Sopenharmony_ci assert_array_equals(ws.events, []); 321cb0ef41Sopenharmony_ci }) 331cb0ef41Sopenharmony_ci .then(() => readableStreamToArray(rs)) 341cb0ef41Sopenharmony_ci .then(chunksNotPreviouslyRead => { 351cb0ef41Sopenharmony_ci assert_array_equals(chunksNotPreviouslyRead, ['a', 'b']); 361cb0ef41Sopenharmony_ci }); 371cb0ef41Sopenharmony_ci 381cb0ef41Sopenharmony_ci}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks'); 391cb0ef41Sopenharmony_ci 401cb0ef41Sopenharmony_cipromise_test(() => { 411cb0ef41Sopenharmony_ci 421cb0ef41Sopenharmony_ci const rs = recordingReadableStream({ 431cb0ef41Sopenharmony_ci start(controller) { 441cb0ef41Sopenharmony_ci controller.enqueue('b'); 451cb0ef41Sopenharmony_ci controller.close(); 461cb0ef41Sopenharmony_ci } 471cb0ef41Sopenharmony_ci }); 481cb0ef41Sopenharmony_ci 491cb0ef41Sopenharmony_ci let resolveWritePromise; 501cb0ef41Sopenharmony_ci const ws = recordingWritableStream({ 511cb0ef41Sopenharmony_ci write() { 521cb0ef41Sopenharmony_ci if (!resolveWritePromise) { 531cb0ef41Sopenharmony_ci // first write 541cb0ef41Sopenharmony_ci return new Promise(resolve => { 551cb0ef41Sopenharmony_ci resolveWritePromise = resolve; 561cb0ef41Sopenharmony_ci }); 571cb0ef41Sopenharmony_ci } 581cb0ef41Sopenharmony_ci return undefined; 591cb0ef41Sopenharmony_ci } 601cb0ef41Sopenharmony_ci }); 611cb0ef41Sopenharmony_ci 621cb0ef41Sopenharmony_ci const writer = ws.getWriter(); 631cb0ef41Sopenharmony_ci const firstWritePromise = writer.write('a'); 641cb0ef41Sopenharmony_ci assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); 651cb0ef41Sopenharmony_ci writer.releaseLock(); 661cb0ef41Sopenharmony_ci 671cb0ef41Sopenharmony_ci // firstWritePromise won't settle until we call resolveWritePromise. 681cb0ef41Sopenharmony_ci 691cb0ef41Sopenharmony_ci const pipePromise = rs.pipeTo(ws); 701cb0ef41Sopenharmony_ci 711cb0ef41Sopenharmony_ci return flushAsyncEvents().then(() => resolveWritePromise()) 721cb0ef41Sopenharmony_ci .then(() => Promise.all([firstWritePromise, pipePromise])) 731cb0ef41Sopenharmony_ci .then(() => { 741cb0ef41Sopenharmony_ci assert_array_equals(rs.eventsWithoutPulls, []); 751cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); 761cb0ef41Sopenharmony_ci }); 771cb0ef41Sopenharmony_ci 781cb0ef41Sopenharmony_ci}, 'Piping from a non-empty ReadableStream into a WritableStream that does not desire chunks, but then does'); 791cb0ef41Sopenharmony_ci 801cb0ef41Sopenharmony_cipromise_test(() => { 811cb0ef41Sopenharmony_ci 821cb0ef41Sopenharmony_ci const rs = recordingReadableStream(); 831cb0ef41Sopenharmony_ci 841cb0ef41Sopenharmony_ci let resolveWritePromise; 851cb0ef41Sopenharmony_ci const ws = recordingWritableStream({ 861cb0ef41Sopenharmony_ci write() { 871cb0ef41Sopenharmony_ci if (!resolveWritePromise) { 881cb0ef41Sopenharmony_ci // first write 891cb0ef41Sopenharmony_ci return new Promise(resolve => { 901cb0ef41Sopenharmony_ci resolveWritePromise = resolve; 911cb0ef41Sopenharmony_ci }); 921cb0ef41Sopenharmony_ci } 931cb0ef41Sopenharmony_ci return undefined; 941cb0ef41Sopenharmony_ci } 951cb0ef41Sopenharmony_ci }); 961cb0ef41Sopenharmony_ci 971cb0ef41Sopenharmony_ci const writer = ws.getWriter(); 981cb0ef41Sopenharmony_ci writer.write('a'); 991cb0ef41Sopenharmony_ci 1001cb0ef41Sopenharmony_ci return flushAsyncEvents().then(() => { 1011cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a']); 1021cb0ef41Sopenharmony_ci assert_equals(writer.desiredSize, 0, 'after writing the writer\'s desiredSize must be 0'); 1031cb0ef41Sopenharmony_ci writer.releaseLock(); 1041cb0ef41Sopenharmony_ci 1051cb0ef41Sopenharmony_ci const pipePromise = rs.pipeTo(ws); 1061cb0ef41Sopenharmony_ci 1071cb0ef41Sopenharmony_ci rs.controller.enqueue('b'); 1081cb0ef41Sopenharmony_ci resolveWritePromise(); 1091cb0ef41Sopenharmony_ci rs.controller.close(); 1101cb0ef41Sopenharmony_ci 1111cb0ef41Sopenharmony_ci return pipePromise.then(() => { 1121cb0ef41Sopenharmony_ci assert_array_equals(rs.eventsWithoutPulls, []); 1131cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'close']); 1141cb0ef41Sopenharmony_ci }); 1151cb0ef41Sopenharmony_ci }); 1161cb0ef41Sopenharmony_ci 1171cb0ef41Sopenharmony_ci}, 'Piping from an empty ReadableStream into a WritableStream that does not desire chunks, but then the readable ' + 1181cb0ef41Sopenharmony_ci 'stream becomes non-empty and the writable stream starts desiring chunks'); 1191cb0ef41Sopenharmony_ci 1201cb0ef41Sopenharmony_cipromise_test(() => { 1211cb0ef41Sopenharmony_ci const unreadChunks = ['b', 'c', 'd']; 1221cb0ef41Sopenharmony_ci 1231cb0ef41Sopenharmony_ci const rs = recordingReadableStream({ 1241cb0ef41Sopenharmony_ci pull(controller) { 1251cb0ef41Sopenharmony_ci controller.enqueue(unreadChunks.shift()); 1261cb0ef41Sopenharmony_ci if (unreadChunks.length === 0) { 1271cb0ef41Sopenharmony_ci controller.close(); 1281cb0ef41Sopenharmony_ci } 1291cb0ef41Sopenharmony_ci } 1301cb0ef41Sopenharmony_ci }, new CountQueuingStrategy({ highWaterMark: 0 })); 1311cb0ef41Sopenharmony_ci 1321cb0ef41Sopenharmony_ci let resolveWritePromise; 1331cb0ef41Sopenharmony_ci const ws = recordingWritableStream({ 1341cb0ef41Sopenharmony_ci write() { 1351cb0ef41Sopenharmony_ci if (!resolveWritePromise) { 1361cb0ef41Sopenharmony_ci // first write 1371cb0ef41Sopenharmony_ci return new Promise(resolve => { 1381cb0ef41Sopenharmony_ci resolveWritePromise = resolve; 1391cb0ef41Sopenharmony_ci }); 1401cb0ef41Sopenharmony_ci } 1411cb0ef41Sopenharmony_ci return undefined; 1421cb0ef41Sopenharmony_ci } 1431cb0ef41Sopenharmony_ci }, new CountQueuingStrategy({ highWaterMark: 3 })); 1441cb0ef41Sopenharmony_ci 1451cb0ef41Sopenharmony_ci const writer = ws.getWriter(); 1461cb0ef41Sopenharmony_ci const firstWritePromise = writer.write('a'); 1471cb0ef41Sopenharmony_ci assert_equals(writer.desiredSize, 2, 'after writing the writer\'s desiredSize must be 2'); 1481cb0ef41Sopenharmony_ci writer.releaseLock(); 1491cb0ef41Sopenharmony_ci 1501cb0ef41Sopenharmony_ci // firstWritePromise won't settle until we call resolveWritePromise. 1511cb0ef41Sopenharmony_ci 1521cb0ef41Sopenharmony_ci const pipePromise = rs.pipeTo(ws); 1531cb0ef41Sopenharmony_ci 1541cb0ef41Sopenharmony_ci return flushAsyncEvents().then(() => { 1551cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a']); 1561cb0ef41Sopenharmony_ci assert_equals(unreadChunks.length, 1, 'chunks should continue to be enqueued until the HWM is reached'); 1571cb0ef41Sopenharmony_ci }).then(() => resolveWritePromise()) 1581cb0ef41Sopenharmony_ci .then(() => Promise.all([firstWritePromise, pipePromise])) 1591cb0ef41Sopenharmony_ci .then(() => { 1601cb0ef41Sopenharmony_ci assert_array_equals(rs.events, ['pull', 'pull', 'pull']); 1611cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a', 'write', 'b','write', 'c','write', 'd', 'close']); 1621cb0ef41Sopenharmony_ci }); 1631cb0ef41Sopenharmony_ci 1641cb0ef41Sopenharmony_ci}, 'Piping from a ReadableStream to a WritableStream that desires more chunks before finishing with previous ones'); 1651cb0ef41Sopenharmony_ci 1661cb0ef41Sopenharmony_ciclass StepTracker { 1671cb0ef41Sopenharmony_ci constructor() { 1681cb0ef41Sopenharmony_ci this.waiters = []; 1691cb0ef41Sopenharmony_ci this.wakers = []; 1701cb0ef41Sopenharmony_ci } 1711cb0ef41Sopenharmony_ci 1721cb0ef41Sopenharmony_ci // Returns promise which resolves when step `n` is reached. Also schedules step n + 1 to happen shortly after the 1731cb0ef41Sopenharmony_ci // promise is resolved. 1741cb0ef41Sopenharmony_ci waitThenAdvance(n) { 1751cb0ef41Sopenharmony_ci if (this.waiters[n] === undefined) { 1761cb0ef41Sopenharmony_ci this.waiters[n] = new Promise(resolve => { 1771cb0ef41Sopenharmony_ci this.wakers[n] = resolve; 1781cb0ef41Sopenharmony_ci }); 1791cb0ef41Sopenharmony_ci this.waiters[n] 1801cb0ef41Sopenharmony_ci .then(() => flushAsyncEvents()) 1811cb0ef41Sopenharmony_ci .then(() => { 1821cb0ef41Sopenharmony_ci if (this.wakers[n + 1] !== undefined) { 1831cb0ef41Sopenharmony_ci this.wakers[n + 1](); 1841cb0ef41Sopenharmony_ci } 1851cb0ef41Sopenharmony_ci }); 1861cb0ef41Sopenharmony_ci } 1871cb0ef41Sopenharmony_ci if (n == 0) { 1881cb0ef41Sopenharmony_ci this.wakers[0](); 1891cb0ef41Sopenharmony_ci } 1901cb0ef41Sopenharmony_ci return this.waiters[n]; 1911cb0ef41Sopenharmony_ci } 1921cb0ef41Sopenharmony_ci} 1931cb0ef41Sopenharmony_ci 1941cb0ef41Sopenharmony_cipromise_test(() => { 1951cb0ef41Sopenharmony_ci const steps = new StepTracker(); 1961cb0ef41Sopenharmony_ci const desiredSizes = []; 1971cb0ef41Sopenharmony_ci const rs = recordingReadableStream({ 1981cb0ef41Sopenharmony_ci start(controller) { 1991cb0ef41Sopenharmony_ci steps.waitThenAdvance(1).then(() => enqueue('a')); 2001cb0ef41Sopenharmony_ci steps.waitThenAdvance(3).then(() => enqueue('b')); 2011cb0ef41Sopenharmony_ci steps.waitThenAdvance(5).then(() => enqueue('c')); 2021cb0ef41Sopenharmony_ci steps.waitThenAdvance(7).then(() => enqueue('d')); 2031cb0ef41Sopenharmony_ci steps.waitThenAdvance(11).then(() => controller.close()); 2041cb0ef41Sopenharmony_ci 2051cb0ef41Sopenharmony_ci function enqueue(chunk) { 2061cb0ef41Sopenharmony_ci controller.enqueue(chunk); 2071cb0ef41Sopenharmony_ci desiredSizes.push(controller.desiredSize); 2081cb0ef41Sopenharmony_ci } 2091cb0ef41Sopenharmony_ci } 2101cb0ef41Sopenharmony_ci }); 2111cb0ef41Sopenharmony_ci 2121cb0ef41Sopenharmony_ci const chunksFinishedWriting = []; 2131cb0ef41Sopenharmony_ci const writableStartPromise = Promise.resolve(); 2141cb0ef41Sopenharmony_ci let writeCalled = false; 2151cb0ef41Sopenharmony_ci const ws = recordingWritableStream({ 2161cb0ef41Sopenharmony_ci start() { 2171cb0ef41Sopenharmony_ci return writableStartPromise; 2181cb0ef41Sopenharmony_ci }, 2191cb0ef41Sopenharmony_ci write(chunk) { 2201cb0ef41Sopenharmony_ci const waitForStep = writeCalled ? 12 : 9; 2211cb0ef41Sopenharmony_ci writeCalled = true; 2221cb0ef41Sopenharmony_ci return steps.waitThenAdvance(waitForStep).then(() => { 2231cb0ef41Sopenharmony_ci chunksFinishedWriting.push(chunk); 2241cb0ef41Sopenharmony_ci }); 2251cb0ef41Sopenharmony_ci } 2261cb0ef41Sopenharmony_ci }); 2271cb0ef41Sopenharmony_ci 2281cb0ef41Sopenharmony_ci return writableStartPromise.then(() => { 2291cb0ef41Sopenharmony_ci const pipePromise = rs.pipeTo(ws); 2301cb0ef41Sopenharmony_ci steps.waitThenAdvance(0); 2311cb0ef41Sopenharmony_ci 2321cb0ef41Sopenharmony_ci return Promise.all([ 2331cb0ef41Sopenharmony_ci steps.waitThenAdvance(2).then(() => { 2341cb0ef41Sopenharmony_ci assert_array_equals(chunksFinishedWriting, [], 'at step 2, zero chunks must have finished writing'); 2351cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a'], 'at step 2, one chunk must have been written'); 2361cb0ef41Sopenharmony_ci 2371cb0ef41Sopenharmony_ci // When 'a' (the very first chunk) was enqueued, it was immediately used to fulfill the outstanding read request 2381cb0ef41Sopenharmony_ci // promise, leaving the queue empty. 2391cb0ef41Sopenharmony_ci assert_array_equals(desiredSizes, [1], 2401cb0ef41Sopenharmony_ci 'at step 2, the desiredSize at the last enqueue (step 1) must have been 1'); 2411cb0ef41Sopenharmony_ci assert_equals(rs.controller.desiredSize, 1, 'at step 2, the current desiredSize must be 1'); 2421cb0ef41Sopenharmony_ci }), 2431cb0ef41Sopenharmony_ci 2441cb0ef41Sopenharmony_ci steps.waitThenAdvance(4).then(() => { 2451cb0ef41Sopenharmony_ci assert_array_equals(chunksFinishedWriting, [], 'at step 4, zero chunks must have finished writing'); 2461cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a'], 'at step 4, one chunk must have been written'); 2471cb0ef41Sopenharmony_ci 2481cb0ef41Sopenharmony_ci // When 'b' was enqueued at step 3, the queue was also empty, since immediately after enqueuing 'a' at 2491cb0ef41Sopenharmony_ci // step 1, it was dequeued in order to fulfill the read() call that was made at step 0. Thus the queue 2501cb0ef41Sopenharmony_ci // had size 1 (thus desiredSize of 0). 2511cb0ef41Sopenharmony_ci assert_array_equals(desiredSizes, [1, 0], 2521cb0ef41Sopenharmony_ci 'at step 4, the desiredSize at the last enqueue (step 3) must have been 0'); 2531cb0ef41Sopenharmony_ci assert_equals(rs.controller.desiredSize, 0, 'at step 4, the current desiredSize must be 0'); 2541cb0ef41Sopenharmony_ci }), 2551cb0ef41Sopenharmony_ci 2561cb0ef41Sopenharmony_ci steps.waitThenAdvance(6).then(() => { 2571cb0ef41Sopenharmony_ci assert_array_equals(chunksFinishedWriting, [], 'at step 6, zero chunks must have finished writing'); 2581cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a'], 'at step 6, one chunk must have been written'); 2591cb0ef41Sopenharmony_ci 2601cb0ef41Sopenharmony_ci // When 'c' was enqueued at step 5, the queue was not empty; it had 'b' in it, since 'b' will not be read until 2611cb0ef41Sopenharmony_ci // the first write completes at step 9. Thus, the queue size is 2 after enqueuing 'c', giving a desiredSize of 2621cb0ef41Sopenharmony_ci // -1. 2631cb0ef41Sopenharmony_ci assert_array_equals(desiredSizes, [1, 0, -1], 2641cb0ef41Sopenharmony_ci 'at step 6, the desiredSize at the last enqueue (step 5) must have been -1'); 2651cb0ef41Sopenharmony_ci assert_equals(rs.controller.desiredSize, -1, 'at step 6, the current desiredSize must be -1'); 2661cb0ef41Sopenharmony_ci }), 2671cb0ef41Sopenharmony_ci 2681cb0ef41Sopenharmony_ci steps.waitThenAdvance(8).then(() => { 2691cb0ef41Sopenharmony_ci assert_array_equals(chunksFinishedWriting, [], 'at step 8, zero chunks must have finished writing'); 2701cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a'], 'at step 8, one chunk must have been written'); 2711cb0ef41Sopenharmony_ci 2721cb0ef41Sopenharmony_ci // When 'd' was enqueued at step 7, the situation is the same as before, leading to a queue containing 'b', 'c', 2731cb0ef41Sopenharmony_ci // and 'd'. 2741cb0ef41Sopenharmony_ci assert_array_equals(desiredSizes, [1, 0, -1, -2], 2751cb0ef41Sopenharmony_ci 'at step 8, the desiredSize at the last enqueue (step 7) must have been -2'); 2761cb0ef41Sopenharmony_ci assert_equals(rs.controller.desiredSize, -2, 'at step 8, the current desiredSize must be -2'); 2771cb0ef41Sopenharmony_ci }), 2781cb0ef41Sopenharmony_ci 2791cb0ef41Sopenharmony_ci steps.waitThenAdvance(10).then(() => { 2801cb0ef41Sopenharmony_ci assert_array_equals(chunksFinishedWriting, ['a'], 'at step 10, one chunk must have finished writing'); 2811cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a', 'write', 'b'], 2821cb0ef41Sopenharmony_ci 'at step 10, two chunks must have been written'); 2831cb0ef41Sopenharmony_ci 2841cb0ef41Sopenharmony_ci assert_equals(rs.controller.desiredSize, -1, 'at step 10, the current desiredSize must be -1'); 2851cb0ef41Sopenharmony_ci }), 2861cb0ef41Sopenharmony_ci 2871cb0ef41Sopenharmony_ci pipePromise.then(() => { 2881cb0ef41Sopenharmony_ci assert_array_equals(desiredSizes, [1, 0, -1, -2], 'backpressure must have been exerted at the source'); 2891cb0ef41Sopenharmony_ci assert_array_equals(chunksFinishedWriting, ['a', 'b', 'c', 'd'], 'all chunks finished writing'); 2901cb0ef41Sopenharmony_ci 2911cb0ef41Sopenharmony_ci assert_array_equals(rs.eventsWithoutPulls, [], 'nothing unexpected should happen to the ReadableStream'); 2921cb0ef41Sopenharmony_ci assert_array_equals(ws.events, ['write', 'a', 'write', 'b', 'write', 'c', 'write', 'd', 'close'], 2931cb0ef41Sopenharmony_ci 'all chunks were written (and the WritableStream closed)'); 2941cb0ef41Sopenharmony_ci }) 2951cb0ef41Sopenharmony_ci ]); 2961cb0ef41Sopenharmony_ci }); 2971cb0ef41Sopenharmony_ci}, 'Piping to a WritableStream that does not consume the writes fast enough exerts backpressure on the ReadableStream'); 298