1'use strict'
2
3const Readable = require('./readable')
4const {
5  InvalidArgumentError,
6  RequestAbortedError
7} = require('../core/errors')
8const util = require('../core/util')
9const { getResolveErrorBodyCallback } = require('./util')
10const { AsyncResource } = require('async_hooks')
11const { addSignal, removeSignal } = require('./abort-signal')
12
13class RequestHandler extends AsyncResource {
14  constructor (opts, callback) {
15    if (!opts || typeof opts !== 'object') {
16      throw new InvalidArgumentError('invalid opts')
17    }
18
19    const { signal, method, opaque, body, onInfo, responseHeaders, throwOnError, highWaterMark } = opts
20
21    try {
22      if (typeof callback !== 'function') {
23        throw new InvalidArgumentError('invalid callback')
24      }
25
26      if (highWaterMark && (typeof highWaterMark !== 'number' || highWaterMark < 0)) {
27        throw new InvalidArgumentError('invalid highWaterMark')
28      }
29
30      if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
31        throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
32      }
33
34      if (method === 'CONNECT') {
35        throw new InvalidArgumentError('invalid method')
36      }
37
38      if (onInfo && typeof onInfo !== 'function') {
39        throw new InvalidArgumentError('invalid onInfo callback')
40      }
41
42      super('UNDICI_REQUEST')
43    } catch (err) {
44      if (util.isStream(body)) {
45        util.destroy(body.on('error', util.nop), err)
46      }
47      throw err
48    }
49
50    this.responseHeaders = responseHeaders || null
51    this.opaque = opaque || null
52    this.callback = callback
53    this.res = null
54    this.abort = null
55    this.body = body
56    this.trailers = {}
57    this.context = null
58    this.onInfo = onInfo || null
59    this.throwOnError = throwOnError
60    this.highWaterMark = highWaterMark
61
62    if (util.isStream(body)) {
63      body.on('error', (err) => {
64        this.onError(err)
65      })
66    }
67
68    addSignal(this, signal)
69  }
70
71  onConnect (abort, context) {
72    if (!this.callback) {
73      throw new RequestAbortedError()
74    }
75
76    this.abort = abort
77    this.context = context
78  }
79
80  onHeaders (statusCode, rawHeaders, resume, statusMessage) {
81    const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this
82
83    const headers = responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
84
85    if (statusCode < 200) {
86      if (this.onInfo) {
87        this.onInfo({ statusCode, headers })
88      }
89      return
90    }
91
92    const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers
93    const contentType = parsedHeaders['content-type']
94    const body = new Readable({ resume, abort, contentType, highWaterMark })
95
96    this.callback = null
97    this.res = body
98    if (callback !== null) {
99      if (this.throwOnError && statusCode >= 400) {
100        this.runInAsyncScope(getResolveErrorBodyCallback, null,
101          { callback, body, contentType, statusCode, statusMessage, headers }
102        )
103      } else {
104        this.runInAsyncScope(callback, null, null, {
105          statusCode,
106          headers,
107          trailers: this.trailers,
108          opaque,
109          body,
110          context
111        })
112      }
113    }
114  }
115
116  onData (chunk) {
117    const { res } = this
118    return res.push(chunk)
119  }
120
121  onComplete (trailers) {
122    const { res } = this
123
124    removeSignal(this)
125
126    util.parseHeaders(trailers, this.trailers)
127
128    res.push(null)
129  }
130
131  onError (err) {
132    const { res, callback, body, opaque } = this
133
134    removeSignal(this)
135
136    if (callback) {
137      // TODO: Does this need queueMicrotask?
138      this.callback = null
139      queueMicrotask(() => {
140        this.runInAsyncScope(callback, null, err, { opaque })
141      })
142    }
143
144    if (res) {
145      this.res = null
146      // Ensure all queued handlers are invoked before destroying res.
147      queueMicrotask(() => {
148        util.destroy(res, err)
149      })
150    }
151
152    if (body) {
153      this.body = null
154      util.destroy(body, err)
155    }
156  }
157}
158
159function request (opts, callback) {
160  if (callback === undefined) {
161    return new Promise((resolve, reject) => {
162      request.call(this, opts, (err, data) => {
163        return err ? reject(err) : resolve(data)
164      })
165    })
166  }
167
168  try {
169    this.dispatch(opts, new RequestHandler(opts, callback))
170  } catch (err) {
171    if (typeof callback !== 'function') {
172      throw err
173    }
174    const opaque = opts && opts.opaque
175    queueMicrotask(() => callback(err, { opaque }))
176  }
177}
178
179module.exports = request
180module.exports.RequestHandler = RequestHandler
181