1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 ArrayPrototypeIncludes, 26 ArrayPrototypeIndexOf, 27 ArrayPrototypePop, 28 ArrayPrototypePush, 29 ArrayPrototypeShift, 30 ArrayPrototypeSome, 31 ArrayPrototypeSplice, 32 FunctionPrototypeCall, 33 ObjectCreate, 34 ObjectKeys, 35 ObjectSetPrototypeOf, 36 ObjectValues, 37 StringPrototypeIndexOf, 38 StringPrototypeSplit, 39 StringPrototypeStartsWith, 40 StringPrototypeSubstr, 41 Symbol, 42} = primordials; 43 44const net = require('net'); 45const EventEmitter = require('events'); 46let debug = require('internal/util/debuglog').debuglog('http', (fn) => { 47 debug = fn; 48}); 49const { AsyncResource } = require('async_hooks'); 50const { async_id_symbol } = require('internal/async_hooks').symbols; 51const { 52 kEmptyObject, 53 once, 54} = require('internal/util'); 55const { 56 validateNumber, 57 validateOneOf, 58 validateString, 59} = require('internal/validators'); 60 61const kOnKeylog = Symbol('onkeylog'); 62const kRequestOptions = Symbol('requestOptions'); 63const kRequestAsyncResource = Symbol('requestAsyncResource'); 64// New Agent code. 65 66// The largest departure from the previous implementation is that 67// an Agent instance holds connections for a variable number of host:ports. 68// Surprisingly, this is still API compatible as far as third parties are 69// concerned. The only code that really notices the difference is the 70// request object. 71 72// Another departure is that all code related to HTTP parsing is in 73// ClientRequest.onSocket(). The Agent is now *strictly* 74// concerned with managing a connection pool. 75 76class ReusedHandle { 77 constructor(type, handle) { 78 this.type = type; 79 this.handle = handle; 80 } 81} 82 83function freeSocketErrorListener(err) { 84 const socket = this; 85 debug('SOCKET ERROR on FREE socket:', err.message, err.stack); 86 socket.destroy(); 87 socket.emit('agentRemove'); 88} 89 90function Agent(options) { 91 if (!(this instanceof Agent)) 92 return new Agent(options); 93 94 FunctionPrototypeCall(EventEmitter, this); 95 96 this.defaultPort = 80; 97 this.protocol = 'http:'; 98 99 this.options = { __proto__: null, ...options }; 100 101 if (this.options.noDelay === undefined) 102 this.options.noDelay = true; 103 104 // Don't confuse net and make it think that we're connecting to a pipe 105 this.options.path = null; 106 this.requests = ObjectCreate(null); 107 this.sockets = ObjectCreate(null); 108 this.freeSockets = ObjectCreate(null); 109 this.keepAliveMsecs = this.options.keepAliveMsecs || 1000; 110 this.keepAlive = this.options.keepAlive || false; 111 this.maxSockets = this.options.maxSockets || Agent.defaultMaxSockets; 112 this.maxFreeSockets = this.options.maxFreeSockets || 256; 113 this.scheduling = this.options.scheduling || 'lifo'; 114 this.maxTotalSockets = this.options.maxTotalSockets; 115 this.totalSocketCount = 0; 116 117 validateOneOf(this.scheduling, 'scheduling', ['fifo', 'lifo']); 118 119 if (this.maxTotalSockets !== undefined) { 120 validateNumber(this.maxTotalSockets, 'maxTotalSockets', 1); 121 } else { 122 this.maxTotalSockets = Infinity; 123 } 124 125 this.on('free', (socket, options) => { 126 const name = this.getName(options); 127 debug('agent.on(free)', name); 128 129 // TODO(ronag): socket.destroy(err) might have been called 130 // before coming here and have an 'error' scheduled. In the 131 // case of socket.destroy() below this 'error' has no handler 132 // and could cause unhandled exception. 133 134 if (!socket.writable) { 135 socket.destroy(); 136 return; 137 } 138 139 const requests = this.requests[name]; 140 if (requests && requests.length) { 141 const req = ArrayPrototypeShift(requests); 142 const reqAsyncRes = req[kRequestAsyncResource]; 143 if (reqAsyncRes) { 144 // Run request within the original async context. 145 reqAsyncRes.runInAsyncScope(() => { 146 asyncResetHandle(socket); 147 setRequestSocket(this, req, socket); 148 }); 149 req[kRequestAsyncResource] = null; 150 } else { 151 setRequestSocket(this, req, socket); 152 } 153 if (requests.length === 0) { 154 delete this.requests[name]; 155 } 156 return; 157 } 158 159 // If there are no pending requests, then put it in 160 // the freeSockets pool, but only if we're allowed to do so. 161 const req = socket._httpMessage; 162 if (!req || !req.shouldKeepAlive || !this.keepAlive) { 163 socket.destroy(); 164 return; 165 } 166 167 const freeSockets = this.freeSockets[name] || []; 168 const freeLen = freeSockets.length; 169 let count = freeLen; 170 if (this.sockets[name]) 171 count += this.sockets[name].length; 172 173 if (this.totalSocketCount > this.maxTotalSockets || 174 count > this.maxSockets || 175 freeLen >= this.maxFreeSockets || 176 !this.keepSocketAlive(socket)) { 177 socket.destroy(); 178 return; 179 } 180 181 this.freeSockets[name] = freeSockets; 182 socket[async_id_symbol] = -1; 183 socket._httpMessage = null; 184 this.removeSocket(socket, options); 185 186 socket.once('error', freeSocketErrorListener); 187 ArrayPrototypePush(freeSockets, socket); 188 }); 189 190 // Don't emit keylog events unless there is a listener for them. 191 this.on('newListener', maybeEnableKeylog); 192} 193ObjectSetPrototypeOf(Agent.prototype, EventEmitter.prototype); 194ObjectSetPrototypeOf(Agent, EventEmitter); 195 196function maybeEnableKeylog(eventName) { 197 if (eventName === 'keylog') { 198 this.removeListener('newListener', maybeEnableKeylog); 199 // Future sockets will listen on keylog at creation. 200 const agent = this; 201 this[kOnKeylog] = function onkeylog(keylog) { 202 agent.emit('keylog', keylog, this); 203 }; 204 // Existing sockets will start listening on keylog now. 205 const sockets = ObjectValues(this.sockets); 206 for (let i = 0; i < sockets.length; i++) { 207 sockets[i].on('keylog', this[kOnKeylog]); 208 } 209 } 210} 211 212Agent.defaultMaxSockets = Infinity; 213 214Agent.prototype.createConnection = net.createConnection; 215 216// Get the key for a given set of request options 217Agent.prototype.getName = function getName(options = kEmptyObject) { 218 let name = options.host || 'localhost'; 219 220 name += ':'; 221 if (options.port) 222 name += options.port; 223 224 name += ':'; 225 if (options.localAddress) 226 name += options.localAddress; 227 228 // Pacify parallel/test-http-agent-getname by only appending 229 // the ':' when options.family is set. 230 if (options.family === 4 || options.family === 6) 231 name += `:${options.family}`; 232 233 if (options.socketPath) 234 name += `:${options.socketPath}`; 235 236 return name; 237}; 238 239Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */, 240 localAddress/* legacy */) { 241 // Legacy API: addRequest(req, host, port, localAddress) 242 if (typeof options === 'string') { 243 options = { 244 __proto__: null, 245 host: options, 246 port, 247 localAddress, 248 }; 249 } 250 251 options = { __proto__: null, ...options, ...this.options }; 252 if (options.socketPath) 253 options.path = options.socketPath; 254 255 if (!options.servername && options.servername !== '') 256 options.servername = calculateServerName(options, req); 257 258 const name = this.getName(options); 259 if (!this.sockets[name]) { 260 this.sockets[name] = []; 261 } 262 263 const freeSockets = this.freeSockets[name]; 264 let socket; 265 if (freeSockets) { 266 while (freeSockets.length && freeSockets[0].destroyed) { 267 ArrayPrototypeShift(freeSockets); 268 } 269 socket = this.scheduling === 'fifo' ? 270 ArrayPrototypeShift(freeSockets) : 271 ArrayPrototypePop(freeSockets); 272 if (!freeSockets.length) 273 delete this.freeSockets[name]; 274 } 275 276 const freeLen = freeSockets ? freeSockets.length : 0; 277 const sockLen = freeLen + this.sockets[name].length; 278 279 if (socket) { 280 asyncResetHandle(socket); 281 this.reuseSocket(socket, req); 282 setRequestSocket(this, req, socket); 283 ArrayPrototypePush(this.sockets[name], socket); 284 } else if (sockLen < this.maxSockets && 285 this.totalSocketCount < this.maxTotalSockets) { 286 debug('call onSocket', sockLen, freeLen); 287 // If we are under maxSockets create a new one. 288 this.createSocket(req, options, (err, socket) => { 289 if (err) 290 req.onSocket(socket, err); 291 else 292 setRequestSocket(this, req, socket); 293 }); 294 } else { 295 debug('wait for socket'); 296 // We are over limit so we'll add it to the queue. 297 if (!this.requests[name]) { 298 this.requests[name] = []; 299 } 300 301 // Used to create sockets for pending requests from different origin 302 req[kRequestOptions] = options; 303 // Used to capture the original async context. 304 req[kRequestAsyncResource] = new AsyncResource('QueuedRequest'); 305 306 ArrayPrototypePush(this.requests[name], req); 307 } 308}; 309 310Agent.prototype.createSocket = function createSocket(req, options, cb) { 311 options = { __proto__: null, ...options, ...this.options }; 312 if (options.socketPath) 313 options.path = options.socketPath; 314 315 if (!options.servername && options.servername !== '') 316 options.servername = calculateServerName(options, req); 317 318 const name = this.getName(options); 319 options._agentKey = name; 320 321 debug('createConnection', name, options); 322 options.encoding = null; 323 324 const oncreate = once((err, s) => { 325 if (err) 326 return cb(err); 327 if (!this.sockets[name]) { 328 this.sockets[name] = []; 329 } 330 ArrayPrototypePush(this.sockets[name], s); 331 this.totalSocketCount++; 332 debug('sockets', name, this.sockets[name].length, this.totalSocketCount); 333 installListeners(this, s, options); 334 cb(null, s); 335 }); 336 // When keepAlive is true, pass the related options to createConnection 337 if (this.keepAlive) { 338 options.keepAlive = this.keepAlive; 339 options.keepAliveInitialDelay = this.keepAliveMsecs; 340 } 341 const newSocket = this.createConnection(options, oncreate); 342 if (newSocket) 343 oncreate(null, newSocket); 344}; 345 346function calculateServerName(options, req) { 347 let servername = options.host; 348 const hostHeader = req.getHeader('host'); 349 if (hostHeader) { 350 validateString(hostHeader, 'options.headers.host'); 351 352 // abc => abc 353 // abc:123 => abc 354 // [::1] => ::1 355 // [::1]:123 => ::1 356 if (StringPrototypeStartsWith(hostHeader, '[')) { 357 const index = StringPrototypeIndexOf(hostHeader, ']'); 358 if (index === -1) { 359 // Leading '[', but no ']'. Need to do something... 360 servername = hostHeader; 361 } else { 362 servername = StringPrototypeSubstr(hostHeader, 1, index - 1); 363 } 364 } else { 365 servername = StringPrototypeSplit(hostHeader, ':', 1)[0]; 366 } 367 } 368 // Don't implicitly set invalid (IP) servernames. 369 if (net.isIP(servername)) 370 servername = ''; 371 return servername; 372} 373 374function installListeners(agent, s, options) { 375 function onFree() { 376 debug('CLIENT socket onFree'); 377 agent.emit('free', s, options); 378 } 379 s.on('free', onFree); 380 381 function onClose(err) { 382 debug('CLIENT socket onClose'); 383 // This is the only place where sockets get removed from the Agent. 384 // If you want to remove a socket from the pool, just close it. 385 // All socket errors end in a close event anyway. 386 agent.totalSocketCount--; 387 agent.removeSocket(s, options); 388 } 389 s.on('close', onClose); 390 391 function onTimeout() { 392 debug('CLIENT socket onTimeout'); 393 394 // Destroy if in free list. 395 // TODO(ronag): Always destroy, even if not in free list. 396 const sockets = agent.freeSockets; 397 if (ArrayPrototypeSome(ObjectKeys(sockets), (name) => 398 ArrayPrototypeIncludes(sockets[name], s), 399 )) { 400 return s.destroy(); 401 } 402 } 403 s.on('timeout', onTimeout); 404 405 function onRemove() { 406 // We need this function for cases like HTTP 'upgrade' 407 // (defined by WebSockets) where we need to remove a socket from the 408 // pool because it'll be locked up indefinitely 409 debug('CLIENT socket onRemove'); 410 agent.totalSocketCount--; 411 agent.removeSocket(s, options); 412 s.removeListener('close', onClose); 413 s.removeListener('free', onFree); 414 s.removeListener('timeout', onTimeout); 415 s.removeListener('agentRemove', onRemove); 416 } 417 s.on('agentRemove', onRemove); 418 419 if (agent[kOnKeylog]) { 420 s.on('keylog', agent[kOnKeylog]); 421 } 422} 423 424Agent.prototype.removeSocket = function removeSocket(s, options) { 425 const name = this.getName(options); 426 debug('removeSocket', name, 'writable:', s.writable); 427 const sets = [this.sockets]; 428 429 // If the socket was destroyed, remove it from the free buffers too. 430 if (!s.writable) 431 ArrayPrototypePush(sets, this.freeSockets); 432 433 for (let sk = 0; sk < sets.length; sk++) { 434 const sockets = sets[sk]; 435 436 if (sockets[name]) { 437 const index = ArrayPrototypeIndexOf(sockets[name], s); 438 if (index !== -1) { 439 ArrayPrototypeSplice(sockets[name], index, 1); 440 // Don't leak 441 if (sockets[name].length === 0) 442 delete sockets[name]; 443 } 444 } 445 } 446 447 let req; 448 if (this.requests[name] && this.requests[name].length) { 449 debug('removeSocket, have a request, make a socket'); 450 req = this.requests[name][0]; 451 } else { 452 // TODO(rickyes): this logic will not be FIFO across origins. 453 // There might be older requests in a different origin, but 454 // if the origin which releases the socket has pending requests 455 // that will be prioritized. 456 const keys = ObjectKeys(this.requests); 457 for (let i = 0; i < keys.length; i++) { 458 const prop = keys[i]; 459 // Check whether this specific origin is already at maxSockets 460 if (this.sockets[prop] && this.sockets[prop].length) break; 461 debug('removeSocket, have a request with different origin,' + 462 ' make a socket'); 463 req = this.requests[prop][0]; 464 options = req[kRequestOptions]; 465 break; 466 } 467 } 468 469 if (req && options) { 470 req[kRequestOptions] = undefined; 471 // If we have pending requests and a socket gets closed make a new one 472 this.createSocket(req, options, (err, socket) => { 473 if (err) 474 req.onSocket(socket, err); 475 else 476 socket.emit('free'); 477 }); 478 } 479 480}; 481 482Agent.prototype.keepSocketAlive = function keepSocketAlive(socket) { 483 socket.setKeepAlive(true, this.keepAliveMsecs); 484 socket.unref(); 485 486 const agentTimeout = this.options.timeout || 0; 487 if (socket.timeout !== agentTimeout) { 488 socket.setTimeout(agentTimeout); 489 } 490 491 return true; 492}; 493 494Agent.prototype.reuseSocket = function reuseSocket(socket, req) { 495 debug('have free socket'); 496 socket.removeListener('error', freeSocketErrorListener); 497 req.reusedSocket = true; 498 socket.ref(); 499}; 500 501Agent.prototype.destroy = function destroy() { 502 const sets = [this.freeSockets, this.sockets]; 503 for (let s = 0; s < sets.length; s++) { 504 const set = sets[s]; 505 const keys = ObjectKeys(set); 506 for (let v = 0; v < keys.length; v++) { 507 const setName = set[keys[v]]; 508 for (let n = 0; n < setName.length; n++) { 509 setName[n].destroy(); 510 } 511 } 512 } 513}; 514 515function setRequestSocket(agent, req, socket) { 516 req.onSocket(socket); 517 const agentTimeout = agent.options.timeout || 0; 518 if (req.timeout === undefined || req.timeout === agentTimeout) { 519 return; 520 } 521 socket.setTimeout(req.timeout); 522} 523 524function asyncResetHandle(socket) { 525 // Guard against an uninitialized or user supplied Socket. 526 const handle = socket._handle; 527 if (handle && typeof handle.asyncReset === 'function') { 528 // Assign the handle a new asyncId and run any destroy()/init() hooks. 529 handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle)); 530 socket[async_id_symbol] = handle.getAsyncId(); 531 } 532} 533 534module.exports = { 535 Agent, 536 globalAgent: new Agent(), 537}; 538