11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst {
41cb0ef41Sopenharmony_ci  PromisePrototypeThen,
51cb0ef41Sopenharmony_ci  PromiseResolve,
61cb0ef41Sopenharmony_ci  SafePromiseAll,
71cb0ef41Sopenharmony_ci  SafePromisePrototypeFinally,
81cb0ef41Sopenharmony_ci  TypedArrayPrototypeGetBuffer,
91cb0ef41Sopenharmony_ci  TypedArrayPrototypeGetByteOffset,
101cb0ef41Sopenharmony_ci  TypedArrayPrototypeGetByteLength,
111cb0ef41Sopenharmony_ci  Uint8Array,
121cb0ef41Sopenharmony_ci} = primordials;
131cb0ef41Sopenharmony_ci
141cb0ef41Sopenharmony_ciconst { TextEncoder } = require('internal/encoding');
151cb0ef41Sopenharmony_ci
161cb0ef41Sopenharmony_ciconst {
171cb0ef41Sopenharmony_ci  ReadableStream,
181cb0ef41Sopenharmony_ci  isReadableStream,
191cb0ef41Sopenharmony_ci} = require('internal/webstreams/readablestream');
201cb0ef41Sopenharmony_ci
211cb0ef41Sopenharmony_ciconst {
221cb0ef41Sopenharmony_ci  WritableStream,
231cb0ef41Sopenharmony_ci  isWritableStream,
241cb0ef41Sopenharmony_ci} = require('internal/webstreams/writablestream');
251cb0ef41Sopenharmony_ci
261cb0ef41Sopenharmony_ciconst {
271cb0ef41Sopenharmony_ci  CountQueuingStrategy,
281cb0ef41Sopenharmony_ci} = require('internal/webstreams/queuingstrategies');
291cb0ef41Sopenharmony_ci
301cb0ef41Sopenharmony_ciconst {
311cb0ef41Sopenharmony_ci  Writable,
321cb0ef41Sopenharmony_ci  Readable,
331cb0ef41Sopenharmony_ci  Duplex,
341cb0ef41Sopenharmony_ci  destroy,
351cb0ef41Sopenharmony_ci} = require('stream');
361cb0ef41Sopenharmony_ci
371cb0ef41Sopenharmony_ciconst {
381cb0ef41Sopenharmony_ci  isDestroyed,
391cb0ef41Sopenharmony_ci  isReadable,
401cb0ef41Sopenharmony_ci  isWritable,
411cb0ef41Sopenharmony_ci  isWritableEnded,
421cb0ef41Sopenharmony_ci} = require('internal/streams/utils');
431cb0ef41Sopenharmony_ci
441cb0ef41Sopenharmony_ciconst {
451cb0ef41Sopenharmony_ci  Buffer,
461cb0ef41Sopenharmony_ci} = require('buffer');
471cb0ef41Sopenharmony_ci
481cb0ef41Sopenharmony_ciconst {
491cb0ef41Sopenharmony_ci  errnoException,
501cb0ef41Sopenharmony_ci  codes: {
511cb0ef41Sopenharmony_ci    ERR_INVALID_ARG_TYPE,
521cb0ef41Sopenharmony_ci    ERR_INVALID_ARG_VALUE,
531cb0ef41Sopenharmony_ci    ERR_INVALID_STATE,
541cb0ef41Sopenharmony_ci    ERR_STREAM_PREMATURE_CLOSE,
551cb0ef41Sopenharmony_ci  },
561cb0ef41Sopenharmony_ci  AbortError,
571cb0ef41Sopenharmony_ci} = require('internal/errors');
581cb0ef41Sopenharmony_ci
591cb0ef41Sopenharmony_ciconst {
601cb0ef41Sopenharmony_ci  createDeferredPromise,
611cb0ef41Sopenharmony_ci  kEmptyObject,
621cb0ef41Sopenharmony_ci  normalizeEncoding,
631cb0ef41Sopenharmony_ci} = require('internal/util');
641cb0ef41Sopenharmony_ci
651cb0ef41Sopenharmony_ciconst {
661cb0ef41Sopenharmony_ci  validateBoolean,
671cb0ef41Sopenharmony_ci  validateFunction,
681cb0ef41Sopenharmony_ci  validateObject,
691cb0ef41Sopenharmony_ci} = require('internal/validators');
701cb0ef41Sopenharmony_ci
711cb0ef41Sopenharmony_ciconst {
721cb0ef41Sopenharmony_ci  WriteWrap,
731cb0ef41Sopenharmony_ci  ShutdownWrap,
741cb0ef41Sopenharmony_ci  kReadBytesOrError,
751cb0ef41Sopenharmony_ci  kLastWriteWasAsync,
761cb0ef41Sopenharmony_ci  streamBaseState,
771cb0ef41Sopenharmony_ci} = internalBinding('stream_wrap');
781cb0ef41Sopenharmony_ci
791cb0ef41Sopenharmony_ciconst finished = require('internal/streams/end-of-stream');
801cb0ef41Sopenharmony_ci
811cb0ef41Sopenharmony_ciconst { UV_EOF } = internalBinding('uv');
821cb0ef41Sopenharmony_ci
831cb0ef41Sopenharmony_ciconst encoder = new TextEncoder();
841cb0ef41Sopenharmony_ci
851cb0ef41Sopenharmony_ci/**
861cb0ef41Sopenharmony_ci * @typedef {import('../../stream').Writable} Writable
871cb0ef41Sopenharmony_ci * @typedef {import('../../stream').Readable} Readable
881cb0ef41Sopenharmony_ci * @typedef {import('./writablestream').WritableStream} WritableStream
891cb0ef41Sopenharmony_ci * @typedef {import('./readablestream').ReadableStream} ReadableStream
901cb0ef41Sopenharmony_ci */
911cb0ef41Sopenharmony_ci
921cb0ef41Sopenharmony_ci/**
931cb0ef41Sopenharmony_ci * @typedef {import('../abort_controller').AbortSignal} AbortSignal
941cb0ef41Sopenharmony_ci */
951cb0ef41Sopenharmony_ci
961cb0ef41Sopenharmony_ci/**
971cb0ef41Sopenharmony_ci * @param {Writable} streamWritable
981cb0ef41Sopenharmony_ci * @returns {WritableStream}
991cb0ef41Sopenharmony_ci */
1001cb0ef41Sopenharmony_cifunction newWritableStreamFromStreamWritable(streamWritable) {
1011cb0ef41Sopenharmony_ci  // Not using the internal/streams/utils isWritableNodeStream utility
1021cb0ef41Sopenharmony_ci  // here because it will return false if streamWritable is a Duplex
1031cb0ef41Sopenharmony_ci  // whose writable option is false. For a Duplex that is not writable,
1041cb0ef41Sopenharmony_ci  // we want it to pass this check but return a closed WritableStream.
1051cb0ef41Sopenharmony_ci  // We check if the given stream is a stream.Writable or http.OutgoingMessage
1061cb0ef41Sopenharmony_ci  const checkIfWritableOrOutgoingMessage =
1071cb0ef41Sopenharmony_ci    streamWritable &&
1081cb0ef41Sopenharmony_ci    typeof streamWritable?.write === 'function' &&
1091cb0ef41Sopenharmony_ci    typeof streamWritable?.on === 'function';
1101cb0ef41Sopenharmony_ci  if (!checkIfWritableOrOutgoingMessage) {
1111cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_TYPE(
1121cb0ef41Sopenharmony_ci      'streamWritable',
1131cb0ef41Sopenharmony_ci      'stream.Writable',
1141cb0ef41Sopenharmony_ci      streamWritable,
1151cb0ef41Sopenharmony_ci    );
1161cb0ef41Sopenharmony_ci  }
1171cb0ef41Sopenharmony_ci
1181cb0ef41Sopenharmony_ci  if (isDestroyed(streamWritable) || !isWritable(streamWritable)) {
1191cb0ef41Sopenharmony_ci    const writable = new WritableStream();
1201cb0ef41Sopenharmony_ci    writable.close();
1211cb0ef41Sopenharmony_ci    return writable;
1221cb0ef41Sopenharmony_ci  }
1231cb0ef41Sopenharmony_ci
1241cb0ef41Sopenharmony_ci  const highWaterMark = streamWritable.writableHighWaterMark;
1251cb0ef41Sopenharmony_ci  const strategy =
1261cb0ef41Sopenharmony_ci    streamWritable.writableObjectMode ?
1271cb0ef41Sopenharmony_ci      new CountQueuingStrategy({ highWaterMark }) :
1281cb0ef41Sopenharmony_ci      { highWaterMark };
1291cb0ef41Sopenharmony_ci
1301cb0ef41Sopenharmony_ci  let controller;
1311cb0ef41Sopenharmony_ci  let backpressurePromise;
1321cb0ef41Sopenharmony_ci  let closed;
1331cb0ef41Sopenharmony_ci
1341cb0ef41Sopenharmony_ci  function onDrain() {
1351cb0ef41Sopenharmony_ci    if (backpressurePromise !== undefined)
1361cb0ef41Sopenharmony_ci      backpressurePromise.resolve();
1371cb0ef41Sopenharmony_ci  }
1381cb0ef41Sopenharmony_ci
1391cb0ef41Sopenharmony_ci  const cleanup = finished(streamWritable, (error) => {
1401cb0ef41Sopenharmony_ci    if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
1411cb0ef41Sopenharmony_ci      const err = new AbortError(undefined, { cause: error });
1421cb0ef41Sopenharmony_ci      error = err;
1431cb0ef41Sopenharmony_ci    }
1441cb0ef41Sopenharmony_ci
1451cb0ef41Sopenharmony_ci    cleanup();
1461cb0ef41Sopenharmony_ci    // This is a protection against non-standard, legacy streams
1471cb0ef41Sopenharmony_ci    // that happen to emit an error event again after finished is called.
1481cb0ef41Sopenharmony_ci    streamWritable.on('error', () => {});
1491cb0ef41Sopenharmony_ci    if (error != null) {
1501cb0ef41Sopenharmony_ci      if (backpressurePromise !== undefined)
1511cb0ef41Sopenharmony_ci        backpressurePromise.reject(error);
1521cb0ef41Sopenharmony_ci      // If closed is not undefined, the error is happening
1531cb0ef41Sopenharmony_ci      // after the WritableStream close has already started.
1541cb0ef41Sopenharmony_ci      // We need to reject it here.
1551cb0ef41Sopenharmony_ci      if (closed !== undefined) {
1561cb0ef41Sopenharmony_ci        closed.reject(error);
1571cb0ef41Sopenharmony_ci        closed = undefined;
1581cb0ef41Sopenharmony_ci      }
1591cb0ef41Sopenharmony_ci      controller.error(error);
1601cb0ef41Sopenharmony_ci      controller = undefined;
1611cb0ef41Sopenharmony_ci      return;
1621cb0ef41Sopenharmony_ci    }
1631cb0ef41Sopenharmony_ci
1641cb0ef41Sopenharmony_ci    if (closed !== undefined) {
1651cb0ef41Sopenharmony_ci      closed.resolve();
1661cb0ef41Sopenharmony_ci      closed = undefined;
1671cb0ef41Sopenharmony_ci      return;
1681cb0ef41Sopenharmony_ci    }
1691cb0ef41Sopenharmony_ci    controller.error(new AbortError());
1701cb0ef41Sopenharmony_ci    controller = undefined;
1711cb0ef41Sopenharmony_ci  });
1721cb0ef41Sopenharmony_ci
1731cb0ef41Sopenharmony_ci  streamWritable.on('drain', onDrain);
1741cb0ef41Sopenharmony_ci
1751cb0ef41Sopenharmony_ci  return new WritableStream({
1761cb0ef41Sopenharmony_ci    start(c) { controller = c; },
1771cb0ef41Sopenharmony_ci
1781cb0ef41Sopenharmony_ci    async write(chunk) {
1791cb0ef41Sopenharmony_ci      if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) {
1801cb0ef41Sopenharmony_ci        backpressurePromise = createDeferredPromise();
1811cb0ef41Sopenharmony_ci        return SafePromisePrototypeFinally(
1821cb0ef41Sopenharmony_ci          backpressurePromise.promise, () => {
1831cb0ef41Sopenharmony_ci            backpressurePromise = undefined;
1841cb0ef41Sopenharmony_ci          });
1851cb0ef41Sopenharmony_ci      }
1861cb0ef41Sopenharmony_ci    },
1871cb0ef41Sopenharmony_ci
1881cb0ef41Sopenharmony_ci    abort(reason) {
1891cb0ef41Sopenharmony_ci      destroy(streamWritable, reason);
1901cb0ef41Sopenharmony_ci    },
1911cb0ef41Sopenharmony_ci
1921cb0ef41Sopenharmony_ci    close() {
1931cb0ef41Sopenharmony_ci      if (closed === undefined && !isWritableEnded(streamWritable)) {
1941cb0ef41Sopenharmony_ci        closed = createDeferredPromise();
1951cb0ef41Sopenharmony_ci        streamWritable.end();
1961cb0ef41Sopenharmony_ci        return closed.promise;
1971cb0ef41Sopenharmony_ci      }
1981cb0ef41Sopenharmony_ci
1991cb0ef41Sopenharmony_ci      controller = undefined;
2001cb0ef41Sopenharmony_ci      return PromiseResolve();
2011cb0ef41Sopenharmony_ci    },
2021cb0ef41Sopenharmony_ci  }, strategy);
2031cb0ef41Sopenharmony_ci}
2041cb0ef41Sopenharmony_ci
2051cb0ef41Sopenharmony_ci/**
2061cb0ef41Sopenharmony_ci * @param {WritableStream} writableStream
2071cb0ef41Sopenharmony_ci * @param {{
2081cb0ef41Sopenharmony_ci *   decodeStrings? : boolean,
2091cb0ef41Sopenharmony_ci *   highWaterMark? : number,
2101cb0ef41Sopenharmony_ci *   objectMode? : boolean,
2111cb0ef41Sopenharmony_ci *   signal? : AbortSignal,
2121cb0ef41Sopenharmony_ci * }} [options]
2131cb0ef41Sopenharmony_ci * @returns {Writable}
2141cb0ef41Sopenharmony_ci */
2151cb0ef41Sopenharmony_cifunction newStreamWritableFromWritableStream(writableStream, options = kEmptyObject) {
2161cb0ef41Sopenharmony_ci  if (!isWritableStream(writableStream)) {
2171cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_TYPE(
2181cb0ef41Sopenharmony_ci      'writableStream',
2191cb0ef41Sopenharmony_ci      'WritableStream',
2201cb0ef41Sopenharmony_ci      writableStream);
2211cb0ef41Sopenharmony_ci  }
2221cb0ef41Sopenharmony_ci
2231cb0ef41Sopenharmony_ci  validateObject(options, 'options');
2241cb0ef41Sopenharmony_ci  const {
2251cb0ef41Sopenharmony_ci    highWaterMark,
2261cb0ef41Sopenharmony_ci    decodeStrings = true,
2271cb0ef41Sopenharmony_ci    objectMode = false,
2281cb0ef41Sopenharmony_ci    signal,
2291cb0ef41Sopenharmony_ci  } = options;
2301cb0ef41Sopenharmony_ci
2311cb0ef41Sopenharmony_ci  validateBoolean(objectMode, 'options.objectMode');
2321cb0ef41Sopenharmony_ci  validateBoolean(decodeStrings, 'options.decodeStrings');
2331cb0ef41Sopenharmony_ci
2341cb0ef41Sopenharmony_ci  const writer = writableStream.getWriter();
2351cb0ef41Sopenharmony_ci  let closed = false;
2361cb0ef41Sopenharmony_ci
2371cb0ef41Sopenharmony_ci  const writable = new Writable({
2381cb0ef41Sopenharmony_ci    highWaterMark,
2391cb0ef41Sopenharmony_ci    objectMode,
2401cb0ef41Sopenharmony_ci    decodeStrings,
2411cb0ef41Sopenharmony_ci    signal,
2421cb0ef41Sopenharmony_ci
2431cb0ef41Sopenharmony_ci    writev(chunks, callback) {
2441cb0ef41Sopenharmony_ci      function done(error) {
2451cb0ef41Sopenharmony_ci        error = error.filter((e) => e);
2461cb0ef41Sopenharmony_ci        try {
2471cb0ef41Sopenharmony_ci          callback(error.length === 0 ? undefined : error);
2481cb0ef41Sopenharmony_ci        } catch (error) {
2491cb0ef41Sopenharmony_ci          // In a next tick because this is happening within
2501cb0ef41Sopenharmony_ci          // a promise context, and if there are any errors
2511cb0ef41Sopenharmony_ci          // thrown we don't want those to cause an unhandled
2521cb0ef41Sopenharmony_ci          // rejection. Let's just escape the promise and
2531cb0ef41Sopenharmony_ci          // handle it separately.
2541cb0ef41Sopenharmony_ci          process.nextTick(() => destroy(writable, error));
2551cb0ef41Sopenharmony_ci        }
2561cb0ef41Sopenharmony_ci      }
2571cb0ef41Sopenharmony_ci
2581cb0ef41Sopenharmony_ci      PromisePrototypeThen(
2591cb0ef41Sopenharmony_ci        writer.ready,
2601cb0ef41Sopenharmony_ci        () => {
2611cb0ef41Sopenharmony_ci          return PromisePrototypeThen(
2621cb0ef41Sopenharmony_ci            SafePromiseAll(
2631cb0ef41Sopenharmony_ci              chunks,
2641cb0ef41Sopenharmony_ci              (data) => writer.write(data.chunk)),
2651cb0ef41Sopenharmony_ci            done,
2661cb0ef41Sopenharmony_ci            done);
2671cb0ef41Sopenharmony_ci        },
2681cb0ef41Sopenharmony_ci        done);
2691cb0ef41Sopenharmony_ci    },
2701cb0ef41Sopenharmony_ci
2711cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
2721cb0ef41Sopenharmony_ci      if (typeof chunk === 'string' && decodeStrings && !objectMode) {
2731cb0ef41Sopenharmony_ci        const enc = normalizeEncoding(encoding);
2741cb0ef41Sopenharmony_ci
2751cb0ef41Sopenharmony_ci        if (enc === 'utf8') {
2761cb0ef41Sopenharmony_ci          chunk = encoder.encode(chunk);
2771cb0ef41Sopenharmony_ci        } else {
2781cb0ef41Sopenharmony_ci          chunk = Buffer.from(chunk, encoding);
2791cb0ef41Sopenharmony_ci          chunk = new Uint8Array(
2801cb0ef41Sopenharmony_ci            TypedArrayPrototypeGetBuffer(chunk),
2811cb0ef41Sopenharmony_ci            TypedArrayPrototypeGetByteOffset(chunk),
2821cb0ef41Sopenharmony_ci            TypedArrayPrototypeGetByteLength(chunk),
2831cb0ef41Sopenharmony_ci          );
2841cb0ef41Sopenharmony_ci        }
2851cb0ef41Sopenharmony_ci      }
2861cb0ef41Sopenharmony_ci
2871cb0ef41Sopenharmony_ci      function done(error) {
2881cb0ef41Sopenharmony_ci        try {
2891cb0ef41Sopenharmony_ci          callback(error);
2901cb0ef41Sopenharmony_ci        } catch (error) {
2911cb0ef41Sopenharmony_ci          destroy(writable, error);
2921cb0ef41Sopenharmony_ci        }
2931cb0ef41Sopenharmony_ci      }
2941cb0ef41Sopenharmony_ci
2951cb0ef41Sopenharmony_ci      PromisePrototypeThen(
2961cb0ef41Sopenharmony_ci        writer.ready,
2971cb0ef41Sopenharmony_ci        () => {
2981cb0ef41Sopenharmony_ci          return PromisePrototypeThen(
2991cb0ef41Sopenharmony_ci            writer.write(chunk),
3001cb0ef41Sopenharmony_ci            done,
3011cb0ef41Sopenharmony_ci            done);
3021cb0ef41Sopenharmony_ci        },
3031cb0ef41Sopenharmony_ci        done);
3041cb0ef41Sopenharmony_ci    },
3051cb0ef41Sopenharmony_ci
3061cb0ef41Sopenharmony_ci    destroy(error, callback) {
3071cb0ef41Sopenharmony_ci      function done() {
3081cb0ef41Sopenharmony_ci        try {
3091cb0ef41Sopenharmony_ci          callback(error);
3101cb0ef41Sopenharmony_ci        } catch (error) {
3111cb0ef41Sopenharmony_ci          // In a next tick because this is happening within
3121cb0ef41Sopenharmony_ci          // a promise context, and if there are any errors
3131cb0ef41Sopenharmony_ci          // thrown we don't want those to cause an unhandled
3141cb0ef41Sopenharmony_ci          // rejection. Let's just escape the promise and
3151cb0ef41Sopenharmony_ci          // handle it separately.
3161cb0ef41Sopenharmony_ci          process.nextTick(() => { throw error; });
3171cb0ef41Sopenharmony_ci        }
3181cb0ef41Sopenharmony_ci      }
3191cb0ef41Sopenharmony_ci
3201cb0ef41Sopenharmony_ci      if (!closed) {
3211cb0ef41Sopenharmony_ci        if (error != null) {
3221cb0ef41Sopenharmony_ci          PromisePrototypeThen(
3231cb0ef41Sopenharmony_ci            writer.abort(error),
3241cb0ef41Sopenharmony_ci            done,
3251cb0ef41Sopenharmony_ci            done);
3261cb0ef41Sopenharmony_ci        } else {
3271cb0ef41Sopenharmony_ci          PromisePrototypeThen(
3281cb0ef41Sopenharmony_ci            writer.close(),
3291cb0ef41Sopenharmony_ci            done,
3301cb0ef41Sopenharmony_ci            done);
3311cb0ef41Sopenharmony_ci        }
3321cb0ef41Sopenharmony_ci        return;
3331cb0ef41Sopenharmony_ci      }
3341cb0ef41Sopenharmony_ci
3351cb0ef41Sopenharmony_ci      done();
3361cb0ef41Sopenharmony_ci    },
3371cb0ef41Sopenharmony_ci
3381cb0ef41Sopenharmony_ci    final(callback) {
3391cb0ef41Sopenharmony_ci      function done(error) {
3401cb0ef41Sopenharmony_ci        try {
3411cb0ef41Sopenharmony_ci          callback(error);
3421cb0ef41Sopenharmony_ci        } catch (error) {
3431cb0ef41Sopenharmony_ci          // In a next tick because this is happening within
3441cb0ef41Sopenharmony_ci          // a promise context, and if there are any errors
3451cb0ef41Sopenharmony_ci          // thrown we don't want those to cause an unhandled
3461cb0ef41Sopenharmony_ci          // rejection. Let's just escape the promise and
3471cb0ef41Sopenharmony_ci          // handle it separately.
3481cb0ef41Sopenharmony_ci          process.nextTick(() => destroy(writable, error));
3491cb0ef41Sopenharmony_ci        }
3501cb0ef41Sopenharmony_ci      }
3511cb0ef41Sopenharmony_ci
3521cb0ef41Sopenharmony_ci      if (!closed) {
3531cb0ef41Sopenharmony_ci        PromisePrototypeThen(
3541cb0ef41Sopenharmony_ci          writer.close(),
3551cb0ef41Sopenharmony_ci          done,
3561cb0ef41Sopenharmony_ci          done);
3571cb0ef41Sopenharmony_ci      }
3581cb0ef41Sopenharmony_ci    },
3591cb0ef41Sopenharmony_ci  });
3601cb0ef41Sopenharmony_ci
3611cb0ef41Sopenharmony_ci  PromisePrototypeThen(
3621cb0ef41Sopenharmony_ci    writer.closed,
3631cb0ef41Sopenharmony_ci    () => {
3641cb0ef41Sopenharmony_ci      // If the WritableStream closes before the stream.Writable has been
3651cb0ef41Sopenharmony_ci      // ended, we signal an error on the stream.Writable.
3661cb0ef41Sopenharmony_ci      closed = true;
3671cb0ef41Sopenharmony_ci      if (!isWritableEnded(writable))
3681cb0ef41Sopenharmony_ci        destroy(writable, new ERR_STREAM_PREMATURE_CLOSE());
3691cb0ef41Sopenharmony_ci    },
3701cb0ef41Sopenharmony_ci    (error) => {
3711cb0ef41Sopenharmony_ci      // If the WritableStream errors before the stream.Writable has been
3721cb0ef41Sopenharmony_ci      // destroyed, signal an error on the stream.Writable.
3731cb0ef41Sopenharmony_ci      closed = true;
3741cb0ef41Sopenharmony_ci      destroy(writable, error);
3751cb0ef41Sopenharmony_ci    });
3761cb0ef41Sopenharmony_ci
3771cb0ef41Sopenharmony_ci  return writable;
3781cb0ef41Sopenharmony_ci}
3791cb0ef41Sopenharmony_ci
3801cb0ef41Sopenharmony_ci/**
3811cb0ef41Sopenharmony_ci * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
3821cb0ef41Sopenharmony_ci * @param {Readable} streamReadable
3831cb0ef41Sopenharmony_ci * @param {{
3841cb0ef41Sopenharmony_ci *  strategy : QueuingStrategy
3851cb0ef41Sopenharmony_ci * }} [options]
3861cb0ef41Sopenharmony_ci * @returns {ReadableStream}
3871cb0ef41Sopenharmony_ci */
3881cb0ef41Sopenharmony_cifunction newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) {
3891cb0ef41Sopenharmony_ci  // Not using the internal/streams/utils isReadableNodeStream utility
3901cb0ef41Sopenharmony_ci  // here because it will return false if streamReadable is a Duplex
3911cb0ef41Sopenharmony_ci  // whose readable option is false. For a Duplex that is not readable,
3921cb0ef41Sopenharmony_ci  // we want it to pass this check but return a closed ReadableStream.
3931cb0ef41Sopenharmony_ci  if (typeof streamReadable?._readableState !== 'object') {
3941cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_TYPE(
3951cb0ef41Sopenharmony_ci      'streamReadable',
3961cb0ef41Sopenharmony_ci      'stream.Readable',
3971cb0ef41Sopenharmony_ci      streamReadable);
3981cb0ef41Sopenharmony_ci  }
3991cb0ef41Sopenharmony_ci
4001cb0ef41Sopenharmony_ci  if (isDestroyed(streamReadable) || !isReadable(streamReadable)) {
4011cb0ef41Sopenharmony_ci    const readable = new ReadableStream();
4021cb0ef41Sopenharmony_ci    readable.cancel();
4031cb0ef41Sopenharmony_ci    return readable;
4041cb0ef41Sopenharmony_ci  }
4051cb0ef41Sopenharmony_ci
4061cb0ef41Sopenharmony_ci  const objectMode = streamReadable.readableObjectMode;
4071cb0ef41Sopenharmony_ci  const highWaterMark = streamReadable.readableHighWaterMark;
4081cb0ef41Sopenharmony_ci
4091cb0ef41Sopenharmony_ci  const evaluateStrategyOrFallback = (strategy) => {
4101cb0ef41Sopenharmony_ci    // If there is a strategy available, use it
4111cb0ef41Sopenharmony_ci    if (strategy)
4121cb0ef41Sopenharmony_ci      return strategy;
4131cb0ef41Sopenharmony_ci
4141cb0ef41Sopenharmony_ci    if (objectMode) {
4151cb0ef41Sopenharmony_ci      // When running in objectMode explicitly but no strategy, we just fall
4161cb0ef41Sopenharmony_ci      // back to CountQueuingStrategy
4171cb0ef41Sopenharmony_ci      return new CountQueuingStrategy({ highWaterMark });
4181cb0ef41Sopenharmony_ci    }
4191cb0ef41Sopenharmony_ci
4201cb0ef41Sopenharmony_ci    // When not running in objectMode explicitly, we just fall
4211cb0ef41Sopenharmony_ci    // back to a minimal strategy that just specifies the highWaterMark
4221cb0ef41Sopenharmony_ci    // and no size algorithm. Using a ByteLengthQueuingStrategy here
4231cb0ef41Sopenharmony_ci    // is unnecessary.
4241cb0ef41Sopenharmony_ci    return { highWaterMark };
4251cb0ef41Sopenharmony_ci  };
4261cb0ef41Sopenharmony_ci
4271cb0ef41Sopenharmony_ci  const strategy = evaluateStrategyOrFallback(options?.strategy);
4281cb0ef41Sopenharmony_ci
4291cb0ef41Sopenharmony_ci  let controller;
4301cb0ef41Sopenharmony_ci
4311cb0ef41Sopenharmony_ci  function onData(chunk) {
4321cb0ef41Sopenharmony_ci    // Copy the Buffer to detach it from the pool.
4331cb0ef41Sopenharmony_ci    if (Buffer.isBuffer(chunk) && !objectMode)
4341cb0ef41Sopenharmony_ci      chunk = new Uint8Array(chunk);
4351cb0ef41Sopenharmony_ci    controller.enqueue(chunk);
4361cb0ef41Sopenharmony_ci    if (controller.desiredSize <= 0)
4371cb0ef41Sopenharmony_ci      streamReadable.pause();
4381cb0ef41Sopenharmony_ci  }
4391cb0ef41Sopenharmony_ci
4401cb0ef41Sopenharmony_ci  streamReadable.pause();
4411cb0ef41Sopenharmony_ci
4421cb0ef41Sopenharmony_ci  const cleanup = finished(streamReadable, (error) => {
4431cb0ef41Sopenharmony_ci    if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') {
4441cb0ef41Sopenharmony_ci      const err = new AbortError(undefined, { cause: error });
4451cb0ef41Sopenharmony_ci      error = err;
4461cb0ef41Sopenharmony_ci    }
4471cb0ef41Sopenharmony_ci
4481cb0ef41Sopenharmony_ci    cleanup();
4491cb0ef41Sopenharmony_ci    // This is a protection against non-standard, legacy streams
4501cb0ef41Sopenharmony_ci    // that happen to emit an error event again after finished is called.
4511cb0ef41Sopenharmony_ci    streamReadable.on('error', () => {});
4521cb0ef41Sopenharmony_ci    if (error)
4531cb0ef41Sopenharmony_ci      return controller.error(error);
4541cb0ef41Sopenharmony_ci    controller.close();
4551cb0ef41Sopenharmony_ci  });
4561cb0ef41Sopenharmony_ci
4571cb0ef41Sopenharmony_ci  streamReadable.on('data', onData);
4581cb0ef41Sopenharmony_ci
4591cb0ef41Sopenharmony_ci  return new ReadableStream({
4601cb0ef41Sopenharmony_ci    start(c) { controller = c; },
4611cb0ef41Sopenharmony_ci
4621cb0ef41Sopenharmony_ci    pull() { streamReadable.resume(); },
4631cb0ef41Sopenharmony_ci
4641cb0ef41Sopenharmony_ci    cancel(reason) {
4651cb0ef41Sopenharmony_ci      destroy(streamReadable, reason);
4661cb0ef41Sopenharmony_ci    },
4671cb0ef41Sopenharmony_ci  }, strategy);
4681cb0ef41Sopenharmony_ci}
4691cb0ef41Sopenharmony_ci
4701cb0ef41Sopenharmony_ci/**
4711cb0ef41Sopenharmony_ci * @param {ReadableStream} readableStream
4721cb0ef41Sopenharmony_ci * @param {{
4731cb0ef41Sopenharmony_ci *   highWaterMark? : number,
4741cb0ef41Sopenharmony_ci *   encoding? : string,
4751cb0ef41Sopenharmony_ci *   objectMode? : boolean,
4761cb0ef41Sopenharmony_ci *   signal? : AbortSignal,
4771cb0ef41Sopenharmony_ci * }} [options]
4781cb0ef41Sopenharmony_ci * @returns {Readable}
4791cb0ef41Sopenharmony_ci */
4801cb0ef41Sopenharmony_cifunction newStreamReadableFromReadableStream(readableStream, options = kEmptyObject) {
4811cb0ef41Sopenharmony_ci  if (!isReadableStream(readableStream)) {
4821cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_TYPE(
4831cb0ef41Sopenharmony_ci      'readableStream',
4841cb0ef41Sopenharmony_ci      'ReadableStream',
4851cb0ef41Sopenharmony_ci      readableStream);
4861cb0ef41Sopenharmony_ci  }
4871cb0ef41Sopenharmony_ci
4881cb0ef41Sopenharmony_ci  validateObject(options, 'options');
4891cb0ef41Sopenharmony_ci  const {
4901cb0ef41Sopenharmony_ci    highWaterMark,
4911cb0ef41Sopenharmony_ci    encoding,
4921cb0ef41Sopenharmony_ci    objectMode = false,
4931cb0ef41Sopenharmony_ci    signal,
4941cb0ef41Sopenharmony_ci  } = options;
4951cb0ef41Sopenharmony_ci
4961cb0ef41Sopenharmony_ci  if (encoding !== undefined && !Buffer.isEncoding(encoding))
4971cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
4981cb0ef41Sopenharmony_ci  validateBoolean(objectMode, 'options.objectMode');
4991cb0ef41Sopenharmony_ci
5001cb0ef41Sopenharmony_ci  const reader = readableStream.getReader();
5011cb0ef41Sopenharmony_ci  let closed = false;
5021cb0ef41Sopenharmony_ci
5031cb0ef41Sopenharmony_ci  const readable = new Readable({
5041cb0ef41Sopenharmony_ci    objectMode,
5051cb0ef41Sopenharmony_ci    highWaterMark,
5061cb0ef41Sopenharmony_ci    encoding,
5071cb0ef41Sopenharmony_ci    signal,
5081cb0ef41Sopenharmony_ci
5091cb0ef41Sopenharmony_ci    read() {
5101cb0ef41Sopenharmony_ci      PromisePrototypeThen(
5111cb0ef41Sopenharmony_ci        reader.read(),
5121cb0ef41Sopenharmony_ci        (chunk) => {
5131cb0ef41Sopenharmony_ci          if (chunk.done) {
5141cb0ef41Sopenharmony_ci            // Value should always be undefined here.
5151cb0ef41Sopenharmony_ci            readable.push(null);
5161cb0ef41Sopenharmony_ci          } else {
5171cb0ef41Sopenharmony_ci            readable.push(chunk.value);
5181cb0ef41Sopenharmony_ci          }
5191cb0ef41Sopenharmony_ci        },
5201cb0ef41Sopenharmony_ci        (error) => destroy(readable, error));
5211cb0ef41Sopenharmony_ci    },
5221cb0ef41Sopenharmony_ci
5231cb0ef41Sopenharmony_ci    destroy(error, callback) {
5241cb0ef41Sopenharmony_ci      function done() {
5251cb0ef41Sopenharmony_ci        try {
5261cb0ef41Sopenharmony_ci          callback(error);
5271cb0ef41Sopenharmony_ci        } catch (error) {
5281cb0ef41Sopenharmony_ci          // In a next tick because this is happening within
5291cb0ef41Sopenharmony_ci          // a promise context, and if there are any errors
5301cb0ef41Sopenharmony_ci          // thrown we don't want those to cause an unhandled
5311cb0ef41Sopenharmony_ci          // rejection. Let's just escape the promise and
5321cb0ef41Sopenharmony_ci          // handle it separately.
5331cb0ef41Sopenharmony_ci          process.nextTick(() => { throw error; });
5341cb0ef41Sopenharmony_ci        }
5351cb0ef41Sopenharmony_ci      }
5361cb0ef41Sopenharmony_ci
5371cb0ef41Sopenharmony_ci      if (!closed) {
5381cb0ef41Sopenharmony_ci        PromisePrototypeThen(
5391cb0ef41Sopenharmony_ci          reader.cancel(error),
5401cb0ef41Sopenharmony_ci          done,
5411cb0ef41Sopenharmony_ci          done);
5421cb0ef41Sopenharmony_ci        return;
5431cb0ef41Sopenharmony_ci      }
5441cb0ef41Sopenharmony_ci      done();
5451cb0ef41Sopenharmony_ci    },
5461cb0ef41Sopenharmony_ci  });
5471cb0ef41Sopenharmony_ci
5481cb0ef41Sopenharmony_ci  PromisePrototypeThen(
5491cb0ef41Sopenharmony_ci    reader.closed,
5501cb0ef41Sopenharmony_ci    () => {
5511cb0ef41Sopenharmony_ci      closed = true;
5521cb0ef41Sopenharmony_ci    },
5531cb0ef41Sopenharmony_ci    (error) => {
5541cb0ef41Sopenharmony_ci      closed = true;
5551cb0ef41Sopenharmony_ci      destroy(readable, error);
5561cb0ef41Sopenharmony_ci    });
5571cb0ef41Sopenharmony_ci
5581cb0ef41Sopenharmony_ci  return readable;
5591cb0ef41Sopenharmony_ci}
5601cb0ef41Sopenharmony_ci
5611cb0ef41Sopenharmony_ci/**
5621cb0ef41Sopenharmony_ci * @typedef {import('./readablestream').ReadableWritablePair
5631cb0ef41Sopenharmony_ci * } ReadableWritablePair
5641cb0ef41Sopenharmony_ci * @typedef {import('../../stream').Duplex} Duplex
5651cb0ef41Sopenharmony_ci */
5661cb0ef41Sopenharmony_ci
5671cb0ef41Sopenharmony_ci/**
5681cb0ef41Sopenharmony_ci * @param {Duplex} duplex
5691cb0ef41Sopenharmony_ci * @returns {ReadableWritablePair}
5701cb0ef41Sopenharmony_ci */
5711cb0ef41Sopenharmony_cifunction newReadableWritablePairFromDuplex(duplex) {
5721cb0ef41Sopenharmony_ci  // Not using the internal/streams/utils isWritableNodeStream and
5731cb0ef41Sopenharmony_ci  // isReadableNodeStream utilities here because they will return false
5741cb0ef41Sopenharmony_ci  // if the duplex was created with writable or readable options set to
5751cb0ef41Sopenharmony_ci  // false. Instead, we'll check the readable and writable state after
5761cb0ef41Sopenharmony_ci  // and return closed WritableStream or closed ReadableStream as
5771cb0ef41Sopenharmony_ci  // necessary.
5781cb0ef41Sopenharmony_ci  if (typeof duplex?._writableState !== 'object' ||
5791cb0ef41Sopenharmony_ci      typeof duplex?._readableState !== 'object') {
5801cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex);
5811cb0ef41Sopenharmony_ci  }
5821cb0ef41Sopenharmony_ci
5831cb0ef41Sopenharmony_ci  if (isDestroyed(duplex)) {
5841cb0ef41Sopenharmony_ci    const writable = new WritableStream();
5851cb0ef41Sopenharmony_ci    const readable = new ReadableStream();
5861cb0ef41Sopenharmony_ci    writable.close();
5871cb0ef41Sopenharmony_ci    readable.cancel();
5881cb0ef41Sopenharmony_ci    return { readable, writable };
5891cb0ef41Sopenharmony_ci  }
5901cb0ef41Sopenharmony_ci
5911cb0ef41Sopenharmony_ci  const writable =
5921cb0ef41Sopenharmony_ci    isWritable(duplex) ?
5931cb0ef41Sopenharmony_ci      newWritableStreamFromStreamWritable(duplex) :
5941cb0ef41Sopenharmony_ci      new WritableStream();
5951cb0ef41Sopenharmony_ci
5961cb0ef41Sopenharmony_ci  if (!isWritable(duplex))
5971cb0ef41Sopenharmony_ci    writable.close();
5981cb0ef41Sopenharmony_ci
5991cb0ef41Sopenharmony_ci  const readable =
6001cb0ef41Sopenharmony_ci    isReadable(duplex) ?
6011cb0ef41Sopenharmony_ci      newReadableStreamFromStreamReadable(duplex) :
6021cb0ef41Sopenharmony_ci      new ReadableStream();
6031cb0ef41Sopenharmony_ci
6041cb0ef41Sopenharmony_ci  if (!isReadable(duplex))
6051cb0ef41Sopenharmony_ci    readable.cancel();
6061cb0ef41Sopenharmony_ci
6071cb0ef41Sopenharmony_ci  return { writable, readable };
6081cb0ef41Sopenharmony_ci}
6091cb0ef41Sopenharmony_ci
6101cb0ef41Sopenharmony_ci/**
6111cb0ef41Sopenharmony_ci * @param {ReadableWritablePair} pair
6121cb0ef41Sopenharmony_ci * @param {{
6131cb0ef41Sopenharmony_ci *   allowHalfOpen? : boolean,
6141cb0ef41Sopenharmony_ci *   decodeStrings? : boolean,
6151cb0ef41Sopenharmony_ci *   encoding? : string,
6161cb0ef41Sopenharmony_ci *   highWaterMark? : number,
6171cb0ef41Sopenharmony_ci *   objectMode? : boolean,
6181cb0ef41Sopenharmony_ci *   signal? : AbortSignal,
6191cb0ef41Sopenharmony_ci * }} [options]
6201cb0ef41Sopenharmony_ci * @returns {Duplex}
6211cb0ef41Sopenharmony_ci */
6221cb0ef41Sopenharmony_cifunction newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = kEmptyObject) {
6231cb0ef41Sopenharmony_ci  validateObject(pair, 'pair');
6241cb0ef41Sopenharmony_ci  const {
6251cb0ef41Sopenharmony_ci    readable: readableStream,
6261cb0ef41Sopenharmony_ci    writable: writableStream,
6271cb0ef41Sopenharmony_ci  } = pair;
6281cb0ef41Sopenharmony_ci
6291cb0ef41Sopenharmony_ci  if (!isReadableStream(readableStream)) {
6301cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_TYPE(
6311cb0ef41Sopenharmony_ci      'pair.readable',
6321cb0ef41Sopenharmony_ci      'ReadableStream',
6331cb0ef41Sopenharmony_ci      readableStream);
6341cb0ef41Sopenharmony_ci  }
6351cb0ef41Sopenharmony_ci  if (!isWritableStream(writableStream)) {
6361cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_TYPE(
6371cb0ef41Sopenharmony_ci      'pair.writable',
6381cb0ef41Sopenharmony_ci      'WritableStream',
6391cb0ef41Sopenharmony_ci      writableStream);
6401cb0ef41Sopenharmony_ci  }
6411cb0ef41Sopenharmony_ci
6421cb0ef41Sopenharmony_ci  validateObject(options, 'options');
6431cb0ef41Sopenharmony_ci  const {
6441cb0ef41Sopenharmony_ci    allowHalfOpen = false,
6451cb0ef41Sopenharmony_ci    objectMode = false,
6461cb0ef41Sopenharmony_ci    encoding,
6471cb0ef41Sopenharmony_ci    decodeStrings = true,
6481cb0ef41Sopenharmony_ci    highWaterMark,
6491cb0ef41Sopenharmony_ci    signal,
6501cb0ef41Sopenharmony_ci  } = options;
6511cb0ef41Sopenharmony_ci
6521cb0ef41Sopenharmony_ci  validateBoolean(objectMode, 'options.objectMode');
6531cb0ef41Sopenharmony_ci  if (encoding !== undefined && !Buffer.isEncoding(encoding))
6541cb0ef41Sopenharmony_ci    throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding');
6551cb0ef41Sopenharmony_ci
6561cb0ef41Sopenharmony_ci  const writer = writableStream.getWriter();
6571cb0ef41Sopenharmony_ci  const reader = readableStream.getReader();
6581cb0ef41Sopenharmony_ci  let writableClosed = false;
6591cb0ef41Sopenharmony_ci  let readableClosed = false;
6601cb0ef41Sopenharmony_ci
6611cb0ef41Sopenharmony_ci  const duplex = new Duplex({
6621cb0ef41Sopenharmony_ci    allowHalfOpen,
6631cb0ef41Sopenharmony_ci    highWaterMark,
6641cb0ef41Sopenharmony_ci    objectMode,
6651cb0ef41Sopenharmony_ci    encoding,
6661cb0ef41Sopenharmony_ci    decodeStrings,
6671cb0ef41Sopenharmony_ci    signal,
6681cb0ef41Sopenharmony_ci
6691cb0ef41Sopenharmony_ci    writev(chunks, callback) {
6701cb0ef41Sopenharmony_ci      function done(error) {
6711cb0ef41Sopenharmony_ci        error = error.filter((e) => e);
6721cb0ef41Sopenharmony_ci        try {
6731cb0ef41Sopenharmony_ci          callback(error.length === 0 ? undefined : error);
6741cb0ef41Sopenharmony_ci        } catch (error) {
6751cb0ef41Sopenharmony_ci          // In a next tick because this is happening within
6761cb0ef41Sopenharmony_ci          // a promise context, and if there are any errors
6771cb0ef41Sopenharmony_ci          // thrown we don't want those to cause an unhandled
6781cb0ef41Sopenharmony_ci          // rejection. Let's just escape the promise and
6791cb0ef41Sopenharmony_ci          // handle it separately.
6801cb0ef41Sopenharmony_ci          process.nextTick(() => destroy(duplex, error));
6811cb0ef41Sopenharmony_ci        }
6821cb0ef41Sopenharmony_ci      }
6831cb0ef41Sopenharmony_ci
6841cb0ef41Sopenharmony_ci      PromisePrototypeThen(
6851cb0ef41Sopenharmony_ci        writer.ready,
6861cb0ef41Sopenharmony_ci        () => {
6871cb0ef41Sopenharmony_ci          return PromisePrototypeThen(
6881cb0ef41Sopenharmony_ci            SafePromiseAll(
6891cb0ef41Sopenharmony_ci              chunks,
6901cb0ef41Sopenharmony_ci              (data) => writer.write(data.chunk)),
6911cb0ef41Sopenharmony_ci            done,
6921cb0ef41Sopenharmony_ci            done);
6931cb0ef41Sopenharmony_ci        },
6941cb0ef41Sopenharmony_ci        done);
6951cb0ef41Sopenharmony_ci    },
6961cb0ef41Sopenharmony_ci
6971cb0ef41Sopenharmony_ci    write(chunk, encoding, callback) {
6981cb0ef41Sopenharmony_ci      if (typeof chunk === 'string' && decodeStrings && !objectMode) {
6991cb0ef41Sopenharmony_ci        const enc = normalizeEncoding(encoding);
7001cb0ef41Sopenharmony_ci
7011cb0ef41Sopenharmony_ci        if (enc === 'utf8') {
7021cb0ef41Sopenharmony_ci          chunk = encoder.encode(chunk);
7031cb0ef41Sopenharmony_ci        } else {
7041cb0ef41Sopenharmony_ci          chunk = Buffer.from(chunk, encoding);
7051cb0ef41Sopenharmony_ci          chunk = new Uint8Array(
7061cb0ef41Sopenharmony_ci            TypedArrayPrototypeGetBuffer(chunk),
7071cb0ef41Sopenharmony_ci            TypedArrayPrototypeGetByteOffset(chunk),
7081cb0ef41Sopenharmony_ci            TypedArrayPrototypeGetByteLength(chunk),
7091cb0ef41Sopenharmony_ci          );
7101cb0ef41Sopenharmony_ci        }
7111cb0ef41Sopenharmony_ci      }
7121cb0ef41Sopenharmony_ci
7131cb0ef41Sopenharmony_ci      function done(error) {
7141cb0ef41Sopenharmony_ci        try {
7151cb0ef41Sopenharmony_ci          callback(error);
7161cb0ef41Sopenharmony_ci        } catch (error) {
7171cb0ef41Sopenharmony_ci          destroy(duplex, error);
7181cb0ef41Sopenharmony_ci        }
7191cb0ef41Sopenharmony_ci      }
7201cb0ef41Sopenharmony_ci
7211cb0ef41Sopenharmony_ci      PromisePrototypeThen(
7221cb0ef41Sopenharmony_ci        writer.ready,
7231cb0ef41Sopenharmony_ci        () => {
7241cb0ef41Sopenharmony_ci          return PromisePrototypeThen(
7251cb0ef41Sopenharmony_ci            writer.write(chunk),
7261cb0ef41Sopenharmony_ci            done,
7271cb0ef41Sopenharmony_ci            done);
7281cb0ef41Sopenharmony_ci        },
7291cb0ef41Sopenharmony_ci        done);
7301cb0ef41Sopenharmony_ci    },
7311cb0ef41Sopenharmony_ci
7321cb0ef41Sopenharmony_ci    final(callback) {
7331cb0ef41Sopenharmony_ci      function done(error) {
7341cb0ef41Sopenharmony_ci        try {
7351cb0ef41Sopenharmony_ci          callback(error);
7361cb0ef41Sopenharmony_ci        } catch (error) {
7371cb0ef41Sopenharmony_ci          // In a next tick because this is happening within
7381cb0ef41Sopenharmony_ci          // a promise context, and if there are any errors
7391cb0ef41Sopenharmony_ci          // thrown we don't want those to cause an unhandled
7401cb0ef41Sopenharmony_ci          // rejection. Let's just escape the promise and
7411cb0ef41Sopenharmony_ci          // handle it separately.
7421cb0ef41Sopenharmony_ci          process.nextTick(() => destroy(duplex, error));
7431cb0ef41Sopenharmony_ci        }
7441cb0ef41Sopenharmony_ci      }
7451cb0ef41Sopenharmony_ci
7461cb0ef41Sopenharmony_ci      if (!writableClosed) {
7471cb0ef41Sopenharmony_ci        PromisePrototypeThen(
7481cb0ef41Sopenharmony_ci          writer.close(),
7491cb0ef41Sopenharmony_ci          done,
7501cb0ef41Sopenharmony_ci          done);
7511cb0ef41Sopenharmony_ci      }
7521cb0ef41Sopenharmony_ci    },
7531cb0ef41Sopenharmony_ci
7541cb0ef41Sopenharmony_ci    read() {
7551cb0ef41Sopenharmony_ci      PromisePrototypeThen(
7561cb0ef41Sopenharmony_ci        reader.read(),
7571cb0ef41Sopenharmony_ci        (chunk) => {
7581cb0ef41Sopenharmony_ci          if (chunk.done) {
7591cb0ef41Sopenharmony_ci            duplex.push(null);
7601cb0ef41Sopenharmony_ci          } else {
7611cb0ef41Sopenharmony_ci            duplex.push(chunk.value);
7621cb0ef41Sopenharmony_ci          }
7631cb0ef41Sopenharmony_ci        },
7641cb0ef41Sopenharmony_ci        (error) => destroy(duplex, error));
7651cb0ef41Sopenharmony_ci    },
7661cb0ef41Sopenharmony_ci
7671cb0ef41Sopenharmony_ci    destroy(error, callback) {
7681cb0ef41Sopenharmony_ci      function done() {
7691cb0ef41Sopenharmony_ci        try {
7701cb0ef41Sopenharmony_ci          callback(error);
7711cb0ef41Sopenharmony_ci        } catch (error) {
7721cb0ef41Sopenharmony_ci          // In a next tick because this is happening within
7731cb0ef41Sopenharmony_ci          // a promise context, and if there are any errors
7741cb0ef41Sopenharmony_ci          // thrown we don't want those to cause an unhandled
7751cb0ef41Sopenharmony_ci          // rejection. Let's just escape the promise and
7761cb0ef41Sopenharmony_ci          // handle it separately.
7771cb0ef41Sopenharmony_ci          process.nextTick(() => { throw error; });
7781cb0ef41Sopenharmony_ci        }
7791cb0ef41Sopenharmony_ci      }
7801cb0ef41Sopenharmony_ci
7811cb0ef41Sopenharmony_ci      async function closeWriter() {
7821cb0ef41Sopenharmony_ci        if (!writableClosed)
7831cb0ef41Sopenharmony_ci          await writer.abort(error);
7841cb0ef41Sopenharmony_ci      }
7851cb0ef41Sopenharmony_ci
7861cb0ef41Sopenharmony_ci      async function closeReader() {
7871cb0ef41Sopenharmony_ci        if (!readableClosed)
7881cb0ef41Sopenharmony_ci          await reader.cancel(error);
7891cb0ef41Sopenharmony_ci      }
7901cb0ef41Sopenharmony_ci
7911cb0ef41Sopenharmony_ci      if (!writableClosed || !readableClosed) {
7921cb0ef41Sopenharmony_ci        PromisePrototypeThen(
7931cb0ef41Sopenharmony_ci          SafePromiseAll([
7941cb0ef41Sopenharmony_ci            closeWriter(),
7951cb0ef41Sopenharmony_ci            closeReader(),
7961cb0ef41Sopenharmony_ci          ]),
7971cb0ef41Sopenharmony_ci          done,
7981cb0ef41Sopenharmony_ci          done);
7991cb0ef41Sopenharmony_ci        return;
8001cb0ef41Sopenharmony_ci      }
8011cb0ef41Sopenharmony_ci
8021cb0ef41Sopenharmony_ci      done();
8031cb0ef41Sopenharmony_ci    },
8041cb0ef41Sopenharmony_ci  });
8051cb0ef41Sopenharmony_ci
8061cb0ef41Sopenharmony_ci  PromisePrototypeThen(
8071cb0ef41Sopenharmony_ci    writer.closed,
8081cb0ef41Sopenharmony_ci    () => {
8091cb0ef41Sopenharmony_ci      writableClosed = true;
8101cb0ef41Sopenharmony_ci      if (!isWritableEnded(duplex))
8111cb0ef41Sopenharmony_ci        destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE());
8121cb0ef41Sopenharmony_ci    },
8131cb0ef41Sopenharmony_ci    (error) => {
8141cb0ef41Sopenharmony_ci      writableClosed = true;
8151cb0ef41Sopenharmony_ci      readableClosed = true;
8161cb0ef41Sopenharmony_ci      destroy(duplex, error);
8171cb0ef41Sopenharmony_ci    });
8181cb0ef41Sopenharmony_ci
8191cb0ef41Sopenharmony_ci  PromisePrototypeThen(
8201cb0ef41Sopenharmony_ci    reader.closed,
8211cb0ef41Sopenharmony_ci    () => {
8221cb0ef41Sopenharmony_ci      readableClosed = true;
8231cb0ef41Sopenharmony_ci    },
8241cb0ef41Sopenharmony_ci    (error) => {
8251cb0ef41Sopenharmony_ci      writableClosed = true;
8261cb0ef41Sopenharmony_ci      readableClosed = true;
8271cb0ef41Sopenharmony_ci      destroy(duplex, error);
8281cb0ef41Sopenharmony_ci    });
8291cb0ef41Sopenharmony_ci
8301cb0ef41Sopenharmony_ci  return duplex;
8311cb0ef41Sopenharmony_ci}
8321cb0ef41Sopenharmony_ci
8331cb0ef41Sopenharmony_ci/**
8341cb0ef41Sopenharmony_ci * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy
8351cb0ef41Sopenharmony_ci * @typedef {{}} StreamBase
8361cb0ef41Sopenharmony_ci * @param {StreamBase} streamBase
8371cb0ef41Sopenharmony_ci * @param {QueuingStrategy} strategy
8381cb0ef41Sopenharmony_ci * @returns {WritableStream}
8391cb0ef41Sopenharmony_ci */
8401cb0ef41Sopenharmony_cifunction newWritableStreamFromStreamBase(streamBase, strategy) {
8411cb0ef41Sopenharmony_ci  validateObject(streamBase, 'streamBase');
8421cb0ef41Sopenharmony_ci
8431cb0ef41Sopenharmony_ci  let current;
8441cb0ef41Sopenharmony_ci
8451cb0ef41Sopenharmony_ci  function createWriteWrap(controller, promise) {
8461cb0ef41Sopenharmony_ci    const req = new WriteWrap();
8471cb0ef41Sopenharmony_ci    req.handle = streamBase;
8481cb0ef41Sopenharmony_ci    req.oncomplete = onWriteComplete;
8491cb0ef41Sopenharmony_ci    req.async = false;
8501cb0ef41Sopenharmony_ci    req.bytes = 0;
8511cb0ef41Sopenharmony_ci    req.buffer = null;
8521cb0ef41Sopenharmony_ci    req.controller = controller;
8531cb0ef41Sopenharmony_ci    req.promise = promise;
8541cb0ef41Sopenharmony_ci    return req;
8551cb0ef41Sopenharmony_ci  }
8561cb0ef41Sopenharmony_ci
8571cb0ef41Sopenharmony_ci  function onWriteComplete(status) {
8581cb0ef41Sopenharmony_ci    if (status < 0) {
8591cb0ef41Sopenharmony_ci      const error = errnoException(status, 'write', this.error);
8601cb0ef41Sopenharmony_ci      this.promise.reject(error);
8611cb0ef41Sopenharmony_ci      this.controller.error(error);
8621cb0ef41Sopenharmony_ci      return;
8631cb0ef41Sopenharmony_ci    }
8641cb0ef41Sopenharmony_ci    this.promise.resolve();
8651cb0ef41Sopenharmony_ci  }
8661cb0ef41Sopenharmony_ci
8671cb0ef41Sopenharmony_ci  function doWrite(chunk, controller) {
8681cb0ef41Sopenharmony_ci    const promise = createDeferredPromise();
8691cb0ef41Sopenharmony_ci    let ret;
8701cb0ef41Sopenharmony_ci    let req;
8711cb0ef41Sopenharmony_ci    try {
8721cb0ef41Sopenharmony_ci      req = createWriteWrap(controller, promise);
8731cb0ef41Sopenharmony_ci      ret = streamBase.writeBuffer(req, chunk);
8741cb0ef41Sopenharmony_ci      if (streamBaseState[kLastWriteWasAsync])
8751cb0ef41Sopenharmony_ci        req.buffer = chunk;
8761cb0ef41Sopenharmony_ci      req.async = !!streamBaseState[kLastWriteWasAsync];
8771cb0ef41Sopenharmony_ci    } catch (error) {
8781cb0ef41Sopenharmony_ci      promise.reject(error);
8791cb0ef41Sopenharmony_ci    }
8801cb0ef41Sopenharmony_ci
8811cb0ef41Sopenharmony_ci    if (ret !== 0)
8821cb0ef41Sopenharmony_ci      promise.reject(errnoException(ret, 'write', req));
8831cb0ef41Sopenharmony_ci    else if (!req.async)
8841cb0ef41Sopenharmony_ci      promise.resolve();
8851cb0ef41Sopenharmony_ci
8861cb0ef41Sopenharmony_ci    return promise.promise;
8871cb0ef41Sopenharmony_ci  }
8881cb0ef41Sopenharmony_ci
8891cb0ef41Sopenharmony_ci  return new WritableStream({
8901cb0ef41Sopenharmony_ci    write(chunk, controller) {
8911cb0ef41Sopenharmony_ci      current = current !== undefined ?
8921cb0ef41Sopenharmony_ci        PromisePrototypeThen(
8931cb0ef41Sopenharmony_ci          current,
8941cb0ef41Sopenharmony_ci          () => doWrite(chunk, controller),
8951cb0ef41Sopenharmony_ci          (error) => controller.error(error)) :
8961cb0ef41Sopenharmony_ci        doWrite(chunk, controller);
8971cb0ef41Sopenharmony_ci      return current;
8981cb0ef41Sopenharmony_ci    },
8991cb0ef41Sopenharmony_ci
9001cb0ef41Sopenharmony_ci    close() {
9011cb0ef41Sopenharmony_ci      const promise = createDeferredPromise();
9021cb0ef41Sopenharmony_ci      const req = new ShutdownWrap();
9031cb0ef41Sopenharmony_ci      req.oncomplete = () => promise.resolve();
9041cb0ef41Sopenharmony_ci      const err = streamBase.shutdown(req);
9051cb0ef41Sopenharmony_ci      if (err === 1)
9061cb0ef41Sopenharmony_ci        promise.resolve();
9071cb0ef41Sopenharmony_ci      return promise.promise;
9081cb0ef41Sopenharmony_ci    },
9091cb0ef41Sopenharmony_ci  }, strategy);
9101cb0ef41Sopenharmony_ci}
9111cb0ef41Sopenharmony_ci
9121cb0ef41Sopenharmony_ci/**
9131cb0ef41Sopenharmony_ci * @param {StreamBase} streamBase
9141cb0ef41Sopenharmony_ci * @param {QueuingStrategy} strategy
9151cb0ef41Sopenharmony_ci * @returns {ReadableStream}
9161cb0ef41Sopenharmony_ci */
9171cb0ef41Sopenharmony_cifunction newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyObject) {
9181cb0ef41Sopenharmony_ci  validateObject(streamBase, 'streamBase');
9191cb0ef41Sopenharmony_ci  validateObject(options, 'options');
9201cb0ef41Sopenharmony_ci
9211cb0ef41Sopenharmony_ci  const {
9221cb0ef41Sopenharmony_ci    ondone = () => {},
9231cb0ef41Sopenharmony_ci  } = options;
9241cb0ef41Sopenharmony_ci
9251cb0ef41Sopenharmony_ci  if (typeof streamBase.onread === 'function')
9261cb0ef41Sopenharmony_ci    throw new ERR_INVALID_STATE('StreamBase already has a consumer');
9271cb0ef41Sopenharmony_ci
9281cb0ef41Sopenharmony_ci  validateFunction(ondone, 'options.ondone');
9291cb0ef41Sopenharmony_ci
9301cb0ef41Sopenharmony_ci  let controller;
9311cb0ef41Sopenharmony_ci
9321cb0ef41Sopenharmony_ci  streamBase.onread = (arrayBuffer) => {
9331cb0ef41Sopenharmony_ci    const nread = streamBaseState[kReadBytesOrError];
9341cb0ef41Sopenharmony_ci
9351cb0ef41Sopenharmony_ci    if (nread === 0)
9361cb0ef41Sopenharmony_ci      return;
9371cb0ef41Sopenharmony_ci
9381cb0ef41Sopenharmony_ci    try {
9391cb0ef41Sopenharmony_ci      if (nread === UV_EOF) {
9401cb0ef41Sopenharmony_ci        controller.close();
9411cb0ef41Sopenharmony_ci        streamBase.readStop();
9421cb0ef41Sopenharmony_ci        try {
9431cb0ef41Sopenharmony_ci          ondone();
9441cb0ef41Sopenharmony_ci        } catch (error) {
9451cb0ef41Sopenharmony_ci          controller.error(error);
9461cb0ef41Sopenharmony_ci        }
9471cb0ef41Sopenharmony_ci        return;
9481cb0ef41Sopenharmony_ci      }
9491cb0ef41Sopenharmony_ci
9501cb0ef41Sopenharmony_ci      controller.enqueue(arrayBuffer);
9511cb0ef41Sopenharmony_ci
9521cb0ef41Sopenharmony_ci      if (controller.desiredSize <= 0)
9531cb0ef41Sopenharmony_ci        streamBase.readStop();
9541cb0ef41Sopenharmony_ci    } catch (error) {
9551cb0ef41Sopenharmony_ci      controller.error(error);
9561cb0ef41Sopenharmony_ci      streamBase.readStop();
9571cb0ef41Sopenharmony_ci    }
9581cb0ef41Sopenharmony_ci  };
9591cb0ef41Sopenharmony_ci
9601cb0ef41Sopenharmony_ci  return new ReadableStream({
9611cb0ef41Sopenharmony_ci    start(c) { controller = c; },
9621cb0ef41Sopenharmony_ci
9631cb0ef41Sopenharmony_ci    pull() {
9641cb0ef41Sopenharmony_ci      streamBase.readStart();
9651cb0ef41Sopenharmony_ci    },
9661cb0ef41Sopenharmony_ci
9671cb0ef41Sopenharmony_ci    cancel() {
9681cb0ef41Sopenharmony_ci      const promise = createDeferredPromise();
9691cb0ef41Sopenharmony_ci      try {
9701cb0ef41Sopenharmony_ci        ondone();
9711cb0ef41Sopenharmony_ci      } catch (error) {
9721cb0ef41Sopenharmony_ci        promise.reject(error);
9731cb0ef41Sopenharmony_ci        return promise.promise;
9741cb0ef41Sopenharmony_ci      }
9751cb0ef41Sopenharmony_ci      const req = new ShutdownWrap();
9761cb0ef41Sopenharmony_ci      req.oncomplete = () => promise.resolve();
9771cb0ef41Sopenharmony_ci      const err = streamBase.shutdown(req);
9781cb0ef41Sopenharmony_ci      if (err === 1)
9791cb0ef41Sopenharmony_ci        promise.resolve();
9801cb0ef41Sopenharmony_ci      return promise.promise;
9811cb0ef41Sopenharmony_ci    },
9821cb0ef41Sopenharmony_ci  }, strategy);
9831cb0ef41Sopenharmony_ci}
9841cb0ef41Sopenharmony_ci
9851cb0ef41Sopenharmony_cimodule.exports = {
9861cb0ef41Sopenharmony_ci  newWritableStreamFromStreamWritable,
9871cb0ef41Sopenharmony_ci  newReadableStreamFromStreamReadable,
9881cb0ef41Sopenharmony_ci  newStreamWritableFromWritableStream,
9891cb0ef41Sopenharmony_ci  newStreamReadableFromReadableStream,
9901cb0ef41Sopenharmony_ci  newReadableWritablePairFromDuplex,
9911cb0ef41Sopenharmony_ci  newStreamDuplexFromReadableWritablePair,
9921cb0ef41Sopenharmony_ci  newWritableStreamFromStreamBase,
9931cb0ef41Sopenharmony_ci  newReadableStreamFromStreamBase,
9941cb0ef41Sopenharmony_ci};
995