1'use strict' 2 3const { Writable } = require('stream') 4const diagnosticsChannel = require('diagnostics_channel') 5const { parserStates, opcodes, states, emptyBuffer } = require('./constants') 6const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols') 7const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util') 8const { WebsocketFrameSend } = require('./frame') 9 10// This code was influenced by ws released under the MIT license. 11// Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com> 12// Copyright (c) 2013 Arnout Kazemier and contributors 13// Copyright (c) 2016 Luigi Pinca and contributors 14 15const channels = {} 16channels.ping = diagnosticsChannel.channel('undici:websocket:ping') 17channels.pong = diagnosticsChannel.channel('undici:websocket:pong') 18 19class ByteParser extends Writable { 20 #buffers = [] 21 #byteOffset = 0 22 23 #state = parserStates.INFO 24 25 #info = {} 26 #fragments = [] 27 28 constructor (ws) { 29 super() 30 31 this.ws = ws 32 } 33 34 /** 35 * @param {Buffer} chunk 36 * @param {() => void} callback 37 */ 38 _write (chunk, _, callback) { 39 this.#buffers.push(chunk) 40 this.#byteOffset += chunk.length 41 42 this.run(callback) 43 } 44 45 /** 46 * Runs whenever a new chunk is received. 47 * Callback is called whenever there are no more chunks buffering, 48 * or not enough bytes are buffered to parse. 49 */ 50 run (callback) { 51 while (true) { 52 if (this.#state === parserStates.INFO) { 53 // If there aren't enough bytes to parse the payload length, etc. 54 if (this.#byteOffset < 2) { 55 return callback() 56 } 57 58 const buffer = this.consume(2) 59 60 this.#info.fin = (buffer[0] & 0x80) !== 0 61 this.#info.opcode = buffer[0] & 0x0F 62 63 // If we receive a fragmented message, we use the type of the first 64 // frame to parse the full message as binary/text, when it's terminated 65 this.#info.originalOpcode ??= this.#info.opcode 66 67 this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION 68 69 if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) { 70 // Only text and binary frames can be fragmented 71 failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.') 72 return 73 } 74 75 const payloadLength = buffer[1] & 0x7F 76 77 if (payloadLength <= 125) { 78 this.#info.payloadLength = payloadLength 79 this.#state = parserStates.READ_DATA 80 } else if (payloadLength === 126) { 81 this.#state = parserStates.PAYLOADLENGTH_16 82 } else if (payloadLength === 127) { 83 this.#state = parserStates.PAYLOADLENGTH_64 84 } 85 86 if (this.#info.fragmented && payloadLength > 125) { 87 // A fragmented frame can't be fragmented itself 88 failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.') 89 return 90 } else if ( 91 (this.#info.opcode === opcodes.PING || 92 this.#info.opcode === opcodes.PONG || 93 this.#info.opcode === opcodes.CLOSE) && 94 payloadLength > 125 95 ) { 96 // Control frames can have a payload length of 125 bytes MAX 97 failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.') 98 return 99 } else if (this.#info.opcode === opcodes.CLOSE) { 100 if (payloadLength === 1) { 101 failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.') 102 return 103 } 104 105 const body = this.consume(payloadLength) 106 107 this.#info.closeInfo = this.parseCloseBody(false, body) 108 109 if (!this.ws[kSentClose]) { 110 // If an endpoint receives a Close frame and did not previously send a 111 // Close frame, the endpoint MUST send a Close frame in response. (When 112 // sending a Close frame in response, the endpoint typically echos the 113 // status code it received.) 114 const body = Buffer.allocUnsafe(2) 115 body.writeUInt16BE(this.#info.closeInfo.code, 0) 116 const closeFrame = new WebsocketFrameSend(body) 117 118 this.ws[kResponse].socket.write( 119 closeFrame.createFrame(opcodes.CLOSE), 120 (err) => { 121 if (!err) { 122 this.ws[kSentClose] = true 123 } 124 } 125 ) 126 } 127 128 // Upon either sending or receiving a Close control frame, it is said 129 // that _The WebSocket Closing Handshake is Started_ and that the 130 // WebSocket connection is in the CLOSING state. 131 this.ws[kReadyState] = states.CLOSING 132 this.ws[kReceivedClose] = true 133 134 this.end() 135 136 return 137 } else if (this.#info.opcode === opcodes.PING) { 138 // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in 139 // response, unless it already received a Close frame. 140 // A Pong frame sent in response to a Ping frame must have identical 141 // "Application data" 142 143 const body = this.consume(payloadLength) 144 145 if (!this.ws[kReceivedClose]) { 146 const frame = new WebsocketFrameSend(body) 147 148 this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG)) 149 150 if (channels.ping.hasSubscribers) { 151 channels.ping.publish({ 152 payload: body 153 }) 154 } 155 } 156 157 this.#state = parserStates.INFO 158 159 if (this.#byteOffset > 0) { 160 continue 161 } else { 162 callback() 163 return 164 } 165 } else if (this.#info.opcode === opcodes.PONG) { 166 // A Pong frame MAY be sent unsolicited. This serves as a 167 // unidirectional heartbeat. A response to an unsolicited Pong frame is 168 // not expected. 169 170 const body = this.consume(payloadLength) 171 172 if (channels.pong.hasSubscribers) { 173 channels.pong.publish({ 174 payload: body 175 }) 176 } 177 178 if (this.#byteOffset > 0) { 179 continue 180 } else { 181 callback() 182 return 183 } 184 } 185 } else if (this.#state === parserStates.PAYLOADLENGTH_16) { 186 if (this.#byteOffset < 2) { 187 return callback() 188 } 189 190 const buffer = this.consume(2) 191 192 this.#info.payloadLength = buffer.readUInt16BE(0) 193 this.#state = parserStates.READ_DATA 194 } else if (this.#state === parserStates.PAYLOADLENGTH_64) { 195 if (this.#byteOffset < 8) { 196 return callback() 197 } 198 199 const buffer = this.consume(8) 200 const upper = buffer.readUInt32BE(0) 201 202 // 2^31 is the maxinimum bytes an arraybuffer can contain 203 // on 32-bit systems. Although, on 64-bit systems, this is 204 // 2^53-1 bytes. 205 // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length 206 // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275 207 // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e 208 if (upper > 2 ** 31 - 1) { 209 failWebsocketConnection(this.ws, 'Received payload length > 2^31 bytes.') 210 return 211 } 212 213 const lower = buffer.readUInt32BE(4) 214 215 this.#info.payloadLength = (upper << 8) + lower 216 this.#state = parserStates.READ_DATA 217 } else if (this.#state === parserStates.READ_DATA) { 218 if (this.#byteOffset < this.#info.payloadLength) { 219 // If there is still more data in this chunk that needs to be read 220 return callback() 221 } else if (this.#byteOffset >= this.#info.payloadLength) { 222 // If the server sent multiple frames in a single chunk 223 224 const body = this.consume(this.#info.payloadLength) 225 226 this.#fragments.push(body) 227 228 // If the frame is unfragmented, or a fragmented frame was terminated, 229 // a message was received 230 if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) { 231 const fullMessage = Buffer.concat(this.#fragments) 232 233 websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage) 234 235 this.#info = {} 236 this.#fragments.length = 0 237 } 238 239 this.#state = parserStates.INFO 240 } 241 } 242 243 if (this.#byteOffset > 0) { 244 continue 245 } else { 246 callback() 247 break 248 } 249 } 250 } 251 252 /** 253 * Take n bytes from the buffered Buffers 254 * @param {number} n 255 * @returns {Buffer|null} 256 */ 257 consume (n) { 258 if (n > this.#byteOffset) { 259 return null 260 } else if (n === 0) { 261 return emptyBuffer 262 } 263 264 if (this.#buffers[0].length === n) { 265 this.#byteOffset -= this.#buffers[0].length 266 return this.#buffers.shift() 267 } 268 269 const buffer = Buffer.allocUnsafe(n) 270 let offset = 0 271 272 while (offset !== n) { 273 const next = this.#buffers[0] 274 const { length } = next 275 276 if (length + offset === n) { 277 buffer.set(this.#buffers.shift(), offset) 278 break 279 } else if (length + offset > n) { 280 buffer.set(next.subarray(0, n - offset), offset) 281 this.#buffers[0] = next.subarray(n - offset) 282 break 283 } else { 284 buffer.set(this.#buffers.shift(), offset) 285 offset += next.length 286 } 287 } 288 289 this.#byteOffset -= n 290 291 return buffer 292 } 293 294 parseCloseBody (onlyCode, data) { 295 // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5 296 /** @type {number|undefined} */ 297 let code 298 299 if (data.length >= 2) { 300 // _The WebSocket Connection Close Code_ is 301 // defined as the status code (Section 7.4) contained in the first Close 302 // control frame received by the application 303 code = data.readUInt16BE(0) 304 } 305 306 if (onlyCode) { 307 if (!isValidStatusCode(code)) { 308 return null 309 } 310 311 return { code } 312 } 313 314 // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6 315 /** @type {Buffer} */ 316 let reason = data.subarray(2) 317 318 // Remove BOM 319 if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) { 320 reason = reason.subarray(3) 321 } 322 323 if (code !== undefined && !isValidStatusCode(code)) { 324 return null 325 } 326 327 try { 328 // TODO: optimize this 329 reason = new TextDecoder('utf-8', { fatal: true }).decode(reason) 330 } catch { 331 return null 332 } 333 334 return { code, reason } 335 } 336 337 get closingInfo () { 338 return this.#info.closeInfo 339 } 340} 341 342module.exports = { 343 ByteParser 344} 345