1'use strict'
2
3const { finished, PassThrough } = require('stream')
4const {
5  InvalidArgumentError,
6  InvalidReturnValueError,
7  RequestAbortedError
8} = require('../core/errors')
9const util = require('../core/util')
10const { getResolveErrorBodyCallback } = require('./util')
11const { AsyncResource } = require('async_hooks')
12const { addSignal, removeSignal } = require('./abort-signal')
13
14class StreamHandler extends AsyncResource {
15  constructor (opts, factory, callback) {
16    if (!opts || typeof opts !== 'object') {
17      throw new InvalidArgumentError('invalid opts')
18    }
19
20    const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError } = opts
21
22    try {
23      if (typeof callback !== 'function') {
24        throw new InvalidArgumentError('invalid callback')
25      }
26
27      if (typeof factory !== 'function') {
28        throw new InvalidArgumentError('invalid factory')
29      }
30
31      if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
32        throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
33      }
34
35      if (method === 'CONNECT') {
36        throw new InvalidArgumentError('invalid method')
37      }
38
39      if (onInfo && typeof onInfo !== 'function') {
40        throw new InvalidArgumentError('invalid onInfo callback')
41      }
42
43      super('UNDICI_STREAM')
44    } catch (err) {
45      if (util.isStream(body)) {
46        util.destroy(body.on('error', util.nop), err)
47      }
48      throw err
49    }
50
51    this.responseHeaders = responseHeaders || null
52    this.opaque = opaque || null
53    this.factory = factory
54    this.callback = callback
55    this.res = null
56    this.abort = null
57    this.context = null
58    this.trailers = null
59    this.body = body
60    this.onInfo = onInfo || null
61    this.throwOnError = throwOnError || false
62
63    if (util.isStream(body)) {
64      body.on('error', (err) => {
65        this.onError(err)
66      })
67    }
68
69    addSignal(this, signal)
70  }
71
72  onConnect (abort, context) {
73    if (!this.callback) {
74      throw new RequestAbortedError()
75    }
76
77    this.abort = abort
78    this.context = context
79  }
80
81  onHeaders (statusCode, rawHeaders, resume, statusMessage) {
82    const { factory, opaque, context, callback, responseHeaders } = this
83
84    const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
85
86    if (statusCode < 200) {
87      if (this.onInfo) {
88        this.onInfo({ statusCode, headers })
89      }
90      return
91    }
92
93    this.factory = null
94
95    let res
96
97    if (this.throwOnError && statusCode >= 400) {
98      const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
99      const contentType = parsedHeaders['content-type']
100      res = new PassThrough()
101
102      this.callback = null
103      this.runInAsyncScope(getResolveErrorBodyCallback, null,
104        { callback, body: res, contentType, statusCode, statusMessage, headers }
105      )
106    } else {
107      if (factory === null) {
108        return
109      }
110
111      res = this.runInAsyncScope(factory, null, {
112        statusCode,
113        headers,
114        opaque,
115        context
116      })
117
118      if (
119        !res ||
120        typeof res.write !== 'function' ||
121        typeof res.end !== 'function' ||
122        typeof res.on !== 'function'
123      ) {
124        throw new InvalidReturnValueError('expected Writable')
125      }
126
127      // TODO: Avoid finished. It registers an unnecessary amount of listeners.
128      finished(res, { readable: false }, (err) => {
129        const { callback, res, opaque, trailers, abort } = this
130
131        this.res = null
132        if (err || !res.readable) {
133          util.destroy(res, err)
134        }
135
136        this.callback = null
137        this.runInAsyncScope(callback, null, err || null, { opaque, trailers })
138
139        if (err) {
140          abort()
141        }
142      })
143    }
144
145    res.on('drain', resume)
146
147    this.res = res
148
149    const needDrain = res.writableNeedDrain !== undefined
150      ? res.writableNeedDrain
151      : res._writableState && res._writableState.needDrain
152
153    return needDrain !== true
154  }
155
156  onData (chunk) {
157    const { res } = this
158
159    return res ? res.write(chunk) : true
160  }
161
162  onComplete (trailers) {
163    const { res } = this
164
165    removeSignal(this)
166
167    if (!res) {
168      return
169    }
170
171    this.trailers = util.parseHeaders(trailers)
172
173    res.end()
174  }
175
176  onError (err) {
177    const { res, callback, opaque, body } = this
178
179    removeSignal(this)
180
181    this.factory = null
182
183    if (res) {
184      this.res = null
185      util.destroy(res, err)
186    } else if (callback) {
187      this.callback = null
188      queueMicrotask(() => {
189        this.runInAsyncScope(callback, null, err, { opaque })
190      })
191    }
192
193    if (body) {
194      this.body = null
195      util.destroy(body, err)
196    }
197  }
198}
199
200function stream (opts, factory, callback) {
201  if (callback === undefined) {
202    return new Promise((resolve, reject) => {
203      stream.call(this, opts, factory, (err, data) => {
204        return err ? reject(err) : resolve(data)
205      })
206    })
207  }
208
209  try {
210    this.dispatch(opts, new StreamHandler(opts, factory, callback))
211  } catch (err) {
212    if (typeof callback !== 'function') {
213      throw err
214    }
215    const opaque = opts && opts.opaque
216    queueMicrotask(() => callback(err, { opaque }))
217  }
218}
219
220module.exports = stream
221