1// META: global=window,worker
2// META: script=../resources/rs-utils.js
3// META: script=../resources/test-utils.js
4// META: script=../resources/recording-streams.js
5'use strict';
6
7const error1 = new Error('error1');
8
9function assert_iter_result(iterResult, value, done, message) {
10  const prefix = message === undefined ? '' : `${message} `;
11  assert_equals(typeof iterResult, 'object', `${prefix}type is object`);
12  assert_equals(Object.getPrototypeOf(iterResult), Object.prototype, `${prefix}[[Prototype]]`);
13  assert_array_equals(Object.getOwnPropertyNames(iterResult).sort(), ['done', 'value'], `${prefix}property names`);
14  assert_equals(iterResult.value, value, `${prefix}value`);
15  assert_equals(iterResult.done, done, `${prefix}done`);
16}
17
18test(() => {
19  const s = new ReadableStream();
20  const it = s.values();
21  const proto = Object.getPrototypeOf(it);
22
23  const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype);
24  assert_equals(Object.getPrototypeOf(proto), AsyncIteratorPrototype, 'prototype should extend AsyncIteratorPrototype');
25
26  const methods = ['next', 'return'].sort();
27  assert_array_equals(Object.getOwnPropertyNames(proto).sort(), methods, 'should have all the correct methods');
28
29  for (const m of methods) {
30    const propDesc = Object.getOwnPropertyDescriptor(proto, m);
31    assert_true(propDesc.enumerable, 'method should be enumerable');
32    assert_true(propDesc.configurable, 'method should be configurable');
33    assert_true(propDesc.writable, 'method should be writable');
34    assert_equals(typeof it[m], 'function', 'method should be a function');
35    assert_equals(it[m].name, m, 'method should have the correct name');
36  }
37
38  assert_equals(it.next.length, 0, 'next should have no parameters');
39  assert_equals(it.return.length, 1, 'return should have 1 parameter');
40  assert_equals(typeof it.throw, 'undefined', 'throw should not exist');
41}, 'Async iterator instances should have the correct list of properties');
42
43promise_test(async () => {
44  const s = new ReadableStream({
45    start(c) {
46      c.enqueue(1);
47      c.enqueue(2);
48      c.enqueue(3);
49      c.close();
50    }
51  });
52
53  const chunks = [];
54  for await (const chunk of s) {
55    chunks.push(chunk);
56  }
57  assert_array_equals(chunks, [1, 2, 3]);
58}, 'Async-iterating a push source');
59
60promise_test(async () => {
61  let i = 1;
62  const s = new ReadableStream({
63    pull(c) {
64      c.enqueue(i);
65      if (i >= 3) {
66        c.close();
67      }
68      i += 1;
69    }
70  });
71
72  const chunks = [];
73  for await (const chunk of s) {
74    chunks.push(chunk);
75  }
76  assert_array_equals(chunks, [1, 2, 3]);
77}, 'Async-iterating a pull source');
78
79promise_test(async () => {
80  const s = new ReadableStream({
81    start(c) {
82      c.enqueue(undefined);
83      c.enqueue(undefined);
84      c.enqueue(undefined);
85      c.close();
86    }
87  });
88
89  const chunks = [];
90  for await (const chunk of s) {
91    chunks.push(chunk);
92  }
93  assert_array_equals(chunks, [undefined, undefined, undefined]);
94}, 'Async-iterating a push source with undefined values');
95
96promise_test(async () => {
97  let i = 1;
98  const s = new ReadableStream({
99    pull(c) {
100      c.enqueue(undefined);
101      if (i >= 3) {
102        c.close();
103      }
104      i += 1;
105    }
106  });
107
108  const chunks = [];
109  for await (const chunk of s) {
110    chunks.push(chunk);
111  }
112  assert_array_equals(chunks, [undefined, undefined, undefined]);
113}, 'Async-iterating a pull source with undefined values');
114
115promise_test(async () => {
116  let i = 1;
117  const s = recordingReadableStream({
118    pull(c) {
119      c.enqueue(i);
120      if (i >= 3) {
121        c.close();
122      }
123      i += 1;
124    },
125  }, new CountQueuingStrategy({ highWaterMark: 0 }));
126
127  const it = s.values();
128  assert_array_equals(s.events, []);
129
130  const read1 = await it.next();
131  assert_iter_result(read1, 1, false);
132  assert_array_equals(s.events, ['pull']);
133
134  const read2 = await it.next();
135  assert_iter_result(read2, 2, false);
136  assert_array_equals(s.events, ['pull', 'pull']);
137
138  const read3 = await it.next();
139  assert_iter_result(read3, 3, false);
140  assert_array_equals(s.events, ['pull', 'pull', 'pull']);
141
142  const read4 = await it.next();
143  assert_iter_result(read4, undefined, true);
144  assert_array_equals(s.events, ['pull', 'pull', 'pull']);
145}, 'Async-iterating a pull source manually');
146
147promise_test(async () => {
148  const s = new ReadableStream({
149    start(c) {
150      c.error('e');
151    },
152  });
153
154  try {
155    for await (const chunk of s) {}
156    assert_unreached();
157  } catch (e) {
158    assert_equals(e, 'e');
159  }
160}, 'Async-iterating an errored stream throws');
161
162promise_test(async () => {
163  const s = new ReadableStream({
164    start(c) {
165      c.close();
166    }
167  });
168
169  for await (const chunk of s) {
170    assert_unreached();
171  }
172}, 'Async-iterating a closed stream never executes the loop body, but works fine');
173
174promise_test(async () => {
175  const s = new ReadableStream();
176
177  const loop = async () => {
178    for await (const chunk of s) {
179      assert_unreached();
180    }
181    assert_unreached();
182  };
183
184  await Promise.race([
185    loop(),
186    flushAsyncEvents()
187  ]);
188}, 'Async-iterating an empty but not closed/errored stream never executes the loop body and stalls the async function');
189
190promise_test(async () => {
191  const s = new ReadableStream({
192    start(c) {
193      c.enqueue(1);
194      c.enqueue(2);
195      c.enqueue(3);
196      c.close();
197    },
198  });
199
200  const reader = s.getReader();
201  const readResult = await reader.read();
202  assert_iter_result(readResult, 1, false);
203  reader.releaseLock();
204
205  const chunks = [];
206  for await (const chunk of s) {
207    chunks.push(chunk);
208  }
209  assert_array_equals(chunks, [2, 3]);
210}, 'Async-iterating a partially consumed stream');
211
212for (const type of ['throw', 'break', 'return']) {
213  for (const preventCancel of [false, true]) {
214    promise_test(async () => {
215      const s = recordingReadableStream({
216        start(c) {
217          c.enqueue(0);
218        }
219      });
220
221      // use a separate function for the loop body so return does not stop the test
222      const loop = async () => {
223        for await (const c of s.values({ preventCancel })) {
224          if (type === 'throw') {
225            throw new Error();
226          } else if (type === 'break') {
227            break;
228          } else if (type === 'return') {
229            return;
230          }
231        }
232      };
233
234      try {
235        await loop();
236      } catch (e) {}
237
238      if (preventCancel) {
239        assert_array_equals(s.events, ['pull'], `cancel() should not be called`);
240      } else {
241        assert_array_equals(s.events, ['pull', 'cancel', undefined], `cancel() should be called`);
242      }
243    }, `Cancellation behavior when ${type}ing inside loop body; preventCancel = ${preventCancel}`);
244  }
245}
246
247for (const preventCancel of [false, true]) {
248  promise_test(async () => {
249    const s = recordingReadableStream({
250      start(c) {
251        c.enqueue(0);
252      }
253    });
254
255    const it = s.values({ preventCancel });
256    await it.return();
257
258    if (preventCancel) {
259      assert_array_equals(s.events, [], `cancel() should not be called`);
260    } else {
261      assert_array_equals(s.events, ['cancel', undefined], `cancel() should be called`);
262    }
263  }, `Cancellation behavior when manually calling return(); preventCancel = ${preventCancel}`);
264}
265
266promise_test(async t => {
267  let timesPulled = 0;
268  const s = new ReadableStream({
269    pull(c) {
270      if (timesPulled === 0) {
271        c.enqueue(0);
272        ++timesPulled;
273      } else {
274        c.error(error1);
275      }
276    }
277  });
278
279  const it = s[Symbol.asyncIterator]();
280
281  const iterResult1 = await it.next();
282  assert_iter_result(iterResult1, 0, false, '1st next()');
283
284  await promise_rejects_exactly(t, error1, it.next(), '2nd next()');
285}, 'next() rejects if the stream errors');
286
287promise_test(async () => {
288  let timesPulled = 0;
289  const s = new ReadableStream({
290    pull(c) {
291      if (timesPulled === 0) {
292        c.enqueue(0);
293        ++timesPulled;
294      } else {
295        c.error(error1);
296      }
297    }
298  });
299
300  const it = s[Symbol.asyncIterator]();
301
302  const iterResult = await it.return('return value');
303  assert_iter_result(iterResult, 'return value', true);
304}, 'return() does not rejects if the stream has not errored yet');
305
306promise_test(async t => {
307  let timesPulled = 0;
308  const s = new ReadableStream({
309    pull(c) {
310      // Do not error in start() because doing so would prevent acquiring a reader/async iterator.
311      c.error(error1);
312    }
313  });
314
315  const it = s[Symbol.asyncIterator]();
316
317  await flushAsyncEvents();
318  await promise_rejects_exactly(t, error1, it.return('return value'));
319}, 'return() rejects if the stream has errored');
320
321promise_test(async t => {
322  let timesPulled = 0;
323  const s = new ReadableStream({
324    pull(c) {
325      if (timesPulled === 0) {
326        c.enqueue(0);
327        ++timesPulled;
328      } else {
329        c.error(error1);
330      }
331    }
332  });
333
334  const it = s[Symbol.asyncIterator]();
335
336  const iterResult1 = await it.next();
337  assert_iter_result(iterResult1, 0, false, '1st next()');
338
339  await promise_rejects_exactly(t, error1, it.next(), '2nd next()');
340
341  const iterResult3 = await it.next();
342  assert_iter_result(iterResult3, undefined, true, '3rd next()');
343}, 'next() that succeeds; next() that reports an error; next()');
344
345promise_test(async () => {
346  let timesPulled = 0;
347  const s = new ReadableStream({
348    pull(c) {
349      if (timesPulled === 0) {
350        c.enqueue(0);
351        ++timesPulled;
352      } else {
353        c.error(error1);
354      }
355    }
356  });
357
358  const it = s[Symbol.asyncIterator]();
359
360  const iterResults = await Promise.allSettled([it.next(), it.next(), it.next()]);
361
362  assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status');
363  assert_iter_result(iterResults[0].value, 0, false, '1st next()');
364
365  assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status');
366  assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason');
367
368  assert_equals(iterResults[2].status, 'fulfilled', '3rd next() promise status');
369  assert_iter_result(iterResults[2].value, undefined, true, '3rd next()');
370}, 'next() that succeeds; next() that reports an error(); next() [no awaiting]');
371
372promise_test(async t => {
373  let timesPulled = 0;
374  const s = new ReadableStream({
375    pull(c) {
376      if (timesPulled === 0) {
377        c.enqueue(0);
378        ++timesPulled;
379      } else {
380        c.error(error1);
381      }
382    }
383  });
384
385  const it = s[Symbol.asyncIterator]();
386
387  const iterResult1 = await it.next();
388  assert_iter_result(iterResult1, 0, false, '1st next()');
389
390  await promise_rejects_exactly(t, error1, it.next(), '2nd next()');
391
392  const iterResult3 = await it.return('return value');
393  assert_iter_result(iterResult3, 'return value', true, 'return()');
394}, 'next() that succeeds; next() that reports an error(); return()');
395
396promise_test(async () => {
397  let timesPulled = 0;
398  const s = new ReadableStream({
399    pull(c) {
400      if (timesPulled === 0) {
401        c.enqueue(0);
402        ++timesPulled;
403      } else {
404        c.error(error1);
405      }
406    }
407  });
408
409  const it = s[Symbol.asyncIterator]();
410
411  const iterResults = await Promise.allSettled([it.next(), it.next(), it.return('return value')]);
412
413  assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status');
414  assert_iter_result(iterResults[0].value, 0, false, '1st next()');
415
416  assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status');
417  assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason');
418
419  assert_equals(iterResults[2].status, 'fulfilled', 'return() promise status');
420  assert_iter_result(iterResults[2].value, 'return value', true, 'return()');
421}, 'next() that succeeds; next() that reports an error(); return() [no awaiting]');
422
423promise_test(async () => {
424  let timesPulled = 0;
425  const s = new ReadableStream({
426    pull(c) {
427      c.enqueue(timesPulled);
428      ++timesPulled;
429    }
430  });
431  const it = s[Symbol.asyncIterator]();
432
433  const iterResult1 = await it.next();
434  assert_iter_result(iterResult1, 0, false, 'next()');
435
436  const iterResult2 = await it.return('return value');
437  assert_iter_result(iterResult2, 'return value', true, 'return()');
438
439  assert_equals(timesPulled, 2);
440}, 'next() that succeeds; return()');
441
442promise_test(async () => {
443  let timesPulled = 0;
444  const s = new ReadableStream({
445    pull(c) {
446      c.enqueue(timesPulled);
447      ++timesPulled;
448    }
449  });
450  const it = s[Symbol.asyncIterator]();
451
452  const iterResults = await Promise.allSettled([it.next(), it.return('return value')]);
453
454  assert_equals(iterResults[0].status, 'fulfilled', 'next() promise status');
455  assert_iter_result(iterResults[0].value, 0, false, 'next()');
456
457  assert_equals(iterResults[1].status, 'fulfilled', 'return() promise status');
458  assert_iter_result(iterResults[1].value, 'return value', true, 'return()');
459
460  assert_equals(timesPulled, 2);
461}, 'next() that succeeds; return() [no awaiting]');
462
463promise_test(async () => {
464  const rs = new ReadableStream();
465  const it = rs.values();
466
467  const iterResult1 = await it.return('return value');
468  assert_iter_result(iterResult1, 'return value', true, 'return()');
469
470  const iterResult2 = await it.next();
471  assert_iter_result(iterResult2, undefined, true, 'next()');
472}, 'return(); next()');
473
474promise_test(async () => {
475  const rs = new ReadableStream();
476  const it = rs.values();
477
478  const iterResults = await Promise.allSettled([it.return('return value'), it.next()]);
479
480  assert_equals(iterResults[0].status, 'fulfilled', 'return() promise status');
481  assert_iter_result(iterResults[0].value, 'return value', true, 'return()');
482
483  assert_equals(iterResults[1].status, 'fulfilled', 'next() promise status');
484  assert_iter_result(iterResults[1].value, undefined, true, 'next()');
485}, 'return(); next() [no awaiting]');
486
487promise_test(async () => {
488  const rs = new ReadableStream();
489  const it = rs.values();
490
491  const iterResult1 = await it.return('return value 1');
492  assert_iter_result(iterResult1, 'return value 1', true, '1st return()');
493
494  const iterResult2 = await it.return('return value 2');
495  assert_iter_result(iterResult2, 'return value 2', true, '1st return()');
496}, 'return(); return()');
497
498promise_test(async () => {
499  const rs = new ReadableStream();
500  const it = rs.values();
501
502  const iterResults = await Promise.allSettled([it.return('return value 1'), it.return('return value 2')]);
503
504  assert_equals(iterResults[0].status, 'fulfilled', '1st return() promise status');
505  assert_iter_result(iterResults[0].value, 'return value 1', true, '1st return()');
506
507  assert_equals(iterResults[1].status, 'fulfilled', '2nd return() promise status');
508  assert_iter_result(iterResults[1].value, 'return value 2', true, '1st return()');
509}, 'return(); return() [no awaiting]');
510
511test(() => {
512  const s = new ReadableStream({
513    start(c) {
514      c.enqueue(0);
515      c.close();
516    },
517  });
518  s.values();
519  assert_throws_js(TypeError, () => s.values(), 'values() should throw');
520}, 'values() throws if there\'s already a lock');
521
522promise_test(async () => {
523  const s = new ReadableStream({
524    start(c) {
525      c.enqueue(1);
526      c.enqueue(2);
527      c.enqueue(3);
528      c.close();
529    }
530  });
531
532  const chunks = [];
533  for await (const chunk of s) {
534    chunks.push(chunk);
535  }
536  assert_array_equals(chunks, [1, 2, 3]);
537
538  const reader = s.getReader();
539  await reader.closed;
540}, 'Acquiring a reader after exhaustively async-iterating a stream');
541
542promise_test(async t => {
543  let timesPulled = 0;
544  const s = new ReadableStream({
545    pull(c) {
546      if (timesPulled === 0) {
547        c.enqueue(0);
548        ++timesPulled;
549      } else {
550        c.error(error1);
551      }
552    }
553  });
554
555  const it = s[Symbol.asyncIterator]({ preventCancel: true });
556
557  const iterResult1 = await it.next();
558  assert_iter_result(iterResult1, 0, false, '1st next()');
559
560  await promise_rejects_exactly(t, error1, it.next(), '2nd next()');
561
562  const iterResult2 = await it.return('return value');
563  assert_iter_result(iterResult2, 'return value', true, 'return()');
564
565  // i.e. it should not reject with a generic "this stream is locked" TypeError.
566  const reader = s.getReader();
567  await promise_rejects_exactly(t, error1, reader.closed, 'closed on the new reader should reject with the error');
568}, 'Acquiring a reader after return()ing from a stream that errors');
569
570promise_test(async () => {
571  const s = new ReadableStream({
572    start(c) {
573      c.enqueue(1);
574      c.enqueue(2);
575      c.enqueue(3);
576      c.close();
577    },
578  });
579
580  // read the first two chunks, then cancel
581  const chunks = [];
582  for await (const chunk of s) {
583    chunks.push(chunk);
584    if (chunk >= 2) {
585      break;
586    }
587  }
588  assert_array_equals(chunks, [1, 2]);
589
590  const reader = s.getReader();
591  await reader.closed;
592}, 'Acquiring a reader after partially async-iterating a stream');
593
594promise_test(async () => {
595  const s = new ReadableStream({
596    start(c) {
597      c.enqueue(1);
598      c.enqueue(2);
599      c.enqueue(3);
600      c.close();
601    },
602  });
603
604  // read the first two chunks, then release lock
605  const chunks = [];
606  for await (const chunk of s.values({preventCancel: true})) {
607    chunks.push(chunk);
608    if (chunk >= 2) {
609      break;
610    }
611  }
612  assert_array_equals(chunks, [1, 2]);
613
614  const reader = s.getReader();
615  const readResult = await reader.read();
616  assert_iter_result(readResult, 3, false);
617  await reader.closed;
618}, 'Acquiring a reader and reading the remaining chunks after partially async-iterating a stream with preventCancel = true');
619
620for (const preventCancel of [false, true]) {
621  test(() => {
622    const rs = new ReadableStream();
623    rs.values({ preventCancel }).return();
624    // The test passes if this line doesn't throw.
625    rs.getReader();
626  }, `return() should unlock the stream synchronously when preventCancel = ${preventCancel}`);
627}
628
629promise_test(async () => {
630  const rs = new ReadableStream({
631    async start(c) {
632      c.enqueue('a');
633      c.enqueue('b');
634      c.enqueue('c');
635      await flushAsyncEvents();
636      // At this point, the async iterator has a read request in the stream's queue for its pending next() promise.
637      // Closing the stream now causes two things to happen *synchronously*:
638      //  1. ReadableStreamClose resolves reader.[[closedPromise]] with undefined.
639      //  2. ReadableStreamClose calls the read request's close steps, which calls ReadableStreamReaderGenericRelease,
640      //     which replaces reader.[[closedPromise]] with a rejected promise.
641      c.close();
642    }
643  });
644
645  const chunks = [];
646  for await (const chunk of rs) {
647    chunks.push(chunk);
648  }
649  assert_array_equals(chunks, ['a', 'b', 'c']);
650}, 'close() while next() is pending');
651