1// META: global=window,worker
2// META: script=../resources/recording-streams.js
3// META: script=../resources/test-utils.js
4'use strict';
5
6// Tests for the use of pipeTo with AbortSignal.
7// There is some extra complexity to avoid timeouts in environments where abort is not implemented.
8
9const error1 = new Error('error1');
10error1.name = 'error1';
11const error2 = new Error('error2');
12error2.name = 'error2';
13
14const errorOnPull = {
15  pull(controller) {
16    // This will cause the test to error if pipeTo abort is not implemented.
17    controller.error('failed to abort');
18  }
19};
20
21// To stop pull() being called immediately when the stream is created, we need to set highWaterMark to 0.
22const hwm0 = { highWaterMark: 0 };
23
24for (const invalidSignal of [null, 'AbortSignal', true, -1, Object.create(AbortSignal.prototype)]) {
25  promise_test(t => {
26    const rs = recordingReadableStream(errorOnPull, hwm0);
27    const ws = recordingWritableStream();
28    return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { signal: invalidSignal }), 'pipeTo should reject')
29        .then(() => {
30          assert_equals(rs.events.length, 0, 'no ReadableStream methods should have been called');
31          assert_equals(ws.events.length, 0, 'no WritableStream methods should have been called');
32        });
33  }, `a signal argument '${invalidSignal}' should cause pipeTo() to reject`);
34}
35
36promise_test(t => {
37  const rs = recordingReadableStream(errorOnPull, hwm0);
38  const ws = new WritableStream();
39  const abortController = new AbortController();
40  const signal = abortController.signal;
41  abortController.abort();
42  return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject')
43      .then(() => Promise.all([
44        rs.getReader().closed,
45        promise_rejects_dom(t, 'AbortError', ws.getWriter().closed, 'writer.closed should reject')
46      ]))
47      .then(() => {
48        assert_equals(rs.events.length, 2, 'cancel should have been called');
49        assert_equals(rs.events[0], 'cancel', 'first event should be cancel');
50        assert_equals(rs.events[1].name, 'AbortError', 'the argument to cancel should be an AbortError');
51        assert_equals(rs.events[1].constructor.name, 'DOMException',
52                      'the argument to cancel should be a DOMException');
53      });
54}, 'an aborted signal should cause the writable stream to reject with an AbortError');
55
56for (const reason of [null, undefined, error1]) {
57  promise_test(async t => {
58    const rs = recordingReadableStream(errorOnPull, hwm0);
59    const ws = new WritableStream();
60    const abortController = new AbortController();
61    const signal = abortController.signal;
62    abortController.abort(reason);
63    const pipeToPromise = rs.pipeTo(ws, { signal });
64    if (reason !== undefined) {
65      await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
66    } else {
67      await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
68    }
69    const error = await pipeToPromise.catch(e => e);
70    await rs.getReader().closed;
71    await promise_rejects_exactly(t, error, ws.getWriter().closed, 'the writable should be errored with the same object');
72    assert_equals(signal.reason, error, 'signal.reason should be error'),
73    assert_equals(rs.events.length, 2, 'cancel should have been called');
74    assert_equals(rs.events[0], 'cancel', 'first event should be cancel');
75    assert_equals(rs.events[1], error, 'the readable should be canceled with the same object');
76  }, `(reason: '${reason}') all the error objects should be the same object`);
77}
78
79promise_test(t => {
80  const rs = recordingReadableStream(errorOnPull, hwm0);
81  const ws = new WritableStream();
82  const abortController = new AbortController();
83  const signal = abortController.signal;
84  abortController.abort();
85  return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true }), 'pipeTo should reject')
86      .then(() => assert_equals(rs.events.length, 0, 'cancel should not be called'));
87}, 'preventCancel should prevent canceling the readable');
88
89promise_test(t => {
90  const rs = new ReadableStream(errorOnPull, hwm0);
91  const ws = recordingWritableStream();
92  const abortController = new AbortController();
93  const signal = abortController.signal;
94  abortController.abort();
95  return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventAbort: true }), 'pipeTo should reject')
96      .then(() => {
97        assert_equals(ws.events.length, 0, 'writable should not have been aborted');
98        return ws.getWriter().ready;
99      });
100}, 'preventAbort should prevent aborting the readable');
101
102promise_test(t => {
103  const rs = recordingReadableStream(errorOnPull, hwm0);
104  const ws = recordingWritableStream();
105  const abortController = new AbortController();
106  const signal = abortController.signal;
107  abortController.abort();
108  return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true, preventAbort: true }),
109                         'pipeTo should reject')
110    .then(() => {
111      assert_equals(rs.events.length, 0, 'cancel should not be called');
112      assert_equals(ws.events.length, 0, 'writable should not have been aborted');
113      return ws.getWriter().ready;
114    });
115}, 'preventCancel and preventAbort should prevent canceling the readable and aborting the readable');
116
117for (const reason of [null, undefined, error1]) {
118  promise_test(async t => {
119    const rs = new ReadableStream({
120      start(controller) {
121        controller.enqueue('a');
122        controller.enqueue('b');
123        controller.close();
124      }
125    });
126    const abortController = new AbortController();
127    const signal = abortController.signal;
128    const ws = recordingWritableStream({
129      write() {
130        abortController.abort(reason);
131      }
132    });
133    const pipeToPromise = rs.pipeTo(ws, { signal });
134    if (reason !== undefined) {
135      await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
136    } else {
137      await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
138    }
139    const error = await pipeToPromise.catch(e => e);
140    assert_equals(signal.reason, error, 'signal.reason should be error');
141    assert_equals(ws.events.length, 4, 'only chunk "a" should have been written');
142    assert_array_equals(ws.events.slice(0, 3), ['write', 'a', 'abort'], 'events should match');
143    assert_equals(ws.events[3], error, 'abort reason should be error');
144  }, `(reason: '${reason}') abort should prevent further reads`);
145}
146
147for (const reason of [null, undefined, error1]) {
148  promise_test(async t => {
149    let readController;
150    const rs = new ReadableStream({
151      start(c) {
152        readController = c;
153        c.enqueue('a');
154        c.enqueue('b');
155      }
156    });
157    const abortController = new AbortController();
158    const signal = abortController.signal;
159    let resolveWrite;
160    const writePromise = new Promise(resolve => {
161      resolveWrite = resolve;
162    });
163    const ws = recordingWritableStream({
164      write() {
165        return writePromise;
166      }
167    }, new CountQueuingStrategy({ highWaterMark: Infinity }));
168    const pipeToPromise = rs.pipeTo(ws, { signal });
169    await delay(0);
170    await abortController.abort(reason);
171    await readController.close(); // Make sure the test terminates when signal is not implemented.
172    await resolveWrite();
173    if (reason !== undefined) {
174      await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason');
175    } else {
176      await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError');
177    }
178    const error = await pipeToPromise.catch(e => e);
179    assert_equals(signal.reason, error, 'signal.reason should be error');
180    assert_equals(ws.events.length, 6, 'chunks "a" and "b" should have been written');
181    assert_array_equals(ws.events.slice(0, 5), ['write', 'a', 'write', 'b', 'abort'], 'events should match');
182    assert_equals(ws.events[5], error, 'abort reason should be error');
183  }, `(reason: '${reason}') all pending writes should complete on abort`);
184}
185
186promise_test(t => {
187  const rs = new ReadableStream({
188    pull(controller) {
189      controller.error('failed to abort');
190    },
191    cancel() {
192      return Promise.reject(error1);
193    }
194  }, hwm0);
195  const ws = new WritableStream();
196  const abortController = new AbortController();
197  const signal = abortController.signal;
198  abortController.abort();
199  return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject');
200}, 'a rejection from underlyingSource.cancel() should be returned by pipeTo()');
201
202promise_test(t => {
203  const rs = new ReadableStream(errorOnPull, hwm0);
204  const ws = new WritableStream({
205    abort() {
206      return Promise.reject(error1);
207    }
208  });
209  const abortController = new AbortController();
210  const signal = abortController.signal;
211  abortController.abort();
212  return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject');
213}, 'a rejection from underlyingSink.abort() should be returned by pipeTo()');
214
215promise_test(t => {
216  const events = [];
217  const rs = new ReadableStream({
218    pull(controller) {
219      controller.error('failed to abort');
220    },
221    cancel() {
222      events.push('cancel');
223      return Promise.reject(error1);
224    }
225  }, hwm0);
226  const ws = new WritableStream({
227    abort() {
228      events.push('abort');
229      return Promise.reject(error2);
230    }
231  });
232  const abortController = new AbortController();
233  const signal = abortController.signal;
234  abortController.abort();
235  return promise_rejects_exactly(t, error2, rs.pipeTo(ws, { signal }), 'pipeTo should reject')
236      .then(() => assert_array_equals(events, ['abort', 'cancel'], 'abort() should be called before cancel()'));
237}, 'a rejection from underlyingSink.abort() should be preferred to one from underlyingSource.cancel()');
238
239promise_test(t => {
240  const rs = new ReadableStream({
241    start(controller) {
242      controller.close();
243    }
244  });
245  const ws = new WritableStream();
246  const abortController = new AbortController();
247  const signal = abortController.signal;
248  abortController.abort();
249  return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
250}, 'abort signal takes priority over closed readable');
251
252promise_test(t => {
253  const rs = new ReadableStream({
254    start(controller) {
255      controller.error(error1);
256    }
257  });
258  const ws = new WritableStream();
259  const abortController = new AbortController();
260  const signal = abortController.signal;
261  abortController.abort();
262  return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
263}, 'abort signal takes priority over errored readable');
264
265promise_test(t => {
266  const rs = new ReadableStream({
267    pull(controller) {
268      controller.error('failed to abort');
269    }
270  }, hwm0);
271  const ws = new WritableStream();
272  const abortController = new AbortController();
273  const signal = abortController.signal;
274  abortController.abort();
275  const writer = ws.getWriter();
276  return writer.close().then(() => {
277    writer.releaseLock();
278    return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
279  });
280}, 'abort signal takes priority over closed writable');
281
282promise_test(t => {
283  const rs = new ReadableStream({
284    pull(controller) {
285      controller.error('failed to abort');
286    }
287  }, hwm0);
288  const ws = new WritableStream({
289    start(controller) {
290      controller.error(error1);
291    }
292  });
293  const abortController = new AbortController();
294  const signal = abortController.signal;
295  abortController.abort();
296  return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject');
297}, 'abort signal takes priority over errored writable');
298
299promise_test(() => {
300  let readController;
301  const rs = new ReadableStream({
302    start(c) {
303      readController = c;
304    }
305  });
306  const ws = new WritableStream();
307  const abortController = new AbortController();
308  const signal = abortController.signal;
309  const pipeToPromise = rs.pipeTo(ws, { signal, preventClose: true });
310  readController.close();
311  return Promise.resolve().then(() => {
312    abortController.abort();
313    return pipeToPromise;
314  }).then(() => ws.getWriter().write('this should succeed'));
315}, 'abort should do nothing after the readable is closed');
316
317promise_test(t => {
318  let readController;
319  const rs = new ReadableStream({
320    start(c) {
321      readController = c;
322    }
323  });
324  const ws = new WritableStream();
325  const abortController = new AbortController();
326  const signal = abortController.signal;
327  const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true });
328  readController.error(error1);
329  return Promise.resolve().then(() => {
330    abortController.abort();
331    return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
332  }).then(() => ws.getWriter().write('this should succeed'));
333}, 'abort should do nothing after the readable is errored');
334
335promise_test(t => {
336  let readController;
337  const rs = new ReadableStream({
338    start(c) {
339      readController = c;
340    }
341  });
342  let resolveWrite;
343  const writePromise = new Promise(resolve => {
344    resolveWrite = resolve;
345  });
346  const ws = new WritableStream({
347    write() {
348      readController.error(error1);
349      return writePromise;
350    }
351  });
352  const abortController = new AbortController();
353  const signal = abortController.signal;
354  const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true });
355  readController.enqueue('a');
356  return delay(0).then(() => {
357    abortController.abort();
358    resolveWrite();
359    return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
360  }).then(() => ws.getWriter().write('this should succeed'));
361}, 'abort should do nothing after the readable is errored, even with pending writes');
362
363promise_test(t => {
364  const rs = recordingReadableStream({
365    pull(controller) {
366      return delay(0).then(() => controller.close());
367    }
368  });
369  let writeController;
370  const ws = new WritableStream({
371    start(c) {
372      writeController = c;
373    }
374  });
375  const abortController = new AbortController();
376  const signal = abortController.signal;
377  const pipeToPromise = rs.pipeTo(ws, { signal, preventCancel: true });
378  return Promise.resolve().then(() => {
379    writeController.error(error1);
380    return Promise.resolve();
381  }).then(() => {
382    abortController.abort();
383    return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject');
384  }).then(() => {
385    assert_array_equals(rs.events, ['pull'], 'cancel should not have been called');
386  });
387}, 'abort should do nothing after the writable is errored');
388
389promise_test(async t => {
390  const rs = new ReadableStream({
391    pull(c) {
392      c.enqueue(new Uint8Array([]));
393    },
394    type: "bytes",
395  });
396  const ws = new WritableStream();
397  const [first, second] = rs.tee();
398
399  let aborted = false;
400  first.pipeTo(ws, { signal: AbortSignal.abort() }).catch(() => {
401    aborted = true;
402  });
403  await delay(0);
404  assert_true(!aborted, "pipeTo should not resolve yet");
405  await second.cancel();
406  await delay(0);
407  assert_true(aborted, "pipeTo should be aborted now");
408}, "pipeTo on a teed readable byte stream should only be aborted when both branches are aborted");
409