1'use strict';
2
3const {
4  ArrayIsArray,
5  Boolean,
6  ObjectCreate,
7  SafeMap,
8} = primordials;
9
10const assert = require('internal/assert');
11const net = require('net');
12const { sendHelper } = require('internal/cluster/utils');
13const { append, init, isEmpty, peek, remove } = require('internal/linkedlist');
14const { constants } = internalBinding('tcp_wrap');
15
16module.exports = RoundRobinHandle;
17
18function RoundRobinHandle(key, address, { port, fd, flags, backlog, readableAll, writableAll }) {
19  this.key = key;
20  this.all = new SafeMap();
21  this.free = new SafeMap();
22  this.handles = init(ObjectCreate(null));
23  this.handle = null;
24  this.server = net.createServer(assert.fail);
25
26  if (fd >= 0)
27    this.server.listen({ fd, backlog });
28  else if (port >= 0) {
29    this.server.listen({
30      port,
31      host: address,
32      // Currently, net module only supports `ipv6Only` option in `flags`.
33      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
34      backlog,
35    });
36  } else
37    this.server.listen({
38      path: address,
39      backlog,
40      readableAll,
41      writableAll,
42    });  // UNIX socket path.
43  this.server.once('listening', () => {
44    this.handle = this.server._handle;
45    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
46    this.server._handle = null;
47    this.server = null;
48  });
49}
50
51RoundRobinHandle.prototype.add = function(worker, send) {
52  assert(this.all.has(worker.id) === false);
53  this.all.set(worker.id, worker);
54
55  const done = () => {
56    if (this.handle.getsockname) {
57      const out = {};
58      this.handle.getsockname(out);
59      // TODO(bnoordhuis) Check err.
60      send(null, { sockname: out }, null);
61    } else {
62      send(null, null, null);  // UNIX socket.
63    }
64
65    this.handoff(worker);  // In case there are connections pending.
66  };
67
68  if (this.server === null)
69    return done();
70
71  // Still busy binding.
72  this.server.once('listening', done);
73  this.server.once('error', (err) => {
74    send(err.errno, null);
75  });
76};
77
78RoundRobinHandle.prototype.remove = function(worker) {
79  const existed = this.all.delete(worker.id);
80
81  if (!existed)
82    return false;
83
84  this.free.delete(worker.id);
85
86  if (this.all.size !== 0)
87    return false;
88
89  while (!isEmpty(this.handles)) {
90    const handle = peek(this.handles);
91    handle.close();
92    remove(handle);
93  }
94
95  this.handle.close();
96  this.handle = null;
97  return true;
98};
99
100RoundRobinHandle.prototype.distribute = function(err, handle) {
101  // If `accept` fails just skip it (handle is undefined)
102  if (err) {
103    return;
104  }
105  append(this.handles, handle);
106  // eslint-disable-next-line node-core/no-array-destructuring
107  const [ workerEntry ] = this.free; // this.free is a SafeMap
108
109  if (ArrayIsArray(workerEntry)) {
110    const { 0: workerId, 1: worker } = workerEntry;
111    this.free.delete(workerId);
112    this.handoff(worker);
113  }
114};
115
116RoundRobinHandle.prototype.handoff = function(worker) {
117  if (!this.all.has(worker.id)) {
118    return;  // Worker is closing (or has closed) the server.
119  }
120
121  const handle = peek(this.handles);
122
123  if (handle === null) {
124    this.free.set(worker.id, worker);  // Add to ready queue again.
125    return;
126  }
127
128  remove(handle);
129
130  const message = { act: 'newconn', key: this.key };
131
132  sendHelper(worker.process, message, handle, (reply) => {
133    if (reply.accepted)
134      handle.close();
135    else
136      this.distribute(0, handle);  // Worker is shutting down. Send to another.
137
138    this.handoff(worker);
139  });
140};
141