11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst { 41cb0ef41Sopenharmony_ci ArrayPrototypeJoin, 51cb0ef41Sopenharmony_ci FunctionPrototype, 61cb0ef41Sopenharmony_ci ObjectAssign, 71cb0ef41Sopenharmony_ci ReflectApply, 81cb0ef41Sopenharmony_ci SafeMap, 91cb0ef41Sopenharmony_ci SafeSet, 101cb0ef41Sopenharmony_ci} = primordials; 111cb0ef41Sopenharmony_ci 121cb0ef41Sopenharmony_ciconst assert = require('internal/assert'); 131cb0ef41Sopenharmony_ciconst path = require('path'); 141cb0ef41Sopenharmony_ciconst EventEmitter = require('events'); 151cb0ef41Sopenharmony_ciconst { owner_symbol } = require('internal/async_hooks').symbols; 161cb0ef41Sopenharmony_ciconst Worker = require('internal/cluster/worker'); 171cb0ef41Sopenharmony_ciconst { internal, sendHelper } = require('internal/cluster/utils'); 181cb0ef41Sopenharmony_ciconst { TIMEOUT_MAX } = require('internal/timers'); 191cb0ef41Sopenharmony_ciconst { setInterval, clearInterval } = require('timers'); 201cb0ef41Sopenharmony_ci 211cb0ef41Sopenharmony_ciconst cluster = new EventEmitter(); 221cb0ef41Sopenharmony_ciconst handles = new SafeMap(); 231cb0ef41Sopenharmony_ciconst indexes = new SafeMap(); 241cb0ef41Sopenharmony_ciconst noop = FunctionPrototype; 251cb0ef41Sopenharmony_ci 261cb0ef41Sopenharmony_cimodule.exports = cluster; 271cb0ef41Sopenharmony_ci 281cb0ef41Sopenharmony_cicluster.isWorker = true; 291cb0ef41Sopenharmony_cicluster.isMaster = false; // Deprecated alias. Must be same as isPrimary. 301cb0ef41Sopenharmony_cicluster.isPrimary = false; 311cb0ef41Sopenharmony_cicluster.worker = null; 321cb0ef41Sopenharmony_cicluster.Worker = Worker; 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_cicluster._setupWorker = function() { 351cb0ef41Sopenharmony_ci const worker = new Worker({ 361cb0ef41Sopenharmony_ci id: +process.env.NODE_UNIQUE_ID | 0, 371cb0ef41Sopenharmony_ci process: process, 381cb0ef41Sopenharmony_ci state: 'online', 391cb0ef41Sopenharmony_ci }); 401cb0ef41Sopenharmony_ci 411cb0ef41Sopenharmony_ci cluster.worker = worker; 421cb0ef41Sopenharmony_ci 431cb0ef41Sopenharmony_ci process.once('disconnect', () => { 441cb0ef41Sopenharmony_ci worker.emit('disconnect'); 451cb0ef41Sopenharmony_ci 461cb0ef41Sopenharmony_ci if (!worker.exitedAfterDisconnect) { 471cb0ef41Sopenharmony_ci // Unexpected disconnect, primary exited, or some such nastiness, so 481cb0ef41Sopenharmony_ci // worker exits immediately. 491cb0ef41Sopenharmony_ci process.exit(0); 501cb0ef41Sopenharmony_ci } 511cb0ef41Sopenharmony_ci }); 521cb0ef41Sopenharmony_ci 531cb0ef41Sopenharmony_ci process.on('internalMessage', internal(worker, onmessage)); 541cb0ef41Sopenharmony_ci send({ act: 'online' }); 551cb0ef41Sopenharmony_ci 561cb0ef41Sopenharmony_ci function onmessage(message, handle) { 571cb0ef41Sopenharmony_ci if (message.act === 'newconn') 581cb0ef41Sopenharmony_ci onconnection(message, handle); 591cb0ef41Sopenharmony_ci else if (message.act === 'disconnect') 601cb0ef41Sopenharmony_ci ReflectApply(_disconnect, worker, [true]); 611cb0ef41Sopenharmony_ci } 621cb0ef41Sopenharmony_ci}; 631cb0ef41Sopenharmony_ci 641cb0ef41Sopenharmony_ci// `obj` is a net#Server or a dgram#Socket object. 651cb0ef41Sopenharmony_cicluster._getServer = function(obj, options, cb) { 661cb0ef41Sopenharmony_ci let address = options.address; 671cb0ef41Sopenharmony_ci 681cb0ef41Sopenharmony_ci // Resolve unix socket paths to absolute paths 691cb0ef41Sopenharmony_ci if (options.port < 0 && typeof address === 'string' && 701cb0ef41Sopenharmony_ci process.platform !== 'win32') 711cb0ef41Sopenharmony_ci address = path.resolve(address); 721cb0ef41Sopenharmony_ci 731cb0ef41Sopenharmony_ci const indexesKey = ArrayPrototypeJoin( 741cb0ef41Sopenharmony_ci [ 751cb0ef41Sopenharmony_ci address, 761cb0ef41Sopenharmony_ci options.port, 771cb0ef41Sopenharmony_ci options.addressType, 781cb0ef41Sopenharmony_ci options.fd, 791cb0ef41Sopenharmony_ci ], ':'); 801cb0ef41Sopenharmony_ci 811cb0ef41Sopenharmony_ci let indexSet = indexes.get(indexesKey); 821cb0ef41Sopenharmony_ci 831cb0ef41Sopenharmony_ci if (indexSet === undefined) { 841cb0ef41Sopenharmony_ci indexSet = { nextIndex: 0, set: new SafeSet() }; 851cb0ef41Sopenharmony_ci indexes.set(indexesKey, indexSet); 861cb0ef41Sopenharmony_ci } 871cb0ef41Sopenharmony_ci const index = indexSet.nextIndex++; 881cb0ef41Sopenharmony_ci indexSet.set.add(index); 891cb0ef41Sopenharmony_ci 901cb0ef41Sopenharmony_ci const message = { 911cb0ef41Sopenharmony_ci act: 'queryServer', 921cb0ef41Sopenharmony_ci index, 931cb0ef41Sopenharmony_ci data: null, 941cb0ef41Sopenharmony_ci ...options, 951cb0ef41Sopenharmony_ci }; 961cb0ef41Sopenharmony_ci 971cb0ef41Sopenharmony_ci message.address = address; 981cb0ef41Sopenharmony_ci 991cb0ef41Sopenharmony_ci // Set custom data on handle (i.e. tls tickets key) 1001cb0ef41Sopenharmony_ci if (obj._getServerData) 1011cb0ef41Sopenharmony_ci message.data = obj._getServerData(); 1021cb0ef41Sopenharmony_ci 1031cb0ef41Sopenharmony_ci send(message, (reply, handle) => { 1041cb0ef41Sopenharmony_ci if (typeof obj._setServerData === 'function') 1051cb0ef41Sopenharmony_ci obj._setServerData(reply.data); 1061cb0ef41Sopenharmony_ci 1071cb0ef41Sopenharmony_ci if (handle) { 1081cb0ef41Sopenharmony_ci // Shared listen socket 1091cb0ef41Sopenharmony_ci shared(reply, { handle, indexesKey, index }, cb); 1101cb0ef41Sopenharmony_ci } else { 1111cb0ef41Sopenharmony_ci // Round-robin. 1121cb0ef41Sopenharmony_ci rr(reply, { indexesKey, index }, cb); 1131cb0ef41Sopenharmony_ci } 1141cb0ef41Sopenharmony_ci }); 1151cb0ef41Sopenharmony_ci 1161cb0ef41Sopenharmony_ci obj.once('listening', () => { 1171cb0ef41Sopenharmony_ci // short-lived sockets might have been closed 1181cb0ef41Sopenharmony_ci if (!indexes.has(indexesKey)) { 1191cb0ef41Sopenharmony_ci return; 1201cb0ef41Sopenharmony_ci } 1211cb0ef41Sopenharmony_ci cluster.worker.state = 'listening'; 1221cb0ef41Sopenharmony_ci const address = obj.address(); 1231cb0ef41Sopenharmony_ci message.act = 'listening'; 1241cb0ef41Sopenharmony_ci message.port = (address && address.port) || options.port; 1251cb0ef41Sopenharmony_ci send(message); 1261cb0ef41Sopenharmony_ci }); 1271cb0ef41Sopenharmony_ci}; 1281cb0ef41Sopenharmony_ci 1291cb0ef41Sopenharmony_cifunction removeIndexesKey(indexesKey, index) { 1301cb0ef41Sopenharmony_ci const indexSet = indexes.get(indexesKey); 1311cb0ef41Sopenharmony_ci if (!indexSet) { 1321cb0ef41Sopenharmony_ci return; 1331cb0ef41Sopenharmony_ci } 1341cb0ef41Sopenharmony_ci 1351cb0ef41Sopenharmony_ci indexSet.set.delete(index); 1361cb0ef41Sopenharmony_ci if (indexSet.set.size === 0) { 1371cb0ef41Sopenharmony_ci indexes.delete(indexesKey); 1381cb0ef41Sopenharmony_ci } 1391cb0ef41Sopenharmony_ci} 1401cb0ef41Sopenharmony_ci 1411cb0ef41Sopenharmony_ci// Shared listen socket. 1421cb0ef41Sopenharmony_cifunction shared(message, { handle, indexesKey, index }, cb) { 1431cb0ef41Sopenharmony_ci const key = message.key; 1441cb0ef41Sopenharmony_ci // Monkey-patch the close() method so we can keep track of when it's 1451cb0ef41Sopenharmony_ci // closed. Avoids resource leaks when the handle is short-lived. 1461cb0ef41Sopenharmony_ci const close = handle.close; 1471cb0ef41Sopenharmony_ci 1481cb0ef41Sopenharmony_ci handle.close = function() { 1491cb0ef41Sopenharmony_ci send({ act: 'close', key }); 1501cb0ef41Sopenharmony_ci handles.delete(key); 1511cb0ef41Sopenharmony_ci removeIndexesKey(indexesKey, index); 1521cb0ef41Sopenharmony_ci return ReflectApply(close, handle, arguments); 1531cb0ef41Sopenharmony_ci }; 1541cb0ef41Sopenharmony_ci assert(handles.has(key) === false); 1551cb0ef41Sopenharmony_ci handles.set(key, handle); 1561cb0ef41Sopenharmony_ci cb(message.errno, handle); 1571cb0ef41Sopenharmony_ci} 1581cb0ef41Sopenharmony_ci 1591cb0ef41Sopenharmony_ci// Round-robin. Master distributes handles across workers. 1601cb0ef41Sopenharmony_cifunction rr(message, { indexesKey, index }, cb) { 1611cb0ef41Sopenharmony_ci if (message.errno) 1621cb0ef41Sopenharmony_ci return cb(message.errno, null); 1631cb0ef41Sopenharmony_ci 1641cb0ef41Sopenharmony_ci let key = message.key; 1651cb0ef41Sopenharmony_ci 1661cb0ef41Sopenharmony_ci let fakeHandle = null; 1671cb0ef41Sopenharmony_ci 1681cb0ef41Sopenharmony_ci function ref() { 1691cb0ef41Sopenharmony_ci if (!fakeHandle) { 1701cb0ef41Sopenharmony_ci fakeHandle = setInterval(noop, TIMEOUT_MAX); 1711cb0ef41Sopenharmony_ci } 1721cb0ef41Sopenharmony_ci } 1731cb0ef41Sopenharmony_ci 1741cb0ef41Sopenharmony_ci function unref() { 1751cb0ef41Sopenharmony_ci if (fakeHandle) { 1761cb0ef41Sopenharmony_ci clearInterval(fakeHandle); 1771cb0ef41Sopenharmony_ci fakeHandle = null; 1781cb0ef41Sopenharmony_ci } 1791cb0ef41Sopenharmony_ci } 1801cb0ef41Sopenharmony_ci 1811cb0ef41Sopenharmony_ci function listen(backlog) { 1821cb0ef41Sopenharmony_ci // TODO(bnoordhuis) Send a message to the primary that tells it to 1831cb0ef41Sopenharmony_ci // update the backlog size. The actual backlog should probably be 1841cb0ef41Sopenharmony_ci // the largest requested size by any worker. 1851cb0ef41Sopenharmony_ci return 0; 1861cb0ef41Sopenharmony_ci } 1871cb0ef41Sopenharmony_ci 1881cb0ef41Sopenharmony_ci function close() { 1891cb0ef41Sopenharmony_ci // lib/net.js treats server._handle.close() as effectively synchronous. 1901cb0ef41Sopenharmony_ci // That means there is a time window between the call to close() and 1911cb0ef41Sopenharmony_ci // the ack by the primary process in which we can still receive handles. 1921cb0ef41Sopenharmony_ci // onconnection() below handles that by sending those handles back to 1931cb0ef41Sopenharmony_ci // the primary. 1941cb0ef41Sopenharmony_ci if (key === undefined) 1951cb0ef41Sopenharmony_ci return; 1961cb0ef41Sopenharmony_ci unref(); 1971cb0ef41Sopenharmony_ci // If the handle is the last handle in process, 1981cb0ef41Sopenharmony_ci // the parent process will delete the handle when worker process exits. 1991cb0ef41Sopenharmony_ci // So it is ok if the close message get lost. 2001cb0ef41Sopenharmony_ci // See the comments of https://github.com/nodejs/node/pull/46161 2011cb0ef41Sopenharmony_ci send({ act: 'close', key }); 2021cb0ef41Sopenharmony_ci handles.delete(key); 2031cb0ef41Sopenharmony_ci removeIndexesKey(indexesKey, index); 2041cb0ef41Sopenharmony_ci key = undefined; 2051cb0ef41Sopenharmony_ci } 2061cb0ef41Sopenharmony_ci 2071cb0ef41Sopenharmony_ci function getsockname(out) { 2081cb0ef41Sopenharmony_ci if (key) 2091cb0ef41Sopenharmony_ci ObjectAssign(out, message.sockname); 2101cb0ef41Sopenharmony_ci 2111cb0ef41Sopenharmony_ci return 0; 2121cb0ef41Sopenharmony_ci } 2131cb0ef41Sopenharmony_ci 2141cb0ef41Sopenharmony_ci // Faux handle. net.Server is not associated with handle, 2151cb0ef41Sopenharmony_ci // so we control its state(ref or unref) by setInterval. 2161cb0ef41Sopenharmony_ci const handle = { close, listen, ref, unref }; 2171cb0ef41Sopenharmony_ci handle.ref(); 2181cb0ef41Sopenharmony_ci if (message.sockname) { 2191cb0ef41Sopenharmony_ci handle.getsockname = getsockname; // TCP handles only. 2201cb0ef41Sopenharmony_ci } 2211cb0ef41Sopenharmony_ci 2221cb0ef41Sopenharmony_ci assert(handles.has(key) === false); 2231cb0ef41Sopenharmony_ci handles.set(key, handle); 2241cb0ef41Sopenharmony_ci cb(0, handle); 2251cb0ef41Sopenharmony_ci} 2261cb0ef41Sopenharmony_ci 2271cb0ef41Sopenharmony_ci// Round-robin connection. 2281cb0ef41Sopenharmony_cifunction onconnection(message, handle) { 2291cb0ef41Sopenharmony_ci const key = message.key; 2301cb0ef41Sopenharmony_ci const server = handles.get(key); 2311cb0ef41Sopenharmony_ci let accepted = server !== undefined; 2321cb0ef41Sopenharmony_ci 2331cb0ef41Sopenharmony_ci if (accepted && server[owner_symbol]) { 2341cb0ef41Sopenharmony_ci const self = server[owner_symbol]; 2351cb0ef41Sopenharmony_ci if (self.maxConnections && self._connections >= self.maxConnections) { 2361cb0ef41Sopenharmony_ci accepted = false; 2371cb0ef41Sopenharmony_ci } 2381cb0ef41Sopenharmony_ci } 2391cb0ef41Sopenharmony_ci 2401cb0ef41Sopenharmony_ci send({ ack: message.seq, accepted }); 2411cb0ef41Sopenharmony_ci 2421cb0ef41Sopenharmony_ci if (accepted) 2431cb0ef41Sopenharmony_ci server.onconnection(0, handle); 2441cb0ef41Sopenharmony_ci else 2451cb0ef41Sopenharmony_ci handle.close(); 2461cb0ef41Sopenharmony_ci} 2471cb0ef41Sopenharmony_ci 2481cb0ef41Sopenharmony_cifunction send(message, cb) { 2491cb0ef41Sopenharmony_ci return sendHelper(process, message, null, cb); 2501cb0ef41Sopenharmony_ci} 2511cb0ef41Sopenharmony_ci 2521cb0ef41Sopenharmony_cifunction _disconnect(primaryInitiated) { 2531cb0ef41Sopenharmony_ci this.exitedAfterDisconnect = true; 2541cb0ef41Sopenharmony_ci let waitingCount = 1; 2551cb0ef41Sopenharmony_ci 2561cb0ef41Sopenharmony_ci function checkWaitingCount() { 2571cb0ef41Sopenharmony_ci waitingCount--; 2581cb0ef41Sopenharmony_ci 2591cb0ef41Sopenharmony_ci if (waitingCount === 0) { 2601cb0ef41Sopenharmony_ci // If disconnect is worker initiated, wait for ack to be sure 2611cb0ef41Sopenharmony_ci // exitedAfterDisconnect is properly set in the primary, otherwise, if 2621cb0ef41Sopenharmony_ci // it's primary initiated there's no need to send the 2631cb0ef41Sopenharmony_ci // exitedAfterDisconnect message 2641cb0ef41Sopenharmony_ci if (primaryInitiated) { 2651cb0ef41Sopenharmony_ci process.disconnect(); 2661cb0ef41Sopenharmony_ci } else { 2671cb0ef41Sopenharmony_ci send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 2681cb0ef41Sopenharmony_ci } 2691cb0ef41Sopenharmony_ci } 2701cb0ef41Sopenharmony_ci } 2711cb0ef41Sopenharmony_ci 2721cb0ef41Sopenharmony_ci handles.forEach((handle) => { 2731cb0ef41Sopenharmony_ci waitingCount++; 2741cb0ef41Sopenharmony_ci 2751cb0ef41Sopenharmony_ci if (handle[owner_symbol]) 2761cb0ef41Sopenharmony_ci handle[owner_symbol].close(checkWaitingCount); 2771cb0ef41Sopenharmony_ci else 2781cb0ef41Sopenharmony_ci handle.close(checkWaitingCount); 2791cb0ef41Sopenharmony_ci }); 2801cb0ef41Sopenharmony_ci 2811cb0ef41Sopenharmony_ci handles.clear(); 2821cb0ef41Sopenharmony_ci checkWaitingCount(); 2831cb0ef41Sopenharmony_ci} 2841cb0ef41Sopenharmony_ci 2851cb0ef41Sopenharmony_ci// Extend generic Worker with methods specific to worker processes. 2861cb0ef41Sopenharmony_ciWorker.prototype.disconnect = function() { 2871cb0ef41Sopenharmony_ci if (this.state !== 'disconnecting' && this.state !== 'destroying') { 2881cb0ef41Sopenharmony_ci this.state = 'disconnecting'; 2891cb0ef41Sopenharmony_ci ReflectApply(_disconnect, this, []); 2901cb0ef41Sopenharmony_ci } 2911cb0ef41Sopenharmony_ci 2921cb0ef41Sopenharmony_ci return this; 2931cb0ef41Sopenharmony_ci}; 2941cb0ef41Sopenharmony_ci 2951cb0ef41Sopenharmony_ciWorker.prototype.destroy = function() { 2961cb0ef41Sopenharmony_ci if (this.state === 'destroying') 2971cb0ef41Sopenharmony_ci return; 2981cb0ef41Sopenharmony_ci 2991cb0ef41Sopenharmony_ci this.exitedAfterDisconnect = true; 3001cb0ef41Sopenharmony_ci if (!this.isConnected()) { 3011cb0ef41Sopenharmony_ci process.exit(0); 3021cb0ef41Sopenharmony_ci } else { 3031cb0ef41Sopenharmony_ci this.state = 'destroying'; 3041cb0ef41Sopenharmony_ci send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 3051cb0ef41Sopenharmony_ci process.once('disconnect', () => process.exit(0)); 3061cb0ef41Sopenharmony_ci } 3071cb0ef41Sopenharmony_ci}; 308