1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const {
25  ArrayPrototypeIncludes,
26  ArrayPrototypeIndexOf,
27  ArrayPrototypePop,
28  ArrayPrototypePush,
29  ArrayPrototypeShift,
30  ArrayPrototypeSome,
31  ArrayPrototypeSplice,
32  FunctionPrototypeCall,
33  ObjectCreate,
34  ObjectKeys,
35  ObjectSetPrototypeOf,
36  ObjectValues,
37  StringPrototypeIndexOf,
38  StringPrototypeSplit,
39  StringPrototypeStartsWith,
40  StringPrototypeSubstr,
41  Symbol,
42} = primordials;
43
44const net = require('net');
45const EventEmitter = require('events');
46let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
47  debug = fn;
48});
49const { AsyncResource } = require('async_hooks');
50const { async_id_symbol } = require('internal/async_hooks').symbols;
51const {
52  kEmptyObject,
53  once,
54} = require('internal/util');
55const {
56  validateNumber,
57  validateOneOf,
58  validateString,
59} = require('internal/validators');
60
61const kOnKeylog = Symbol('onkeylog');
62const kRequestOptions = Symbol('requestOptions');
63const kRequestAsyncResource = Symbol('requestAsyncResource');
64// New Agent code.
65
66// The largest departure from the previous implementation is that
67// an Agent instance holds connections for a variable number of host:ports.
68// Surprisingly, this is still API compatible as far as third parties are
69// concerned. The only code that really notices the difference is the
70// request object.
71
72// Another departure is that all code related to HTTP parsing is in
73// ClientRequest.onSocket(). The Agent is now *strictly*
74// concerned with managing a connection pool.
75
76class ReusedHandle {
77  constructor(type, handle) {
78    this.type = type;
79    this.handle = handle;
80  }
81}
82
83function freeSocketErrorListener(err) {
84  const socket = this;
85  debug('SOCKET ERROR on FREE socket:', err.message, err.stack);
86  socket.destroy();
87  socket.emit('agentRemove');
88}
89
90function Agent(options) {
91  if (!(this instanceof Agent))
92    return new Agent(options);
93
94  FunctionPrototypeCall(EventEmitter, this);
95
96  this.defaultPort = 80;
97  this.protocol = 'http:';
98
99  this.options = { __proto__: null, ...options };
100
101  if (this.options.noDelay === undefined)
102    this.options.noDelay = true;
103
104  // Don't confuse net and make it think that we're connecting to a pipe
105  this.options.path = null;
106  this.requests = ObjectCreate(null);
107  this.sockets = ObjectCreate(null);
108  this.freeSockets = ObjectCreate(null);
109  this.keepAliveMsecs = this.options.keepAliveMsecs || 1000;
110  this.keepAlive = this.options.keepAlive || false;
111  this.maxSockets = this.options.maxSockets || Agent.defaultMaxSockets;
112  this.maxFreeSockets = this.options.maxFreeSockets || 256;
113  this.scheduling = this.options.scheduling || 'lifo';
114  this.maxTotalSockets = this.options.maxTotalSockets;
115  this.totalSocketCount = 0;
116
117  validateOneOf(this.scheduling, 'scheduling', ['fifo', 'lifo']);
118
119  if (this.maxTotalSockets !== undefined) {
120    validateNumber(this.maxTotalSockets, 'maxTotalSockets', 1);
121  } else {
122    this.maxTotalSockets = Infinity;
123  }
124
125  this.on('free', (socket, options) => {
126    const name = this.getName(options);
127    debug('agent.on(free)', name);
128
129    // TODO(ronag): socket.destroy(err) might have been called
130    // before coming here and have an 'error' scheduled. In the
131    // case of socket.destroy() below this 'error' has no handler
132    // and could cause unhandled exception.
133
134    if (!socket.writable) {
135      socket.destroy();
136      return;
137    }
138
139    const requests = this.requests[name];
140    if (requests && requests.length) {
141      const req = ArrayPrototypeShift(requests);
142      const reqAsyncRes = req[kRequestAsyncResource];
143      if (reqAsyncRes) {
144        // Run request within the original async context.
145        reqAsyncRes.runInAsyncScope(() => {
146          asyncResetHandle(socket);
147          setRequestSocket(this, req, socket);
148        });
149        req[kRequestAsyncResource] = null;
150      } else {
151        setRequestSocket(this, req, socket);
152      }
153      if (requests.length === 0) {
154        delete this.requests[name];
155      }
156      return;
157    }
158
159    // If there are no pending requests, then put it in
160    // the freeSockets pool, but only if we're allowed to do so.
161    const req = socket._httpMessage;
162    if (!req || !req.shouldKeepAlive || !this.keepAlive) {
163      socket.destroy();
164      return;
165    }
166
167    const freeSockets = this.freeSockets[name] || [];
168    const freeLen = freeSockets.length;
169    let count = freeLen;
170    if (this.sockets[name])
171      count += this.sockets[name].length;
172
173    if (this.totalSocketCount > this.maxTotalSockets ||
174        count > this.maxSockets ||
175        freeLen >= this.maxFreeSockets ||
176        !this.keepSocketAlive(socket)) {
177      socket.destroy();
178      return;
179    }
180
181    this.freeSockets[name] = freeSockets;
182    socket[async_id_symbol] = -1;
183    socket._httpMessage = null;
184    this.removeSocket(socket, options);
185
186    socket.once('error', freeSocketErrorListener);
187    ArrayPrototypePush(freeSockets, socket);
188  });
189
190  // Don't emit keylog events unless there is a listener for them.
191  this.on('newListener', maybeEnableKeylog);
192}
193ObjectSetPrototypeOf(Agent.prototype, EventEmitter.prototype);
194ObjectSetPrototypeOf(Agent, EventEmitter);
195
196function maybeEnableKeylog(eventName) {
197  if (eventName === 'keylog') {
198    this.removeListener('newListener', maybeEnableKeylog);
199    // Future sockets will listen on keylog at creation.
200    const agent = this;
201    this[kOnKeylog] = function onkeylog(keylog) {
202      agent.emit('keylog', keylog, this);
203    };
204    // Existing sockets will start listening on keylog now.
205    const sockets = ObjectValues(this.sockets);
206    for (let i = 0; i < sockets.length; i++) {
207      sockets[i].on('keylog', this[kOnKeylog]);
208    }
209  }
210}
211
212Agent.defaultMaxSockets = Infinity;
213
214Agent.prototype.createConnection = net.createConnection;
215
216// Get the key for a given set of request options
217Agent.prototype.getName = function getName(options = kEmptyObject) {
218  let name = options.host || 'localhost';
219
220  name += ':';
221  if (options.port)
222    name += options.port;
223
224  name += ':';
225  if (options.localAddress)
226    name += options.localAddress;
227
228  // Pacify parallel/test-http-agent-getname by only appending
229  // the ':' when options.family is set.
230  if (options.family === 4 || options.family === 6)
231    name += `:${options.family}`;
232
233  if (options.socketPath)
234    name += `:${options.socketPath}`;
235
236  return name;
237};
238
239Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
240                                                 localAddress/* legacy */) {
241  // Legacy API: addRequest(req, host, port, localAddress)
242  if (typeof options === 'string') {
243    options = {
244      __proto__: null,
245      host: options,
246      port,
247      localAddress,
248    };
249  }
250
251  options = { __proto__: null, ...options, ...this.options };
252  if (options.socketPath)
253    options.path = options.socketPath;
254
255  if (!options.servername && options.servername !== '')
256    options.servername = calculateServerName(options, req);
257
258  const name = this.getName(options);
259  if (!this.sockets[name]) {
260    this.sockets[name] = [];
261  }
262
263  const freeSockets = this.freeSockets[name];
264  let socket;
265  if (freeSockets) {
266    while (freeSockets.length && freeSockets[0].destroyed) {
267      ArrayPrototypeShift(freeSockets);
268    }
269    socket = this.scheduling === 'fifo' ?
270      ArrayPrototypeShift(freeSockets) :
271      ArrayPrototypePop(freeSockets);
272    if (!freeSockets.length)
273      delete this.freeSockets[name];
274  }
275
276  const freeLen = freeSockets ? freeSockets.length : 0;
277  const sockLen = freeLen + this.sockets[name].length;
278
279  if (socket) {
280    asyncResetHandle(socket);
281    this.reuseSocket(socket, req);
282    setRequestSocket(this, req, socket);
283    ArrayPrototypePush(this.sockets[name], socket);
284  } else if (sockLen < this.maxSockets &&
285             this.totalSocketCount < this.maxTotalSockets) {
286    debug('call onSocket', sockLen, freeLen);
287    // If we are under maxSockets create a new one.
288    this.createSocket(req, options, (err, socket) => {
289      if (err)
290        req.onSocket(socket, err);
291      else
292        setRequestSocket(this, req, socket);
293    });
294  } else {
295    debug('wait for socket');
296    // We are over limit so we'll add it to the queue.
297    if (!this.requests[name]) {
298      this.requests[name] = [];
299    }
300
301    // Used to create sockets for pending requests from different origin
302    req[kRequestOptions] = options;
303    // Used to capture the original async context.
304    req[kRequestAsyncResource] = new AsyncResource('QueuedRequest');
305
306    ArrayPrototypePush(this.requests[name], req);
307  }
308};
309
310Agent.prototype.createSocket = function createSocket(req, options, cb) {
311  options = { __proto__: null, ...options, ...this.options };
312  if (options.socketPath)
313    options.path = options.socketPath;
314
315  if (!options.servername && options.servername !== '')
316    options.servername = calculateServerName(options, req);
317
318  const name = this.getName(options);
319  options._agentKey = name;
320
321  debug('createConnection', name, options);
322  options.encoding = null;
323
324  const oncreate = once((err, s) => {
325    if (err)
326      return cb(err);
327    if (!this.sockets[name]) {
328      this.sockets[name] = [];
329    }
330    ArrayPrototypePush(this.sockets[name], s);
331    this.totalSocketCount++;
332    debug('sockets', name, this.sockets[name].length, this.totalSocketCount);
333    installListeners(this, s, options);
334    cb(null, s);
335  });
336  // When keepAlive is true, pass the related options to createConnection
337  if (this.keepAlive) {
338    options.keepAlive = this.keepAlive;
339    options.keepAliveInitialDelay = this.keepAliveMsecs;
340  }
341  const newSocket = this.createConnection(options, oncreate);
342  if (newSocket)
343    oncreate(null, newSocket);
344};
345
346function calculateServerName(options, req) {
347  let servername = options.host;
348  const hostHeader = req.getHeader('host');
349  if (hostHeader) {
350    validateString(hostHeader, 'options.headers.host');
351
352    // abc => abc
353    // abc:123 => abc
354    // [::1] => ::1
355    // [::1]:123 => ::1
356    if (StringPrototypeStartsWith(hostHeader, '[')) {
357      const index = StringPrototypeIndexOf(hostHeader, ']');
358      if (index === -1) {
359        // Leading '[', but no ']'. Need to do something...
360        servername = hostHeader;
361      } else {
362        servername = StringPrototypeSubstr(hostHeader, 1, index - 1);
363      }
364    } else {
365      servername = StringPrototypeSplit(hostHeader, ':', 1)[0];
366    }
367  }
368  // Don't implicitly set invalid (IP) servernames.
369  if (net.isIP(servername))
370    servername = '';
371  return servername;
372}
373
374function installListeners(agent, s, options) {
375  function onFree() {
376    debug('CLIENT socket onFree');
377    agent.emit('free', s, options);
378  }
379  s.on('free', onFree);
380
381  function onClose(err) {
382    debug('CLIENT socket onClose');
383    // This is the only place where sockets get removed from the Agent.
384    // If you want to remove a socket from the pool, just close it.
385    // All socket errors end in a close event anyway.
386    agent.totalSocketCount--;
387    agent.removeSocket(s, options);
388  }
389  s.on('close', onClose);
390
391  function onTimeout() {
392    debug('CLIENT socket onTimeout');
393
394    // Destroy if in free list.
395    // TODO(ronag): Always destroy, even if not in free list.
396    const sockets = agent.freeSockets;
397    if (ArrayPrototypeSome(ObjectKeys(sockets), (name) =>
398      ArrayPrototypeIncludes(sockets[name], s),
399    )) {
400      return s.destroy();
401    }
402  }
403  s.on('timeout', onTimeout);
404
405  function onRemove() {
406    // We need this function for cases like HTTP 'upgrade'
407    // (defined by WebSockets) where we need to remove a socket from the
408    // pool because it'll be locked up indefinitely
409    debug('CLIENT socket onRemove');
410    agent.totalSocketCount--;
411    agent.removeSocket(s, options);
412    s.removeListener('close', onClose);
413    s.removeListener('free', onFree);
414    s.removeListener('timeout', onTimeout);
415    s.removeListener('agentRemove', onRemove);
416  }
417  s.on('agentRemove', onRemove);
418
419  if (agent[kOnKeylog]) {
420    s.on('keylog', agent[kOnKeylog]);
421  }
422}
423
424Agent.prototype.removeSocket = function removeSocket(s, options) {
425  const name = this.getName(options);
426  debug('removeSocket', name, 'writable:', s.writable);
427  const sets = [this.sockets];
428
429  // If the socket was destroyed, remove it from the free buffers too.
430  if (!s.writable)
431    ArrayPrototypePush(sets, this.freeSockets);
432
433  for (let sk = 0; sk < sets.length; sk++) {
434    const sockets = sets[sk];
435
436    if (sockets[name]) {
437      const index = ArrayPrototypeIndexOf(sockets[name], s);
438      if (index !== -1) {
439        ArrayPrototypeSplice(sockets[name], index, 1);
440        // Don't leak
441        if (sockets[name].length === 0)
442          delete sockets[name];
443      }
444    }
445  }
446
447  let req;
448  if (this.requests[name] && this.requests[name].length) {
449    debug('removeSocket, have a request, make a socket');
450    req = this.requests[name][0];
451  } else {
452    // TODO(rickyes): this logic will not be FIFO across origins.
453    // There might be older requests in a different origin, but
454    // if the origin which releases the socket has pending requests
455    // that will be prioritized.
456    const keys = ObjectKeys(this.requests);
457    for (let i = 0; i < keys.length; i++) {
458      const prop = keys[i];
459      // Check whether this specific origin is already at maxSockets
460      if (this.sockets[prop] && this.sockets[prop].length) break;
461      debug('removeSocket, have a request with different origin,' +
462        ' make a socket');
463      req = this.requests[prop][0];
464      options = req[kRequestOptions];
465      break;
466    }
467  }
468
469  if (req && options) {
470    req[kRequestOptions] = undefined;
471    // If we have pending requests and a socket gets closed make a new one
472    this.createSocket(req, options, (err, socket) => {
473      if (err)
474        req.onSocket(socket, err);
475      else
476        socket.emit('free');
477    });
478  }
479
480};
481
482Agent.prototype.keepSocketAlive = function keepSocketAlive(socket) {
483  socket.setKeepAlive(true, this.keepAliveMsecs);
484  socket.unref();
485
486  const agentTimeout = this.options.timeout || 0;
487  if (socket.timeout !== agentTimeout) {
488    socket.setTimeout(agentTimeout);
489  }
490
491  return true;
492};
493
494Agent.prototype.reuseSocket = function reuseSocket(socket, req) {
495  debug('have free socket');
496  socket.removeListener('error', freeSocketErrorListener);
497  req.reusedSocket = true;
498  socket.ref();
499};
500
501Agent.prototype.destroy = function destroy() {
502  const sets = [this.freeSockets, this.sockets];
503  for (let s = 0; s < sets.length; s++) {
504    const set = sets[s];
505    const keys = ObjectKeys(set);
506    for (let v = 0; v < keys.length; v++) {
507      const setName = set[keys[v]];
508      for (let n = 0; n < setName.length; n++) {
509        setName[n].destroy();
510      }
511    }
512  }
513};
514
515function setRequestSocket(agent, req, socket) {
516  req.onSocket(socket);
517  const agentTimeout = agent.options.timeout || 0;
518  if (req.timeout === undefined || req.timeout === agentTimeout) {
519    return;
520  }
521  socket.setTimeout(req.timeout);
522}
523
524function asyncResetHandle(socket) {
525  // Guard against an uninitialized or user supplied Socket.
526  const handle = socket._handle;
527  if (handle && typeof handle.asyncReset === 'function') {
528    // Assign the handle a new asyncId and run any destroy()/init() hooks.
529    handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
530    socket[async_id_symbol] = handle.getAsyncId();
531  }
532}
533
534module.exports = {
535  Agent,
536  globalAgent: new Agent(),
537};
538