1'use strict'; 2 3const { 4 ArrayPrototypePush, 5 ArrayPrototypeShift, 6 FunctionPrototypeBind, 7 FunctionPrototypeCall, 8 ObjectDefineProperties, 9 PromisePrototypeThen, 10 PromiseResolve, 11 PromiseReject, 12 ReflectConstruct, 13 Symbol, 14 SymbolToStringTag, 15} = primordials; 16 17const { 18 codes: { 19 ERR_ILLEGAL_CONSTRUCTOR, 20 ERR_INVALID_ARG_VALUE, 21 ERR_INVALID_ARG_TYPE, 22 ERR_INVALID_STATE, 23 ERR_INVALID_THIS, 24 }, 25} = require('internal/errors'); 26 27const { 28 DOMException, 29} = internalBinding('messaging'); 30 31const { 32 createDeferredPromise, 33 customInspectSymbol: kInspect, 34 kEmptyObject, 35 kEnumerableProperty, 36 SideEffectFreeRegExpPrototypeSymbolReplace, 37} = require('internal/util'); 38 39const { 40 MessageChannel, 41} = require('internal/worker/io'); 42 43const { 44 kDeserialize, 45 kTransfer, 46 kTransferList, 47 makeTransferable, 48} = require('internal/worker/js_transferable'); 49 50const { 51 customInspect, 52 dequeueValue, 53 ensureIsPromise, 54 enqueueValueWithSize, 55 extractHighWaterMark, 56 extractSizeAlgorithm, 57 lazyTransfer, 58 isBrandCheck, 59 isPromisePending, 60 peekQueueValue, 61 resetQueue, 62 setPromiseHandled, 63 nonOpCancel, 64 nonOpStart, 65 nonOpWrite, 66 kType, 67 kState, 68} = require('internal/webstreams/util'); 69 70const { 71 kIsClosedPromise, 72 kControllerErrorFunction, 73} = require('internal/streams/utils'); 74 75const { 76 AbortController, 77} = require('internal/abort_controller'); 78 79const assert = require('internal/assert'); 80 81const kAbort = Symbol('kAbort'); 82const kCloseSentinel = Symbol('kCloseSentinel'); 83const kError = Symbol('kError'); 84const kSkipThrow = Symbol('kSkipThrow'); 85 86let releasedError; 87 88function lazyWritableReleasedError() { 89 if (releasedError) { 90 return releasedError; 91 } 92 const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm; 93 94 releasedError = new ERR_INVALID_STATE.TypeError('Writer has been released'); 95 // Avoid V8 leak and remove userland stackstrace 96 releasedError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasedError.stack, ''); 97 return releasedError; 98} 99 100const getNonWritablePropertyDescriptor = (value) => { 101 return { 102 __proto__: null, 103 configurable: true, 104 value, 105 }; 106}; 107 108/** 109 * @typedef {import('../abort_controller').AbortSignal} AbortSignal 110 * @typedef {import('./queuingstrategies').QueuingStrategy 111 * } QueuingStrategy 112 * @typedef {import('./queuingstrategies').QueuingStrategySize 113 * } QueuingStrategySize 114 */ 115 116/** 117 * @callback UnderlyingSinkStartCallback 118 * @param {WritableStreamDefaultController} controller 119 */ 120 121/** 122 * @callback UnderlyingSinkWriteCallback 123 * @param {any} chunk 124 * @param {WritableStreamDefaultController} controller 125 * @returns {Promise<void>} 126 */ 127 128/** 129 * @callback UnderlyingSinkCloseCallback 130 * @returns {Promise<void>} 131 */ 132 133/** 134 * @callback UnderlyingSinkAbortCallback 135 * @param {any} reason 136 * @returns {Promise<void>} 137 */ 138 139/** 140 * @typedef {{ 141 * start? : UnderlyingSinkStartCallback, 142 * write? : UnderlyingSinkWriteCallback, 143 * close? : UnderlyingSinkCloseCallback, 144 * abort? : UnderlyingSinkAbortCallback, 145 * type? : any, 146 * }} UnderlyingSink 147 */ 148 149class WritableStream { 150 [kType] = 'WritableStream'; 151 152 /** 153 * @param {UnderlyingSink} [sink] 154 * @param {QueuingStrategy} [strategy] 155 */ 156 constructor(sink = null, strategy = kEmptyObject) { 157 const type = sink?.type; 158 if (type !== undefined) 159 throw new ERR_INVALID_ARG_VALUE.RangeError('type', type); 160 161 this[kState] = { 162 close: createDeferredPromise(), 163 closeRequest: { 164 promise: undefined, 165 resolve: undefined, 166 reject: undefined, 167 }, 168 inFlightWriteRequest: { 169 promise: undefined, 170 resolve: undefined, 171 reject: undefined, 172 }, 173 inFlightCloseRequest: { 174 promise: undefined, 175 resolve: undefined, 176 reject: undefined, 177 }, 178 pendingAbortRequest: { 179 abort: { 180 promise: undefined, 181 resolve: undefined, 182 reject: undefined, 183 }, 184 reason: undefined, 185 wasAlreadyErroring: false, 186 }, 187 backpressure: false, 188 controller: undefined, 189 state: 'writable', 190 storedError: undefined, 191 writeRequests: [], 192 writer: undefined, 193 transfer: { 194 readable: undefined, 195 port1: undefined, 196 port2: undefined, 197 promise: undefined, 198 }, 199 }; 200 201 this[kIsClosedPromise] = createDeferredPromise(); 202 this[kControllerErrorFunction] = () => {}; 203 204 const size = extractSizeAlgorithm(strategy?.size); 205 const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1); 206 207 setupWritableStreamDefaultControllerFromSink( 208 this, 209 sink, 210 highWaterMark, 211 size); 212 213 // eslint-disable-next-line no-constructor-return 214 return makeTransferable(this); 215 } 216 217 /** 218 * @readonly 219 * @type {boolean} 220 */ 221 get locked() { 222 if (!isWritableStream(this)) 223 throw new ERR_INVALID_THIS('WritableStream'); 224 return isWritableStreamLocked(this); 225 } 226 227 /** 228 * @param {any} [reason] 229 * @returns {Promise<void>} 230 */ 231 abort(reason = undefined) { 232 if (!isWritableStream(this)) 233 return PromiseReject(new ERR_INVALID_THIS('WritableStream')); 234 if (isWritableStreamLocked(this)) { 235 return PromiseReject( 236 new ERR_INVALID_STATE.TypeError('WritableStream is locked')); 237 } 238 return writableStreamAbort(this, reason); 239 } 240 241 /** 242 * @returns {Promise<void>} 243 */ 244 close() { 245 if (!isWritableStream(this)) 246 return PromiseReject(new ERR_INVALID_THIS('WritableStream')); 247 if (isWritableStreamLocked(this)) { 248 return PromiseReject( 249 new ERR_INVALID_STATE.TypeError('WritableStream is locked')); 250 } 251 if (writableStreamCloseQueuedOrInFlight(this)) { 252 return PromiseReject( 253 new ERR_INVALID_STATE.TypeError('Failure closing WritableStream')); 254 } 255 return writableStreamClose(this); 256 } 257 258 /** 259 * @returns {WritableStreamDefaultWriter} 260 */ 261 getWriter() { 262 if (!isWritableStream(this)) 263 throw new ERR_INVALID_THIS('WritableStream'); 264 // eslint-disable-next-line no-use-before-define 265 return new WritableStreamDefaultWriter(this); 266 } 267 268 [kInspect](depth, options) { 269 return customInspect(depth, options, this[kType], { 270 locked: this.locked, 271 state: this[kState].state, 272 }); 273 } 274 275 [kTransfer]() { 276 if (!isWritableStream(this)) 277 throw new ERR_INVALID_THIS('WritableStream'); 278 if (this.locked) { 279 this[kState].transfer.port1?.close(); 280 this[kState].transfer.port1 = undefined; 281 this[kState].transfer.port2 = undefined; 282 throw new DOMException( 283 'Cannot transfer a locked WritableStream', 284 'DataCloneError'); 285 } 286 287 const { 288 readable, 289 promise, 290 } = lazyTransfer().newCrossRealmReadableStream( 291 this, 292 this[kState].transfer.port1); 293 294 this[kState].transfer.readable = readable; 295 this[kState].transfer.promise = promise; 296 297 setPromiseHandled(this[kState].transfer.promise); 298 299 return { 300 data: { port: this[kState].transfer.port2 }, 301 deserializeInfo: 302 'internal/webstreams/writablestream:TransferredWritableStream', 303 }; 304 } 305 306 [kTransferList]() { 307 const { port1, port2 } = new MessageChannel(); 308 this[kState].transfer.port1 = port1; 309 this[kState].transfer.port2 = port2; 310 return [ port2 ]; 311 } 312 313 [kDeserialize]({ port }) { 314 const transfer = lazyTransfer(); 315 setupWritableStreamDefaultControllerFromSink( 316 this, 317 new transfer.CrossRealmTransformWritableSink(port), 318 1, 319 () => 1); 320 } 321} 322 323ObjectDefineProperties(WritableStream.prototype, { 324 locked: kEnumerableProperty, 325 abort: kEnumerableProperty, 326 close: kEnumerableProperty, 327 getWriter: kEnumerableProperty, 328 [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name), 329}); 330 331function TransferredWritableStream() { 332 return makeTransferable(ReflectConstruct( 333 function() { 334 this[kType] = 'WritableStream'; 335 this[kState] = { 336 close: createDeferredPromise(), 337 closeRequest: { 338 promise: undefined, 339 resolve: undefined, 340 reject: undefined, 341 }, 342 inFlightWriteRequest: { 343 promise: undefined, 344 resolve: undefined, 345 reject: undefined, 346 }, 347 inFlightCloseRequest: { 348 promise: undefined, 349 resolve: undefined, 350 reject: undefined, 351 }, 352 pendingAbortRequest: { 353 abort: { 354 promise: undefined, 355 resolve: undefined, 356 reject: undefined, 357 }, 358 reason: undefined, 359 wasAlreadyErroring: false, 360 }, 361 backpressure: false, 362 controller: undefined, 363 state: 'writable', 364 storedError: undefined, 365 writeRequests: [], 366 writer: undefined, 367 transfer: { 368 promise: undefined, 369 port1: undefined, 370 port2: undefined, 371 readable: undefined, 372 }, 373 }; 374 this[kIsClosedPromise] = createDeferredPromise(); 375 this[kControllerErrorFunction] = () => {}; 376 }, 377 [], WritableStream)); 378} 379TransferredWritableStream.prototype[kDeserialize] = () => {}; 380 381class WritableStreamDefaultWriter { 382 [kType] = 'WritableStreamDefaultWriter'; 383 384 /** 385 * @param {WritableStream} stream 386 */ 387 constructor(stream) { 388 if (!isWritableStream(stream)) 389 throw new ERR_INVALID_ARG_TYPE('stream', 'WritableStream', stream); 390 this[kState] = { 391 stream: undefined, 392 close: { 393 promise: undefined, 394 resolve: undefined, 395 reject: undefined, 396 }, 397 ready: { 398 promise: undefined, 399 resolve: undefined, 400 reject: undefined, 401 }, 402 }; 403 setupWritableStreamDefaultWriter(this, stream); 404 } 405 406 /** 407 * @readonly 408 * @type {Promise<void>} 409 */ 410 get closed() { 411 if (!isWritableStreamDefaultWriter(this)) 412 return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter')); 413 return this[kState].close.promise; 414 } 415 416 /** 417 * @readonly 418 * @type {number} 419 */ 420 get desiredSize() { 421 if (!isWritableStreamDefaultWriter(this)) 422 throw new ERR_INVALID_THIS('WritableStreamDefaultWriter'); 423 if (this[kState].stream === undefined) { 424 throw new ERR_INVALID_STATE.TypeError( 425 'Writer is not bound to a WritableStream'); 426 } 427 return writableStreamDefaultWriterGetDesiredSize(this); 428 } 429 430 /** 431 * @readonly 432 * @type {Promise<void>} 433 */ 434 get ready() { 435 if (!isWritableStreamDefaultWriter(this)) 436 return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter')); 437 return this[kState].ready.promise; 438 } 439 440 /** 441 * @param {any} [reason] 442 * @returns {Promise<void>} 443 */ 444 abort(reason = undefined) { 445 if (!isWritableStreamDefaultWriter(this)) 446 return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter')); 447 if (this[kState].stream === undefined) { 448 return PromiseReject( 449 new ERR_INVALID_STATE.TypeError( 450 'Writer is not bound to a WritableStream')); 451 } 452 return writableStreamDefaultWriterAbort(this, reason); 453 } 454 455 /** 456 * @returns {Promise<void>} 457 */ 458 close() { 459 if (!isWritableStreamDefaultWriter(this)) 460 return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter')); 461 const { 462 stream, 463 } = this[kState]; 464 if (stream === undefined) { 465 return PromiseReject( 466 new ERR_INVALID_STATE.TypeError( 467 'Writer is not bound to a WritableStream')); 468 } 469 if (writableStreamCloseQueuedOrInFlight(stream)) { 470 return PromiseReject( 471 new ERR_INVALID_STATE.TypeError('Failure to close WritableStream')); 472 } 473 return writableStreamDefaultWriterClose(this); 474 } 475 476 releaseLock() { 477 if (!isWritableStreamDefaultWriter(this)) 478 throw new ERR_INVALID_THIS('WritableStreamDefaultWriter'); 479 const { 480 stream, 481 } = this[kState]; 482 if (stream === undefined) 483 return; 484 assert(stream[kState].writer !== undefined); 485 writableStreamDefaultWriterRelease(this); 486 } 487 488 /** 489 * @param {any} [chunk] 490 * @returns {Promise<void>} 491 */ 492 write(chunk = undefined) { 493 if (!isWritableStreamDefaultWriter(this)) 494 return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter')); 495 if (this[kState].stream === undefined) { 496 return PromiseReject( 497 new ERR_INVALID_STATE.TypeError( 498 'Writer is not bound to a WritableStream')); 499 } 500 return writableStreamDefaultWriterWrite(this, chunk); 501 } 502 503 [kInspect](depth, options) { 504 return customInspect(depth, options, this[kType], { 505 stream: this[kState].stream, 506 close: this[kState].close.promise, 507 ready: this[kState].ready.promise, 508 desiredSize: this.desiredSize, 509 }); 510 } 511} 512 513ObjectDefineProperties(WritableStreamDefaultWriter.prototype, { 514 closed: kEnumerableProperty, 515 ready: kEnumerableProperty, 516 desiredSize: kEnumerableProperty, 517 abort: kEnumerableProperty, 518 close: kEnumerableProperty, 519 releaseLock: kEnumerableProperty, 520 write: kEnumerableProperty, 521 [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultWriter.name), 522}); 523 524class WritableStreamDefaultController { 525 [kType] = 'WritableStreamDefaultController'; 526 527 constructor(skipThrowSymbol = undefined) { 528 if (skipThrowSymbol !== kSkipThrow) { 529 throw new ERR_ILLEGAL_CONSTRUCTOR(); 530 } 531 } 532 533 [kAbort](reason) { 534 const result = this[kState].abortAlgorithm(reason); 535 writableStreamDefaultControllerClearAlgorithms(this); 536 return result; 537 } 538 539 [kError]() { 540 resetQueue(this); 541 } 542 543 /** 544 * @type {AbortSignal} 545 */ 546 get signal() { 547 if (!isWritableStreamDefaultController(this)) 548 throw new ERR_INVALID_THIS('WritableStreamDefaultController'); 549 return this[kState].abortController.signal; 550 } 551 552 /** 553 * @param {any} [error] 554 */ 555 error(error = undefined) { 556 if (!isWritableStreamDefaultController(this)) 557 throw new ERR_INVALID_THIS('WritableStreamDefaultController'); 558 if (this[kState].stream[kState].state !== 'writable') 559 return; 560 writableStreamDefaultControllerError(this, error); 561 } 562 563 [kInspect](depth, options) { 564 return customInspect(depth, options, this[kType], { 565 stream: this[kState].stream, 566 }); 567 } 568} 569 570ObjectDefineProperties(WritableStreamDefaultController.prototype, { 571 signal: kEnumerableProperty, 572 error: kEnumerableProperty, 573 [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultController.name), 574}); 575 576const isWritableStream = 577 isBrandCheck('WritableStream'); 578const isWritableStreamDefaultWriter = 579 isBrandCheck('WritableStreamDefaultWriter'); 580const isWritableStreamDefaultController = 581 isBrandCheck('WritableStreamDefaultController'); 582 583function isWritableStreamLocked(stream) { 584 return stream[kState].writer !== undefined; 585} 586 587function setupWritableStreamDefaultWriter(writer, stream) { 588 if (isWritableStreamLocked(stream)) 589 throw new ERR_INVALID_STATE.TypeError('WritableStream is locked'); 590 writer[kState].stream = stream; 591 stream[kState].writer = writer; 592 switch (stream[kState].state) { 593 case 'writable': 594 if (!writableStreamCloseQueuedOrInFlight(stream) && 595 stream[kState].backpressure) { 596 writer[kState].ready = createDeferredPromise(); 597 } else { 598 writer[kState].ready = { 599 promise: PromiseResolve(), 600 resolve: undefined, 601 reject: undefined, 602 }; 603 } 604 setClosedPromiseToNewPromise(); 605 break; 606 case 'erroring': 607 writer[kState].ready = { 608 promise: PromiseReject(stream[kState].storedError), 609 resolve: undefined, 610 reject: undefined, 611 }; 612 setPromiseHandled(writer[kState].ready.promise); 613 setClosedPromiseToNewPromise(); 614 break; 615 case 'closed': 616 writer[kState].ready = { 617 promise: PromiseResolve(), 618 resolve: undefined, 619 reject: undefined, 620 }; 621 writer[kState].close = { 622 promise: PromiseResolve(), 623 resolve: undefined, 624 reject: undefined, 625 }; 626 break; 627 default: 628 writer[kState].ready = { 629 promise: PromiseReject(stream[kState].storedError), 630 resolve: undefined, 631 reject: undefined, 632 }; 633 writer[kState].close = { 634 promise: PromiseReject(stream[kState].storedError), 635 resolve: undefined, 636 reject: undefined, 637 }; 638 setPromiseHandled(writer[kState].ready.promise); 639 setPromiseHandled(writer[kState].close.promise); 640 } 641 642 function setClosedPromiseToNewPromise() { 643 writer[kState].close = createDeferredPromise(); 644 } 645} 646 647function writableStreamAbort(stream, reason) { 648 const { 649 state, 650 controller, 651 } = stream[kState]; 652 if (state === 'closed' || state === 'errored') 653 return PromiseResolve(); 654 655 controller[kState].abortController.abort(reason); 656 657 if (stream[kState].pendingAbortRequest.abort.promise !== undefined) 658 return stream[kState].pendingAbortRequest.abort.promise; 659 660 assert(state === 'writable' || state === 'erroring'); 661 662 let wasAlreadyErroring = false; 663 if (state === 'erroring') { 664 wasAlreadyErroring = true; 665 reason = undefined; 666 } 667 668 const abort = createDeferredPromise(); 669 670 stream[kState].pendingAbortRequest = { 671 abort, 672 reason, 673 wasAlreadyErroring, 674 }; 675 676 if (!wasAlreadyErroring) 677 writableStreamStartErroring(stream, reason); 678 679 return abort.promise; 680} 681 682function writableStreamClose(stream) { 683 const { 684 state, 685 writer, 686 backpressure, 687 controller, 688 } = stream[kState]; 689 if (state === 'closed' || state === 'errored') { 690 return PromiseReject( 691 new ERR_INVALID_STATE.TypeError('WritableStream is closed')); 692 } 693 assert(state === 'writable' || state === 'erroring'); 694 assert(!writableStreamCloseQueuedOrInFlight(stream)); 695 stream[kState].closeRequest = createDeferredPromise(); 696 const { promise } = stream[kState].closeRequest; 697 if (writer !== undefined && backpressure && state === 'writable') 698 writer[kState].ready.resolve?.(); 699 writableStreamDefaultControllerClose(controller); 700 return promise; 701} 702 703function writableStreamUpdateBackpressure(stream, backpressure) { 704 assert(stream[kState].state === 'writable'); 705 assert(!writableStreamCloseQueuedOrInFlight(stream)); 706 const { 707 writer, 708 } = stream[kState]; 709 if (writer !== undefined && stream[kState].backpressure !== backpressure) { 710 if (backpressure) { 711 writer[kState].ready = createDeferredPromise(); 712 } else { 713 writer[kState].ready.resolve?.(); 714 } 715 } 716 stream[kState].backpressure = backpressure; 717} 718 719function writableStreamStartErroring(stream, reason) { 720 assert(stream[kState].storedError === undefined); 721 assert(stream[kState].state === 'writable'); 722 const { 723 controller, 724 writer, 725 } = stream[kState]; 726 assert(controller !== undefined); 727 stream[kState].state = 'erroring'; 728 stream[kState].storedError = reason; 729 if (writer !== undefined) { 730 writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); 731 } 732 if (!writableStreamHasOperationMarkedInFlight(stream) && 733 controller[kState].started) { 734 writableStreamFinishErroring(stream); 735 } 736} 737 738function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) { 739 assert(stream[kState].state === 'errored'); 740 if (stream[kState].closeRequest.promise !== undefined) { 741 assert(stream[kState].inFlightCloseRequest.promise === undefined); 742 stream[kState].closeRequest.reject?.(stream[kState].storedError); 743 stream[kState].closeRequest = { 744 promise: undefined, 745 reject: undefined, 746 resolve: undefined, 747 }; 748 } 749 750 stream[kIsClosedPromise].reject(stream[kState]?.storedError); 751 setPromiseHandled(stream[kIsClosedPromise].promise); 752 753 const { 754 writer, 755 } = stream[kState]; 756 if (writer !== undefined) { 757 writer[kState].close.reject?.(stream[kState].storedError); 758 setPromiseHandled(writer[kState].close.promise); 759 } 760} 761 762function writableStreamMarkFirstWriteRequestInFlight(stream) { 763 assert(stream[kState].inFlightWriteRequest.promise === undefined); 764 assert(stream[kState].writeRequests.length); 765 const writeRequest = ArrayPrototypeShift(stream[kState].writeRequests); 766 stream[kState].inFlightWriteRequest = writeRequest; 767} 768 769function writableStreamMarkCloseRequestInFlight(stream) { 770 assert(stream[kState].inFlightWriteRequest.promise === undefined); 771 assert(stream[kState].closeRequest.promise !== undefined); 772 stream[kState].inFlightCloseRequest = stream[kState].closeRequest; 773 stream[kState].closeRequest = { 774 promise: undefined, 775 resolve: undefined, 776 reject: undefined, 777 }; 778} 779 780function writableStreamHasOperationMarkedInFlight(stream) { 781 const { 782 inFlightWriteRequest, 783 inFlightCloseRequest, 784 } = stream[kState]; 785 if (inFlightWriteRequest.promise === undefined && 786 inFlightCloseRequest.promise === undefined) { 787 return false; 788 } 789 return true; 790} 791 792function writableStreamFinishInFlightWriteWithError(stream, error) { 793 assert(stream[kState].inFlightWriteRequest.promise !== undefined); 794 stream[kState].inFlightWriteRequest.reject?.(error); 795 stream[kState].inFlightWriteRequest = { 796 promise: undefined, 797 resolve: undefined, 798 reject: undefined, 799 }; 800 assert(stream[kState].state === 'writable' || 801 stream[kState].state === 'erroring'); 802 writableStreamDealWithRejection(stream, error); 803} 804 805function writableStreamFinishInFlightWrite(stream) { 806 assert(stream[kState].inFlightWriteRequest.promise !== undefined); 807 stream[kState].inFlightWriteRequest.resolve?.(); 808 stream[kState].inFlightWriteRequest = { 809 promise: undefined, 810 resolve: undefined, 811 reject: undefined, 812 }; 813} 814 815function writableStreamFinishInFlightCloseWithError(stream, error) { 816 assert(stream[kState].inFlightCloseRequest.promise !== undefined); 817 stream[kState].inFlightCloseRequest.reject?.(error); 818 stream[kState].inFlightCloseRequest = { 819 promise: undefined, 820 resolve: undefined, 821 reject: undefined, 822 }; 823 assert(stream[kState].state === 'writable' || 824 stream[kState].state === 'erroring'); 825 if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { 826 stream[kState].pendingAbortRequest.abort.reject?.(error); 827 stream[kState].pendingAbortRequest = { 828 abort: { 829 promise: undefined, 830 resolve: undefined, 831 reject: undefined, 832 }, 833 reason: undefined, 834 wasAlreadyErroring: false, 835 }; 836 } 837 writableStreamDealWithRejection(stream, error); 838} 839 840function writableStreamFinishInFlightClose(stream) { 841 assert(stream[kState].inFlightCloseRequest.promise !== undefined); 842 stream[kState].inFlightCloseRequest.resolve?.(); 843 stream[kState].inFlightCloseRequest = { 844 promise: undefined, 845 resolve: undefined, 846 reject: undefined, 847 }; 848 if (stream[kState].state === 'erroring') { 849 stream[kState].storedError = undefined; 850 if (stream[kState].pendingAbortRequest.abort.promise !== undefined) { 851 stream[kState].pendingAbortRequest.abort.resolve?.(); 852 stream[kState].pendingAbortRequest = { 853 abort: { 854 promise: undefined, 855 resolve: undefined, 856 reject: undefined, 857 }, 858 reason: undefined, 859 wasAlreadyErroring: false, 860 }; 861 } 862 } 863 stream[kState].state = 'closed'; 864 if (stream[kState].writer !== undefined) 865 stream[kState].writer[kState].close.resolve?.(); 866 stream[kIsClosedPromise].resolve?.(); 867 assert(stream[kState].pendingAbortRequest.abort.promise === undefined); 868 assert(stream[kState].storedError === undefined); 869} 870 871function writableStreamFinishErroring(stream) { 872 assert(stream[kState].state === 'erroring'); 873 assert(!writableStreamHasOperationMarkedInFlight(stream)); 874 stream[kState].state = 'errored'; 875 stream[kState].controller[kError](); 876 const storedError = stream[kState].storedError; 877 for (let n = 0; n < stream[kState].writeRequests.length; n++) 878 stream[kState].writeRequests[n].reject?.(storedError); 879 stream[kState].writeRequests = []; 880 881 if (stream[kState].pendingAbortRequest.abort.promise === undefined) { 882 writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); 883 return; 884 } 885 886 const abortRequest = stream[kState].pendingAbortRequest; 887 stream[kState].pendingAbortRequest = { 888 abort: { 889 promise: undefined, 890 resolve: undefined, 891 reject: undefined, 892 }, 893 reason: undefined, 894 wasAlreadyErroring: false, 895 }; 896 if (abortRequest.wasAlreadyErroring) { 897 abortRequest.abort.reject?.(storedError); 898 writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); 899 return; 900 } 901 PromisePrototypeThen( 902 ensureIsPromise( 903 stream[kState].controller[kAbort], 904 stream[kState].controller, 905 abortRequest.reason), 906 () => { 907 abortRequest.abort.resolve?.(); 908 writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); 909 }, 910 (error) => { 911 abortRequest.abort.reject?.(error); 912 writableStreamRejectCloseAndClosedPromiseIfNeeded(stream); 913 }); 914} 915 916function writableStreamDealWithRejection(stream, error) { 917 const { 918 state, 919 } = stream[kState]; 920 if (state === 'writable') { 921 writableStreamStartErroring(stream, error); 922 return; 923 } 924 925 assert(state === 'erroring'); 926 writableStreamFinishErroring(stream); 927} 928 929function writableStreamCloseQueuedOrInFlight(stream) { 930 if (stream[kState].closeRequest.promise === undefined && 931 stream[kState].inFlightCloseRequest.promise === undefined) { 932 return false; 933 } 934 return true; 935} 936 937function writableStreamAddWriteRequest(stream) { 938 assert(isWritableStreamLocked(stream)); 939 assert(stream[kState].state === 'writable'); 940 const { 941 promise, 942 resolve, 943 reject, 944 } = createDeferredPromise(); 945 ArrayPrototypePush( 946 stream[kState].writeRequests, 947 { 948 promise, 949 resolve, 950 reject, 951 }); 952 return promise; 953} 954 955function writableStreamDefaultWriterWrite(writer, chunk) { 956 const { 957 stream, 958 } = writer[kState]; 959 assert(stream !== undefined); 960 const { 961 controller, 962 } = stream[kState]; 963 const chunkSize = writableStreamDefaultControllerGetChunkSize( 964 controller, 965 chunk); 966 if (stream !== writer[kState].stream) { 967 return PromiseReject( 968 new ERR_INVALID_STATE.TypeError('Mismatched WritableStreams')); 969 } 970 const { 971 state, 972 } = stream[kState]; 973 974 if (state === 'errored') 975 return PromiseReject(stream[kState].storedError); 976 977 if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed') { 978 return PromiseReject( 979 new ERR_INVALID_STATE.TypeError('WritableStream is closed')); 980 } 981 982 if (state === 'erroring') 983 return PromiseReject(stream[kState].storedError); 984 985 assert(state === 'writable'); 986 987 const promise = writableStreamAddWriteRequest(stream); 988 writableStreamDefaultControllerWrite(controller, chunk, chunkSize); 989 return promise; 990} 991 992function writableStreamDefaultWriterRelease(writer) { 993 const { 994 stream, 995 } = writer[kState]; 996 assert(stream !== undefined); 997 assert(stream[kState].writer === writer); 998 const releasedStateError = lazyWritableReleasedError(); 999 writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedStateError); 1000 writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedStateError); 1001 stream[kState].writer = undefined; 1002 writer[kState].stream = undefined; 1003} 1004 1005function writableStreamDefaultWriterGetDesiredSize(writer) { 1006 const { 1007 stream, 1008 } = writer[kState]; 1009 switch (stream[kState].state) { 1010 case 'errored': 1011 // Fall through 1012 case 'erroring': 1013 return null; 1014 case 'closed': 1015 return 0; 1016 } 1017 return writableStreamDefaultControllerGetDesiredSize( 1018 stream[kState].controller); 1019} 1020 1021function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) { 1022 if (isPromisePending(writer[kState].ready.promise)) { 1023 writer[kState].ready.reject?.(error); 1024 } else { 1025 writer[kState].ready = { 1026 promise: PromiseReject(error), 1027 resolve: undefined, 1028 reject: undefined, 1029 }; 1030 } 1031 setPromiseHandled(writer[kState].ready.promise); 1032} 1033 1034function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) { 1035 if (isPromisePending(writer[kState].close.promise)) { 1036 writer[kState].close.reject?.(error); 1037 } else { 1038 writer[kState].close = { 1039 promise: PromiseReject(error), 1040 resolve: undefined, 1041 reject: undefined, 1042 }; 1043 } 1044 setPromiseHandled(writer[kState].close.promise); 1045} 1046 1047function writableStreamDefaultWriterCloseWithErrorPropagation(writer) { 1048 const { 1049 stream, 1050 } = writer[kState]; 1051 assert(stream !== undefined); 1052 const { 1053 state, 1054 } = stream[kState]; 1055 if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed') 1056 return PromiseResolve(); 1057 1058 if (state === 'errored') 1059 return PromiseReject(stream[kState].storedError); 1060 1061 assert(state === 'writable' || state === 'erroring'); 1062 1063 return writableStreamDefaultWriterClose(writer); 1064} 1065 1066function writableStreamDefaultWriterClose(writer) { 1067 const { 1068 stream, 1069 } = writer[kState]; 1070 assert(stream !== undefined); 1071 return writableStreamClose(stream); 1072} 1073 1074function writableStreamDefaultWriterAbort(writer, reason) { 1075 const { 1076 stream, 1077 } = writer[kState]; 1078 assert(stream !== undefined); 1079 return writableStreamAbort(stream, reason); 1080} 1081 1082function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) { 1083 try { 1084 enqueueValueWithSize(controller, chunk, chunkSize); 1085 } catch (error) { 1086 writableStreamDefaultControllerErrorIfNeeded(controller, error); 1087 return; 1088 } 1089 const { 1090 stream, 1091 } = controller[kState]; 1092 if (!writableStreamCloseQueuedOrInFlight(stream) && 1093 stream[kState].state === 'writable') { 1094 writableStreamUpdateBackpressure( 1095 stream, 1096 writableStreamDefaultControllerGetBackpressure(controller)); 1097 } 1098 writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 1099} 1100 1101function writableStreamDefaultControllerProcessWrite(controller, chunk) { 1102 const { 1103 stream, 1104 writeAlgorithm, 1105 } = controller[kState]; 1106 writableStreamMarkFirstWriteRequestInFlight(stream); 1107 1108 PromisePrototypeThen( 1109 ensureIsPromise(writeAlgorithm, controller, chunk, controller), 1110 () => { 1111 writableStreamFinishInFlightWrite(stream); 1112 const { 1113 state, 1114 } = stream[kState]; 1115 assert(state === 'writable' || state === 'erroring'); 1116 dequeueValue(controller); 1117 if (!writableStreamCloseQueuedOrInFlight(stream) && 1118 state === 'writable') { 1119 writableStreamUpdateBackpressure( 1120 stream, 1121 writableStreamDefaultControllerGetBackpressure(controller)); 1122 } 1123 writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 1124 }, 1125 (error) => { 1126 if (stream[kState].state === 'writable') 1127 writableStreamDefaultControllerClearAlgorithms(controller); 1128 writableStreamFinishInFlightWriteWithError(stream, error); 1129 }); 1130 1131} 1132 1133function writableStreamDefaultControllerProcessClose(controller) { 1134 const { 1135 closeAlgorithm, 1136 queue, 1137 stream, 1138 } = controller[kState]; 1139 writableStreamMarkCloseRequestInFlight(stream); 1140 dequeueValue(controller); 1141 assert(!queue.length); 1142 const sinkClosePromise = ensureIsPromise(closeAlgorithm, controller); 1143 writableStreamDefaultControllerClearAlgorithms(controller); 1144 PromisePrototypeThen( 1145 sinkClosePromise, 1146 () => writableStreamFinishInFlightClose(stream), 1147 (error) => writableStreamFinishInFlightCloseWithError(stream, error)); 1148} 1149 1150function writableStreamDefaultControllerGetDesiredSize(controller) { 1151 const { 1152 highWaterMark, 1153 queueTotalSize, 1154 } = controller[kState]; 1155 return highWaterMark - queueTotalSize; 1156} 1157 1158function writableStreamDefaultControllerGetChunkSize(controller, chunk) { 1159 try { 1160 return FunctionPrototypeCall( 1161 controller[kState].sizeAlgorithm, 1162 undefined, 1163 chunk); 1164 } catch (error) { 1165 writableStreamDefaultControllerErrorIfNeeded(controller, error); 1166 return 1; 1167 } 1168} 1169 1170function writableStreamDefaultControllerErrorIfNeeded(controller, error) { 1171 const { 1172 stream, 1173 } = controller[kState]; 1174 if (stream[kState].state === 'writable') 1175 writableStreamDefaultControllerError(controller, error); 1176} 1177 1178function writableStreamDefaultControllerError(controller, error) { 1179 const { 1180 stream, 1181 } = controller[kState]; 1182 assert(stream[kState].state === 'writable'); 1183 writableStreamDefaultControllerClearAlgorithms(controller); 1184 writableStreamStartErroring(stream, error); 1185} 1186 1187function writableStreamDefaultControllerClose(controller) { 1188 enqueueValueWithSize(controller, kCloseSentinel, 0); 1189 writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 1190} 1191 1192function writableStreamDefaultControllerClearAlgorithms(controller) { 1193 controller[kState].writeAlgorithm = undefined; 1194 controller[kState].closeAlgorithm = undefined; 1195 controller[kState].abortAlgorithm = undefined; 1196 controller[kState].sizeAlgorithm = undefined; 1197} 1198 1199function writableStreamDefaultControllerGetBackpressure(controller) { 1200 return writableStreamDefaultControllerGetDesiredSize(controller) <= 0; 1201} 1202 1203function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) { 1204 const { 1205 queue, 1206 started, 1207 stream, 1208 } = controller[kState]; 1209 if (!started || stream[kState].inFlightWriteRequest.promise !== undefined) 1210 return; 1211 1212 if (stream[kState].state === 'erroring') { 1213 writableStreamFinishErroring(stream); 1214 return; 1215 } 1216 1217 if (!queue.length) 1218 return; 1219 1220 const value = peekQueueValue(controller); 1221 if (value === kCloseSentinel) 1222 writableStreamDefaultControllerProcessClose(controller); 1223 else 1224 writableStreamDefaultControllerProcessWrite(controller, value); 1225} 1226 1227function setupWritableStreamDefaultControllerFromSink( 1228 stream, 1229 sink, 1230 highWaterMark, 1231 sizeAlgorithm) { 1232 const controller = new WritableStreamDefaultController(kSkipThrow); 1233 const start = sink?.start; 1234 const write = sink?.write; 1235 const close = sink?.close; 1236 const abort = sink?.abort; 1237 const startAlgorithm = start ? 1238 FunctionPrototypeBind(start, sink, controller) : 1239 nonOpStart; 1240 const writeAlgorithm = write ? 1241 FunctionPrototypeBind(write, sink) : 1242 nonOpWrite; 1243 const closeAlgorithm = close ? 1244 FunctionPrototypeBind(close, sink) : nonOpCancel; 1245 const abortAlgorithm = abort ? 1246 FunctionPrototypeBind(abort, sink) : nonOpCancel; 1247 setupWritableStreamDefaultController( 1248 stream, 1249 controller, 1250 startAlgorithm, 1251 writeAlgorithm, 1252 closeAlgorithm, 1253 abortAlgorithm, 1254 highWaterMark, 1255 sizeAlgorithm); 1256} 1257 1258function setupWritableStreamDefaultController( 1259 stream, 1260 controller, 1261 startAlgorithm, 1262 writeAlgorithm, 1263 closeAlgorithm, 1264 abortAlgorithm, 1265 highWaterMark, 1266 sizeAlgorithm) { 1267 assert(isWritableStream(stream)); 1268 assert(stream[kState].controller === undefined); 1269 controller[kState] = { 1270 abortAlgorithm, 1271 closeAlgorithm, 1272 highWaterMark, 1273 queue: [], 1274 queueTotalSize: 0, 1275 abortController: new AbortController(), 1276 sizeAlgorithm, 1277 started: false, 1278 stream, 1279 writeAlgorithm, 1280 }; 1281 stream[kState].controller = controller; 1282 stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller); 1283 1284 writableStreamUpdateBackpressure( 1285 stream, 1286 writableStreamDefaultControllerGetBackpressure(controller)); 1287 1288 const startResult = startAlgorithm(); 1289 1290 PromisePrototypeThen( 1291 PromiseResolve(startResult), 1292 () => { 1293 assert(stream[kState].state === 'writable' || 1294 stream[kState].state === 'erroring'); 1295 controller[kState].started = true; 1296 writableStreamDefaultControllerAdvanceQueueIfNeeded(controller); 1297 }, 1298 (error) => { 1299 assert(stream[kState].state === 'writable' || 1300 stream[kState].state === 'erroring'); 1301 controller[kState].started = true; 1302 writableStreamDealWithRejection(stream, error); 1303 }); 1304} 1305 1306module.exports = { 1307 WritableStream, 1308 WritableStreamDefaultWriter, 1309 WritableStreamDefaultController, 1310 TransferredWritableStream, 1311 1312 // Exported Brand Checks 1313 isWritableStream, 1314 isWritableStreamDefaultController, 1315 isWritableStreamDefaultWriter, 1316 1317 isWritableStreamLocked, 1318 setupWritableStreamDefaultWriter, 1319 writableStreamAbort, 1320 writableStreamClose, 1321 writableStreamUpdateBackpressure, 1322 writableStreamStartErroring, 1323 writableStreamRejectCloseAndClosedPromiseIfNeeded, 1324 writableStreamMarkFirstWriteRequestInFlight, 1325 writableStreamMarkCloseRequestInFlight, 1326 writableStreamHasOperationMarkedInFlight, 1327 writableStreamFinishInFlightWriteWithError, 1328 writableStreamFinishInFlightWrite, 1329 writableStreamFinishInFlightCloseWithError, 1330 writableStreamFinishInFlightClose, 1331 writableStreamFinishErroring, 1332 writableStreamDealWithRejection, 1333 writableStreamCloseQueuedOrInFlight, 1334 writableStreamAddWriteRequest, 1335 writableStreamDefaultWriterWrite, 1336 writableStreamDefaultWriterRelease, 1337 writableStreamDefaultWriterGetDesiredSize, 1338 writableStreamDefaultWriterEnsureReadyPromiseRejected, 1339 writableStreamDefaultWriterEnsureClosedPromiseRejected, 1340 writableStreamDefaultWriterCloseWithErrorPropagation, 1341 writableStreamDefaultWriterClose, 1342 writableStreamDefaultWriterAbort, 1343 writableStreamDefaultControllerWrite, 1344 writableStreamDefaultControllerProcessWrite, 1345 writableStreamDefaultControllerProcessClose, 1346 writableStreamDefaultControllerGetDesiredSize, 1347 writableStreamDefaultControllerGetChunkSize, 1348 writableStreamDefaultControllerErrorIfNeeded, 1349 writableStreamDefaultControllerError, 1350 writableStreamDefaultControllerClose, 1351 writableStreamDefaultControllerClearAlgorithms, 1352 writableStreamDefaultControllerGetBackpressure, 1353 writableStreamDefaultControllerAdvanceQueueIfNeeded, 1354 setupWritableStreamDefaultControllerFromSink, 1355 setupWritableStreamDefaultController, 1356}; 1357