1'use strict';
2
3const {
4  ArrayPrototypePush,
5  ArrayPrototypeShift,
6  FunctionPrototypeBind,
7  FunctionPrototypeCall,
8  ObjectDefineProperties,
9  PromisePrototypeThen,
10  PromiseResolve,
11  PromiseReject,
12  ReflectConstruct,
13  Symbol,
14  SymbolToStringTag,
15} = primordials;
16
17const {
18  codes: {
19    ERR_ILLEGAL_CONSTRUCTOR,
20    ERR_INVALID_ARG_VALUE,
21    ERR_INVALID_ARG_TYPE,
22    ERR_INVALID_STATE,
23    ERR_INVALID_THIS,
24  },
25} = require('internal/errors');
26
27const {
28  DOMException,
29} = internalBinding('messaging');
30
31const {
32  createDeferredPromise,
33  customInspectSymbol: kInspect,
34  kEmptyObject,
35  kEnumerableProperty,
36  SideEffectFreeRegExpPrototypeSymbolReplace,
37} = require('internal/util');
38
39const {
40  MessageChannel,
41} = require('internal/worker/io');
42
43const {
44  kDeserialize,
45  kTransfer,
46  kTransferList,
47  makeTransferable,
48} = require('internal/worker/js_transferable');
49
50const {
51  customInspect,
52  dequeueValue,
53  ensureIsPromise,
54  enqueueValueWithSize,
55  extractHighWaterMark,
56  extractSizeAlgorithm,
57  lazyTransfer,
58  isBrandCheck,
59  isPromisePending,
60  peekQueueValue,
61  resetQueue,
62  setPromiseHandled,
63  nonOpCancel,
64  nonOpStart,
65  nonOpWrite,
66  kType,
67  kState,
68} = require('internal/webstreams/util');
69
70const {
71  kIsClosedPromise,
72  kControllerErrorFunction,
73} = require('internal/streams/utils');
74
75const {
76  AbortController,
77} = require('internal/abort_controller');
78
79const assert = require('internal/assert');
80
81const kAbort = Symbol('kAbort');
82const kCloseSentinel = Symbol('kCloseSentinel');
83const kError = Symbol('kError');
84const kSkipThrow = Symbol('kSkipThrow');
85
86let releasedError;
87
88function lazyWritableReleasedError() {
89  if (releasedError) {
90    return releasedError;
91  }
92  const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;
93
94  releasedError = new ERR_INVALID_STATE.TypeError('Writer has been released');
95  // Avoid V8 leak and remove userland stackstrace
96  releasedError.stack = SideEffectFreeRegExpPrototypeSymbolReplace(userModuleRegExp, releasedError.stack, '');
97  return releasedError;
98}
99
100const getNonWritablePropertyDescriptor = (value) => {
101  return {
102    __proto__: null,
103    configurable: true,
104    value,
105  };
106};
107
108/**
109 * @typedef {import('../abort_controller').AbortSignal} AbortSignal
110 * @typedef {import('./queuingstrategies').QueuingStrategy
111 * } QueuingStrategy
112 * @typedef {import('./queuingstrategies').QueuingStrategySize
113 * } QueuingStrategySize
114 */
115
116/**
117 * @callback UnderlyingSinkStartCallback
118 * @param {WritableStreamDefaultController} controller
119 */
120
121/**
122 * @callback UnderlyingSinkWriteCallback
123 * @param {any} chunk
124 * @param {WritableStreamDefaultController} controller
125 * @returns {Promise<void>}
126 */
127
128/**
129 * @callback UnderlyingSinkCloseCallback
130 * @returns {Promise<void>}
131 */
132
133/**
134 * @callback UnderlyingSinkAbortCallback
135 * @param {any} reason
136 * @returns {Promise<void>}
137 */
138
139/**
140 * @typedef {{
141 *   start? : UnderlyingSinkStartCallback,
142 *   write? : UnderlyingSinkWriteCallback,
143 *   close? : UnderlyingSinkCloseCallback,
144 *   abort? : UnderlyingSinkAbortCallback,
145 *   type? : any,
146 * }} UnderlyingSink
147 */
148
149class WritableStream {
150  [kType] = 'WritableStream';
151
152  /**
153   * @param {UnderlyingSink} [sink]
154   * @param {QueuingStrategy} [strategy]
155   */
156  constructor(sink = null, strategy = kEmptyObject) {
157    const type = sink?.type;
158    if (type !== undefined)
159      throw new ERR_INVALID_ARG_VALUE.RangeError('type', type);
160
161    this[kState] = {
162      close: createDeferredPromise(),
163      closeRequest: {
164        promise: undefined,
165        resolve: undefined,
166        reject: undefined,
167      },
168      inFlightWriteRequest: {
169        promise: undefined,
170        resolve: undefined,
171        reject: undefined,
172      },
173      inFlightCloseRequest: {
174        promise: undefined,
175        resolve: undefined,
176        reject: undefined,
177      },
178      pendingAbortRequest: {
179        abort: {
180          promise: undefined,
181          resolve: undefined,
182          reject: undefined,
183        },
184        reason: undefined,
185        wasAlreadyErroring: false,
186      },
187      backpressure: false,
188      controller: undefined,
189      state: 'writable',
190      storedError: undefined,
191      writeRequests: [],
192      writer: undefined,
193      transfer: {
194        readable: undefined,
195        port1: undefined,
196        port2: undefined,
197        promise: undefined,
198      },
199    };
200
201    this[kIsClosedPromise] = createDeferredPromise();
202    this[kControllerErrorFunction] = () => {};
203
204    const size = extractSizeAlgorithm(strategy?.size);
205    const highWaterMark = extractHighWaterMark(strategy?.highWaterMark, 1);
206
207    setupWritableStreamDefaultControllerFromSink(
208      this,
209      sink,
210      highWaterMark,
211      size);
212
213    // eslint-disable-next-line no-constructor-return
214    return makeTransferable(this);
215  }
216
217  /**
218   * @readonly
219   * @type {boolean}
220   */
221  get locked() {
222    if (!isWritableStream(this))
223      throw new ERR_INVALID_THIS('WritableStream');
224    return isWritableStreamLocked(this);
225  }
226
227  /**
228   * @param {any} [reason]
229   * @returns {Promise<void>}
230   */
231  abort(reason = undefined) {
232    if (!isWritableStream(this))
233      return PromiseReject(new ERR_INVALID_THIS('WritableStream'));
234    if (isWritableStreamLocked(this)) {
235      return PromiseReject(
236        new ERR_INVALID_STATE.TypeError('WritableStream is locked'));
237    }
238    return writableStreamAbort(this, reason);
239  }
240
241  /**
242   * @returns {Promise<void>}
243   */
244  close() {
245    if (!isWritableStream(this))
246      return PromiseReject(new ERR_INVALID_THIS('WritableStream'));
247    if (isWritableStreamLocked(this)) {
248      return PromiseReject(
249        new ERR_INVALID_STATE.TypeError('WritableStream is locked'));
250    }
251    if (writableStreamCloseQueuedOrInFlight(this)) {
252      return PromiseReject(
253        new ERR_INVALID_STATE.TypeError('Failure closing WritableStream'));
254    }
255    return writableStreamClose(this);
256  }
257
258  /**
259   * @returns {WritableStreamDefaultWriter}
260   */
261  getWriter() {
262    if (!isWritableStream(this))
263      throw new ERR_INVALID_THIS('WritableStream');
264    // eslint-disable-next-line no-use-before-define
265    return new WritableStreamDefaultWriter(this);
266  }
267
268  [kInspect](depth, options) {
269    return customInspect(depth, options, this[kType], {
270      locked: this.locked,
271      state: this[kState].state,
272    });
273  }
274
275  [kTransfer]() {
276    if (!isWritableStream(this))
277      throw new ERR_INVALID_THIS('WritableStream');
278    if (this.locked) {
279      this[kState].transfer.port1?.close();
280      this[kState].transfer.port1 = undefined;
281      this[kState].transfer.port2 = undefined;
282      throw new DOMException(
283        'Cannot transfer a locked WritableStream',
284        'DataCloneError');
285    }
286
287    const {
288      readable,
289      promise,
290    } = lazyTransfer().newCrossRealmReadableStream(
291      this,
292      this[kState].transfer.port1);
293
294    this[kState].transfer.readable = readable;
295    this[kState].transfer.promise = promise;
296
297    setPromiseHandled(this[kState].transfer.promise);
298
299    return {
300      data: { port: this[kState].transfer.port2 },
301      deserializeInfo:
302        'internal/webstreams/writablestream:TransferredWritableStream',
303    };
304  }
305
306  [kTransferList]() {
307    const { port1, port2 } = new MessageChannel();
308    this[kState].transfer.port1 = port1;
309    this[kState].transfer.port2 = port2;
310    return [ port2 ];
311  }
312
313  [kDeserialize]({ port }) {
314    const transfer = lazyTransfer();
315    setupWritableStreamDefaultControllerFromSink(
316      this,
317      new transfer.CrossRealmTransformWritableSink(port),
318      1,
319      () => 1);
320  }
321}
322
323ObjectDefineProperties(WritableStream.prototype, {
324  locked: kEnumerableProperty,
325  abort: kEnumerableProperty,
326  close: kEnumerableProperty,
327  getWriter: kEnumerableProperty,
328  [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStream.name),
329});
330
331function TransferredWritableStream() {
332  return makeTransferable(ReflectConstruct(
333    function() {
334      this[kType] = 'WritableStream';
335      this[kState] = {
336        close: createDeferredPromise(),
337        closeRequest: {
338          promise: undefined,
339          resolve: undefined,
340          reject: undefined,
341        },
342        inFlightWriteRequest: {
343          promise: undefined,
344          resolve: undefined,
345          reject: undefined,
346        },
347        inFlightCloseRequest: {
348          promise: undefined,
349          resolve: undefined,
350          reject: undefined,
351        },
352        pendingAbortRequest: {
353          abort: {
354            promise: undefined,
355            resolve: undefined,
356            reject: undefined,
357          },
358          reason: undefined,
359          wasAlreadyErroring: false,
360        },
361        backpressure: false,
362        controller: undefined,
363        state: 'writable',
364        storedError: undefined,
365        writeRequests: [],
366        writer: undefined,
367        transfer: {
368          promise: undefined,
369          port1: undefined,
370          port2: undefined,
371          readable: undefined,
372        },
373      };
374      this[kIsClosedPromise] = createDeferredPromise();
375      this[kControllerErrorFunction] = () => {};
376    },
377    [], WritableStream));
378}
379TransferredWritableStream.prototype[kDeserialize] = () => {};
380
381class WritableStreamDefaultWriter {
382  [kType] = 'WritableStreamDefaultWriter';
383
384  /**
385   * @param {WritableStream} stream
386   */
387  constructor(stream) {
388    if (!isWritableStream(stream))
389      throw new ERR_INVALID_ARG_TYPE('stream', 'WritableStream', stream);
390    this[kState] = {
391      stream: undefined,
392      close: {
393        promise: undefined,
394        resolve: undefined,
395        reject: undefined,
396      },
397      ready: {
398        promise: undefined,
399        resolve: undefined,
400        reject: undefined,
401      },
402    };
403    setupWritableStreamDefaultWriter(this, stream);
404  }
405
406  /**
407   * @readonly
408   * @type {Promise<void>}
409   */
410  get closed() {
411    if (!isWritableStreamDefaultWriter(this))
412      return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter'));
413    return this[kState].close.promise;
414  }
415
416  /**
417   * @readonly
418   * @type {number}
419   */
420  get desiredSize() {
421    if (!isWritableStreamDefaultWriter(this))
422      throw new ERR_INVALID_THIS('WritableStreamDefaultWriter');
423    if (this[kState].stream === undefined) {
424      throw new ERR_INVALID_STATE.TypeError(
425        'Writer is not bound to a WritableStream');
426    }
427    return writableStreamDefaultWriterGetDesiredSize(this);
428  }
429
430  /**
431   * @readonly
432   * @type {Promise<void>}
433   */
434  get ready() {
435    if (!isWritableStreamDefaultWriter(this))
436      return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter'));
437    return this[kState].ready.promise;
438  }
439
440  /**
441   * @param {any} [reason]
442   * @returns {Promise<void>}
443   */
444  abort(reason = undefined) {
445    if (!isWritableStreamDefaultWriter(this))
446      return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter'));
447    if (this[kState].stream === undefined) {
448      return PromiseReject(
449        new ERR_INVALID_STATE.TypeError(
450          'Writer is not bound to a WritableStream'));
451    }
452    return writableStreamDefaultWriterAbort(this, reason);
453  }
454
455  /**
456   * @returns {Promise<void>}
457   */
458  close() {
459    if (!isWritableStreamDefaultWriter(this))
460      return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter'));
461    const {
462      stream,
463    } = this[kState];
464    if (stream === undefined) {
465      return PromiseReject(
466        new ERR_INVALID_STATE.TypeError(
467          'Writer is not bound to a WritableStream'));
468    }
469    if (writableStreamCloseQueuedOrInFlight(stream)) {
470      return PromiseReject(
471        new ERR_INVALID_STATE.TypeError('Failure to close WritableStream'));
472    }
473    return writableStreamDefaultWriterClose(this);
474  }
475
476  releaseLock() {
477    if (!isWritableStreamDefaultWriter(this))
478      throw new ERR_INVALID_THIS('WritableStreamDefaultWriter');
479    const {
480      stream,
481    } = this[kState];
482    if (stream === undefined)
483      return;
484    assert(stream[kState].writer !== undefined);
485    writableStreamDefaultWriterRelease(this);
486  }
487
488  /**
489   * @param {any} [chunk]
490   * @returns {Promise<void>}
491   */
492  write(chunk = undefined) {
493    if (!isWritableStreamDefaultWriter(this))
494      return PromiseReject(new ERR_INVALID_THIS('WritableStreamDefaultWriter'));
495    if (this[kState].stream === undefined) {
496      return PromiseReject(
497        new ERR_INVALID_STATE.TypeError(
498          'Writer is not bound to a WritableStream'));
499    }
500    return writableStreamDefaultWriterWrite(this, chunk);
501  }
502
503  [kInspect](depth, options) {
504    return customInspect(depth, options, this[kType], {
505      stream: this[kState].stream,
506      close: this[kState].close.promise,
507      ready: this[kState].ready.promise,
508      desiredSize: this.desiredSize,
509    });
510  }
511}
512
513ObjectDefineProperties(WritableStreamDefaultWriter.prototype, {
514  closed: kEnumerableProperty,
515  ready: kEnumerableProperty,
516  desiredSize: kEnumerableProperty,
517  abort: kEnumerableProperty,
518  close: kEnumerableProperty,
519  releaseLock: kEnumerableProperty,
520  write: kEnumerableProperty,
521  [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultWriter.name),
522});
523
524class WritableStreamDefaultController {
525  [kType] = 'WritableStreamDefaultController';
526
527  constructor(skipThrowSymbol = undefined) {
528    if (skipThrowSymbol !== kSkipThrow) {
529      throw new ERR_ILLEGAL_CONSTRUCTOR();
530    }
531  }
532
533  [kAbort](reason) {
534    const result = this[kState].abortAlgorithm(reason);
535    writableStreamDefaultControllerClearAlgorithms(this);
536    return result;
537  }
538
539  [kError]() {
540    resetQueue(this);
541  }
542
543  /**
544   * @type {AbortSignal}
545   */
546  get signal() {
547    if (!isWritableStreamDefaultController(this))
548      throw new ERR_INVALID_THIS('WritableStreamDefaultController');
549    return this[kState].abortController.signal;
550  }
551
552  /**
553   * @param {any} [error]
554   */
555  error(error = undefined) {
556    if (!isWritableStreamDefaultController(this))
557      throw new ERR_INVALID_THIS('WritableStreamDefaultController');
558    if (this[kState].stream[kState].state !== 'writable')
559      return;
560    writableStreamDefaultControllerError(this, error);
561  }
562
563  [kInspect](depth, options) {
564    return customInspect(depth, options, this[kType], {
565      stream: this[kState].stream,
566    });
567  }
568}
569
570ObjectDefineProperties(WritableStreamDefaultController.prototype, {
571  signal: kEnumerableProperty,
572  error: kEnumerableProperty,
573  [SymbolToStringTag]: getNonWritablePropertyDescriptor(WritableStreamDefaultController.name),
574});
575
576const isWritableStream =
577  isBrandCheck('WritableStream');
578const isWritableStreamDefaultWriter =
579  isBrandCheck('WritableStreamDefaultWriter');
580const isWritableStreamDefaultController =
581  isBrandCheck('WritableStreamDefaultController');
582
583function isWritableStreamLocked(stream) {
584  return stream[kState].writer !== undefined;
585}
586
587function setupWritableStreamDefaultWriter(writer, stream) {
588  if (isWritableStreamLocked(stream))
589    throw new ERR_INVALID_STATE.TypeError('WritableStream is locked');
590  writer[kState].stream = stream;
591  stream[kState].writer = writer;
592  switch (stream[kState].state) {
593    case 'writable':
594      if (!writableStreamCloseQueuedOrInFlight(stream) &&
595          stream[kState].backpressure) {
596        writer[kState].ready = createDeferredPromise();
597      } else {
598        writer[kState].ready = {
599          promise: PromiseResolve(),
600          resolve: undefined,
601          reject: undefined,
602        };
603      }
604      setClosedPromiseToNewPromise();
605      break;
606    case 'erroring':
607      writer[kState].ready = {
608        promise: PromiseReject(stream[kState].storedError),
609        resolve: undefined,
610        reject: undefined,
611      };
612      setPromiseHandled(writer[kState].ready.promise);
613      setClosedPromiseToNewPromise();
614      break;
615    case 'closed':
616      writer[kState].ready = {
617        promise: PromiseResolve(),
618        resolve: undefined,
619        reject: undefined,
620      };
621      writer[kState].close = {
622        promise: PromiseResolve(),
623        resolve: undefined,
624        reject: undefined,
625      };
626      break;
627    default:
628      writer[kState].ready = {
629        promise: PromiseReject(stream[kState].storedError),
630        resolve: undefined,
631        reject: undefined,
632      };
633      writer[kState].close = {
634        promise: PromiseReject(stream[kState].storedError),
635        resolve: undefined,
636        reject: undefined,
637      };
638      setPromiseHandled(writer[kState].ready.promise);
639      setPromiseHandled(writer[kState].close.promise);
640  }
641
642  function setClosedPromiseToNewPromise() {
643    writer[kState].close = createDeferredPromise();
644  }
645}
646
647function writableStreamAbort(stream, reason) {
648  const {
649    state,
650    controller,
651  } = stream[kState];
652  if (state === 'closed' || state === 'errored')
653    return PromiseResolve();
654
655  controller[kState].abortController.abort(reason);
656
657  if (stream[kState].pendingAbortRequest.abort.promise !== undefined)
658    return stream[kState].pendingAbortRequest.abort.promise;
659
660  assert(state === 'writable' || state === 'erroring');
661
662  let wasAlreadyErroring = false;
663  if (state === 'erroring') {
664    wasAlreadyErroring = true;
665    reason = undefined;
666  }
667
668  const abort = createDeferredPromise();
669
670  stream[kState].pendingAbortRequest = {
671    abort,
672    reason,
673    wasAlreadyErroring,
674  };
675
676  if (!wasAlreadyErroring)
677    writableStreamStartErroring(stream, reason);
678
679  return abort.promise;
680}
681
682function writableStreamClose(stream) {
683  const {
684    state,
685    writer,
686    backpressure,
687    controller,
688  } = stream[kState];
689  if (state === 'closed' || state === 'errored') {
690    return PromiseReject(
691      new ERR_INVALID_STATE.TypeError('WritableStream is closed'));
692  }
693  assert(state === 'writable' || state === 'erroring');
694  assert(!writableStreamCloseQueuedOrInFlight(stream));
695  stream[kState].closeRequest = createDeferredPromise();
696  const { promise } = stream[kState].closeRequest;
697  if (writer !== undefined && backpressure && state === 'writable')
698    writer[kState].ready.resolve?.();
699  writableStreamDefaultControllerClose(controller);
700  return promise;
701}
702
703function writableStreamUpdateBackpressure(stream, backpressure) {
704  assert(stream[kState].state === 'writable');
705  assert(!writableStreamCloseQueuedOrInFlight(stream));
706  const {
707    writer,
708  } = stream[kState];
709  if (writer !== undefined && stream[kState].backpressure !== backpressure) {
710    if (backpressure) {
711      writer[kState].ready = createDeferredPromise();
712    } else {
713      writer[kState].ready.resolve?.();
714    }
715  }
716  stream[kState].backpressure = backpressure;
717}
718
719function writableStreamStartErroring(stream, reason) {
720  assert(stream[kState].storedError === undefined);
721  assert(stream[kState].state === 'writable');
722  const {
723    controller,
724    writer,
725  } = stream[kState];
726  assert(controller !== undefined);
727  stream[kState].state = 'erroring';
728  stream[kState].storedError = reason;
729  if (writer !== undefined) {
730    writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
731  }
732  if (!writableStreamHasOperationMarkedInFlight(stream) &&
733      controller[kState].started) {
734    writableStreamFinishErroring(stream);
735  }
736}
737
738function writableStreamRejectCloseAndClosedPromiseIfNeeded(stream) {
739  assert(stream[kState].state === 'errored');
740  if (stream[kState].closeRequest.promise !== undefined) {
741    assert(stream[kState].inFlightCloseRequest.promise === undefined);
742    stream[kState].closeRequest.reject?.(stream[kState].storedError);
743    stream[kState].closeRequest = {
744      promise: undefined,
745      reject: undefined,
746      resolve: undefined,
747    };
748  }
749
750  stream[kIsClosedPromise].reject(stream[kState]?.storedError);
751  setPromiseHandled(stream[kIsClosedPromise].promise);
752
753  const {
754    writer,
755  } = stream[kState];
756  if (writer !== undefined) {
757    writer[kState].close.reject?.(stream[kState].storedError);
758    setPromiseHandled(writer[kState].close.promise);
759  }
760}
761
762function writableStreamMarkFirstWriteRequestInFlight(stream) {
763  assert(stream[kState].inFlightWriteRequest.promise === undefined);
764  assert(stream[kState].writeRequests.length);
765  const writeRequest = ArrayPrototypeShift(stream[kState].writeRequests);
766  stream[kState].inFlightWriteRequest = writeRequest;
767}
768
769function writableStreamMarkCloseRequestInFlight(stream) {
770  assert(stream[kState].inFlightWriteRequest.promise === undefined);
771  assert(stream[kState].closeRequest.promise !== undefined);
772  stream[kState].inFlightCloseRequest = stream[kState].closeRequest;
773  stream[kState].closeRequest = {
774    promise: undefined,
775    resolve: undefined,
776    reject: undefined,
777  };
778}
779
780function writableStreamHasOperationMarkedInFlight(stream) {
781  const {
782    inFlightWriteRequest,
783    inFlightCloseRequest,
784  } = stream[kState];
785  if (inFlightWriteRequest.promise === undefined &&
786      inFlightCloseRequest.promise === undefined) {
787    return false;
788  }
789  return true;
790}
791
792function writableStreamFinishInFlightWriteWithError(stream, error) {
793  assert(stream[kState].inFlightWriteRequest.promise !== undefined);
794  stream[kState].inFlightWriteRequest.reject?.(error);
795  stream[kState].inFlightWriteRequest = {
796    promise: undefined,
797    resolve: undefined,
798    reject: undefined,
799  };
800  assert(stream[kState].state === 'writable' ||
801         stream[kState].state === 'erroring');
802  writableStreamDealWithRejection(stream, error);
803}
804
805function writableStreamFinishInFlightWrite(stream) {
806  assert(stream[kState].inFlightWriteRequest.promise !== undefined);
807  stream[kState].inFlightWriteRequest.resolve?.();
808  stream[kState].inFlightWriteRequest = {
809    promise: undefined,
810    resolve: undefined,
811    reject: undefined,
812  };
813}
814
815function writableStreamFinishInFlightCloseWithError(stream, error) {
816  assert(stream[kState].inFlightCloseRequest.promise !== undefined);
817  stream[kState].inFlightCloseRequest.reject?.(error);
818  stream[kState].inFlightCloseRequest = {
819    promise: undefined,
820    resolve: undefined,
821    reject: undefined,
822  };
823  assert(stream[kState].state === 'writable' ||
824         stream[kState].state === 'erroring');
825  if (stream[kState].pendingAbortRequest.abort.promise !== undefined) {
826    stream[kState].pendingAbortRequest.abort.reject?.(error);
827    stream[kState].pendingAbortRequest = {
828      abort: {
829        promise: undefined,
830        resolve: undefined,
831        reject: undefined,
832      },
833      reason: undefined,
834      wasAlreadyErroring: false,
835    };
836  }
837  writableStreamDealWithRejection(stream, error);
838}
839
840function writableStreamFinishInFlightClose(stream) {
841  assert(stream[kState].inFlightCloseRequest.promise !== undefined);
842  stream[kState].inFlightCloseRequest.resolve?.();
843  stream[kState].inFlightCloseRequest = {
844    promise: undefined,
845    resolve: undefined,
846    reject: undefined,
847  };
848  if (stream[kState].state === 'erroring') {
849    stream[kState].storedError = undefined;
850    if (stream[kState].pendingAbortRequest.abort.promise !== undefined) {
851      stream[kState].pendingAbortRequest.abort.resolve?.();
852      stream[kState].pendingAbortRequest = {
853        abort: {
854          promise: undefined,
855          resolve: undefined,
856          reject: undefined,
857        },
858        reason: undefined,
859        wasAlreadyErroring: false,
860      };
861    }
862  }
863  stream[kState].state = 'closed';
864  if (stream[kState].writer !== undefined)
865    stream[kState].writer[kState].close.resolve?.();
866  stream[kIsClosedPromise].resolve?.();
867  assert(stream[kState].pendingAbortRequest.abort.promise === undefined);
868  assert(stream[kState].storedError === undefined);
869}
870
871function writableStreamFinishErroring(stream) {
872  assert(stream[kState].state === 'erroring');
873  assert(!writableStreamHasOperationMarkedInFlight(stream));
874  stream[kState].state = 'errored';
875  stream[kState].controller[kError]();
876  const storedError = stream[kState].storedError;
877  for (let n = 0; n < stream[kState].writeRequests.length; n++)
878    stream[kState].writeRequests[n].reject?.(storedError);
879  stream[kState].writeRequests = [];
880
881  if (stream[kState].pendingAbortRequest.abort.promise === undefined) {
882    writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
883    return;
884  }
885
886  const abortRequest = stream[kState].pendingAbortRequest;
887  stream[kState].pendingAbortRequest = {
888    abort: {
889      promise: undefined,
890      resolve: undefined,
891      reject: undefined,
892    },
893    reason: undefined,
894    wasAlreadyErroring: false,
895  };
896  if (abortRequest.wasAlreadyErroring) {
897    abortRequest.abort.reject?.(storedError);
898    writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
899    return;
900  }
901  PromisePrototypeThen(
902    ensureIsPromise(
903      stream[kState].controller[kAbort],
904      stream[kState].controller,
905      abortRequest.reason),
906    () => {
907      abortRequest.abort.resolve?.();
908      writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
909    },
910    (error) => {
911      abortRequest.abort.reject?.(error);
912      writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
913    });
914}
915
916function writableStreamDealWithRejection(stream, error) {
917  const {
918    state,
919  } = stream[kState];
920  if (state === 'writable') {
921    writableStreamStartErroring(stream, error);
922    return;
923  }
924
925  assert(state === 'erroring');
926  writableStreamFinishErroring(stream);
927}
928
929function writableStreamCloseQueuedOrInFlight(stream) {
930  if (stream[kState].closeRequest.promise === undefined &&
931      stream[kState].inFlightCloseRequest.promise === undefined) {
932    return false;
933  }
934  return true;
935}
936
937function writableStreamAddWriteRequest(stream) {
938  assert(isWritableStreamLocked(stream));
939  assert(stream[kState].state === 'writable');
940  const {
941    promise,
942    resolve,
943    reject,
944  } = createDeferredPromise();
945  ArrayPrototypePush(
946    stream[kState].writeRequests,
947    {
948      promise,
949      resolve,
950      reject,
951    });
952  return promise;
953}
954
955function writableStreamDefaultWriterWrite(writer, chunk) {
956  const {
957    stream,
958  } = writer[kState];
959  assert(stream !== undefined);
960  const {
961    controller,
962  } = stream[kState];
963  const chunkSize = writableStreamDefaultControllerGetChunkSize(
964    controller,
965    chunk);
966  if (stream !== writer[kState].stream) {
967    return PromiseReject(
968      new ERR_INVALID_STATE.TypeError('Mismatched WritableStreams'));
969  }
970  const {
971    state,
972  } = stream[kState];
973
974  if (state === 'errored')
975    return PromiseReject(stream[kState].storedError);
976
977  if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed') {
978    return PromiseReject(
979      new ERR_INVALID_STATE.TypeError('WritableStream is closed'));
980  }
981
982  if (state === 'erroring')
983    return PromiseReject(stream[kState].storedError);
984
985  assert(state === 'writable');
986
987  const promise = writableStreamAddWriteRequest(stream);
988  writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
989  return promise;
990}
991
992function writableStreamDefaultWriterRelease(writer) {
993  const {
994    stream,
995  } = writer[kState];
996  assert(stream !== undefined);
997  assert(stream[kState].writer === writer);
998  const releasedStateError = lazyWritableReleasedError();
999  writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, releasedStateError);
1000  writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, releasedStateError);
1001  stream[kState].writer = undefined;
1002  writer[kState].stream = undefined;
1003}
1004
1005function writableStreamDefaultWriterGetDesiredSize(writer) {
1006  const {
1007    stream,
1008  } = writer[kState];
1009  switch (stream[kState].state) {
1010    case 'errored':
1011      // Fall through
1012    case 'erroring':
1013      return null;
1014    case 'closed':
1015      return 0;
1016  }
1017  return writableStreamDefaultControllerGetDesiredSize(
1018    stream[kState].controller);
1019}
1020
1021function writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, error) {
1022  if (isPromisePending(writer[kState].ready.promise)) {
1023    writer[kState].ready.reject?.(error);
1024  } else {
1025    writer[kState].ready = {
1026      promise: PromiseReject(error),
1027      resolve: undefined,
1028      reject: undefined,
1029    };
1030  }
1031  setPromiseHandled(writer[kState].ready.promise);
1032}
1033
1034function writableStreamDefaultWriterEnsureClosedPromiseRejected(writer, error) {
1035  if (isPromisePending(writer[kState].close.promise)) {
1036    writer[kState].close.reject?.(error);
1037  } else {
1038    writer[kState].close = {
1039      promise: PromiseReject(error),
1040      resolve: undefined,
1041      reject: undefined,
1042    };
1043  }
1044  setPromiseHandled(writer[kState].close.promise);
1045}
1046
1047function writableStreamDefaultWriterCloseWithErrorPropagation(writer) {
1048  const {
1049    stream,
1050  } = writer[kState];
1051  assert(stream !== undefined);
1052  const {
1053    state,
1054  } = stream[kState];
1055  if (writableStreamCloseQueuedOrInFlight(stream) || state === 'closed')
1056    return PromiseResolve();
1057
1058  if (state === 'errored')
1059    return PromiseReject(stream[kState].storedError);
1060
1061  assert(state === 'writable' || state === 'erroring');
1062
1063  return writableStreamDefaultWriterClose(writer);
1064}
1065
1066function writableStreamDefaultWriterClose(writer) {
1067  const {
1068    stream,
1069  } = writer[kState];
1070  assert(stream !== undefined);
1071  return writableStreamClose(stream);
1072}
1073
1074function writableStreamDefaultWriterAbort(writer, reason) {
1075  const {
1076    stream,
1077  } = writer[kState];
1078  assert(stream !== undefined);
1079  return writableStreamAbort(stream, reason);
1080}
1081
1082function writableStreamDefaultControllerWrite(controller, chunk, chunkSize) {
1083  try {
1084    enqueueValueWithSize(controller, chunk, chunkSize);
1085  } catch (error) {
1086    writableStreamDefaultControllerErrorIfNeeded(controller, error);
1087    return;
1088  }
1089  const {
1090    stream,
1091  } = controller[kState];
1092  if (!writableStreamCloseQueuedOrInFlight(stream) &&
1093      stream[kState].state === 'writable') {
1094    writableStreamUpdateBackpressure(
1095      stream,
1096      writableStreamDefaultControllerGetBackpressure(controller));
1097  }
1098  writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
1099}
1100
1101function writableStreamDefaultControllerProcessWrite(controller, chunk) {
1102  const {
1103    stream,
1104    writeAlgorithm,
1105  } = controller[kState];
1106  writableStreamMarkFirstWriteRequestInFlight(stream);
1107
1108  PromisePrototypeThen(
1109    ensureIsPromise(writeAlgorithm, controller, chunk, controller),
1110    () => {
1111      writableStreamFinishInFlightWrite(stream);
1112      const {
1113        state,
1114      } = stream[kState];
1115      assert(state === 'writable' || state === 'erroring');
1116      dequeueValue(controller);
1117      if (!writableStreamCloseQueuedOrInFlight(stream) &&
1118          state === 'writable') {
1119        writableStreamUpdateBackpressure(
1120          stream,
1121          writableStreamDefaultControllerGetBackpressure(controller));
1122      }
1123      writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
1124    },
1125    (error) => {
1126      if (stream[kState].state === 'writable')
1127        writableStreamDefaultControllerClearAlgorithms(controller);
1128      writableStreamFinishInFlightWriteWithError(stream, error);
1129    });
1130
1131}
1132
1133function writableStreamDefaultControllerProcessClose(controller) {
1134  const {
1135    closeAlgorithm,
1136    queue,
1137    stream,
1138  } = controller[kState];
1139  writableStreamMarkCloseRequestInFlight(stream);
1140  dequeueValue(controller);
1141  assert(!queue.length);
1142  const sinkClosePromise = ensureIsPromise(closeAlgorithm, controller);
1143  writableStreamDefaultControllerClearAlgorithms(controller);
1144  PromisePrototypeThen(
1145    sinkClosePromise,
1146    () => writableStreamFinishInFlightClose(stream),
1147    (error) => writableStreamFinishInFlightCloseWithError(stream, error));
1148}
1149
1150function writableStreamDefaultControllerGetDesiredSize(controller) {
1151  const {
1152    highWaterMark,
1153    queueTotalSize,
1154  } = controller[kState];
1155  return highWaterMark - queueTotalSize;
1156}
1157
1158function writableStreamDefaultControllerGetChunkSize(controller, chunk) {
1159  try {
1160    return FunctionPrototypeCall(
1161      controller[kState].sizeAlgorithm,
1162      undefined,
1163      chunk);
1164  } catch (error) {
1165    writableStreamDefaultControllerErrorIfNeeded(controller, error);
1166    return 1;
1167  }
1168}
1169
1170function writableStreamDefaultControllerErrorIfNeeded(controller, error) {
1171  const {
1172    stream,
1173  } = controller[kState];
1174  if (stream[kState].state === 'writable')
1175    writableStreamDefaultControllerError(controller, error);
1176}
1177
1178function writableStreamDefaultControllerError(controller, error) {
1179  const {
1180    stream,
1181  } = controller[kState];
1182  assert(stream[kState].state === 'writable');
1183  writableStreamDefaultControllerClearAlgorithms(controller);
1184  writableStreamStartErroring(stream, error);
1185}
1186
1187function writableStreamDefaultControllerClose(controller) {
1188  enqueueValueWithSize(controller, kCloseSentinel, 0);
1189  writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
1190}
1191
1192function writableStreamDefaultControllerClearAlgorithms(controller) {
1193  controller[kState].writeAlgorithm = undefined;
1194  controller[kState].closeAlgorithm = undefined;
1195  controller[kState].abortAlgorithm = undefined;
1196  controller[kState].sizeAlgorithm = undefined;
1197}
1198
1199function writableStreamDefaultControllerGetBackpressure(controller) {
1200  return writableStreamDefaultControllerGetDesiredSize(controller) <= 0;
1201}
1202
1203function writableStreamDefaultControllerAdvanceQueueIfNeeded(controller) {
1204  const {
1205    queue,
1206    started,
1207    stream,
1208  } = controller[kState];
1209  if (!started || stream[kState].inFlightWriteRequest.promise !== undefined)
1210    return;
1211
1212  if (stream[kState].state === 'erroring') {
1213    writableStreamFinishErroring(stream);
1214    return;
1215  }
1216
1217  if (!queue.length)
1218    return;
1219
1220  const value = peekQueueValue(controller);
1221  if (value === kCloseSentinel)
1222    writableStreamDefaultControllerProcessClose(controller);
1223  else
1224    writableStreamDefaultControllerProcessWrite(controller, value);
1225}
1226
1227function setupWritableStreamDefaultControllerFromSink(
1228  stream,
1229  sink,
1230  highWaterMark,
1231  sizeAlgorithm) {
1232  const controller = new WritableStreamDefaultController(kSkipThrow);
1233  const start = sink?.start;
1234  const write = sink?.write;
1235  const close = sink?.close;
1236  const abort = sink?.abort;
1237  const startAlgorithm = start ?
1238    FunctionPrototypeBind(start, sink, controller) :
1239    nonOpStart;
1240  const writeAlgorithm = write ?
1241    FunctionPrototypeBind(write, sink) :
1242    nonOpWrite;
1243  const closeAlgorithm = close ?
1244    FunctionPrototypeBind(close, sink) : nonOpCancel;
1245  const abortAlgorithm = abort ?
1246    FunctionPrototypeBind(abort, sink) : nonOpCancel;
1247  setupWritableStreamDefaultController(
1248    stream,
1249    controller,
1250    startAlgorithm,
1251    writeAlgorithm,
1252    closeAlgorithm,
1253    abortAlgorithm,
1254    highWaterMark,
1255    sizeAlgorithm);
1256}
1257
1258function setupWritableStreamDefaultController(
1259  stream,
1260  controller,
1261  startAlgorithm,
1262  writeAlgorithm,
1263  closeAlgorithm,
1264  abortAlgorithm,
1265  highWaterMark,
1266  sizeAlgorithm) {
1267  assert(isWritableStream(stream));
1268  assert(stream[kState].controller === undefined);
1269  controller[kState] = {
1270    abortAlgorithm,
1271    closeAlgorithm,
1272    highWaterMark,
1273    queue: [],
1274    queueTotalSize: 0,
1275    abortController: new AbortController(),
1276    sizeAlgorithm,
1277    started: false,
1278    stream,
1279    writeAlgorithm,
1280  };
1281  stream[kState].controller = controller;
1282  stream[kControllerErrorFunction] = FunctionPrototypeBind(controller.error, controller);
1283
1284  writableStreamUpdateBackpressure(
1285    stream,
1286    writableStreamDefaultControllerGetBackpressure(controller));
1287
1288  const startResult = startAlgorithm();
1289
1290  PromisePrototypeThen(
1291    PromiseResolve(startResult),
1292    () => {
1293      assert(stream[kState].state === 'writable' ||
1294             stream[kState].state === 'erroring');
1295      controller[kState].started = true;
1296      writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
1297    },
1298    (error) => {
1299      assert(stream[kState].state === 'writable' ||
1300             stream[kState].state === 'erroring');
1301      controller[kState].started = true;
1302      writableStreamDealWithRejection(stream, error);
1303    });
1304}
1305
1306module.exports = {
1307  WritableStream,
1308  WritableStreamDefaultWriter,
1309  WritableStreamDefaultController,
1310  TransferredWritableStream,
1311
1312  // Exported Brand Checks
1313  isWritableStream,
1314  isWritableStreamDefaultController,
1315  isWritableStreamDefaultWriter,
1316
1317  isWritableStreamLocked,
1318  setupWritableStreamDefaultWriter,
1319  writableStreamAbort,
1320  writableStreamClose,
1321  writableStreamUpdateBackpressure,
1322  writableStreamStartErroring,
1323  writableStreamRejectCloseAndClosedPromiseIfNeeded,
1324  writableStreamMarkFirstWriteRequestInFlight,
1325  writableStreamMarkCloseRequestInFlight,
1326  writableStreamHasOperationMarkedInFlight,
1327  writableStreamFinishInFlightWriteWithError,
1328  writableStreamFinishInFlightWrite,
1329  writableStreamFinishInFlightCloseWithError,
1330  writableStreamFinishInFlightClose,
1331  writableStreamFinishErroring,
1332  writableStreamDealWithRejection,
1333  writableStreamCloseQueuedOrInFlight,
1334  writableStreamAddWriteRequest,
1335  writableStreamDefaultWriterWrite,
1336  writableStreamDefaultWriterRelease,
1337  writableStreamDefaultWriterGetDesiredSize,
1338  writableStreamDefaultWriterEnsureReadyPromiseRejected,
1339  writableStreamDefaultWriterEnsureClosedPromiseRejected,
1340  writableStreamDefaultWriterCloseWithErrorPropagation,
1341  writableStreamDefaultWriterClose,
1342  writableStreamDefaultWriterAbort,
1343  writableStreamDefaultControllerWrite,
1344  writableStreamDefaultControllerProcessWrite,
1345  writableStreamDefaultControllerProcessClose,
1346  writableStreamDefaultControllerGetDesiredSize,
1347  writableStreamDefaultControllerGetChunkSize,
1348  writableStreamDefaultControllerErrorIfNeeded,
1349  writableStreamDefaultControllerError,
1350  writableStreamDefaultControllerClose,
1351  writableStreamDefaultControllerClearAlgorithms,
1352  writableStreamDefaultControllerGetBackpressure,
1353  writableStreamDefaultControllerAdvanceQueueIfNeeded,
1354  setupWritableStreamDefaultControllerFromSink,
1355  setupWritableStreamDefaultController,
1356};
1357