11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst { 41cb0ef41Sopenharmony_ci PromisePrototypeThen, 51cb0ef41Sopenharmony_ci PromiseResolve, 61cb0ef41Sopenharmony_ci SafePromiseAll, 71cb0ef41Sopenharmony_ci SafePromisePrototypeFinally, 81cb0ef41Sopenharmony_ci TypedArrayPrototypeGetBuffer, 91cb0ef41Sopenharmony_ci TypedArrayPrototypeGetByteOffset, 101cb0ef41Sopenharmony_ci TypedArrayPrototypeGetByteLength, 111cb0ef41Sopenharmony_ci Uint8Array, 121cb0ef41Sopenharmony_ci} = primordials; 131cb0ef41Sopenharmony_ci 141cb0ef41Sopenharmony_ciconst { TextEncoder } = require('internal/encoding'); 151cb0ef41Sopenharmony_ci 161cb0ef41Sopenharmony_ciconst { 171cb0ef41Sopenharmony_ci ReadableStream, 181cb0ef41Sopenharmony_ci isReadableStream, 191cb0ef41Sopenharmony_ci} = require('internal/webstreams/readablestream'); 201cb0ef41Sopenharmony_ci 211cb0ef41Sopenharmony_ciconst { 221cb0ef41Sopenharmony_ci WritableStream, 231cb0ef41Sopenharmony_ci isWritableStream, 241cb0ef41Sopenharmony_ci} = require('internal/webstreams/writablestream'); 251cb0ef41Sopenharmony_ci 261cb0ef41Sopenharmony_ciconst { 271cb0ef41Sopenharmony_ci CountQueuingStrategy, 281cb0ef41Sopenharmony_ci} = require('internal/webstreams/queuingstrategies'); 291cb0ef41Sopenharmony_ci 301cb0ef41Sopenharmony_ciconst { 311cb0ef41Sopenharmony_ci Writable, 321cb0ef41Sopenharmony_ci Readable, 331cb0ef41Sopenharmony_ci Duplex, 341cb0ef41Sopenharmony_ci destroy, 351cb0ef41Sopenharmony_ci} = require('stream'); 361cb0ef41Sopenharmony_ci 371cb0ef41Sopenharmony_ciconst { 381cb0ef41Sopenharmony_ci isDestroyed, 391cb0ef41Sopenharmony_ci isReadable, 401cb0ef41Sopenharmony_ci isWritable, 411cb0ef41Sopenharmony_ci isWritableEnded, 421cb0ef41Sopenharmony_ci} = require('internal/streams/utils'); 431cb0ef41Sopenharmony_ci 441cb0ef41Sopenharmony_ciconst { 451cb0ef41Sopenharmony_ci Buffer, 461cb0ef41Sopenharmony_ci} = require('buffer'); 471cb0ef41Sopenharmony_ci 481cb0ef41Sopenharmony_ciconst { 491cb0ef41Sopenharmony_ci errnoException, 501cb0ef41Sopenharmony_ci codes: { 511cb0ef41Sopenharmony_ci ERR_INVALID_ARG_TYPE, 521cb0ef41Sopenharmony_ci ERR_INVALID_ARG_VALUE, 531cb0ef41Sopenharmony_ci ERR_INVALID_STATE, 541cb0ef41Sopenharmony_ci ERR_STREAM_PREMATURE_CLOSE, 551cb0ef41Sopenharmony_ci }, 561cb0ef41Sopenharmony_ci AbortError, 571cb0ef41Sopenharmony_ci} = require('internal/errors'); 581cb0ef41Sopenharmony_ci 591cb0ef41Sopenharmony_ciconst { 601cb0ef41Sopenharmony_ci createDeferredPromise, 611cb0ef41Sopenharmony_ci kEmptyObject, 621cb0ef41Sopenharmony_ci normalizeEncoding, 631cb0ef41Sopenharmony_ci} = require('internal/util'); 641cb0ef41Sopenharmony_ci 651cb0ef41Sopenharmony_ciconst { 661cb0ef41Sopenharmony_ci validateBoolean, 671cb0ef41Sopenharmony_ci validateFunction, 681cb0ef41Sopenharmony_ci validateObject, 691cb0ef41Sopenharmony_ci} = require('internal/validators'); 701cb0ef41Sopenharmony_ci 711cb0ef41Sopenharmony_ciconst { 721cb0ef41Sopenharmony_ci WriteWrap, 731cb0ef41Sopenharmony_ci ShutdownWrap, 741cb0ef41Sopenharmony_ci kReadBytesOrError, 751cb0ef41Sopenharmony_ci kLastWriteWasAsync, 761cb0ef41Sopenharmony_ci streamBaseState, 771cb0ef41Sopenharmony_ci} = internalBinding('stream_wrap'); 781cb0ef41Sopenharmony_ci 791cb0ef41Sopenharmony_ciconst finished = require('internal/streams/end-of-stream'); 801cb0ef41Sopenharmony_ci 811cb0ef41Sopenharmony_ciconst { UV_EOF } = internalBinding('uv'); 821cb0ef41Sopenharmony_ci 831cb0ef41Sopenharmony_ciconst encoder = new TextEncoder(); 841cb0ef41Sopenharmony_ci 851cb0ef41Sopenharmony_ci/** 861cb0ef41Sopenharmony_ci * @typedef {import('../../stream').Writable} Writable 871cb0ef41Sopenharmony_ci * @typedef {import('../../stream').Readable} Readable 881cb0ef41Sopenharmony_ci * @typedef {import('./writablestream').WritableStream} WritableStream 891cb0ef41Sopenharmony_ci * @typedef {import('./readablestream').ReadableStream} ReadableStream 901cb0ef41Sopenharmony_ci */ 911cb0ef41Sopenharmony_ci 921cb0ef41Sopenharmony_ci/** 931cb0ef41Sopenharmony_ci * @typedef {import('../abort_controller').AbortSignal} AbortSignal 941cb0ef41Sopenharmony_ci */ 951cb0ef41Sopenharmony_ci 961cb0ef41Sopenharmony_ci/** 971cb0ef41Sopenharmony_ci * @param {Writable} streamWritable 981cb0ef41Sopenharmony_ci * @returns {WritableStream} 991cb0ef41Sopenharmony_ci */ 1001cb0ef41Sopenharmony_cifunction newWritableStreamFromStreamWritable(streamWritable) { 1011cb0ef41Sopenharmony_ci // Not using the internal/streams/utils isWritableNodeStream utility 1021cb0ef41Sopenharmony_ci // here because it will return false if streamWritable is a Duplex 1031cb0ef41Sopenharmony_ci // whose writable option is false. For a Duplex that is not writable, 1041cb0ef41Sopenharmony_ci // we want it to pass this check but return a closed WritableStream. 1051cb0ef41Sopenharmony_ci // We check if the given stream is a stream.Writable or http.OutgoingMessage 1061cb0ef41Sopenharmony_ci const checkIfWritableOrOutgoingMessage = 1071cb0ef41Sopenharmony_ci streamWritable && 1081cb0ef41Sopenharmony_ci typeof streamWritable?.write === 'function' && 1091cb0ef41Sopenharmony_ci typeof streamWritable?.on === 'function'; 1101cb0ef41Sopenharmony_ci if (!checkIfWritableOrOutgoingMessage) { 1111cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 1121cb0ef41Sopenharmony_ci 'streamWritable', 1131cb0ef41Sopenharmony_ci 'stream.Writable', 1141cb0ef41Sopenharmony_ci streamWritable, 1151cb0ef41Sopenharmony_ci ); 1161cb0ef41Sopenharmony_ci } 1171cb0ef41Sopenharmony_ci 1181cb0ef41Sopenharmony_ci if (isDestroyed(streamWritable) || !isWritable(streamWritable)) { 1191cb0ef41Sopenharmony_ci const writable = new WritableStream(); 1201cb0ef41Sopenharmony_ci writable.close(); 1211cb0ef41Sopenharmony_ci return writable; 1221cb0ef41Sopenharmony_ci } 1231cb0ef41Sopenharmony_ci 1241cb0ef41Sopenharmony_ci const highWaterMark = streamWritable.writableHighWaterMark; 1251cb0ef41Sopenharmony_ci const strategy = 1261cb0ef41Sopenharmony_ci streamWritable.writableObjectMode ? 1271cb0ef41Sopenharmony_ci new CountQueuingStrategy({ highWaterMark }) : 1281cb0ef41Sopenharmony_ci { highWaterMark }; 1291cb0ef41Sopenharmony_ci 1301cb0ef41Sopenharmony_ci let controller; 1311cb0ef41Sopenharmony_ci let backpressurePromise; 1321cb0ef41Sopenharmony_ci let closed; 1331cb0ef41Sopenharmony_ci 1341cb0ef41Sopenharmony_ci function onDrain() { 1351cb0ef41Sopenharmony_ci if (backpressurePromise !== undefined) 1361cb0ef41Sopenharmony_ci backpressurePromise.resolve(); 1371cb0ef41Sopenharmony_ci } 1381cb0ef41Sopenharmony_ci 1391cb0ef41Sopenharmony_ci const cleanup = finished(streamWritable, (error) => { 1401cb0ef41Sopenharmony_ci if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') { 1411cb0ef41Sopenharmony_ci const err = new AbortError(undefined, { cause: error }); 1421cb0ef41Sopenharmony_ci error = err; 1431cb0ef41Sopenharmony_ci } 1441cb0ef41Sopenharmony_ci 1451cb0ef41Sopenharmony_ci cleanup(); 1461cb0ef41Sopenharmony_ci // This is a protection against non-standard, legacy streams 1471cb0ef41Sopenharmony_ci // that happen to emit an error event again after finished is called. 1481cb0ef41Sopenharmony_ci streamWritable.on('error', () => {}); 1491cb0ef41Sopenharmony_ci if (error != null) { 1501cb0ef41Sopenharmony_ci if (backpressurePromise !== undefined) 1511cb0ef41Sopenharmony_ci backpressurePromise.reject(error); 1521cb0ef41Sopenharmony_ci // If closed is not undefined, the error is happening 1531cb0ef41Sopenharmony_ci // after the WritableStream close has already started. 1541cb0ef41Sopenharmony_ci // We need to reject it here. 1551cb0ef41Sopenharmony_ci if (closed !== undefined) { 1561cb0ef41Sopenharmony_ci closed.reject(error); 1571cb0ef41Sopenharmony_ci closed = undefined; 1581cb0ef41Sopenharmony_ci } 1591cb0ef41Sopenharmony_ci controller.error(error); 1601cb0ef41Sopenharmony_ci controller = undefined; 1611cb0ef41Sopenharmony_ci return; 1621cb0ef41Sopenharmony_ci } 1631cb0ef41Sopenharmony_ci 1641cb0ef41Sopenharmony_ci if (closed !== undefined) { 1651cb0ef41Sopenharmony_ci closed.resolve(); 1661cb0ef41Sopenharmony_ci closed = undefined; 1671cb0ef41Sopenharmony_ci return; 1681cb0ef41Sopenharmony_ci } 1691cb0ef41Sopenharmony_ci controller.error(new AbortError()); 1701cb0ef41Sopenharmony_ci controller = undefined; 1711cb0ef41Sopenharmony_ci }); 1721cb0ef41Sopenharmony_ci 1731cb0ef41Sopenharmony_ci streamWritable.on('drain', onDrain); 1741cb0ef41Sopenharmony_ci 1751cb0ef41Sopenharmony_ci return new WritableStream({ 1761cb0ef41Sopenharmony_ci start(c) { controller = c; }, 1771cb0ef41Sopenharmony_ci 1781cb0ef41Sopenharmony_ci async write(chunk) { 1791cb0ef41Sopenharmony_ci if (streamWritable.writableNeedDrain || !streamWritable.write(chunk)) { 1801cb0ef41Sopenharmony_ci backpressurePromise = createDeferredPromise(); 1811cb0ef41Sopenharmony_ci return SafePromisePrototypeFinally( 1821cb0ef41Sopenharmony_ci backpressurePromise.promise, () => { 1831cb0ef41Sopenharmony_ci backpressurePromise = undefined; 1841cb0ef41Sopenharmony_ci }); 1851cb0ef41Sopenharmony_ci } 1861cb0ef41Sopenharmony_ci }, 1871cb0ef41Sopenharmony_ci 1881cb0ef41Sopenharmony_ci abort(reason) { 1891cb0ef41Sopenharmony_ci destroy(streamWritable, reason); 1901cb0ef41Sopenharmony_ci }, 1911cb0ef41Sopenharmony_ci 1921cb0ef41Sopenharmony_ci close() { 1931cb0ef41Sopenharmony_ci if (closed === undefined && !isWritableEnded(streamWritable)) { 1941cb0ef41Sopenharmony_ci closed = createDeferredPromise(); 1951cb0ef41Sopenharmony_ci streamWritable.end(); 1961cb0ef41Sopenharmony_ci return closed.promise; 1971cb0ef41Sopenharmony_ci } 1981cb0ef41Sopenharmony_ci 1991cb0ef41Sopenharmony_ci controller = undefined; 2001cb0ef41Sopenharmony_ci return PromiseResolve(); 2011cb0ef41Sopenharmony_ci }, 2021cb0ef41Sopenharmony_ci }, strategy); 2031cb0ef41Sopenharmony_ci} 2041cb0ef41Sopenharmony_ci 2051cb0ef41Sopenharmony_ci/** 2061cb0ef41Sopenharmony_ci * @param {WritableStream} writableStream 2071cb0ef41Sopenharmony_ci * @param {{ 2081cb0ef41Sopenharmony_ci * decodeStrings? : boolean, 2091cb0ef41Sopenharmony_ci * highWaterMark? : number, 2101cb0ef41Sopenharmony_ci * objectMode? : boolean, 2111cb0ef41Sopenharmony_ci * signal? : AbortSignal, 2121cb0ef41Sopenharmony_ci * }} [options] 2131cb0ef41Sopenharmony_ci * @returns {Writable} 2141cb0ef41Sopenharmony_ci */ 2151cb0ef41Sopenharmony_cifunction newStreamWritableFromWritableStream(writableStream, options = kEmptyObject) { 2161cb0ef41Sopenharmony_ci if (!isWritableStream(writableStream)) { 2171cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 2181cb0ef41Sopenharmony_ci 'writableStream', 2191cb0ef41Sopenharmony_ci 'WritableStream', 2201cb0ef41Sopenharmony_ci writableStream); 2211cb0ef41Sopenharmony_ci } 2221cb0ef41Sopenharmony_ci 2231cb0ef41Sopenharmony_ci validateObject(options, 'options'); 2241cb0ef41Sopenharmony_ci const { 2251cb0ef41Sopenharmony_ci highWaterMark, 2261cb0ef41Sopenharmony_ci decodeStrings = true, 2271cb0ef41Sopenharmony_ci objectMode = false, 2281cb0ef41Sopenharmony_ci signal, 2291cb0ef41Sopenharmony_ci } = options; 2301cb0ef41Sopenharmony_ci 2311cb0ef41Sopenharmony_ci validateBoolean(objectMode, 'options.objectMode'); 2321cb0ef41Sopenharmony_ci validateBoolean(decodeStrings, 'options.decodeStrings'); 2331cb0ef41Sopenharmony_ci 2341cb0ef41Sopenharmony_ci const writer = writableStream.getWriter(); 2351cb0ef41Sopenharmony_ci let closed = false; 2361cb0ef41Sopenharmony_ci 2371cb0ef41Sopenharmony_ci const writable = new Writable({ 2381cb0ef41Sopenharmony_ci highWaterMark, 2391cb0ef41Sopenharmony_ci objectMode, 2401cb0ef41Sopenharmony_ci decodeStrings, 2411cb0ef41Sopenharmony_ci signal, 2421cb0ef41Sopenharmony_ci 2431cb0ef41Sopenharmony_ci writev(chunks, callback) { 2441cb0ef41Sopenharmony_ci function done(error) { 2451cb0ef41Sopenharmony_ci error = error.filter((e) => e); 2461cb0ef41Sopenharmony_ci try { 2471cb0ef41Sopenharmony_ci callback(error.length === 0 ? undefined : error); 2481cb0ef41Sopenharmony_ci } catch (error) { 2491cb0ef41Sopenharmony_ci // In a next tick because this is happening within 2501cb0ef41Sopenharmony_ci // a promise context, and if there are any errors 2511cb0ef41Sopenharmony_ci // thrown we don't want those to cause an unhandled 2521cb0ef41Sopenharmony_ci // rejection. Let's just escape the promise and 2531cb0ef41Sopenharmony_ci // handle it separately. 2541cb0ef41Sopenharmony_ci process.nextTick(() => destroy(writable, error)); 2551cb0ef41Sopenharmony_ci } 2561cb0ef41Sopenharmony_ci } 2571cb0ef41Sopenharmony_ci 2581cb0ef41Sopenharmony_ci PromisePrototypeThen( 2591cb0ef41Sopenharmony_ci writer.ready, 2601cb0ef41Sopenharmony_ci () => { 2611cb0ef41Sopenharmony_ci return PromisePrototypeThen( 2621cb0ef41Sopenharmony_ci SafePromiseAll( 2631cb0ef41Sopenharmony_ci chunks, 2641cb0ef41Sopenharmony_ci (data) => writer.write(data.chunk)), 2651cb0ef41Sopenharmony_ci done, 2661cb0ef41Sopenharmony_ci done); 2671cb0ef41Sopenharmony_ci }, 2681cb0ef41Sopenharmony_ci done); 2691cb0ef41Sopenharmony_ci }, 2701cb0ef41Sopenharmony_ci 2711cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 2721cb0ef41Sopenharmony_ci if (typeof chunk === 'string' && decodeStrings && !objectMode) { 2731cb0ef41Sopenharmony_ci const enc = normalizeEncoding(encoding); 2741cb0ef41Sopenharmony_ci 2751cb0ef41Sopenharmony_ci if (enc === 'utf8') { 2761cb0ef41Sopenharmony_ci chunk = encoder.encode(chunk); 2771cb0ef41Sopenharmony_ci } else { 2781cb0ef41Sopenharmony_ci chunk = Buffer.from(chunk, encoding); 2791cb0ef41Sopenharmony_ci chunk = new Uint8Array( 2801cb0ef41Sopenharmony_ci TypedArrayPrototypeGetBuffer(chunk), 2811cb0ef41Sopenharmony_ci TypedArrayPrototypeGetByteOffset(chunk), 2821cb0ef41Sopenharmony_ci TypedArrayPrototypeGetByteLength(chunk), 2831cb0ef41Sopenharmony_ci ); 2841cb0ef41Sopenharmony_ci } 2851cb0ef41Sopenharmony_ci } 2861cb0ef41Sopenharmony_ci 2871cb0ef41Sopenharmony_ci function done(error) { 2881cb0ef41Sopenharmony_ci try { 2891cb0ef41Sopenharmony_ci callback(error); 2901cb0ef41Sopenharmony_ci } catch (error) { 2911cb0ef41Sopenharmony_ci destroy(writable, error); 2921cb0ef41Sopenharmony_ci } 2931cb0ef41Sopenharmony_ci } 2941cb0ef41Sopenharmony_ci 2951cb0ef41Sopenharmony_ci PromisePrototypeThen( 2961cb0ef41Sopenharmony_ci writer.ready, 2971cb0ef41Sopenharmony_ci () => { 2981cb0ef41Sopenharmony_ci return PromisePrototypeThen( 2991cb0ef41Sopenharmony_ci writer.write(chunk), 3001cb0ef41Sopenharmony_ci done, 3011cb0ef41Sopenharmony_ci done); 3021cb0ef41Sopenharmony_ci }, 3031cb0ef41Sopenharmony_ci done); 3041cb0ef41Sopenharmony_ci }, 3051cb0ef41Sopenharmony_ci 3061cb0ef41Sopenharmony_ci destroy(error, callback) { 3071cb0ef41Sopenharmony_ci function done() { 3081cb0ef41Sopenharmony_ci try { 3091cb0ef41Sopenharmony_ci callback(error); 3101cb0ef41Sopenharmony_ci } catch (error) { 3111cb0ef41Sopenharmony_ci // In a next tick because this is happening within 3121cb0ef41Sopenharmony_ci // a promise context, and if there are any errors 3131cb0ef41Sopenharmony_ci // thrown we don't want those to cause an unhandled 3141cb0ef41Sopenharmony_ci // rejection. Let's just escape the promise and 3151cb0ef41Sopenharmony_ci // handle it separately. 3161cb0ef41Sopenharmony_ci process.nextTick(() => { throw error; }); 3171cb0ef41Sopenharmony_ci } 3181cb0ef41Sopenharmony_ci } 3191cb0ef41Sopenharmony_ci 3201cb0ef41Sopenharmony_ci if (!closed) { 3211cb0ef41Sopenharmony_ci if (error != null) { 3221cb0ef41Sopenharmony_ci PromisePrototypeThen( 3231cb0ef41Sopenharmony_ci writer.abort(error), 3241cb0ef41Sopenharmony_ci done, 3251cb0ef41Sopenharmony_ci done); 3261cb0ef41Sopenharmony_ci } else { 3271cb0ef41Sopenharmony_ci PromisePrototypeThen( 3281cb0ef41Sopenharmony_ci writer.close(), 3291cb0ef41Sopenharmony_ci done, 3301cb0ef41Sopenharmony_ci done); 3311cb0ef41Sopenharmony_ci } 3321cb0ef41Sopenharmony_ci return; 3331cb0ef41Sopenharmony_ci } 3341cb0ef41Sopenharmony_ci 3351cb0ef41Sopenharmony_ci done(); 3361cb0ef41Sopenharmony_ci }, 3371cb0ef41Sopenharmony_ci 3381cb0ef41Sopenharmony_ci final(callback) { 3391cb0ef41Sopenharmony_ci function done(error) { 3401cb0ef41Sopenharmony_ci try { 3411cb0ef41Sopenharmony_ci callback(error); 3421cb0ef41Sopenharmony_ci } catch (error) { 3431cb0ef41Sopenharmony_ci // In a next tick because this is happening within 3441cb0ef41Sopenharmony_ci // a promise context, and if there are any errors 3451cb0ef41Sopenharmony_ci // thrown we don't want those to cause an unhandled 3461cb0ef41Sopenharmony_ci // rejection. Let's just escape the promise and 3471cb0ef41Sopenharmony_ci // handle it separately. 3481cb0ef41Sopenharmony_ci process.nextTick(() => destroy(writable, error)); 3491cb0ef41Sopenharmony_ci } 3501cb0ef41Sopenharmony_ci } 3511cb0ef41Sopenharmony_ci 3521cb0ef41Sopenharmony_ci if (!closed) { 3531cb0ef41Sopenharmony_ci PromisePrototypeThen( 3541cb0ef41Sopenharmony_ci writer.close(), 3551cb0ef41Sopenharmony_ci done, 3561cb0ef41Sopenharmony_ci done); 3571cb0ef41Sopenharmony_ci } 3581cb0ef41Sopenharmony_ci }, 3591cb0ef41Sopenharmony_ci }); 3601cb0ef41Sopenharmony_ci 3611cb0ef41Sopenharmony_ci PromisePrototypeThen( 3621cb0ef41Sopenharmony_ci writer.closed, 3631cb0ef41Sopenharmony_ci () => { 3641cb0ef41Sopenharmony_ci // If the WritableStream closes before the stream.Writable has been 3651cb0ef41Sopenharmony_ci // ended, we signal an error on the stream.Writable. 3661cb0ef41Sopenharmony_ci closed = true; 3671cb0ef41Sopenharmony_ci if (!isWritableEnded(writable)) 3681cb0ef41Sopenharmony_ci destroy(writable, new ERR_STREAM_PREMATURE_CLOSE()); 3691cb0ef41Sopenharmony_ci }, 3701cb0ef41Sopenharmony_ci (error) => { 3711cb0ef41Sopenharmony_ci // If the WritableStream errors before the stream.Writable has been 3721cb0ef41Sopenharmony_ci // destroyed, signal an error on the stream.Writable. 3731cb0ef41Sopenharmony_ci closed = true; 3741cb0ef41Sopenharmony_ci destroy(writable, error); 3751cb0ef41Sopenharmony_ci }); 3761cb0ef41Sopenharmony_ci 3771cb0ef41Sopenharmony_ci return writable; 3781cb0ef41Sopenharmony_ci} 3791cb0ef41Sopenharmony_ci 3801cb0ef41Sopenharmony_ci/** 3811cb0ef41Sopenharmony_ci * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy 3821cb0ef41Sopenharmony_ci * @param {Readable} streamReadable 3831cb0ef41Sopenharmony_ci * @param {{ 3841cb0ef41Sopenharmony_ci * strategy : QueuingStrategy 3851cb0ef41Sopenharmony_ci * }} [options] 3861cb0ef41Sopenharmony_ci * @returns {ReadableStream} 3871cb0ef41Sopenharmony_ci */ 3881cb0ef41Sopenharmony_cifunction newReadableStreamFromStreamReadable(streamReadable, options = kEmptyObject) { 3891cb0ef41Sopenharmony_ci // Not using the internal/streams/utils isReadableNodeStream utility 3901cb0ef41Sopenharmony_ci // here because it will return false if streamReadable is a Duplex 3911cb0ef41Sopenharmony_ci // whose readable option is false. For a Duplex that is not readable, 3921cb0ef41Sopenharmony_ci // we want it to pass this check but return a closed ReadableStream. 3931cb0ef41Sopenharmony_ci if (typeof streamReadable?._readableState !== 'object') { 3941cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 3951cb0ef41Sopenharmony_ci 'streamReadable', 3961cb0ef41Sopenharmony_ci 'stream.Readable', 3971cb0ef41Sopenharmony_ci streamReadable); 3981cb0ef41Sopenharmony_ci } 3991cb0ef41Sopenharmony_ci 4001cb0ef41Sopenharmony_ci if (isDestroyed(streamReadable) || !isReadable(streamReadable)) { 4011cb0ef41Sopenharmony_ci const readable = new ReadableStream(); 4021cb0ef41Sopenharmony_ci readable.cancel(); 4031cb0ef41Sopenharmony_ci return readable; 4041cb0ef41Sopenharmony_ci } 4051cb0ef41Sopenharmony_ci 4061cb0ef41Sopenharmony_ci const objectMode = streamReadable.readableObjectMode; 4071cb0ef41Sopenharmony_ci const highWaterMark = streamReadable.readableHighWaterMark; 4081cb0ef41Sopenharmony_ci 4091cb0ef41Sopenharmony_ci const evaluateStrategyOrFallback = (strategy) => { 4101cb0ef41Sopenharmony_ci // If there is a strategy available, use it 4111cb0ef41Sopenharmony_ci if (strategy) 4121cb0ef41Sopenharmony_ci return strategy; 4131cb0ef41Sopenharmony_ci 4141cb0ef41Sopenharmony_ci if (objectMode) { 4151cb0ef41Sopenharmony_ci // When running in objectMode explicitly but no strategy, we just fall 4161cb0ef41Sopenharmony_ci // back to CountQueuingStrategy 4171cb0ef41Sopenharmony_ci return new CountQueuingStrategy({ highWaterMark }); 4181cb0ef41Sopenharmony_ci } 4191cb0ef41Sopenharmony_ci 4201cb0ef41Sopenharmony_ci // When not running in objectMode explicitly, we just fall 4211cb0ef41Sopenharmony_ci // back to a minimal strategy that just specifies the highWaterMark 4221cb0ef41Sopenharmony_ci // and no size algorithm. Using a ByteLengthQueuingStrategy here 4231cb0ef41Sopenharmony_ci // is unnecessary. 4241cb0ef41Sopenharmony_ci return { highWaterMark }; 4251cb0ef41Sopenharmony_ci }; 4261cb0ef41Sopenharmony_ci 4271cb0ef41Sopenharmony_ci const strategy = evaluateStrategyOrFallback(options?.strategy); 4281cb0ef41Sopenharmony_ci 4291cb0ef41Sopenharmony_ci let controller; 4301cb0ef41Sopenharmony_ci 4311cb0ef41Sopenharmony_ci function onData(chunk) { 4321cb0ef41Sopenharmony_ci // Copy the Buffer to detach it from the pool. 4331cb0ef41Sopenharmony_ci if (Buffer.isBuffer(chunk) && !objectMode) 4341cb0ef41Sopenharmony_ci chunk = new Uint8Array(chunk); 4351cb0ef41Sopenharmony_ci controller.enqueue(chunk); 4361cb0ef41Sopenharmony_ci if (controller.desiredSize <= 0) 4371cb0ef41Sopenharmony_ci streamReadable.pause(); 4381cb0ef41Sopenharmony_ci } 4391cb0ef41Sopenharmony_ci 4401cb0ef41Sopenharmony_ci streamReadable.pause(); 4411cb0ef41Sopenharmony_ci 4421cb0ef41Sopenharmony_ci const cleanup = finished(streamReadable, (error) => { 4431cb0ef41Sopenharmony_ci if (error?.code === 'ERR_STREAM_PREMATURE_CLOSE') { 4441cb0ef41Sopenharmony_ci const err = new AbortError(undefined, { cause: error }); 4451cb0ef41Sopenharmony_ci error = err; 4461cb0ef41Sopenharmony_ci } 4471cb0ef41Sopenharmony_ci 4481cb0ef41Sopenharmony_ci cleanup(); 4491cb0ef41Sopenharmony_ci // This is a protection against non-standard, legacy streams 4501cb0ef41Sopenharmony_ci // that happen to emit an error event again after finished is called. 4511cb0ef41Sopenharmony_ci streamReadable.on('error', () => {}); 4521cb0ef41Sopenharmony_ci if (error) 4531cb0ef41Sopenharmony_ci return controller.error(error); 4541cb0ef41Sopenharmony_ci controller.close(); 4551cb0ef41Sopenharmony_ci }); 4561cb0ef41Sopenharmony_ci 4571cb0ef41Sopenharmony_ci streamReadable.on('data', onData); 4581cb0ef41Sopenharmony_ci 4591cb0ef41Sopenharmony_ci return new ReadableStream({ 4601cb0ef41Sopenharmony_ci start(c) { controller = c; }, 4611cb0ef41Sopenharmony_ci 4621cb0ef41Sopenharmony_ci pull() { streamReadable.resume(); }, 4631cb0ef41Sopenharmony_ci 4641cb0ef41Sopenharmony_ci cancel(reason) { 4651cb0ef41Sopenharmony_ci destroy(streamReadable, reason); 4661cb0ef41Sopenharmony_ci }, 4671cb0ef41Sopenharmony_ci }, strategy); 4681cb0ef41Sopenharmony_ci} 4691cb0ef41Sopenharmony_ci 4701cb0ef41Sopenharmony_ci/** 4711cb0ef41Sopenharmony_ci * @param {ReadableStream} readableStream 4721cb0ef41Sopenharmony_ci * @param {{ 4731cb0ef41Sopenharmony_ci * highWaterMark? : number, 4741cb0ef41Sopenharmony_ci * encoding? : string, 4751cb0ef41Sopenharmony_ci * objectMode? : boolean, 4761cb0ef41Sopenharmony_ci * signal? : AbortSignal, 4771cb0ef41Sopenharmony_ci * }} [options] 4781cb0ef41Sopenharmony_ci * @returns {Readable} 4791cb0ef41Sopenharmony_ci */ 4801cb0ef41Sopenharmony_cifunction newStreamReadableFromReadableStream(readableStream, options = kEmptyObject) { 4811cb0ef41Sopenharmony_ci if (!isReadableStream(readableStream)) { 4821cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 4831cb0ef41Sopenharmony_ci 'readableStream', 4841cb0ef41Sopenharmony_ci 'ReadableStream', 4851cb0ef41Sopenharmony_ci readableStream); 4861cb0ef41Sopenharmony_ci } 4871cb0ef41Sopenharmony_ci 4881cb0ef41Sopenharmony_ci validateObject(options, 'options'); 4891cb0ef41Sopenharmony_ci const { 4901cb0ef41Sopenharmony_ci highWaterMark, 4911cb0ef41Sopenharmony_ci encoding, 4921cb0ef41Sopenharmony_ci objectMode = false, 4931cb0ef41Sopenharmony_ci signal, 4941cb0ef41Sopenharmony_ci } = options; 4951cb0ef41Sopenharmony_ci 4961cb0ef41Sopenharmony_ci if (encoding !== undefined && !Buffer.isEncoding(encoding)) 4971cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); 4981cb0ef41Sopenharmony_ci validateBoolean(objectMode, 'options.objectMode'); 4991cb0ef41Sopenharmony_ci 5001cb0ef41Sopenharmony_ci const reader = readableStream.getReader(); 5011cb0ef41Sopenharmony_ci let closed = false; 5021cb0ef41Sopenharmony_ci 5031cb0ef41Sopenharmony_ci const readable = new Readable({ 5041cb0ef41Sopenharmony_ci objectMode, 5051cb0ef41Sopenharmony_ci highWaterMark, 5061cb0ef41Sopenharmony_ci encoding, 5071cb0ef41Sopenharmony_ci signal, 5081cb0ef41Sopenharmony_ci 5091cb0ef41Sopenharmony_ci read() { 5101cb0ef41Sopenharmony_ci PromisePrototypeThen( 5111cb0ef41Sopenharmony_ci reader.read(), 5121cb0ef41Sopenharmony_ci (chunk) => { 5131cb0ef41Sopenharmony_ci if (chunk.done) { 5141cb0ef41Sopenharmony_ci // Value should always be undefined here. 5151cb0ef41Sopenharmony_ci readable.push(null); 5161cb0ef41Sopenharmony_ci } else { 5171cb0ef41Sopenharmony_ci readable.push(chunk.value); 5181cb0ef41Sopenharmony_ci } 5191cb0ef41Sopenharmony_ci }, 5201cb0ef41Sopenharmony_ci (error) => destroy(readable, error)); 5211cb0ef41Sopenharmony_ci }, 5221cb0ef41Sopenharmony_ci 5231cb0ef41Sopenharmony_ci destroy(error, callback) { 5241cb0ef41Sopenharmony_ci function done() { 5251cb0ef41Sopenharmony_ci try { 5261cb0ef41Sopenharmony_ci callback(error); 5271cb0ef41Sopenharmony_ci } catch (error) { 5281cb0ef41Sopenharmony_ci // In a next tick because this is happening within 5291cb0ef41Sopenharmony_ci // a promise context, and if there are any errors 5301cb0ef41Sopenharmony_ci // thrown we don't want those to cause an unhandled 5311cb0ef41Sopenharmony_ci // rejection. Let's just escape the promise and 5321cb0ef41Sopenharmony_ci // handle it separately. 5331cb0ef41Sopenharmony_ci process.nextTick(() => { throw error; }); 5341cb0ef41Sopenharmony_ci } 5351cb0ef41Sopenharmony_ci } 5361cb0ef41Sopenharmony_ci 5371cb0ef41Sopenharmony_ci if (!closed) { 5381cb0ef41Sopenharmony_ci PromisePrototypeThen( 5391cb0ef41Sopenharmony_ci reader.cancel(error), 5401cb0ef41Sopenharmony_ci done, 5411cb0ef41Sopenharmony_ci done); 5421cb0ef41Sopenharmony_ci return; 5431cb0ef41Sopenharmony_ci } 5441cb0ef41Sopenharmony_ci done(); 5451cb0ef41Sopenharmony_ci }, 5461cb0ef41Sopenharmony_ci }); 5471cb0ef41Sopenharmony_ci 5481cb0ef41Sopenharmony_ci PromisePrototypeThen( 5491cb0ef41Sopenharmony_ci reader.closed, 5501cb0ef41Sopenharmony_ci () => { 5511cb0ef41Sopenharmony_ci closed = true; 5521cb0ef41Sopenharmony_ci }, 5531cb0ef41Sopenharmony_ci (error) => { 5541cb0ef41Sopenharmony_ci closed = true; 5551cb0ef41Sopenharmony_ci destroy(readable, error); 5561cb0ef41Sopenharmony_ci }); 5571cb0ef41Sopenharmony_ci 5581cb0ef41Sopenharmony_ci return readable; 5591cb0ef41Sopenharmony_ci} 5601cb0ef41Sopenharmony_ci 5611cb0ef41Sopenharmony_ci/** 5621cb0ef41Sopenharmony_ci * @typedef {import('./readablestream').ReadableWritablePair 5631cb0ef41Sopenharmony_ci * } ReadableWritablePair 5641cb0ef41Sopenharmony_ci * @typedef {import('../../stream').Duplex} Duplex 5651cb0ef41Sopenharmony_ci */ 5661cb0ef41Sopenharmony_ci 5671cb0ef41Sopenharmony_ci/** 5681cb0ef41Sopenharmony_ci * @param {Duplex} duplex 5691cb0ef41Sopenharmony_ci * @returns {ReadableWritablePair} 5701cb0ef41Sopenharmony_ci */ 5711cb0ef41Sopenharmony_cifunction newReadableWritablePairFromDuplex(duplex) { 5721cb0ef41Sopenharmony_ci // Not using the internal/streams/utils isWritableNodeStream and 5731cb0ef41Sopenharmony_ci // isReadableNodeStream utilities here because they will return false 5741cb0ef41Sopenharmony_ci // if the duplex was created with writable or readable options set to 5751cb0ef41Sopenharmony_ci // false. Instead, we'll check the readable and writable state after 5761cb0ef41Sopenharmony_ci // and return closed WritableStream or closed ReadableStream as 5771cb0ef41Sopenharmony_ci // necessary. 5781cb0ef41Sopenharmony_ci if (typeof duplex?._writableState !== 'object' || 5791cb0ef41Sopenharmony_ci typeof duplex?._readableState !== 'object') { 5801cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE('duplex', 'stream.Duplex', duplex); 5811cb0ef41Sopenharmony_ci } 5821cb0ef41Sopenharmony_ci 5831cb0ef41Sopenharmony_ci if (isDestroyed(duplex)) { 5841cb0ef41Sopenharmony_ci const writable = new WritableStream(); 5851cb0ef41Sopenharmony_ci const readable = new ReadableStream(); 5861cb0ef41Sopenharmony_ci writable.close(); 5871cb0ef41Sopenharmony_ci readable.cancel(); 5881cb0ef41Sopenharmony_ci return { readable, writable }; 5891cb0ef41Sopenharmony_ci } 5901cb0ef41Sopenharmony_ci 5911cb0ef41Sopenharmony_ci const writable = 5921cb0ef41Sopenharmony_ci isWritable(duplex) ? 5931cb0ef41Sopenharmony_ci newWritableStreamFromStreamWritable(duplex) : 5941cb0ef41Sopenharmony_ci new WritableStream(); 5951cb0ef41Sopenharmony_ci 5961cb0ef41Sopenharmony_ci if (!isWritable(duplex)) 5971cb0ef41Sopenharmony_ci writable.close(); 5981cb0ef41Sopenharmony_ci 5991cb0ef41Sopenharmony_ci const readable = 6001cb0ef41Sopenharmony_ci isReadable(duplex) ? 6011cb0ef41Sopenharmony_ci newReadableStreamFromStreamReadable(duplex) : 6021cb0ef41Sopenharmony_ci new ReadableStream(); 6031cb0ef41Sopenharmony_ci 6041cb0ef41Sopenharmony_ci if (!isReadable(duplex)) 6051cb0ef41Sopenharmony_ci readable.cancel(); 6061cb0ef41Sopenharmony_ci 6071cb0ef41Sopenharmony_ci return { writable, readable }; 6081cb0ef41Sopenharmony_ci} 6091cb0ef41Sopenharmony_ci 6101cb0ef41Sopenharmony_ci/** 6111cb0ef41Sopenharmony_ci * @param {ReadableWritablePair} pair 6121cb0ef41Sopenharmony_ci * @param {{ 6131cb0ef41Sopenharmony_ci * allowHalfOpen? : boolean, 6141cb0ef41Sopenharmony_ci * decodeStrings? : boolean, 6151cb0ef41Sopenharmony_ci * encoding? : string, 6161cb0ef41Sopenharmony_ci * highWaterMark? : number, 6171cb0ef41Sopenharmony_ci * objectMode? : boolean, 6181cb0ef41Sopenharmony_ci * signal? : AbortSignal, 6191cb0ef41Sopenharmony_ci * }} [options] 6201cb0ef41Sopenharmony_ci * @returns {Duplex} 6211cb0ef41Sopenharmony_ci */ 6221cb0ef41Sopenharmony_cifunction newStreamDuplexFromReadableWritablePair(pair = kEmptyObject, options = kEmptyObject) { 6231cb0ef41Sopenharmony_ci validateObject(pair, 'pair'); 6241cb0ef41Sopenharmony_ci const { 6251cb0ef41Sopenharmony_ci readable: readableStream, 6261cb0ef41Sopenharmony_ci writable: writableStream, 6271cb0ef41Sopenharmony_ci } = pair; 6281cb0ef41Sopenharmony_ci 6291cb0ef41Sopenharmony_ci if (!isReadableStream(readableStream)) { 6301cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 6311cb0ef41Sopenharmony_ci 'pair.readable', 6321cb0ef41Sopenharmony_ci 'ReadableStream', 6331cb0ef41Sopenharmony_ci readableStream); 6341cb0ef41Sopenharmony_ci } 6351cb0ef41Sopenharmony_ci if (!isWritableStream(writableStream)) { 6361cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_TYPE( 6371cb0ef41Sopenharmony_ci 'pair.writable', 6381cb0ef41Sopenharmony_ci 'WritableStream', 6391cb0ef41Sopenharmony_ci writableStream); 6401cb0ef41Sopenharmony_ci } 6411cb0ef41Sopenharmony_ci 6421cb0ef41Sopenharmony_ci validateObject(options, 'options'); 6431cb0ef41Sopenharmony_ci const { 6441cb0ef41Sopenharmony_ci allowHalfOpen = false, 6451cb0ef41Sopenharmony_ci objectMode = false, 6461cb0ef41Sopenharmony_ci encoding, 6471cb0ef41Sopenharmony_ci decodeStrings = true, 6481cb0ef41Sopenharmony_ci highWaterMark, 6491cb0ef41Sopenharmony_ci signal, 6501cb0ef41Sopenharmony_ci } = options; 6511cb0ef41Sopenharmony_ci 6521cb0ef41Sopenharmony_ci validateBoolean(objectMode, 'options.objectMode'); 6531cb0ef41Sopenharmony_ci if (encoding !== undefined && !Buffer.isEncoding(encoding)) 6541cb0ef41Sopenharmony_ci throw new ERR_INVALID_ARG_VALUE(encoding, 'options.encoding'); 6551cb0ef41Sopenharmony_ci 6561cb0ef41Sopenharmony_ci const writer = writableStream.getWriter(); 6571cb0ef41Sopenharmony_ci const reader = readableStream.getReader(); 6581cb0ef41Sopenharmony_ci let writableClosed = false; 6591cb0ef41Sopenharmony_ci let readableClosed = false; 6601cb0ef41Sopenharmony_ci 6611cb0ef41Sopenharmony_ci const duplex = new Duplex({ 6621cb0ef41Sopenharmony_ci allowHalfOpen, 6631cb0ef41Sopenharmony_ci highWaterMark, 6641cb0ef41Sopenharmony_ci objectMode, 6651cb0ef41Sopenharmony_ci encoding, 6661cb0ef41Sopenharmony_ci decodeStrings, 6671cb0ef41Sopenharmony_ci signal, 6681cb0ef41Sopenharmony_ci 6691cb0ef41Sopenharmony_ci writev(chunks, callback) { 6701cb0ef41Sopenharmony_ci function done(error) { 6711cb0ef41Sopenharmony_ci error = error.filter((e) => e); 6721cb0ef41Sopenharmony_ci try { 6731cb0ef41Sopenharmony_ci callback(error.length === 0 ? undefined : error); 6741cb0ef41Sopenharmony_ci } catch (error) { 6751cb0ef41Sopenharmony_ci // In a next tick because this is happening within 6761cb0ef41Sopenharmony_ci // a promise context, and if there are any errors 6771cb0ef41Sopenharmony_ci // thrown we don't want those to cause an unhandled 6781cb0ef41Sopenharmony_ci // rejection. Let's just escape the promise and 6791cb0ef41Sopenharmony_ci // handle it separately. 6801cb0ef41Sopenharmony_ci process.nextTick(() => destroy(duplex, error)); 6811cb0ef41Sopenharmony_ci } 6821cb0ef41Sopenharmony_ci } 6831cb0ef41Sopenharmony_ci 6841cb0ef41Sopenharmony_ci PromisePrototypeThen( 6851cb0ef41Sopenharmony_ci writer.ready, 6861cb0ef41Sopenharmony_ci () => { 6871cb0ef41Sopenharmony_ci return PromisePrototypeThen( 6881cb0ef41Sopenharmony_ci SafePromiseAll( 6891cb0ef41Sopenharmony_ci chunks, 6901cb0ef41Sopenharmony_ci (data) => writer.write(data.chunk)), 6911cb0ef41Sopenharmony_ci done, 6921cb0ef41Sopenharmony_ci done); 6931cb0ef41Sopenharmony_ci }, 6941cb0ef41Sopenharmony_ci done); 6951cb0ef41Sopenharmony_ci }, 6961cb0ef41Sopenharmony_ci 6971cb0ef41Sopenharmony_ci write(chunk, encoding, callback) { 6981cb0ef41Sopenharmony_ci if (typeof chunk === 'string' && decodeStrings && !objectMode) { 6991cb0ef41Sopenharmony_ci const enc = normalizeEncoding(encoding); 7001cb0ef41Sopenharmony_ci 7011cb0ef41Sopenharmony_ci if (enc === 'utf8') { 7021cb0ef41Sopenharmony_ci chunk = encoder.encode(chunk); 7031cb0ef41Sopenharmony_ci } else { 7041cb0ef41Sopenharmony_ci chunk = Buffer.from(chunk, encoding); 7051cb0ef41Sopenharmony_ci chunk = new Uint8Array( 7061cb0ef41Sopenharmony_ci TypedArrayPrototypeGetBuffer(chunk), 7071cb0ef41Sopenharmony_ci TypedArrayPrototypeGetByteOffset(chunk), 7081cb0ef41Sopenharmony_ci TypedArrayPrototypeGetByteLength(chunk), 7091cb0ef41Sopenharmony_ci ); 7101cb0ef41Sopenharmony_ci } 7111cb0ef41Sopenharmony_ci } 7121cb0ef41Sopenharmony_ci 7131cb0ef41Sopenharmony_ci function done(error) { 7141cb0ef41Sopenharmony_ci try { 7151cb0ef41Sopenharmony_ci callback(error); 7161cb0ef41Sopenharmony_ci } catch (error) { 7171cb0ef41Sopenharmony_ci destroy(duplex, error); 7181cb0ef41Sopenharmony_ci } 7191cb0ef41Sopenharmony_ci } 7201cb0ef41Sopenharmony_ci 7211cb0ef41Sopenharmony_ci PromisePrototypeThen( 7221cb0ef41Sopenharmony_ci writer.ready, 7231cb0ef41Sopenharmony_ci () => { 7241cb0ef41Sopenharmony_ci return PromisePrototypeThen( 7251cb0ef41Sopenharmony_ci writer.write(chunk), 7261cb0ef41Sopenharmony_ci done, 7271cb0ef41Sopenharmony_ci done); 7281cb0ef41Sopenharmony_ci }, 7291cb0ef41Sopenharmony_ci done); 7301cb0ef41Sopenharmony_ci }, 7311cb0ef41Sopenharmony_ci 7321cb0ef41Sopenharmony_ci final(callback) { 7331cb0ef41Sopenharmony_ci function done(error) { 7341cb0ef41Sopenharmony_ci try { 7351cb0ef41Sopenharmony_ci callback(error); 7361cb0ef41Sopenharmony_ci } catch (error) { 7371cb0ef41Sopenharmony_ci // In a next tick because this is happening within 7381cb0ef41Sopenharmony_ci // a promise context, and if there are any errors 7391cb0ef41Sopenharmony_ci // thrown we don't want those to cause an unhandled 7401cb0ef41Sopenharmony_ci // rejection. Let's just escape the promise and 7411cb0ef41Sopenharmony_ci // handle it separately. 7421cb0ef41Sopenharmony_ci process.nextTick(() => destroy(duplex, error)); 7431cb0ef41Sopenharmony_ci } 7441cb0ef41Sopenharmony_ci } 7451cb0ef41Sopenharmony_ci 7461cb0ef41Sopenharmony_ci if (!writableClosed) { 7471cb0ef41Sopenharmony_ci PromisePrototypeThen( 7481cb0ef41Sopenharmony_ci writer.close(), 7491cb0ef41Sopenharmony_ci done, 7501cb0ef41Sopenharmony_ci done); 7511cb0ef41Sopenharmony_ci } 7521cb0ef41Sopenharmony_ci }, 7531cb0ef41Sopenharmony_ci 7541cb0ef41Sopenharmony_ci read() { 7551cb0ef41Sopenharmony_ci PromisePrototypeThen( 7561cb0ef41Sopenharmony_ci reader.read(), 7571cb0ef41Sopenharmony_ci (chunk) => { 7581cb0ef41Sopenharmony_ci if (chunk.done) { 7591cb0ef41Sopenharmony_ci duplex.push(null); 7601cb0ef41Sopenharmony_ci } else { 7611cb0ef41Sopenharmony_ci duplex.push(chunk.value); 7621cb0ef41Sopenharmony_ci } 7631cb0ef41Sopenharmony_ci }, 7641cb0ef41Sopenharmony_ci (error) => destroy(duplex, error)); 7651cb0ef41Sopenharmony_ci }, 7661cb0ef41Sopenharmony_ci 7671cb0ef41Sopenharmony_ci destroy(error, callback) { 7681cb0ef41Sopenharmony_ci function done() { 7691cb0ef41Sopenharmony_ci try { 7701cb0ef41Sopenharmony_ci callback(error); 7711cb0ef41Sopenharmony_ci } catch (error) { 7721cb0ef41Sopenharmony_ci // In a next tick because this is happening within 7731cb0ef41Sopenharmony_ci // a promise context, and if there are any errors 7741cb0ef41Sopenharmony_ci // thrown we don't want those to cause an unhandled 7751cb0ef41Sopenharmony_ci // rejection. Let's just escape the promise and 7761cb0ef41Sopenharmony_ci // handle it separately. 7771cb0ef41Sopenharmony_ci process.nextTick(() => { throw error; }); 7781cb0ef41Sopenharmony_ci } 7791cb0ef41Sopenharmony_ci } 7801cb0ef41Sopenharmony_ci 7811cb0ef41Sopenharmony_ci async function closeWriter() { 7821cb0ef41Sopenharmony_ci if (!writableClosed) 7831cb0ef41Sopenharmony_ci await writer.abort(error); 7841cb0ef41Sopenharmony_ci } 7851cb0ef41Sopenharmony_ci 7861cb0ef41Sopenharmony_ci async function closeReader() { 7871cb0ef41Sopenharmony_ci if (!readableClosed) 7881cb0ef41Sopenharmony_ci await reader.cancel(error); 7891cb0ef41Sopenharmony_ci } 7901cb0ef41Sopenharmony_ci 7911cb0ef41Sopenharmony_ci if (!writableClosed || !readableClosed) { 7921cb0ef41Sopenharmony_ci PromisePrototypeThen( 7931cb0ef41Sopenharmony_ci SafePromiseAll([ 7941cb0ef41Sopenharmony_ci closeWriter(), 7951cb0ef41Sopenharmony_ci closeReader(), 7961cb0ef41Sopenharmony_ci ]), 7971cb0ef41Sopenharmony_ci done, 7981cb0ef41Sopenharmony_ci done); 7991cb0ef41Sopenharmony_ci return; 8001cb0ef41Sopenharmony_ci } 8011cb0ef41Sopenharmony_ci 8021cb0ef41Sopenharmony_ci done(); 8031cb0ef41Sopenharmony_ci }, 8041cb0ef41Sopenharmony_ci }); 8051cb0ef41Sopenharmony_ci 8061cb0ef41Sopenharmony_ci PromisePrototypeThen( 8071cb0ef41Sopenharmony_ci writer.closed, 8081cb0ef41Sopenharmony_ci () => { 8091cb0ef41Sopenharmony_ci writableClosed = true; 8101cb0ef41Sopenharmony_ci if (!isWritableEnded(duplex)) 8111cb0ef41Sopenharmony_ci destroy(duplex, new ERR_STREAM_PREMATURE_CLOSE()); 8121cb0ef41Sopenharmony_ci }, 8131cb0ef41Sopenharmony_ci (error) => { 8141cb0ef41Sopenharmony_ci writableClosed = true; 8151cb0ef41Sopenharmony_ci readableClosed = true; 8161cb0ef41Sopenharmony_ci destroy(duplex, error); 8171cb0ef41Sopenharmony_ci }); 8181cb0ef41Sopenharmony_ci 8191cb0ef41Sopenharmony_ci PromisePrototypeThen( 8201cb0ef41Sopenharmony_ci reader.closed, 8211cb0ef41Sopenharmony_ci () => { 8221cb0ef41Sopenharmony_ci readableClosed = true; 8231cb0ef41Sopenharmony_ci }, 8241cb0ef41Sopenharmony_ci (error) => { 8251cb0ef41Sopenharmony_ci writableClosed = true; 8261cb0ef41Sopenharmony_ci readableClosed = true; 8271cb0ef41Sopenharmony_ci destroy(duplex, error); 8281cb0ef41Sopenharmony_ci }); 8291cb0ef41Sopenharmony_ci 8301cb0ef41Sopenharmony_ci return duplex; 8311cb0ef41Sopenharmony_ci} 8321cb0ef41Sopenharmony_ci 8331cb0ef41Sopenharmony_ci/** 8341cb0ef41Sopenharmony_ci * @typedef {import('./queuingstrategies').QueuingStrategy} QueuingStrategy 8351cb0ef41Sopenharmony_ci * @typedef {{}} StreamBase 8361cb0ef41Sopenharmony_ci * @param {StreamBase} streamBase 8371cb0ef41Sopenharmony_ci * @param {QueuingStrategy} strategy 8381cb0ef41Sopenharmony_ci * @returns {WritableStream} 8391cb0ef41Sopenharmony_ci */ 8401cb0ef41Sopenharmony_cifunction newWritableStreamFromStreamBase(streamBase, strategy) { 8411cb0ef41Sopenharmony_ci validateObject(streamBase, 'streamBase'); 8421cb0ef41Sopenharmony_ci 8431cb0ef41Sopenharmony_ci let current; 8441cb0ef41Sopenharmony_ci 8451cb0ef41Sopenharmony_ci function createWriteWrap(controller, promise) { 8461cb0ef41Sopenharmony_ci const req = new WriteWrap(); 8471cb0ef41Sopenharmony_ci req.handle = streamBase; 8481cb0ef41Sopenharmony_ci req.oncomplete = onWriteComplete; 8491cb0ef41Sopenharmony_ci req.async = false; 8501cb0ef41Sopenharmony_ci req.bytes = 0; 8511cb0ef41Sopenharmony_ci req.buffer = null; 8521cb0ef41Sopenharmony_ci req.controller = controller; 8531cb0ef41Sopenharmony_ci req.promise = promise; 8541cb0ef41Sopenharmony_ci return req; 8551cb0ef41Sopenharmony_ci } 8561cb0ef41Sopenharmony_ci 8571cb0ef41Sopenharmony_ci function onWriteComplete(status) { 8581cb0ef41Sopenharmony_ci if (status < 0) { 8591cb0ef41Sopenharmony_ci const error = errnoException(status, 'write', this.error); 8601cb0ef41Sopenharmony_ci this.promise.reject(error); 8611cb0ef41Sopenharmony_ci this.controller.error(error); 8621cb0ef41Sopenharmony_ci return; 8631cb0ef41Sopenharmony_ci } 8641cb0ef41Sopenharmony_ci this.promise.resolve(); 8651cb0ef41Sopenharmony_ci } 8661cb0ef41Sopenharmony_ci 8671cb0ef41Sopenharmony_ci function doWrite(chunk, controller) { 8681cb0ef41Sopenharmony_ci const promise = createDeferredPromise(); 8691cb0ef41Sopenharmony_ci let ret; 8701cb0ef41Sopenharmony_ci let req; 8711cb0ef41Sopenharmony_ci try { 8721cb0ef41Sopenharmony_ci req = createWriteWrap(controller, promise); 8731cb0ef41Sopenharmony_ci ret = streamBase.writeBuffer(req, chunk); 8741cb0ef41Sopenharmony_ci if (streamBaseState[kLastWriteWasAsync]) 8751cb0ef41Sopenharmony_ci req.buffer = chunk; 8761cb0ef41Sopenharmony_ci req.async = !!streamBaseState[kLastWriteWasAsync]; 8771cb0ef41Sopenharmony_ci } catch (error) { 8781cb0ef41Sopenharmony_ci promise.reject(error); 8791cb0ef41Sopenharmony_ci } 8801cb0ef41Sopenharmony_ci 8811cb0ef41Sopenharmony_ci if (ret !== 0) 8821cb0ef41Sopenharmony_ci promise.reject(errnoException(ret, 'write', req)); 8831cb0ef41Sopenharmony_ci else if (!req.async) 8841cb0ef41Sopenharmony_ci promise.resolve(); 8851cb0ef41Sopenharmony_ci 8861cb0ef41Sopenharmony_ci return promise.promise; 8871cb0ef41Sopenharmony_ci } 8881cb0ef41Sopenharmony_ci 8891cb0ef41Sopenharmony_ci return new WritableStream({ 8901cb0ef41Sopenharmony_ci write(chunk, controller) { 8911cb0ef41Sopenharmony_ci current = current !== undefined ? 8921cb0ef41Sopenharmony_ci PromisePrototypeThen( 8931cb0ef41Sopenharmony_ci current, 8941cb0ef41Sopenharmony_ci () => doWrite(chunk, controller), 8951cb0ef41Sopenharmony_ci (error) => controller.error(error)) : 8961cb0ef41Sopenharmony_ci doWrite(chunk, controller); 8971cb0ef41Sopenharmony_ci return current; 8981cb0ef41Sopenharmony_ci }, 8991cb0ef41Sopenharmony_ci 9001cb0ef41Sopenharmony_ci close() { 9011cb0ef41Sopenharmony_ci const promise = createDeferredPromise(); 9021cb0ef41Sopenharmony_ci const req = new ShutdownWrap(); 9031cb0ef41Sopenharmony_ci req.oncomplete = () => promise.resolve(); 9041cb0ef41Sopenharmony_ci const err = streamBase.shutdown(req); 9051cb0ef41Sopenharmony_ci if (err === 1) 9061cb0ef41Sopenharmony_ci promise.resolve(); 9071cb0ef41Sopenharmony_ci return promise.promise; 9081cb0ef41Sopenharmony_ci }, 9091cb0ef41Sopenharmony_ci }, strategy); 9101cb0ef41Sopenharmony_ci} 9111cb0ef41Sopenharmony_ci 9121cb0ef41Sopenharmony_ci/** 9131cb0ef41Sopenharmony_ci * @param {StreamBase} streamBase 9141cb0ef41Sopenharmony_ci * @param {QueuingStrategy} strategy 9151cb0ef41Sopenharmony_ci * @returns {ReadableStream} 9161cb0ef41Sopenharmony_ci */ 9171cb0ef41Sopenharmony_cifunction newReadableStreamFromStreamBase(streamBase, strategy, options = kEmptyObject) { 9181cb0ef41Sopenharmony_ci validateObject(streamBase, 'streamBase'); 9191cb0ef41Sopenharmony_ci validateObject(options, 'options'); 9201cb0ef41Sopenharmony_ci 9211cb0ef41Sopenharmony_ci const { 9221cb0ef41Sopenharmony_ci ondone = () => {}, 9231cb0ef41Sopenharmony_ci } = options; 9241cb0ef41Sopenharmony_ci 9251cb0ef41Sopenharmony_ci if (typeof streamBase.onread === 'function') 9261cb0ef41Sopenharmony_ci throw new ERR_INVALID_STATE('StreamBase already has a consumer'); 9271cb0ef41Sopenharmony_ci 9281cb0ef41Sopenharmony_ci validateFunction(ondone, 'options.ondone'); 9291cb0ef41Sopenharmony_ci 9301cb0ef41Sopenharmony_ci let controller; 9311cb0ef41Sopenharmony_ci 9321cb0ef41Sopenharmony_ci streamBase.onread = (arrayBuffer) => { 9331cb0ef41Sopenharmony_ci const nread = streamBaseState[kReadBytesOrError]; 9341cb0ef41Sopenharmony_ci 9351cb0ef41Sopenharmony_ci if (nread === 0) 9361cb0ef41Sopenharmony_ci return; 9371cb0ef41Sopenharmony_ci 9381cb0ef41Sopenharmony_ci try { 9391cb0ef41Sopenharmony_ci if (nread === UV_EOF) { 9401cb0ef41Sopenharmony_ci controller.close(); 9411cb0ef41Sopenharmony_ci streamBase.readStop(); 9421cb0ef41Sopenharmony_ci try { 9431cb0ef41Sopenharmony_ci ondone(); 9441cb0ef41Sopenharmony_ci } catch (error) { 9451cb0ef41Sopenharmony_ci controller.error(error); 9461cb0ef41Sopenharmony_ci } 9471cb0ef41Sopenharmony_ci return; 9481cb0ef41Sopenharmony_ci } 9491cb0ef41Sopenharmony_ci 9501cb0ef41Sopenharmony_ci controller.enqueue(arrayBuffer); 9511cb0ef41Sopenharmony_ci 9521cb0ef41Sopenharmony_ci if (controller.desiredSize <= 0) 9531cb0ef41Sopenharmony_ci streamBase.readStop(); 9541cb0ef41Sopenharmony_ci } catch (error) { 9551cb0ef41Sopenharmony_ci controller.error(error); 9561cb0ef41Sopenharmony_ci streamBase.readStop(); 9571cb0ef41Sopenharmony_ci } 9581cb0ef41Sopenharmony_ci }; 9591cb0ef41Sopenharmony_ci 9601cb0ef41Sopenharmony_ci return new ReadableStream({ 9611cb0ef41Sopenharmony_ci start(c) { controller = c; }, 9621cb0ef41Sopenharmony_ci 9631cb0ef41Sopenharmony_ci pull() { 9641cb0ef41Sopenharmony_ci streamBase.readStart(); 9651cb0ef41Sopenharmony_ci }, 9661cb0ef41Sopenharmony_ci 9671cb0ef41Sopenharmony_ci cancel() { 9681cb0ef41Sopenharmony_ci const promise = createDeferredPromise(); 9691cb0ef41Sopenharmony_ci try { 9701cb0ef41Sopenharmony_ci ondone(); 9711cb0ef41Sopenharmony_ci } catch (error) { 9721cb0ef41Sopenharmony_ci promise.reject(error); 9731cb0ef41Sopenharmony_ci return promise.promise; 9741cb0ef41Sopenharmony_ci } 9751cb0ef41Sopenharmony_ci const req = new ShutdownWrap(); 9761cb0ef41Sopenharmony_ci req.oncomplete = () => promise.resolve(); 9771cb0ef41Sopenharmony_ci const err = streamBase.shutdown(req); 9781cb0ef41Sopenharmony_ci if (err === 1) 9791cb0ef41Sopenharmony_ci promise.resolve(); 9801cb0ef41Sopenharmony_ci return promise.promise; 9811cb0ef41Sopenharmony_ci }, 9821cb0ef41Sopenharmony_ci }, strategy); 9831cb0ef41Sopenharmony_ci} 9841cb0ef41Sopenharmony_ci 9851cb0ef41Sopenharmony_cimodule.exports = { 9861cb0ef41Sopenharmony_ci newWritableStreamFromStreamWritable, 9871cb0ef41Sopenharmony_ci newReadableStreamFromStreamReadable, 9881cb0ef41Sopenharmony_ci newStreamWritableFromWritableStream, 9891cb0ef41Sopenharmony_ci newStreamReadableFromReadableStream, 9901cb0ef41Sopenharmony_ci newReadableWritablePairFromDuplex, 9911cb0ef41Sopenharmony_ci newStreamDuplexFromReadableWritablePair, 9921cb0ef41Sopenharmony_ci newWritableStreamFromStreamBase, 9931cb0ef41Sopenharmony_ci newReadableStreamFromStreamBase, 9941cb0ef41Sopenharmony_ci}; 995