1// META: global=window,worker 2// META: script=../resources/recording-streams.js 3// META: script=../resources/test-utils.js 4'use strict'; 5 6// Tests for the use of pipeTo with AbortSignal. 7// There is some extra complexity to avoid timeouts in environments where abort is not implemented. 8 9const error1 = new Error('error1'); 10error1.name = 'error1'; 11const error2 = new Error('error2'); 12error2.name = 'error2'; 13 14const errorOnPull = { 15 pull(controller) { 16 // This will cause the test to error if pipeTo abort is not implemented. 17 controller.error('failed to abort'); 18 } 19}; 20 21// To stop pull() being called immediately when the stream is created, we need to set highWaterMark to 0. 22const hwm0 = { highWaterMark: 0 }; 23 24for (const invalidSignal of [null, 'AbortSignal', true, -1, Object.create(AbortSignal.prototype)]) { 25 promise_test(t => { 26 const rs = recordingReadableStream(errorOnPull, hwm0); 27 const ws = recordingWritableStream(); 28 return promise_rejects_js(t, TypeError, rs.pipeTo(ws, { signal: invalidSignal }), 'pipeTo should reject') 29 .then(() => { 30 assert_equals(rs.events.length, 0, 'no ReadableStream methods should have been called'); 31 assert_equals(ws.events.length, 0, 'no WritableStream methods should have been called'); 32 }); 33 }, `a signal argument '${invalidSignal}' should cause pipeTo() to reject`); 34} 35 36promise_test(t => { 37 const rs = recordingReadableStream(errorOnPull, hwm0); 38 const ws = new WritableStream(); 39 const abortController = new AbortController(); 40 const signal = abortController.signal; 41 abortController.abort(); 42 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject') 43 .then(() => Promise.all([ 44 rs.getReader().closed, 45 promise_rejects_dom(t, 'AbortError', ws.getWriter().closed, 'writer.closed should reject') 46 ])) 47 .then(() => { 48 assert_equals(rs.events.length, 2, 'cancel should have been called'); 49 assert_equals(rs.events[0], 'cancel', 'first event should be cancel'); 50 assert_equals(rs.events[1].name, 'AbortError', 'the argument to cancel should be an AbortError'); 51 assert_equals(rs.events[1].constructor.name, 'DOMException', 52 'the argument to cancel should be a DOMException'); 53 }); 54}, 'an aborted signal should cause the writable stream to reject with an AbortError'); 55 56for (const reason of [null, undefined, error1]) { 57 promise_test(async t => { 58 const rs = recordingReadableStream(errorOnPull, hwm0); 59 const ws = new WritableStream(); 60 const abortController = new AbortController(); 61 const signal = abortController.signal; 62 abortController.abort(reason); 63 const pipeToPromise = rs.pipeTo(ws, { signal }); 64 if (reason !== undefined) { 65 await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason'); 66 } else { 67 await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError'); 68 } 69 const error = await pipeToPromise.catch(e => e); 70 await rs.getReader().closed; 71 await promise_rejects_exactly(t, error, ws.getWriter().closed, 'the writable should be errored with the same object'); 72 assert_equals(signal.reason, error, 'signal.reason should be error'), 73 assert_equals(rs.events.length, 2, 'cancel should have been called'); 74 assert_equals(rs.events[0], 'cancel', 'first event should be cancel'); 75 assert_equals(rs.events[1], error, 'the readable should be canceled with the same object'); 76 }, `(reason: '${reason}') all the error objects should be the same object`); 77} 78 79promise_test(t => { 80 const rs = recordingReadableStream(errorOnPull, hwm0); 81 const ws = new WritableStream(); 82 const abortController = new AbortController(); 83 const signal = abortController.signal; 84 abortController.abort(); 85 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true }), 'pipeTo should reject') 86 .then(() => assert_equals(rs.events.length, 0, 'cancel should not be called')); 87}, 'preventCancel should prevent canceling the readable'); 88 89promise_test(t => { 90 const rs = new ReadableStream(errorOnPull, hwm0); 91 const ws = recordingWritableStream(); 92 const abortController = new AbortController(); 93 const signal = abortController.signal; 94 abortController.abort(); 95 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventAbort: true }), 'pipeTo should reject') 96 .then(() => { 97 assert_equals(ws.events.length, 0, 'writable should not have been aborted'); 98 return ws.getWriter().ready; 99 }); 100}, 'preventAbort should prevent aborting the readable'); 101 102promise_test(t => { 103 const rs = recordingReadableStream(errorOnPull, hwm0); 104 const ws = recordingWritableStream(); 105 const abortController = new AbortController(); 106 const signal = abortController.signal; 107 abortController.abort(); 108 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal, preventCancel: true, preventAbort: true }), 109 'pipeTo should reject') 110 .then(() => { 111 assert_equals(rs.events.length, 0, 'cancel should not be called'); 112 assert_equals(ws.events.length, 0, 'writable should not have been aborted'); 113 return ws.getWriter().ready; 114 }); 115}, 'preventCancel and preventAbort should prevent canceling the readable and aborting the readable'); 116 117for (const reason of [null, undefined, error1]) { 118 promise_test(async t => { 119 const rs = new ReadableStream({ 120 start(controller) { 121 controller.enqueue('a'); 122 controller.enqueue('b'); 123 controller.close(); 124 } 125 }); 126 const abortController = new AbortController(); 127 const signal = abortController.signal; 128 const ws = recordingWritableStream({ 129 write() { 130 abortController.abort(reason); 131 } 132 }); 133 const pipeToPromise = rs.pipeTo(ws, { signal }); 134 if (reason !== undefined) { 135 await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason'); 136 } else { 137 await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError'); 138 } 139 const error = await pipeToPromise.catch(e => e); 140 assert_equals(signal.reason, error, 'signal.reason should be error'); 141 assert_equals(ws.events.length, 4, 'only chunk "a" should have been written'); 142 assert_array_equals(ws.events.slice(0, 3), ['write', 'a', 'abort'], 'events should match'); 143 assert_equals(ws.events[3], error, 'abort reason should be error'); 144 }, `(reason: '${reason}') abort should prevent further reads`); 145} 146 147for (const reason of [null, undefined, error1]) { 148 promise_test(async t => { 149 let readController; 150 const rs = new ReadableStream({ 151 start(c) { 152 readController = c; 153 c.enqueue('a'); 154 c.enqueue('b'); 155 } 156 }); 157 const abortController = new AbortController(); 158 const signal = abortController.signal; 159 let resolveWrite; 160 const writePromise = new Promise(resolve => { 161 resolveWrite = resolve; 162 }); 163 const ws = recordingWritableStream({ 164 write() { 165 return writePromise; 166 } 167 }, new CountQueuingStrategy({ highWaterMark: Infinity })); 168 const pipeToPromise = rs.pipeTo(ws, { signal }); 169 await delay(0); 170 await abortController.abort(reason); 171 await readController.close(); // Make sure the test terminates when signal is not implemented. 172 await resolveWrite(); 173 if (reason !== undefined) { 174 await promise_rejects_exactly(t, reason, pipeToPromise, 'pipeTo rejects with abort reason'); 175 } else { 176 await promise_rejects_dom(t, 'AbortError', pipeToPromise, 'pipeTo rejects with AbortError'); 177 } 178 const error = await pipeToPromise.catch(e => e); 179 assert_equals(signal.reason, error, 'signal.reason should be error'); 180 assert_equals(ws.events.length, 6, 'chunks "a" and "b" should have been written'); 181 assert_array_equals(ws.events.slice(0, 5), ['write', 'a', 'write', 'b', 'abort'], 'events should match'); 182 assert_equals(ws.events[5], error, 'abort reason should be error'); 183 }, `(reason: '${reason}') all pending writes should complete on abort`); 184} 185 186promise_test(t => { 187 const rs = new ReadableStream({ 188 pull(controller) { 189 controller.error('failed to abort'); 190 }, 191 cancel() { 192 return Promise.reject(error1); 193 } 194 }, hwm0); 195 const ws = new WritableStream(); 196 const abortController = new AbortController(); 197 const signal = abortController.signal; 198 abortController.abort(); 199 return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject'); 200}, 'a rejection from underlyingSource.cancel() should be returned by pipeTo()'); 201 202promise_test(t => { 203 const rs = new ReadableStream(errorOnPull, hwm0); 204 const ws = new WritableStream({ 205 abort() { 206 return Promise.reject(error1); 207 } 208 }); 209 const abortController = new AbortController(); 210 const signal = abortController.signal; 211 abortController.abort(); 212 return promise_rejects_exactly(t, error1, rs.pipeTo(ws, { signal }), 'pipeTo should reject'); 213}, 'a rejection from underlyingSink.abort() should be returned by pipeTo()'); 214 215promise_test(t => { 216 const events = []; 217 const rs = new ReadableStream({ 218 pull(controller) { 219 controller.error('failed to abort'); 220 }, 221 cancel() { 222 events.push('cancel'); 223 return Promise.reject(error1); 224 } 225 }, hwm0); 226 const ws = new WritableStream({ 227 abort() { 228 events.push('abort'); 229 return Promise.reject(error2); 230 } 231 }); 232 const abortController = new AbortController(); 233 const signal = abortController.signal; 234 abortController.abort(); 235 return promise_rejects_exactly(t, error2, rs.pipeTo(ws, { signal }), 'pipeTo should reject') 236 .then(() => assert_array_equals(events, ['abort', 'cancel'], 'abort() should be called before cancel()')); 237}, 'a rejection from underlyingSink.abort() should be preferred to one from underlyingSource.cancel()'); 238 239promise_test(t => { 240 const rs = new ReadableStream({ 241 start(controller) { 242 controller.close(); 243 } 244 }); 245 const ws = new WritableStream(); 246 const abortController = new AbortController(); 247 const signal = abortController.signal; 248 abortController.abort(); 249 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); 250}, 'abort signal takes priority over closed readable'); 251 252promise_test(t => { 253 const rs = new ReadableStream({ 254 start(controller) { 255 controller.error(error1); 256 } 257 }); 258 const ws = new WritableStream(); 259 const abortController = new AbortController(); 260 const signal = abortController.signal; 261 abortController.abort(); 262 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); 263}, 'abort signal takes priority over errored readable'); 264 265promise_test(t => { 266 const rs = new ReadableStream({ 267 pull(controller) { 268 controller.error('failed to abort'); 269 } 270 }, hwm0); 271 const ws = new WritableStream(); 272 const abortController = new AbortController(); 273 const signal = abortController.signal; 274 abortController.abort(); 275 const writer = ws.getWriter(); 276 return writer.close().then(() => { 277 writer.releaseLock(); 278 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); 279 }); 280}, 'abort signal takes priority over closed writable'); 281 282promise_test(t => { 283 const rs = new ReadableStream({ 284 pull(controller) { 285 controller.error('failed to abort'); 286 } 287 }, hwm0); 288 const ws = new WritableStream({ 289 start(controller) { 290 controller.error(error1); 291 } 292 }); 293 const abortController = new AbortController(); 294 const signal = abortController.signal; 295 abortController.abort(); 296 return promise_rejects_dom(t, 'AbortError', rs.pipeTo(ws, { signal }), 'pipeTo should reject'); 297}, 'abort signal takes priority over errored writable'); 298 299promise_test(() => { 300 let readController; 301 const rs = new ReadableStream({ 302 start(c) { 303 readController = c; 304 } 305 }); 306 const ws = new WritableStream(); 307 const abortController = new AbortController(); 308 const signal = abortController.signal; 309 const pipeToPromise = rs.pipeTo(ws, { signal, preventClose: true }); 310 readController.close(); 311 return Promise.resolve().then(() => { 312 abortController.abort(); 313 return pipeToPromise; 314 }).then(() => ws.getWriter().write('this should succeed')); 315}, 'abort should do nothing after the readable is closed'); 316 317promise_test(t => { 318 let readController; 319 const rs = new ReadableStream({ 320 start(c) { 321 readController = c; 322 } 323 }); 324 const ws = new WritableStream(); 325 const abortController = new AbortController(); 326 const signal = abortController.signal; 327 const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true }); 328 readController.error(error1); 329 return Promise.resolve().then(() => { 330 abortController.abort(); 331 return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject'); 332 }).then(() => ws.getWriter().write('this should succeed')); 333}, 'abort should do nothing after the readable is errored'); 334 335promise_test(t => { 336 let readController; 337 const rs = new ReadableStream({ 338 start(c) { 339 readController = c; 340 } 341 }); 342 let resolveWrite; 343 const writePromise = new Promise(resolve => { 344 resolveWrite = resolve; 345 }); 346 const ws = new WritableStream({ 347 write() { 348 readController.error(error1); 349 return writePromise; 350 } 351 }); 352 const abortController = new AbortController(); 353 const signal = abortController.signal; 354 const pipeToPromise = rs.pipeTo(ws, { signal, preventAbort: true }); 355 readController.enqueue('a'); 356 return delay(0).then(() => { 357 abortController.abort(); 358 resolveWrite(); 359 return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject'); 360 }).then(() => ws.getWriter().write('this should succeed')); 361}, 'abort should do nothing after the readable is errored, even with pending writes'); 362 363promise_test(t => { 364 const rs = recordingReadableStream({ 365 pull(controller) { 366 return delay(0).then(() => controller.close()); 367 } 368 }); 369 let writeController; 370 const ws = new WritableStream({ 371 start(c) { 372 writeController = c; 373 } 374 }); 375 const abortController = new AbortController(); 376 const signal = abortController.signal; 377 const pipeToPromise = rs.pipeTo(ws, { signal, preventCancel: true }); 378 return Promise.resolve().then(() => { 379 writeController.error(error1); 380 return Promise.resolve(); 381 }).then(() => { 382 abortController.abort(); 383 return promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo should reject'); 384 }).then(() => { 385 assert_array_equals(rs.events, ['pull'], 'cancel should not have been called'); 386 }); 387}, 'abort should do nothing after the writable is errored'); 388 389promise_test(async t => { 390 const rs = new ReadableStream({ 391 pull(c) { 392 c.enqueue(new Uint8Array([])); 393 }, 394 type: "bytes", 395 }); 396 const ws = new WritableStream(); 397 const [first, second] = rs.tee(); 398 399 let aborted = false; 400 first.pipeTo(ws, { signal: AbortSignal.abort() }).catch(() => { 401 aborted = true; 402 }); 403 await delay(0); 404 assert_true(!aborted, "pipeTo should not resolve yet"); 405 await second.cancel(); 406 await delay(0); 407 assert_true(aborted, "pipeTo should be aborted now"); 408}, "pipeTo on a teed readable byte stream should only be aborted when both branches are aborted"); 409