11cb0ef41Sopenharmony_ci'use strict' 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst { 41cb0ef41Sopenharmony_ci Readable, 51cb0ef41Sopenharmony_ci Duplex, 61cb0ef41Sopenharmony_ci PassThrough 71cb0ef41Sopenharmony_ci} = require('stream') 81cb0ef41Sopenharmony_ciconst { 91cb0ef41Sopenharmony_ci InvalidArgumentError, 101cb0ef41Sopenharmony_ci InvalidReturnValueError, 111cb0ef41Sopenharmony_ci RequestAbortedError 121cb0ef41Sopenharmony_ci} = require('../core/errors') 131cb0ef41Sopenharmony_ciconst util = require('../core/util') 141cb0ef41Sopenharmony_ciconst { AsyncResource } = require('async_hooks') 151cb0ef41Sopenharmony_ciconst { addSignal, removeSignal } = require('./abort-signal') 161cb0ef41Sopenharmony_ciconst assert = require('assert') 171cb0ef41Sopenharmony_ci 181cb0ef41Sopenharmony_ciconst kResume = Symbol('resume') 191cb0ef41Sopenharmony_ci 201cb0ef41Sopenharmony_ciclass PipelineRequest extends Readable { 211cb0ef41Sopenharmony_ci constructor () { 221cb0ef41Sopenharmony_ci super({ autoDestroy: true }) 231cb0ef41Sopenharmony_ci 241cb0ef41Sopenharmony_ci this[kResume] = null 251cb0ef41Sopenharmony_ci } 261cb0ef41Sopenharmony_ci 271cb0ef41Sopenharmony_ci _read () { 281cb0ef41Sopenharmony_ci const { [kResume]: resume } = this 291cb0ef41Sopenharmony_ci 301cb0ef41Sopenharmony_ci if (resume) { 311cb0ef41Sopenharmony_ci this[kResume] = null 321cb0ef41Sopenharmony_ci resume() 331cb0ef41Sopenharmony_ci } 341cb0ef41Sopenharmony_ci } 351cb0ef41Sopenharmony_ci 361cb0ef41Sopenharmony_ci _destroy (err, callback) { 371cb0ef41Sopenharmony_ci this._read() 381cb0ef41Sopenharmony_ci 391cb0ef41Sopenharmony_ci callback(err) 401cb0ef41Sopenharmony_ci } 411cb0ef41Sopenharmony_ci} 421cb0ef41Sopenharmony_ci 431cb0ef41Sopenharmony_ciclass PipelineResponse extends Readable { 441cb0ef41Sopenharmony_ci constructor (resume) { 451cb0ef41Sopenharmony_ci super({ autoDestroy: true }) 461cb0ef41Sopenharmony_ci this[kResume] = resume 471cb0ef41Sopenharmony_ci } 481cb0ef41Sopenharmony_ci 491cb0ef41Sopenharmony_ci _read () { 501cb0ef41Sopenharmony_ci this[kResume]() 511cb0ef41Sopenharmony_ci } 521cb0ef41Sopenharmony_ci 531cb0ef41Sopenharmony_ci _destroy (err, callback) { 541cb0ef41Sopenharmony_ci if (!err && !this._readableState.endEmitted) { 551cb0ef41Sopenharmony_ci err = new RequestAbortedError() 561cb0ef41Sopenharmony_ci } 571cb0ef41Sopenharmony_ci 581cb0ef41Sopenharmony_ci callback(err) 591cb0ef41Sopenharmony_ci } 601cb0ef41Sopenharmony_ci} 611cb0ef41Sopenharmony_ci 621cb0ef41Sopenharmony_ciclass PipelineHandler extends AsyncResource { 631cb0ef41Sopenharmony_ci constructor (opts, handler) { 641cb0ef41Sopenharmony_ci if (!opts || typeof opts !== 'object') { 651cb0ef41Sopenharmony_ci throw new InvalidArgumentError('invalid opts') 661cb0ef41Sopenharmony_ci } 671cb0ef41Sopenharmony_ci 681cb0ef41Sopenharmony_ci if (typeof handler !== 'function') { 691cb0ef41Sopenharmony_ci throw new InvalidArgumentError('invalid handler') 701cb0ef41Sopenharmony_ci } 711cb0ef41Sopenharmony_ci 721cb0ef41Sopenharmony_ci const { signal, method, opaque, onInfo, responseHeaders } = opts 731cb0ef41Sopenharmony_ci 741cb0ef41Sopenharmony_ci if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') { 751cb0ef41Sopenharmony_ci throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget') 761cb0ef41Sopenharmony_ci } 771cb0ef41Sopenharmony_ci 781cb0ef41Sopenharmony_ci if (method === 'CONNECT') { 791cb0ef41Sopenharmony_ci throw new InvalidArgumentError('invalid method') 801cb0ef41Sopenharmony_ci } 811cb0ef41Sopenharmony_ci 821cb0ef41Sopenharmony_ci if (onInfo && typeof onInfo !== 'function') { 831cb0ef41Sopenharmony_ci throw new InvalidArgumentError('invalid onInfo callback') 841cb0ef41Sopenharmony_ci } 851cb0ef41Sopenharmony_ci 861cb0ef41Sopenharmony_ci super('UNDICI_PIPELINE') 871cb0ef41Sopenharmony_ci 881cb0ef41Sopenharmony_ci this.opaque = opaque || null 891cb0ef41Sopenharmony_ci this.responseHeaders = responseHeaders || null 901cb0ef41Sopenharmony_ci this.handler = handler 911cb0ef41Sopenharmony_ci this.abort = null 921cb0ef41Sopenharmony_ci this.context = null 931cb0ef41Sopenharmony_ci this.onInfo = onInfo || null 941cb0ef41Sopenharmony_ci 951cb0ef41Sopenharmony_ci this.req = new PipelineRequest().on('error', util.nop) 961cb0ef41Sopenharmony_ci 971cb0ef41Sopenharmony_ci this.ret = new Duplex({ 981cb0ef41Sopenharmony_ci readableObjectMode: opts.objectMode, 991cb0ef41Sopenharmony_ci autoDestroy: true, 1001cb0ef41Sopenharmony_ci read: () => { 1011cb0ef41Sopenharmony_ci const { body } = this 1021cb0ef41Sopenharmony_ci 1031cb0ef41Sopenharmony_ci if (body && body.resume) { 1041cb0ef41Sopenharmony_ci body.resume() 1051cb0ef41Sopenharmony_ci } 1061cb0ef41Sopenharmony_ci }, 1071cb0ef41Sopenharmony_ci write: (chunk, encoding, callback) => { 1081cb0ef41Sopenharmony_ci const { req } = this 1091cb0ef41Sopenharmony_ci 1101cb0ef41Sopenharmony_ci if (req.push(chunk, encoding) || req._readableState.destroyed) { 1111cb0ef41Sopenharmony_ci callback() 1121cb0ef41Sopenharmony_ci } else { 1131cb0ef41Sopenharmony_ci req[kResume] = callback 1141cb0ef41Sopenharmony_ci } 1151cb0ef41Sopenharmony_ci }, 1161cb0ef41Sopenharmony_ci destroy: (err, callback) => { 1171cb0ef41Sopenharmony_ci const { body, req, res, ret, abort } = this 1181cb0ef41Sopenharmony_ci 1191cb0ef41Sopenharmony_ci if (!err && !ret._readableState.endEmitted) { 1201cb0ef41Sopenharmony_ci err = new RequestAbortedError() 1211cb0ef41Sopenharmony_ci } 1221cb0ef41Sopenharmony_ci 1231cb0ef41Sopenharmony_ci if (abort && err) { 1241cb0ef41Sopenharmony_ci abort() 1251cb0ef41Sopenharmony_ci } 1261cb0ef41Sopenharmony_ci 1271cb0ef41Sopenharmony_ci util.destroy(body, err) 1281cb0ef41Sopenharmony_ci util.destroy(req, err) 1291cb0ef41Sopenharmony_ci util.destroy(res, err) 1301cb0ef41Sopenharmony_ci 1311cb0ef41Sopenharmony_ci removeSignal(this) 1321cb0ef41Sopenharmony_ci 1331cb0ef41Sopenharmony_ci callback(err) 1341cb0ef41Sopenharmony_ci } 1351cb0ef41Sopenharmony_ci }).on('prefinish', () => { 1361cb0ef41Sopenharmony_ci const { req } = this 1371cb0ef41Sopenharmony_ci 1381cb0ef41Sopenharmony_ci // Node < 15 does not call _final in same tick. 1391cb0ef41Sopenharmony_ci req.push(null) 1401cb0ef41Sopenharmony_ci }) 1411cb0ef41Sopenharmony_ci 1421cb0ef41Sopenharmony_ci this.res = null 1431cb0ef41Sopenharmony_ci 1441cb0ef41Sopenharmony_ci addSignal(this, signal) 1451cb0ef41Sopenharmony_ci } 1461cb0ef41Sopenharmony_ci 1471cb0ef41Sopenharmony_ci onConnect (abort, context) { 1481cb0ef41Sopenharmony_ci const { ret, res } = this 1491cb0ef41Sopenharmony_ci 1501cb0ef41Sopenharmony_ci assert(!res, 'pipeline cannot be retried') 1511cb0ef41Sopenharmony_ci 1521cb0ef41Sopenharmony_ci if (ret.destroyed) { 1531cb0ef41Sopenharmony_ci throw new RequestAbortedError() 1541cb0ef41Sopenharmony_ci } 1551cb0ef41Sopenharmony_ci 1561cb0ef41Sopenharmony_ci this.abort = abort 1571cb0ef41Sopenharmony_ci this.context = context 1581cb0ef41Sopenharmony_ci } 1591cb0ef41Sopenharmony_ci 1601cb0ef41Sopenharmony_ci onHeaders (statusCode, rawHeaders, resume) { 1611cb0ef41Sopenharmony_ci const { opaque, handler, context } = this 1621cb0ef41Sopenharmony_ci 1631cb0ef41Sopenharmony_ci if (statusCode < 200) { 1641cb0ef41Sopenharmony_ci if (this.onInfo) { 1651cb0ef41Sopenharmony_ci const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) 1661cb0ef41Sopenharmony_ci this.onInfo({ statusCode, headers }) 1671cb0ef41Sopenharmony_ci } 1681cb0ef41Sopenharmony_ci return 1691cb0ef41Sopenharmony_ci } 1701cb0ef41Sopenharmony_ci 1711cb0ef41Sopenharmony_ci this.res = new PipelineResponse(resume) 1721cb0ef41Sopenharmony_ci 1731cb0ef41Sopenharmony_ci let body 1741cb0ef41Sopenharmony_ci try { 1751cb0ef41Sopenharmony_ci this.handler = null 1761cb0ef41Sopenharmony_ci const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders) 1771cb0ef41Sopenharmony_ci body = this.runInAsyncScope(handler, null, { 1781cb0ef41Sopenharmony_ci statusCode, 1791cb0ef41Sopenharmony_ci headers, 1801cb0ef41Sopenharmony_ci opaque, 1811cb0ef41Sopenharmony_ci body: this.res, 1821cb0ef41Sopenharmony_ci context 1831cb0ef41Sopenharmony_ci }) 1841cb0ef41Sopenharmony_ci } catch (err) { 1851cb0ef41Sopenharmony_ci this.res.on('error', util.nop) 1861cb0ef41Sopenharmony_ci throw err 1871cb0ef41Sopenharmony_ci } 1881cb0ef41Sopenharmony_ci 1891cb0ef41Sopenharmony_ci if (!body || typeof body.on !== 'function') { 1901cb0ef41Sopenharmony_ci throw new InvalidReturnValueError('expected Readable') 1911cb0ef41Sopenharmony_ci } 1921cb0ef41Sopenharmony_ci 1931cb0ef41Sopenharmony_ci body 1941cb0ef41Sopenharmony_ci .on('data', (chunk) => { 1951cb0ef41Sopenharmony_ci const { ret, body } = this 1961cb0ef41Sopenharmony_ci 1971cb0ef41Sopenharmony_ci if (!ret.push(chunk) && body.pause) { 1981cb0ef41Sopenharmony_ci body.pause() 1991cb0ef41Sopenharmony_ci } 2001cb0ef41Sopenharmony_ci }) 2011cb0ef41Sopenharmony_ci .on('error', (err) => { 2021cb0ef41Sopenharmony_ci const { ret } = this 2031cb0ef41Sopenharmony_ci 2041cb0ef41Sopenharmony_ci util.destroy(ret, err) 2051cb0ef41Sopenharmony_ci }) 2061cb0ef41Sopenharmony_ci .on('end', () => { 2071cb0ef41Sopenharmony_ci const { ret } = this 2081cb0ef41Sopenharmony_ci 2091cb0ef41Sopenharmony_ci ret.push(null) 2101cb0ef41Sopenharmony_ci }) 2111cb0ef41Sopenharmony_ci .on('close', () => { 2121cb0ef41Sopenharmony_ci const { ret } = this 2131cb0ef41Sopenharmony_ci 2141cb0ef41Sopenharmony_ci if (!ret._readableState.ended) { 2151cb0ef41Sopenharmony_ci util.destroy(ret, new RequestAbortedError()) 2161cb0ef41Sopenharmony_ci } 2171cb0ef41Sopenharmony_ci }) 2181cb0ef41Sopenharmony_ci 2191cb0ef41Sopenharmony_ci this.body = body 2201cb0ef41Sopenharmony_ci } 2211cb0ef41Sopenharmony_ci 2221cb0ef41Sopenharmony_ci onData (chunk) { 2231cb0ef41Sopenharmony_ci const { res } = this 2241cb0ef41Sopenharmony_ci return res.push(chunk) 2251cb0ef41Sopenharmony_ci } 2261cb0ef41Sopenharmony_ci 2271cb0ef41Sopenharmony_ci onComplete (trailers) { 2281cb0ef41Sopenharmony_ci const { res } = this 2291cb0ef41Sopenharmony_ci res.push(null) 2301cb0ef41Sopenharmony_ci } 2311cb0ef41Sopenharmony_ci 2321cb0ef41Sopenharmony_ci onError (err) { 2331cb0ef41Sopenharmony_ci const { ret } = this 2341cb0ef41Sopenharmony_ci this.handler = null 2351cb0ef41Sopenharmony_ci util.destroy(ret, err) 2361cb0ef41Sopenharmony_ci } 2371cb0ef41Sopenharmony_ci} 2381cb0ef41Sopenharmony_ci 2391cb0ef41Sopenharmony_cifunction pipeline (opts, handler) { 2401cb0ef41Sopenharmony_ci try { 2411cb0ef41Sopenharmony_ci const pipelineHandler = new PipelineHandler(opts, handler) 2421cb0ef41Sopenharmony_ci this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler) 2431cb0ef41Sopenharmony_ci return pipelineHandler.ret 2441cb0ef41Sopenharmony_ci } catch (err) { 2451cb0ef41Sopenharmony_ci return new PassThrough().destroy(err) 2461cb0ef41Sopenharmony_ci } 2471cb0ef41Sopenharmony_ci} 2481cb0ef41Sopenharmony_ci 2491cb0ef41Sopenharmony_cimodule.exports = pipeline 250