11cb0ef41Sopenharmony_ci'use strict'
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst {
41cb0ef41Sopenharmony_ci  Readable,
51cb0ef41Sopenharmony_ci  Duplex,
61cb0ef41Sopenharmony_ci  PassThrough
71cb0ef41Sopenharmony_ci} = require('stream')
81cb0ef41Sopenharmony_ciconst {
91cb0ef41Sopenharmony_ci  InvalidArgumentError,
101cb0ef41Sopenharmony_ci  InvalidReturnValueError,
111cb0ef41Sopenharmony_ci  RequestAbortedError
121cb0ef41Sopenharmony_ci} = require('../core/errors')
131cb0ef41Sopenharmony_ciconst util = require('../core/util')
141cb0ef41Sopenharmony_ciconst { AsyncResource } = require('async_hooks')
151cb0ef41Sopenharmony_ciconst { addSignal, removeSignal } = require('./abort-signal')
161cb0ef41Sopenharmony_ciconst assert = require('assert')
171cb0ef41Sopenharmony_ci
181cb0ef41Sopenharmony_ciconst kResume = Symbol('resume')
191cb0ef41Sopenharmony_ci
201cb0ef41Sopenharmony_ciclass PipelineRequest extends Readable {
211cb0ef41Sopenharmony_ci  constructor () {
221cb0ef41Sopenharmony_ci    super({ autoDestroy: true })
231cb0ef41Sopenharmony_ci
241cb0ef41Sopenharmony_ci    this[kResume] = null
251cb0ef41Sopenharmony_ci  }
261cb0ef41Sopenharmony_ci
271cb0ef41Sopenharmony_ci  _read () {
281cb0ef41Sopenharmony_ci    const { [kResume]: resume } = this
291cb0ef41Sopenharmony_ci
301cb0ef41Sopenharmony_ci    if (resume) {
311cb0ef41Sopenharmony_ci      this[kResume] = null
321cb0ef41Sopenharmony_ci      resume()
331cb0ef41Sopenharmony_ci    }
341cb0ef41Sopenharmony_ci  }
351cb0ef41Sopenharmony_ci
361cb0ef41Sopenharmony_ci  _destroy (err, callback) {
371cb0ef41Sopenharmony_ci    this._read()
381cb0ef41Sopenharmony_ci
391cb0ef41Sopenharmony_ci    callback(err)
401cb0ef41Sopenharmony_ci  }
411cb0ef41Sopenharmony_ci}
421cb0ef41Sopenharmony_ci
431cb0ef41Sopenharmony_ciclass PipelineResponse extends Readable {
441cb0ef41Sopenharmony_ci  constructor (resume) {
451cb0ef41Sopenharmony_ci    super({ autoDestroy: true })
461cb0ef41Sopenharmony_ci    this[kResume] = resume
471cb0ef41Sopenharmony_ci  }
481cb0ef41Sopenharmony_ci
491cb0ef41Sopenharmony_ci  _read () {
501cb0ef41Sopenharmony_ci    this[kResume]()
511cb0ef41Sopenharmony_ci  }
521cb0ef41Sopenharmony_ci
531cb0ef41Sopenharmony_ci  _destroy (err, callback) {
541cb0ef41Sopenharmony_ci    if (!err && !this._readableState.endEmitted) {
551cb0ef41Sopenharmony_ci      err = new RequestAbortedError()
561cb0ef41Sopenharmony_ci    }
571cb0ef41Sopenharmony_ci
581cb0ef41Sopenharmony_ci    callback(err)
591cb0ef41Sopenharmony_ci  }
601cb0ef41Sopenharmony_ci}
611cb0ef41Sopenharmony_ci
621cb0ef41Sopenharmony_ciclass PipelineHandler extends AsyncResource {
631cb0ef41Sopenharmony_ci  constructor (opts, handler) {
641cb0ef41Sopenharmony_ci    if (!opts || typeof opts !== 'object') {
651cb0ef41Sopenharmony_ci      throw new InvalidArgumentError('invalid opts')
661cb0ef41Sopenharmony_ci    }
671cb0ef41Sopenharmony_ci
681cb0ef41Sopenharmony_ci    if (typeof handler !== 'function') {
691cb0ef41Sopenharmony_ci      throw new InvalidArgumentError('invalid handler')
701cb0ef41Sopenharmony_ci    }
711cb0ef41Sopenharmony_ci
721cb0ef41Sopenharmony_ci    const { signal, method, opaque, onInfo, responseHeaders } = opts
731cb0ef41Sopenharmony_ci
741cb0ef41Sopenharmony_ci    if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
751cb0ef41Sopenharmony_ci      throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
761cb0ef41Sopenharmony_ci    }
771cb0ef41Sopenharmony_ci
781cb0ef41Sopenharmony_ci    if (method === 'CONNECT') {
791cb0ef41Sopenharmony_ci      throw new InvalidArgumentError('invalid method')
801cb0ef41Sopenharmony_ci    }
811cb0ef41Sopenharmony_ci
821cb0ef41Sopenharmony_ci    if (onInfo && typeof onInfo !== 'function') {
831cb0ef41Sopenharmony_ci      throw new InvalidArgumentError('invalid onInfo callback')
841cb0ef41Sopenharmony_ci    }
851cb0ef41Sopenharmony_ci
861cb0ef41Sopenharmony_ci    super('UNDICI_PIPELINE')
871cb0ef41Sopenharmony_ci
881cb0ef41Sopenharmony_ci    this.opaque = opaque || null
891cb0ef41Sopenharmony_ci    this.responseHeaders = responseHeaders || null
901cb0ef41Sopenharmony_ci    this.handler = handler
911cb0ef41Sopenharmony_ci    this.abort = null
921cb0ef41Sopenharmony_ci    this.context = null
931cb0ef41Sopenharmony_ci    this.onInfo = onInfo || null
941cb0ef41Sopenharmony_ci
951cb0ef41Sopenharmony_ci    this.req = new PipelineRequest().on('error', util.nop)
961cb0ef41Sopenharmony_ci
971cb0ef41Sopenharmony_ci    this.ret = new Duplex({
981cb0ef41Sopenharmony_ci      readableObjectMode: opts.objectMode,
991cb0ef41Sopenharmony_ci      autoDestroy: true,
1001cb0ef41Sopenharmony_ci      read: () => {
1011cb0ef41Sopenharmony_ci        const { body } = this
1021cb0ef41Sopenharmony_ci
1031cb0ef41Sopenharmony_ci        if (body && body.resume) {
1041cb0ef41Sopenharmony_ci          body.resume()
1051cb0ef41Sopenharmony_ci        }
1061cb0ef41Sopenharmony_ci      },
1071cb0ef41Sopenharmony_ci      write: (chunk, encoding, callback) => {
1081cb0ef41Sopenharmony_ci        const { req } = this
1091cb0ef41Sopenharmony_ci
1101cb0ef41Sopenharmony_ci        if (req.push(chunk, encoding) || req._readableState.destroyed) {
1111cb0ef41Sopenharmony_ci          callback()
1121cb0ef41Sopenharmony_ci        } else {
1131cb0ef41Sopenharmony_ci          req[kResume] = callback
1141cb0ef41Sopenharmony_ci        }
1151cb0ef41Sopenharmony_ci      },
1161cb0ef41Sopenharmony_ci      destroy: (err, callback) => {
1171cb0ef41Sopenharmony_ci        const { body, req, res, ret, abort } = this
1181cb0ef41Sopenharmony_ci
1191cb0ef41Sopenharmony_ci        if (!err && !ret._readableState.endEmitted) {
1201cb0ef41Sopenharmony_ci          err = new RequestAbortedError()
1211cb0ef41Sopenharmony_ci        }
1221cb0ef41Sopenharmony_ci
1231cb0ef41Sopenharmony_ci        if (abort && err) {
1241cb0ef41Sopenharmony_ci          abort()
1251cb0ef41Sopenharmony_ci        }
1261cb0ef41Sopenharmony_ci
1271cb0ef41Sopenharmony_ci        util.destroy(body, err)
1281cb0ef41Sopenharmony_ci        util.destroy(req, err)
1291cb0ef41Sopenharmony_ci        util.destroy(res, err)
1301cb0ef41Sopenharmony_ci
1311cb0ef41Sopenharmony_ci        removeSignal(this)
1321cb0ef41Sopenharmony_ci
1331cb0ef41Sopenharmony_ci        callback(err)
1341cb0ef41Sopenharmony_ci      }
1351cb0ef41Sopenharmony_ci    }).on('prefinish', () => {
1361cb0ef41Sopenharmony_ci      const { req } = this
1371cb0ef41Sopenharmony_ci
1381cb0ef41Sopenharmony_ci      // Node < 15 does not call _final in same tick.
1391cb0ef41Sopenharmony_ci      req.push(null)
1401cb0ef41Sopenharmony_ci    })
1411cb0ef41Sopenharmony_ci
1421cb0ef41Sopenharmony_ci    this.res = null
1431cb0ef41Sopenharmony_ci
1441cb0ef41Sopenharmony_ci    addSignal(this, signal)
1451cb0ef41Sopenharmony_ci  }
1461cb0ef41Sopenharmony_ci
1471cb0ef41Sopenharmony_ci  onConnect (abort, context) {
1481cb0ef41Sopenharmony_ci    const { ret, res } = this
1491cb0ef41Sopenharmony_ci
1501cb0ef41Sopenharmony_ci    assert(!res, 'pipeline cannot be retried')
1511cb0ef41Sopenharmony_ci
1521cb0ef41Sopenharmony_ci    if (ret.destroyed) {
1531cb0ef41Sopenharmony_ci      throw new RequestAbortedError()
1541cb0ef41Sopenharmony_ci    }
1551cb0ef41Sopenharmony_ci
1561cb0ef41Sopenharmony_ci    this.abort = abort
1571cb0ef41Sopenharmony_ci    this.context = context
1581cb0ef41Sopenharmony_ci  }
1591cb0ef41Sopenharmony_ci
1601cb0ef41Sopenharmony_ci  onHeaders (statusCode, rawHeaders, resume) {
1611cb0ef41Sopenharmony_ci    const { opaque, handler, context } = this
1621cb0ef41Sopenharmony_ci
1631cb0ef41Sopenharmony_ci    if (statusCode < 200) {
1641cb0ef41Sopenharmony_ci      if (this.onInfo) {
1651cb0ef41Sopenharmony_ci        const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
1661cb0ef41Sopenharmony_ci        this.onInfo({ statusCode, headers })
1671cb0ef41Sopenharmony_ci      }
1681cb0ef41Sopenharmony_ci      return
1691cb0ef41Sopenharmony_ci    }
1701cb0ef41Sopenharmony_ci
1711cb0ef41Sopenharmony_ci    this.res = new PipelineResponse(resume)
1721cb0ef41Sopenharmony_ci
1731cb0ef41Sopenharmony_ci    let body
1741cb0ef41Sopenharmony_ci    try {
1751cb0ef41Sopenharmony_ci      this.handler = null
1761cb0ef41Sopenharmony_ci      const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
1771cb0ef41Sopenharmony_ci      body = this.runInAsyncScope(handler, null, {
1781cb0ef41Sopenharmony_ci        statusCode,
1791cb0ef41Sopenharmony_ci        headers,
1801cb0ef41Sopenharmony_ci        opaque,
1811cb0ef41Sopenharmony_ci        body: this.res,
1821cb0ef41Sopenharmony_ci        context
1831cb0ef41Sopenharmony_ci      })
1841cb0ef41Sopenharmony_ci    } catch (err) {
1851cb0ef41Sopenharmony_ci      this.res.on('error', util.nop)
1861cb0ef41Sopenharmony_ci      throw err
1871cb0ef41Sopenharmony_ci    }
1881cb0ef41Sopenharmony_ci
1891cb0ef41Sopenharmony_ci    if (!body || typeof body.on !== 'function') {
1901cb0ef41Sopenharmony_ci      throw new InvalidReturnValueError('expected Readable')
1911cb0ef41Sopenharmony_ci    }
1921cb0ef41Sopenharmony_ci
1931cb0ef41Sopenharmony_ci    body
1941cb0ef41Sopenharmony_ci      .on('data', (chunk) => {
1951cb0ef41Sopenharmony_ci        const { ret, body } = this
1961cb0ef41Sopenharmony_ci
1971cb0ef41Sopenharmony_ci        if (!ret.push(chunk) && body.pause) {
1981cb0ef41Sopenharmony_ci          body.pause()
1991cb0ef41Sopenharmony_ci        }
2001cb0ef41Sopenharmony_ci      })
2011cb0ef41Sopenharmony_ci      .on('error', (err) => {
2021cb0ef41Sopenharmony_ci        const { ret } = this
2031cb0ef41Sopenharmony_ci
2041cb0ef41Sopenharmony_ci        util.destroy(ret, err)
2051cb0ef41Sopenharmony_ci      })
2061cb0ef41Sopenharmony_ci      .on('end', () => {
2071cb0ef41Sopenharmony_ci        const { ret } = this
2081cb0ef41Sopenharmony_ci
2091cb0ef41Sopenharmony_ci        ret.push(null)
2101cb0ef41Sopenharmony_ci      })
2111cb0ef41Sopenharmony_ci      .on('close', () => {
2121cb0ef41Sopenharmony_ci        const { ret } = this
2131cb0ef41Sopenharmony_ci
2141cb0ef41Sopenharmony_ci        if (!ret._readableState.ended) {
2151cb0ef41Sopenharmony_ci          util.destroy(ret, new RequestAbortedError())
2161cb0ef41Sopenharmony_ci        }
2171cb0ef41Sopenharmony_ci      })
2181cb0ef41Sopenharmony_ci
2191cb0ef41Sopenharmony_ci    this.body = body
2201cb0ef41Sopenharmony_ci  }
2211cb0ef41Sopenharmony_ci
2221cb0ef41Sopenharmony_ci  onData (chunk) {
2231cb0ef41Sopenharmony_ci    const { res } = this
2241cb0ef41Sopenharmony_ci    return res.push(chunk)
2251cb0ef41Sopenharmony_ci  }
2261cb0ef41Sopenharmony_ci
2271cb0ef41Sopenharmony_ci  onComplete (trailers) {
2281cb0ef41Sopenharmony_ci    const { res } = this
2291cb0ef41Sopenharmony_ci    res.push(null)
2301cb0ef41Sopenharmony_ci  }
2311cb0ef41Sopenharmony_ci
2321cb0ef41Sopenharmony_ci  onError (err) {
2331cb0ef41Sopenharmony_ci    const { ret } = this
2341cb0ef41Sopenharmony_ci    this.handler = null
2351cb0ef41Sopenharmony_ci    util.destroy(ret, err)
2361cb0ef41Sopenharmony_ci  }
2371cb0ef41Sopenharmony_ci}
2381cb0ef41Sopenharmony_ci
2391cb0ef41Sopenharmony_cifunction pipeline (opts, handler) {
2401cb0ef41Sopenharmony_ci  try {
2411cb0ef41Sopenharmony_ci    const pipelineHandler = new PipelineHandler(opts, handler)
2421cb0ef41Sopenharmony_ci    this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
2431cb0ef41Sopenharmony_ci    return pipelineHandler.ret
2441cb0ef41Sopenharmony_ci  } catch (err) {
2451cb0ef41Sopenharmony_ci    return new PassThrough().destroy(err)
2461cb0ef41Sopenharmony_ci  }
2471cb0ef41Sopenharmony_ci}
2481cb0ef41Sopenharmony_ci
2491cb0ef41Sopenharmony_cimodule.exports = pipeline
250