1'use strict'
2
3const { Writable } = require('stream')
4const diagnosticsChannel = require('diagnostics_channel')
5const { parserStates, opcodes, states, emptyBuffer } = require('./constants')
6const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols')
7const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util')
8const { WebsocketFrameSend } = require('./frame')
9
10// This code was influenced by ws released under the MIT license.
11// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com>
12// Copyright (c) 2013 Arnout Kazemier and contributors
13// Copyright (c) 2016 Luigi Pinca and contributors
14
15const channels = {}
16channels.ping = diagnosticsChannel.channel('undici:websocket:ping')
17channels.pong = diagnosticsChannel.channel('undici:websocket:pong')
18
19class ByteParser extends Writable {
20  #buffers = []
21  #byteOffset = 0
22
23  #state = parserStates.INFO
24
25  #info = {}
26  #fragments = []
27
28  constructor (ws) {
29    super()
30
31    this.ws = ws
32  }
33
34  /**
35   * @param {Buffer} chunk
36   * @param {() => void} callback
37   */
38  _write (chunk, _, callback) {
39    this.#buffers.push(chunk)
40    this.#byteOffset += chunk.length
41
42    this.run(callback)
43  }
44
45  /**
46   * Runs whenever a new chunk is received.
47   * Callback is called whenever there are no more chunks buffering,
48   * or not enough bytes are buffered to parse.
49   */
50  run (callback) {
51    while (true) {
52      if (this.#state === parserStates.INFO) {
53        // If there aren't enough bytes to parse the payload length, etc.
54        if (this.#byteOffset < 2) {
55          return callback()
56        }
57
58        const buffer = this.consume(2)
59
60        this.#info.fin = (buffer[0] & 0x80) !== 0
61        this.#info.opcode = buffer[0] & 0x0F
62
63        // If we receive a fragmented message, we use the type of the first
64        // frame to parse the full message as binary/text, when it's terminated
65        this.#info.originalOpcode ??= this.#info.opcode
66
67        this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION
68
69        if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) {
70          // Only text and binary frames can be fragmented
71          failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.')
72          return
73        }
74
75        const payloadLength = buffer[1] & 0x7F
76
77        if (payloadLength <= 125) {
78          this.#info.payloadLength = payloadLength
79          this.#state = parserStates.READ_DATA
80        } else if (payloadLength === 126) {
81          this.#state = parserStates.PAYLOADLENGTH_16
82        } else if (payloadLength === 127) {
83          this.#state = parserStates.PAYLOADLENGTH_64
84        }
85
86        if (this.#info.fragmented && payloadLength > 125) {
87          // A fragmented frame can't be fragmented itself
88          failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.')
89          return
90        } else if (
91          (this.#info.opcode === opcodes.PING ||
92            this.#info.opcode === opcodes.PONG ||
93            this.#info.opcode === opcodes.CLOSE) &&
94          payloadLength > 125
95        ) {
96          // Control frames can have a payload length of 125 bytes MAX
97          failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.')
98          return
99        } else if (this.#info.opcode === opcodes.CLOSE) {
100          if (payloadLength === 1) {
101            failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.')
102            return
103          }
104
105          const body = this.consume(payloadLength)
106
107          this.#info.closeInfo = this.parseCloseBody(false, body)
108
109          if (!this.ws[kSentClose]) {
110            // If an endpoint receives a Close frame and did not previously send a
111            // Close frame, the endpoint MUST send a Close frame in response.  (When
112            // sending a Close frame in response, the endpoint typically echos the
113            // status code it received.)
114            const body = Buffer.allocUnsafe(2)
115            body.writeUInt16BE(this.#info.closeInfo.code, 0)
116            const closeFrame = new WebsocketFrameSend(body)
117
118            this.ws[kResponse].socket.write(
119              closeFrame.createFrame(opcodes.CLOSE),
120              (err) => {
121                if (!err) {
122                  this.ws[kSentClose] = true
123                }
124              }
125            )
126          }
127
128          // Upon either sending or receiving a Close control frame, it is said
129          // that _The WebSocket Closing Handshake is Started_ and that the
130          // WebSocket connection is in the CLOSING state.
131          this.ws[kReadyState] = states.CLOSING
132          this.ws[kReceivedClose] = true
133
134          this.end()
135
136          return
137        } else if (this.#info.opcode === opcodes.PING) {
138          // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in
139          // response, unless it already received a Close frame.
140          // A Pong frame sent in response to a Ping frame must have identical
141          // "Application data"
142
143          const body = this.consume(payloadLength)
144
145          if (!this.ws[kReceivedClose]) {
146            const frame = new WebsocketFrameSend(body)
147
148            this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG))
149
150            if (channels.ping.hasSubscribers) {
151              channels.ping.publish({
152                payload: body
153              })
154            }
155          }
156
157          this.#state = parserStates.INFO
158
159          if (this.#byteOffset > 0) {
160            continue
161          } else {
162            callback()
163            return
164          }
165        } else if (this.#info.opcode === opcodes.PONG) {
166          // A Pong frame MAY be sent unsolicited.  This serves as a
167          // unidirectional heartbeat.  A response to an unsolicited Pong frame is
168          // not expected.
169
170          const body = this.consume(payloadLength)
171
172          if (channels.pong.hasSubscribers) {
173            channels.pong.publish({
174              payload: body
175            })
176          }
177
178          if (this.#byteOffset > 0) {
179            continue
180          } else {
181            callback()
182            return
183          }
184        }
185      } else if (this.#state === parserStates.PAYLOADLENGTH_16) {
186        if (this.#byteOffset < 2) {
187          return callback()
188        }
189
190        const buffer = this.consume(2)
191
192        this.#info.payloadLength = buffer.readUInt16BE(0)
193        this.#state = parserStates.READ_DATA
194      } else if (this.#state === parserStates.PAYLOADLENGTH_64) {
195        if (this.#byteOffset < 8) {
196          return callback()
197        }
198
199        const buffer = this.consume(8)
200        const upper = buffer.readUInt32BE(0)
201
202        // 2^31 is the maxinimum bytes an arraybuffer can contain
203        // on 32-bit systems. Although, on 64-bit systems, this is
204        // 2^53-1 bytes.
205        // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length
206        // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275
207        // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e
208        if (upper > 2 ** 31 - 1) {
209          failWebsocketConnection(this.ws, 'Received payload length > 2^31 bytes.')
210          return
211        }
212
213        const lower = buffer.readUInt32BE(4)
214
215        this.#info.payloadLength = (upper << 8) + lower
216        this.#state = parserStates.READ_DATA
217      } else if (this.#state === parserStates.READ_DATA) {
218        if (this.#byteOffset < this.#info.payloadLength) {
219          // If there is still more data in this chunk that needs to be read
220          return callback()
221        } else if (this.#byteOffset >= this.#info.payloadLength) {
222          // If the server sent multiple frames in a single chunk
223
224          const body = this.consume(this.#info.payloadLength)
225
226          this.#fragments.push(body)
227
228          // If the frame is unfragmented, or a fragmented frame was terminated,
229          // a message was received
230          if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) {
231            const fullMessage = Buffer.concat(this.#fragments)
232
233            websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage)
234
235            this.#info = {}
236            this.#fragments.length = 0
237          }
238
239          this.#state = parserStates.INFO
240        }
241      }
242
243      if (this.#byteOffset > 0) {
244        continue
245      } else {
246        callback()
247        break
248      }
249    }
250  }
251
252  /**
253   * Take n bytes from the buffered Buffers
254   * @param {number} n
255   * @returns {Buffer|null}
256   */
257  consume (n) {
258    if (n > this.#byteOffset) {
259      return null
260    } else if (n === 0) {
261      return emptyBuffer
262    }
263
264    if (this.#buffers[0].length === n) {
265      this.#byteOffset -= this.#buffers[0].length
266      return this.#buffers.shift()
267    }
268
269    const buffer = Buffer.allocUnsafe(n)
270    let offset = 0
271
272    while (offset !== n) {
273      const next = this.#buffers[0]
274      const { length } = next
275
276      if (length + offset === n) {
277        buffer.set(this.#buffers.shift(), offset)
278        break
279      } else if (length + offset > n) {
280        buffer.set(next.subarray(0, n - offset), offset)
281        this.#buffers[0] = next.subarray(n - offset)
282        break
283      } else {
284        buffer.set(this.#buffers.shift(), offset)
285        offset += next.length
286      }
287    }
288
289    this.#byteOffset -= n
290
291    return buffer
292  }
293
294  parseCloseBody (onlyCode, data) {
295    // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5
296    /** @type {number|undefined} */
297    let code
298
299    if (data.length >= 2) {
300      // _The WebSocket Connection Close Code_ is
301      // defined as the status code (Section 7.4) contained in the first Close
302      // control frame received by the application
303      code = data.readUInt16BE(0)
304    }
305
306    if (onlyCode) {
307      if (!isValidStatusCode(code)) {
308        return null
309      }
310
311      return { code }
312    }
313
314    // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6
315    /** @type {Buffer} */
316    let reason = data.subarray(2)
317
318    // Remove BOM
319    if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) {
320      reason = reason.subarray(3)
321    }
322
323    if (code !== undefined && !isValidStatusCode(code)) {
324      return null
325    }
326
327    try {
328      // TODO: optimize this
329      reason = new TextDecoder('utf-8', { fatal: true }).decode(reason)
330    } catch {
331      return null
332    }
333
334    return { code, reason }
335  }
336
337  get closingInfo () {
338    return this.#info.closeInfo
339  }
340}
341
342module.exports = {
343  ByteParser
344}
345