1// META: global=window,worker 2// META: script=../resources/rs-utils.js 3// META: script=../resources/test-utils.js 4// META: script=../resources/recording-streams.js 5'use strict'; 6 7const error1 = new Error('error1'); 8 9function assert_iter_result(iterResult, value, done, message) { 10 const prefix = message === undefined ? '' : `${message} `; 11 assert_equals(typeof iterResult, 'object', `${prefix}type is object`); 12 assert_equals(Object.getPrototypeOf(iterResult), Object.prototype, `${prefix}[[Prototype]]`); 13 assert_array_equals(Object.getOwnPropertyNames(iterResult).sort(), ['done', 'value'], `${prefix}property names`); 14 assert_equals(iterResult.value, value, `${prefix}value`); 15 assert_equals(iterResult.done, done, `${prefix}done`); 16} 17 18test(() => { 19 const s = new ReadableStream(); 20 const it = s.values(); 21 const proto = Object.getPrototypeOf(it); 22 23 const AsyncIteratorPrototype = Object.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype); 24 assert_equals(Object.getPrototypeOf(proto), AsyncIteratorPrototype, 'prototype should extend AsyncIteratorPrototype'); 25 26 const methods = ['next', 'return'].sort(); 27 assert_array_equals(Object.getOwnPropertyNames(proto).sort(), methods, 'should have all the correct methods'); 28 29 for (const m of methods) { 30 const propDesc = Object.getOwnPropertyDescriptor(proto, m); 31 assert_true(propDesc.enumerable, 'method should be enumerable'); 32 assert_true(propDesc.configurable, 'method should be configurable'); 33 assert_true(propDesc.writable, 'method should be writable'); 34 assert_equals(typeof it[m], 'function', 'method should be a function'); 35 assert_equals(it[m].name, m, 'method should have the correct name'); 36 } 37 38 assert_equals(it.next.length, 0, 'next should have no parameters'); 39 assert_equals(it.return.length, 1, 'return should have 1 parameter'); 40 assert_equals(typeof it.throw, 'undefined', 'throw should not exist'); 41}, 'Async iterator instances should have the correct list of properties'); 42 43promise_test(async () => { 44 const s = new ReadableStream({ 45 start(c) { 46 c.enqueue(1); 47 c.enqueue(2); 48 c.enqueue(3); 49 c.close(); 50 } 51 }); 52 53 const chunks = []; 54 for await (const chunk of s) { 55 chunks.push(chunk); 56 } 57 assert_array_equals(chunks, [1, 2, 3]); 58}, 'Async-iterating a push source'); 59 60promise_test(async () => { 61 let i = 1; 62 const s = new ReadableStream({ 63 pull(c) { 64 c.enqueue(i); 65 if (i >= 3) { 66 c.close(); 67 } 68 i += 1; 69 } 70 }); 71 72 const chunks = []; 73 for await (const chunk of s) { 74 chunks.push(chunk); 75 } 76 assert_array_equals(chunks, [1, 2, 3]); 77}, 'Async-iterating a pull source'); 78 79promise_test(async () => { 80 const s = new ReadableStream({ 81 start(c) { 82 c.enqueue(undefined); 83 c.enqueue(undefined); 84 c.enqueue(undefined); 85 c.close(); 86 } 87 }); 88 89 const chunks = []; 90 for await (const chunk of s) { 91 chunks.push(chunk); 92 } 93 assert_array_equals(chunks, [undefined, undefined, undefined]); 94}, 'Async-iterating a push source with undefined values'); 95 96promise_test(async () => { 97 let i = 1; 98 const s = new ReadableStream({ 99 pull(c) { 100 c.enqueue(undefined); 101 if (i >= 3) { 102 c.close(); 103 } 104 i += 1; 105 } 106 }); 107 108 const chunks = []; 109 for await (const chunk of s) { 110 chunks.push(chunk); 111 } 112 assert_array_equals(chunks, [undefined, undefined, undefined]); 113}, 'Async-iterating a pull source with undefined values'); 114 115promise_test(async () => { 116 let i = 1; 117 const s = recordingReadableStream({ 118 pull(c) { 119 c.enqueue(i); 120 if (i >= 3) { 121 c.close(); 122 } 123 i += 1; 124 }, 125 }, new CountQueuingStrategy({ highWaterMark: 0 })); 126 127 const it = s.values(); 128 assert_array_equals(s.events, []); 129 130 const read1 = await it.next(); 131 assert_iter_result(read1, 1, false); 132 assert_array_equals(s.events, ['pull']); 133 134 const read2 = await it.next(); 135 assert_iter_result(read2, 2, false); 136 assert_array_equals(s.events, ['pull', 'pull']); 137 138 const read3 = await it.next(); 139 assert_iter_result(read3, 3, false); 140 assert_array_equals(s.events, ['pull', 'pull', 'pull']); 141 142 const read4 = await it.next(); 143 assert_iter_result(read4, undefined, true); 144 assert_array_equals(s.events, ['pull', 'pull', 'pull']); 145}, 'Async-iterating a pull source manually'); 146 147promise_test(async () => { 148 const s = new ReadableStream({ 149 start(c) { 150 c.error('e'); 151 }, 152 }); 153 154 try { 155 for await (const chunk of s) {} 156 assert_unreached(); 157 } catch (e) { 158 assert_equals(e, 'e'); 159 } 160}, 'Async-iterating an errored stream throws'); 161 162promise_test(async () => { 163 const s = new ReadableStream({ 164 start(c) { 165 c.close(); 166 } 167 }); 168 169 for await (const chunk of s) { 170 assert_unreached(); 171 } 172}, 'Async-iterating a closed stream never executes the loop body, but works fine'); 173 174promise_test(async () => { 175 const s = new ReadableStream(); 176 177 const loop = async () => { 178 for await (const chunk of s) { 179 assert_unreached(); 180 } 181 assert_unreached(); 182 }; 183 184 await Promise.race([ 185 loop(), 186 flushAsyncEvents() 187 ]); 188}, 'Async-iterating an empty but not closed/errored stream never executes the loop body and stalls the async function'); 189 190promise_test(async () => { 191 const s = new ReadableStream({ 192 start(c) { 193 c.enqueue(1); 194 c.enqueue(2); 195 c.enqueue(3); 196 c.close(); 197 }, 198 }); 199 200 const reader = s.getReader(); 201 const readResult = await reader.read(); 202 assert_iter_result(readResult, 1, false); 203 reader.releaseLock(); 204 205 const chunks = []; 206 for await (const chunk of s) { 207 chunks.push(chunk); 208 } 209 assert_array_equals(chunks, [2, 3]); 210}, 'Async-iterating a partially consumed stream'); 211 212for (const type of ['throw', 'break', 'return']) { 213 for (const preventCancel of [false, true]) { 214 promise_test(async () => { 215 const s = recordingReadableStream({ 216 start(c) { 217 c.enqueue(0); 218 } 219 }); 220 221 // use a separate function for the loop body so return does not stop the test 222 const loop = async () => { 223 for await (const c of s.values({ preventCancel })) { 224 if (type === 'throw') { 225 throw new Error(); 226 } else if (type === 'break') { 227 break; 228 } else if (type === 'return') { 229 return; 230 } 231 } 232 }; 233 234 try { 235 await loop(); 236 } catch (e) {} 237 238 if (preventCancel) { 239 assert_array_equals(s.events, ['pull'], `cancel() should not be called`); 240 } else { 241 assert_array_equals(s.events, ['pull', 'cancel', undefined], `cancel() should be called`); 242 } 243 }, `Cancellation behavior when ${type}ing inside loop body; preventCancel = ${preventCancel}`); 244 } 245} 246 247for (const preventCancel of [false, true]) { 248 promise_test(async () => { 249 const s = recordingReadableStream({ 250 start(c) { 251 c.enqueue(0); 252 } 253 }); 254 255 const it = s.values({ preventCancel }); 256 await it.return(); 257 258 if (preventCancel) { 259 assert_array_equals(s.events, [], `cancel() should not be called`); 260 } else { 261 assert_array_equals(s.events, ['cancel', undefined], `cancel() should be called`); 262 } 263 }, `Cancellation behavior when manually calling return(); preventCancel = ${preventCancel}`); 264} 265 266promise_test(async t => { 267 let timesPulled = 0; 268 const s = new ReadableStream({ 269 pull(c) { 270 if (timesPulled === 0) { 271 c.enqueue(0); 272 ++timesPulled; 273 } else { 274 c.error(error1); 275 } 276 } 277 }); 278 279 const it = s[Symbol.asyncIterator](); 280 281 const iterResult1 = await it.next(); 282 assert_iter_result(iterResult1, 0, false, '1st next()'); 283 284 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 285}, 'next() rejects if the stream errors'); 286 287promise_test(async () => { 288 let timesPulled = 0; 289 const s = new ReadableStream({ 290 pull(c) { 291 if (timesPulled === 0) { 292 c.enqueue(0); 293 ++timesPulled; 294 } else { 295 c.error(error1); 296 } 297 } 298 }); 299 300 const it = s[Symbol.asyncIterator](); 301 302 const iterResult = await it.return('return value'); 303 assert_iter_result(iterResult, 'return value', true); 304}, 'return() does not rejects if the stream has not errored yet'); 305 306promise_test(async t => { 307 let timesPulled = 0; 308 const s = new ReadableStream({ 309 pull(c) { 310 // Do not error in start() because doing so would prevent acquiring a reader/async iterator. 311 c.error(error1); 312 } 313 }); 314 315 const it = s[Symbol.asyncIterator](); 316 317 await flushAsyncEvents(); 318 await promise_rejects_exactly(t, error1, it.return('return value')); 319}, 'return() rejects if the stream has errored'); 320 321promise_test(async t => { 322 let timesPulled = 0; 323 const s = new ReadableStream({ 324 pull(c) { 325 if (timesPulled === 0) { 326 c.enqueue(0); 327 ++timesPulled; 328 } else { 329 c.error(error1); 330 } 331 } 332 }); 333 334 const it = s[Symbol.asyncIterator](); 335 336 const iterResult1 = await it.next(); 337 assert_iter_result(iterResult1, 0, false, '1st next()'); 338 339 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 340 341 const iterResult3 = await it.next(); 342 assert_iter_result(iterResult3, undefined, true, '3rd next()'); 343}, 'next() that succeeds; next() that reports an error; next()'); 344 345promise_test(async () => { 346 let timesPulled = 0; 347 const s = new ReadableStream({ 348 pull(c) { 349 if (timesPulled === 0) { 350 c.enqueue(0); 351 ++timesPulled; 352 } else { 353 c.error(error1); 354 } 355 } 356 }); 357 358 const it = s[Symbol.asyncIterator](); 359 360 const iterResults = await Promise.allSettled([it.next(), it.next(), it.next()]); 361 362 assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status'); 363 assert_iter_result(iterResults[0].value, 0, false, '1st next()'); 364 365 assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status'); 366 assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason'); 367 368 assert_equals(iterResults[2].status, 'fulfilled', '3rd next() promise status'); 369 assert_iter_result(iterResults[2].value, undefined, true, '3rd next()'); 370}, 'next() that succeeds; next() that reports an error(); next() [no awaiting]'); 371 372promise_test(async t => { 373 let timesPulled = 0; 374 const s = new ReadableStream({ 375 pull(c) { 376 if (timesPulled === 0) { 377 c.enqueue(0); 378 ++timesPulled; 379 } else { 380 c.error(error1); 381 } 382 } 383 }); 384 385 const it = s[Symbol.asyncIterator](); 386 387 const iterResult1 = await it.next(); 388 assert_iter_result(iterResult1, 0, false, '1st next()'); 389 390 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 391 392 const iterResult3 = await it.return('return value'); 393 assert_iter_result(iterResult3, 'return value', true, 'return()'); 394}, 'next() that succeeds; next() that reports an error(); return()'); 395 396promise_test(async () => { 397 let timesPulled = 0; 398 const s = new ReadableStream({ 399 pull(c) { 400 if (timesPulled === 0) { 401 c.enqueue(0); 402 ++timesPulled; 403 } else { 404 c.error(error1); 405 } 406 } 407 }); 408 409 const it = s[Symbol.asyncIterator](); 410 411 const iterResults = await Promise.allSettled([it.next(), it.next(), it.return('return value')]); 412 413 assert_equals(iterResults[0].status, 'fulfilled', '1st next() promise status'); 414 assert_iter_result(iterResults[0].value, 0, false, '1st next()'); 415 416 assert_equals(iterResults[1].status, 'rejected', '2nd next() promise status'); 417 assert_equals(iterResults[1].reason, error1, '2nd next() rejection reason'); 418 419 assert_equals(iterResults[2].status, 'fulfilled', 'return() promise status'); 420 assert_iter_result(iterResults[2].value, 'return value', true, 'return()'); 421}, 'next() that succeeds; next() that reports an error(); return() [no awaiting]'); 422 423promise_test(async () => { 424 let timesPulled = 0; 425 const s = new ReadableStream({ 426 pull(c) { 427 c.enqueue(timesPulled); 428 ++timesPulled; 429 } 430 }); 431 const it = s[Symbol.asyncIterator](); 432 433 const iterResult1 = await it.next(); 434 assert_iter_result(iterResult1, 0, false, 'next()'); 435 436 const iterResult2 = await it.return('return value'); 437 assert_iter_result(iterResult2, 'return value', true, 'return()'); 438 439 assert_equals(timesPulled, 2); 440}, 'next() that succeeds; return()'); 441 442promise_test(async () => { 443 let timesPulled = 0; 444 const s = new ReadableStream({ 445 pull(c) { 446 c.enqueue(timesPulled); 447 ++timesPulled; 448 } 449 }); 450 const it = s[Symbol.asyncIterator](); 451 452 const iterResults = await Promise.allSettled([it.next(), it.return('return value')]); 453 454 assert_equals(iterResults[0].status, 'fulfilled', 'next() promise status'); 455 assert_iter_result(iterResults[0].value, 0, false, 'next()'); 456 457 assert_equals(iterResults[1].status, 'fulfilled', 'return() promise status'); 458 assert_iter_result(iterResults[1].value, 'return value', true, 'return()'); 459 460 assert_equals(timesPulled, 2); 461}, 'next() that succeeds; return() [no awaiting]'); 462 463promise_test(async () => { 464 const rs = new ReadableStream(); 465 const it = rs.values(); 466 467 const iterResult1 = await it.return('return value'); 468 assert_iter_result(iterResult1, 'return value', true, 'return()'); 469 470 const iterResult2 = await it.next(); 471 assert_iter_result(iterResult2, undefined, true, 'next()'); 472}, 'return(); next()'); 473 474promise_test(async () => { 475 const rs = new ReadableStream(); 476 const it = rs.values(); 477 478 const iterResults = await Promise.allSettled([it.return('return value'), it.next()]); 479 480 assert_equals(iterResults[0].status, 'fulfilled', 'return() promise status'); 481 assert_iter_result(iterResults[0].value, 'return value', true, 'return()'); 482 483 assert_equals(iterResults[1].status, 'fulfilled', 'next() promise status'); 484 assert_iter_result(iterResults[1].value, undefined, true, 'next()'); 485}, 'return(); next() [no awaiting]'); 486 487promise_test(async () => { 488 const rs = new ReadableStream(); 489 const it = rs.values(); 490 491 const iterResult1 = await it.return('return value 1'); 492 assert_iter_result(iterResult1, 'return value 1', true, '1st return()'); 493 494 const iterResult2 = await it.return('return value 2'); 495 assert_iter_result(iterResult2, 'return value 2', true, '1st return()'); 496}, 'return(); return()'); 497 498promise_test(async () => { 499 const rs = new ReadableStream(); 500 const it = rs.values(); 501 502 const iterResults = await Promise.allSettled([it.return('return value 1'), it.return('return value 2')]); 503 504 assert_equals(iterResults[0].status, 'fulfilled', '1st return() promise status'); 505 assert_iter_result(iterResults[0].value, 'return value 1', true, '1st return()'); 506 507 assert_equals(iterResults[1].status, 'fulfilled', '2nd return() promise status'); 508 assert_iter_result(iterResults[1].value, 'return value 2', true, '1st return()'); 509}, 'return(); return() [no awaiting]'); 510 511test(() => { 512 const s = new ReadableStream({ 513 start(c) { 514 c.enqueue(0); 515 c.close(); 516 }, 517 }); 518 s.values(); 519 assert_throws_js(TypeError, () => s.values(), 'values() should throw'); 520}, 'values() throws if there\'s already a lock'); 521 522promise_test(async () => { 523 const s = new ReadableStream({ 524 start(c) { 525 c.enqueue(1); 526 c.enqueue(2); 527 c.enqueue(3); 528 c.close(); 529 } 530 }); 531 532 const chunks = []; 533 for await (const chunk of s) { 534 chunks.push(chunk); 535 } 536 assert_array_equals(chunks, [1, 2, 3]); 537 538 const reader = s.getReader(); 539 await reader.closed; 540}, 'Acquiring a reader after exhaustively async-iterating a stream'); 541 542promise_test(async t => { 543 let timesPulled = 0; 544 const s = new ReadableStream({ 545 pull(c) { 546 if (timesPulled === 0) { 547 c.enqueue(0); 548 ++timesPulled; 549 } else { 550 c.error(error1); 551 } 552 } 553 }); 554 555 const it = s[Symbol.asyncIterator]({ preventCancel: true }); 556 557 const iterResult1 = await it.next(); 558 assert_iter_result(iterResult1, 0, false, '1st next()'); 559 560 await promise_rejects_exactly(t, error1, it.next(), '2nd next()'); 561 562 const iterResult2 = await it.return('return value'); 563 assert_iter_result(iterResult2, 'return value', true, 'return()'); 564 565 // i.e. it should not reject with a generic "this stream is locked" TypeError. 566 const reader = s.getReader(); 567 await promise_rejects_exactly(t, error1, reader.closed, 'closed on the new reader should reject with the error'); 568}, 'Acquiring a reader after return()ing from a stream that errors'); 569 570promise_test(async () => { 571 const s = new ReadableStream({ 572 start(c) { 573 c.enqueue(1); 574 c.enqueue(2); 575 c.enqueue(3); 576 c.close(); 577 }, 578 }); 579 580 // read the first two chunks, then cancel 581 const chunks = []; 582 for await (const chunk of s) { 583 chunks.push(chunk); 584 if (chunk >= 2) { 585 break; 586 } 587 } 588 assert_array_equals(chunks, [1, 2]); 589 590 const reader = s.getReader(); 591 await reader.closed; 592}, 'Acquiring a reader after partially async-iterating a stream'); 593 594promise_test(async () => { 595 const s = new ReadableStream({ 596 start(c) { 597 c.enqueue(1); 598 c.enqueue(2); 599 c.enqueue(3); 600 c.close(); 601 }, 602 }); 603 604 // read the first two chunks, then release lock 605 const chunks = []; 606 for await (const chunk of s.values({preventCancel: true})) { 607 chunks.push(chunk); 608 if (chunk >= 2) { 609 break; 610 } 611 } 612 assert_array_equals(chunks, [1, 2]); 613 614 const reader = s.getReader(); 615 const readResult = await reader.read(); 616 assert_iter_result(readResult, 3, false); 617 await reader.closed; 618}, 'Acquiring a reader and reading the remaining chunks after partially async-iterating a stream with preventCancel = true'); 619 620for (const preventCancel of [false, true]) { 621 test(() => { 622 const rs = new ReadableStream(); 623 rs.values({ preventCancel }).return(); 624 // The test passes if this line doesn't throw. 625 rs.getReader(); 626 }, `return() should unlock the stream synchronously when preventCancel = ${preventCancel}`); 627} 628 629promise_test(async () => { 630 const rs = new ReadableStream({ 631 async start(c) { 632 c.enqueue('a'); 633 c.enqueue('b'); 634 c.enqueue('c'); 635 await flushAsyncEvents(); 636 // At this point, the async iterator has a read request in the stream's queue for its pending next() promise. 637 // Closing the stream now causes two things to happen *synchronously*: 638 // 1. ReadableStreamClose resolves reader.[[closedPromise]] with undefined. 639 // 2. ReadableStreamClose calls the read request's close steps, which calls ReadableStreamReaderGenericRelease, 640 // which replaces reader.[[closedPromise]] with a rejected promise. 641 c.close(); 642 } 643 }); 644 645 const chunks = []; 646 for await (const chunk of rs) { 647 chunks.push(chunk); 648 } 649 assert_array_equals(chunks, ['a', 'b', 'c']); 650}, 'close() while next() is pending'); 651