1'use strict'; 2 3const { 4 ArrayIsArray, 5 Boolean, 6 ObjectCreate, 7 SafeMap, 8} = primordials; 9 10const assert = require('internal/assert'); 11const net = require('net'); 12const { sendHelper } = require('internal/cluster/utils'); 13const { append, init, isEmpty, peek, remove } = require('internal/linkedlist'); 14const { constants } = internalBinding('tcp_wrap'); 15 16module.exports = RoundRobinHandle; 17 18function RoundRobinHandle(key, address, { port, fd, flags, backlog, readableAll, writableAll }) { 19 this.key = key; 20 this.all = new SafeMap(); 21 this.free = new SafeMap(); 22 this.handles = init(ObjectCreate(null)); 23 this.handle = null; 24 this.server = net.createServer(assert.fail); 25 26 if (fd >= 0) 27 this.server.listen({ fd, backlog }); 28 else if (port >= 0) { 29 this.server.listen({ 30 port, 31 host: address, 32 // Currently, net module only supports `ipv6Only` option in `flags`. 33 ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), 34 backlog, 35 }); 36 } else 37 this.server.listen({ 38 path: address, 39 backlog, 40 readableAll, 41 writableAll, 42 }); // UNIX socket path. 43 this.server.once('listening', () => { 44 this.handle = this.server._handle; 45 this.handle.onconnection = (err, handle) => this.distribute(err, handle); 46 this.server._handle = null; 47 this.server = null; 48 }); 49} 50 51RoundRobinHandle.prototype.add = function(worker, send) { 52 assert(this.all.has(worker.id) === false); 53 this.all.set(worker.id, worker); 54 55 const done = () => { 56 if (this.handle.getsockname) { 57 const out = {}; 58 this.handle.getsockname(out); 59 // TODO(bnoordhuis) Check err. 60 send(null, { sockname: out }, null); 61 } else { 62 send(null, null, null); // UNIX socket. 63 } 64 65 this.handoff(worker); // In case there are connections pending. 66 }; 67 68 if (this.server === null) 69 return done(); 70 71 // Still busy binding. 72 this.server.once('listening', done); 73 this.server.once('error', (err) => { 74 send(err.errno, null); 75 }); 76}; 77 78RoundRobinHandle.prototype.remove = function(worker) { 79 const existed = this.all.delete(worker.id); 80 81 if (!existed) 82 return false; 83 84 this.free.delete(worker.id); 85 86 if (this.all.size !== 0) 87 return false; 88 89 while (!isEmpty(this.handles)) { 90 const handle = peek(this.handles); 91 handle.close(); 92 remove(handle); 93 } 94 95 this.handle.close(); 96 this.handle = null; 97 return true; 98}; 99 100RoundRobinHandle.prototype.distribute = function(err, handle) { 101 // If `accept` fails just skip it (handle is undefined) 102 if (err) { 103 return; 104 } 105 append(this.handles, handle); 106 // eslint-disable-next-line node-core/no-array-destructuring 107 const [ workerEntry ] = this.free; // this.free is a SafeMap 108 109 if (ArrayIsArray(workerEntry)) { 110 const { 0: workerId, 1: worker } = workerEntry; 111 this.free.delete(workerId); 112 this.handoff(worker); 113 } 114}; 115 116RoundRobinHandle.prototype.handoff = function(worker) { 117 if (!this.all.has(worker.id)) { 118 return; // Worker is closing (or has closed) the server. 119 } 120 121 const handle = peek(this.handles); 122 123 if (handle === null) { 124 this.free.set(worker.id, worker); // Add to ready queue again. 125 return; 126 } 127 128 remove(handle); 129 130 const message = { act: 'newconn', key: this.key }; 131 132 sendHelper(worker.process, message, handle, (reply) => { 133 if (reply.accepted) 134 handle.close(); 135 else 136 this.distribute(0, handle); // Worker is shutting down. Send to another. 137 138 this.handoff(worker); 139 }); 140}; 141