11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst {
41cb0ef41Sopenharmony_ci  ArrayPrototypeJoin,
51cb0ef41Sopenharmony_ci  FunctionPrototype,
61cb0ef41Sopenharmony_ci  ObjectAssign,
71cb0ef41Sopenharmony_ci  ReflectApply,
81cb0ef41Sopenharmony_ci  SafeMap,
91cb0ef41Sopenharmony_ci  SafeSet,
101cb0ef41Sopenharmony_ci} = primordials;
111cb0ef41Sopenharmony_ci
121cb0ef41Sopenharmony_ciconst assert = require('internal/assert');
131cb0ef41Sopenharmony_ciconst path = require('path');
141cb0ef41Sopenharmony_ciconst EventEmitter = require('events');
151cb0ef41Sopenharmony_ciconst { owner_symbol } = require('internal/async_hooks').symbols;
161cb0ef41Sopenharmony_ciconst Worker = require('internal/cluster/worker');
171cb0ef41Sopenharmony_ciconst { internal, sendHelper } = require('internal/cluster/utils');
181cb0ef41Sopenharmony_ciconst { TIMEOUT_MAX } = require('internal/timers');
191cb0ef41Sopenharmony_ciconst { setInterval, clearInterval } = require('timers');
201cb0ef41Sopenharmony_ci
211cb0ef41Sopenharmony_ciconst cluster = new EventEmitter();
221cb0ef41Sopenharmony_ciconst handles = new SafeMap();
231cb0ef41Sopenharmony_ciconst indexes = new SafeMap();
241cb0ef41Sopenharmony_ciconst noop = FunctionPrototype;
251cb0ef41Sopenharmony_ci
261cb0ef41Sopenharmony_cimodule.exports = cluster;
271cb0ef41Sopenharmony_ci
281cb0ef41Sopenharmony_cicluster.isWorker = true;
291cb0ef41Sopenharmony_cicluster.isMaster = false; // Deprecated alias. Must be same as isPrimary.
301cb0ef41Sopenharmony_cicluster.isPrimary = false;
311cb0ef41Sopenharmony_cicluster.worker = null;
321cb0ef41Sopenharmony_cicluster.Worker = Worker;
331cb0ef41Sopenharmony_ci
341cb0ef41Sopenharmony_cicluster._setupWorker = function() {
351cb0ef41Sopenharmony_ci  const worker = new Worker({
361cb0ef41Sopenharmony_ci    id: +process.env.NODE_UNIQUE_ID | 0,
371cb0ef41Sopenharmony_ci    process: process,
381cb0ef41Sopenharmony_ci    state: 'online',
391cb0ef41Sopenharmony_ci  });
401cb0ef41Sopenharmony_ci
411cb0ef41Sopenharmony_ci  cluster.worker = worker;
421cb0ef41Sopenharmony_ci
431cb0ef41Sopenharmony_ci  process.once('disconnect', () => {
441cb0ef41Sopenharmony_ci    worker.emit('disconnect');
451cb0ef41Sopenharmony_ci
461cb0ef41Sopenharmony_ci    if (!worker.exitedAfterDisconnect) {
471cb0ef41Sopenharmony_ci      // Unexpected disconnect, primary exited, or some such nastiness, so
481cb0ef41Sopenharmony_ci      // worker exits immediately.
491cb0ef41Sopenharmony_ci      process.exit(0);
501cb0ef41Sopenharmony_ci    }
511cb0ef41Sopenharmony_ci  });
521cb0ef41Sopenharmony_ci
531cb0ef41Sopenharmony_ci  process.on('internalMessage', internal(worker, onmessage));
541cb0ef41Sopenharmony_ci  send({ act: 'online' });
551cb0ef41Sopenharmony_ci
561cb0ef41Sopenharmony_ci  function onmessage(message, handle) {
571cb0ef41Sopenharmony_ci    if (message.act === 'newconn')
581cb0ef41Sopenharmony_ci      onconnection(message, handle);
591cb0ef41Sopenharmony_ci    else if (message.act === 'disconnect')
601cb0ef41Sopenharmony_ci      ReflectApply(_disconnect, worker, [true]);
611cb0ef41Sopenharmony_ci  }
621cb0ef41Sopenharmony_ci};
631cb0ef41Sopenharmony_ci
641cb0ef41Sopenharmony_ci// `obj` is a net#Server or a dgram#Socket object.
651cb0ef41Sopenharmony_cicluster._getServer = function(obj, options, cb) {
661cb0ef41Sopenharmony_ci  let address = options.address;
671cb0ef41Sopenharmony_ci
681cb0ef41Sopenharmony_ci  // Resolve unix socket paths to absolute paths
691cb0ef41Sopenharmony_ci  if (options.port < 0 && typeof address === 'string' &&
701cb0ef41Sopenharmony_ci      process.platform !== 'win32')
711cb0ef41Sopenharmony_ci    address = path.resolve(address);
721cb0ef41Sopenharmony_ci
731cb0ef41Sopenharmony_ci  const indexesKey = ArrayPrototypeJoin(
741cb0ef41Sopenharmony_ci    [
751cb0ef41Sopenharmony_ci      address,
761cb0ef41Sopenharmony_ci      options.port,
771cb0ef41Sopenharmony_ci      options.addressType,
781cb0ef41Sopenharmony_ci      options.fd,
791cb0ef41Sopenharmony_ci    ], ':');
801cb0ef41Sopenharmony_ci
811cb0ef41Sopenharmony_ci  let indexSet = indexes.get(indexesKey);
821cb0ef41Sopenharmony_ci
831cb0ef41Sopenharmony_ci  if (indexSet === undefined) {
841cb0ef41Sopenharmony_ci    indexSet = { nextIndex: 0, set: new SafeSet() };
851cb0ef41Sopenharmony_ci    indexes.set(indexesKey, indexSet);
861cb0ef41Sopenharmony_ci  }
871cb0ef41Sopenharmony_ci  const index = indexSet.nextIndex++;
881cb0ef41Sopenharmony_ci  indexSet.set.add(index);
891cb0ef41Sopenharmony_ci
901cb0ef41Sopenharmony_ci  const message = {
911cb0ef41Sopenharmony_ci    act: 'queryServer',
921cb0ef41Sopenharmony_ci    index,
931cb0ef41Sopenharmony_ci    data: null,
941cb0ef41Sopenharmony_ci    ...options,
951cb0ef41Sopenharmony_ci  };
961cb0ef41Sopenharmony_ci
971cb0ef41Sopenharmony_ci  message.address = address;
981cb0ef41Sopenharmony_ci
991cb0ef41Sopenharmony_ci  // Set custom data on handle (i.e. tls tickets key)
1001cb0ef41Sopenharmony_ci  if (obj._getServerData)
1011cb0ef41Sopenharmony_ci    message.data = obj._getServerData();
1021cb0ef41Sopenharmony_ci
1031cb0ef41Sopenharmony_ci  send(message, (reply, handle) => {
1041cb0ef41Sopenharmony_ci    if (typeof obj._setServerData === 'function')
1051cb0ef41Sopenharmony_ci      obj._setServerData(reply.data);
1061cb0ef41Sopenharmony_ci
1071cb0ef41Sopenharmony_ci    if (handle) {
1081cb0ef41Sopenharmony_ci      // Shared listen socket
1091cb0ef41Sopenharmony_ci      shared(reply, { handle, indexesKey, index }, cb);
1101cb0ef41Sopenharmony_ci    } else {
1111cb0ef41Sopenharmony_ci      // Round-robin.
1121cb0ef41Sopenharmony_ci      rr(reply, { indexesKey, index }, cb);
1131cb0ef41Sopenharmony_ci    }
1141cb0ef41Sopenharmony_ci  });
1151cb0ef41Sopenharmony_ci
1161cb0ef41Sopenharmony_ci  obj.once('listening', () => {
1171cb0ef41Sopenharmony_ci    // short-lived sockets might have been closed
1181cb0ef41Sopenharmony_ci    if (!indexes.has(indexesKey)) {
1191cb0ef41Sopenharmony_ci      return;
1201cb0ef41Sopenharmony_ci    }
1211cb0ef41Sopenharmony_ci    cluster.worker.state = 'listening';
1221cb0ef41Sopenharmony_ci    const address = obj.address();
1231cb0ef41Sopenharmony_ci    message.act = 'listening';
1241cb0ef41Sopenharmony_ci    message.port = (address && address.port) || options.port;
1251cb0ef41Sopenharmony_ci    send(message);
1261cb0ef41Sopenharmony_ci  });
1271cb0ef41Sopenharmony_ci};
1281cb0ef41Sopenharmony_ci
1291cb0ef41Sopenharmony_cifunction removeIndexesKey(indexesKey, index) {
1301cb0ef41Sopenharmony_ci  const indexSet = indexes.get(indexesKey);
1311cb0ef41Sopenharmony_ci  if (!indexSet) {
1321cb0ef41Sopenharmony_ci    return;
1331cb0ef41Sopenharmony_ci  }
1341cb0ef41Sopenharmony_ci
1351cb0ef41Sopenharmony_ci  indexSet.set.delete(index);
1361cb0ef41Sopenharmony_ci  if (indexSet.set.size === 0) {
1371cb0ef41Sopenharmony_ci    indexes.delete(indexesKey);
1381cb0ef41Sopenharmony_ci  }
1391cb0ef41Sopenharmony_ci}
1401cb0ef41Sopenharmony_ci
1411cb0ef41Sopenharmony_ci// Shared listen socket.
1421cb0ef41Sopenharmony_cifunction shared(message, { handle, indexesKey, index }, cb) {
1431cb0ef41Sopenharmony_ci  const key = message.key;
1441cb0ef41Sopenharmony_ci  // Monkey-patch the close() method so we can keep track of when it's
1451cb0ef41Sopenharmony_ci  // closed. Avoids resource leaks when the handle is short-lived.
1461cb0ef41Sopenharmony_ci  const close = handle.close;
1471cb0ef41Sopenharmony_ci
1481cb0ef41Sopenharmony_ci  handle.close = function() {
1491cb0ef41Sopenharmony_ci    send({ act: 'close', key });
1501cb0ef41Sopenharmony_ci    handles.delete(key);
1511cb0ef41Sopenharmony_ci    removeIndexesKey(indexesKey, index);
1521cb0ef41Sopenharmony_ci    return ReflectApply(close, handle, arguments);
1531cb0ef41Sopenharmony_ci  };
1541cb0ef41Sopenharmony_ci  assert(handles.has(key) === false);
1551cb0ef41Sopenharmony_ci  handles.set(key, handle);
1561cb0ef41Sopenharmony_ci  cb(message.errno, handle);
1571cb0ef41Sopenharmony_ci}
1581cb0ef41Sopenharmony_ci
1591cb0ef41Sopenharmony_ci// Round-robin. Master distributes handles across workers.
1601cb0ef41Sopenharmony_cifunction rr(message, { indexesKey, index }, cb) {
1611cb0ef41Sopenharmony_ci  if (message.errno)
1621cb0ef41Sopenharmony_ci    return cb(message.errno, null);
1631cb0ef41Sopenharmony_ci
1641cb0ef41Sopenharmony_ci  let key = message.key;
1651cb0ef41Sopenharmony_ci
1661cb0ef41Sopenharmony_ci  let fakeHandle = null;
1671cb0ef41Sopenharmony_ci
1681cb0ef41Sopenharmony_ci  function ref() {
1691cb0ef41Sopenharmony_ci    if (!fakeHandle) {
1701cb0ef41Sopenharmony_ci      fakeHandle = setInterval(noop, TIMEOUT_MAX);
1711cb0ef41Sopenharmony_ci    }
1721cb0ef41Sopenharmony_ci  }
1731cb0ef41Sopenharmony_ci
1741cb0ef41Sopenharmony_ci  function unref() {
1751cb0ef41Sopenharmony_ci    if (fakeHandle) {
1761cb0ef41Sopenharmony_ci      clearInterval(fakeHandle);
1771cb0ef41Sopenharmony_ci      fakeHandle = null;
1781cb0ef41Sopenharmony_ci    }
1791cb0ef41Sopenharmony_ci  }
1801cb0ef41Sopenharmony_ci
1811cb0ef41Sopenharmony_ci  function listen(backlog) {
1821cb0ef41Sopenharmony_ci    // TODO(bnoordhuis) Send a message to the primary that tells it to
1831cb0ef41Sopenharmony_ci    // update the backlog size. The actual backlog should probably be
1841cb0ef41Sopenharmony_ci    // the largest requested size by any worker.
1851cb0ef41Sopenharmony_ci    return 0;
1861cb0ef41Sopenharmony_ci  }
1871cb0ef41Sopenharmony_ci
1881cb0ef41Sopenharmony_ci  function close() {
1891cb0ef41Sopenharmony_ci    // lib/net.js treats server._handle.close() as effectively synchronous.
1901cb0ef41Sopenharmony_ci    // That means there is a time window between the call to close() and
1911cb0ef41Sopenharmony_ci    // the ack by the primary process in which we can still receive handles.
1921cb0ef41Sopenharmony_ci    // onconnection() below handles that by sending those handles back to
1931cb0ef41Sopenharmony_ci    // the primary.
1941cb0ef41Sopenharmony_ci    if (key === undefined)
1951cb0ef41Sopenharmony_ci      return;
1961cb0ef41Sopenharmony_ci    unref();
1971cb0ef41Sopenharmony_ci    // If the handle is the last handle in process,
1981cb0ef41Sopenharmony_ci    // the parent process will delete the handle when worker process exits.
1991cb0ef41Sopenharmony_ci    // So it is ok if the close message get lost.
2001cb0ef41Sopenharmony_ci    // See the comments of https://github.com/nodejs/node/pull/46161
2011cb0ef41Sopenharmony_ci    send({ act: 'close', key });
2021cb0ef41Sopenharmony_ci    handles.delete(key);
2031cb0ef41Sopenharmony_ci    removeIndexesKey(indexesKey, index);
2041cb0ef41Sopenharmony_ci    key = undefined;
2051cb0ef41Sopenharmony_ci  }
2061cb0ef41Sopenharmony_ci
2071cb0ef41Sopenharmony_ci  function getsockname(out) {
2081cb0ef41Sopenharmony_ci    if (key)
2091cb0ef41Sopenharmony_ci      ObjectAssign(out, message.sockname);
2101cb0ef41Sopenharmony_ci
2111cb0ef41Sopenharmony_ci    return 0;
2121cb0ef41Sopenharmony_ci  }
2131cb0ef41Sopenharmony_ci
2141cb0ef41Sopenharmony_ci  // Faux handle. net.Server is not associated with handle,
2151cb0ef41Sopenharmony_ci  // so we control its state(ref or unref) by setInterval.
2161cb0ef41Sopenharmony_ci  const handle = { close, listen, ref, unref };
2171cb0ef41Sopenharmony_ci  handle.ref();
2181cb0ef41Sopenharmony_ci  if (message.sockname) {
2191cb0ef41Sopenharmony_ci    handle.getsockname = getsockname;  // TCP handles only.
2201cb0ef41Sopenharmony_ci  }
2211cb0ef41Sopenharmony_ci
2221cb0ef41Sopenharmony_ci  assert(handles.has(key) === false);
2231cb0ef41Sopenharmony_ci  handles.set(key, handle);
2241cb0ef41Sopenharmony_ci  cb(0, handle);
2251cb0ef41Sopenharmony_ci}
2261cb0ef41Sopenharmony_ci
2271cb0ef41Sopenharmony_ci// Round-robin connection.
2281cb0ef41Sopenharmony_cifunction onconnection(message, handle) {
2291cb0ef41Sopenharmony_ci  const key = message.key;
2301cb0ef41Sopenharmony_ci  const server = handles.get(key);
2311cb0ef41Sopenharmony_ci  let accepted = server !== undefined;
2321cb0ef41Sopenharmony_ci
2331cb0ef41Sopenharmony_ci  if (accepted && server[owner_symbol]) {
2341cb0ef41Sopenharmony_ci    const self = server[owner_symbol];
2351cb0ef41Sopenharmony_ci    if (self.maxConnections && self._connections >= self.maxConnections) {
2361cb0ef41Sopenharmony_ci      accepted = false;
2371cb0ef41Sopenharmony_ci    }
2381cb0ef41Sopenharmony_ci  }
2391cb0ef41Sopenharmony_ci
2401cb0ef41Sopenharmony_ci  send({ ack: message.seq, accepted });
2411cb0ef41Sopenharmony_ci
2421cb0ef41Sopenharmony_ci  if (accepted)
2431cb0ef41Sopenharmony_ci    server.onconnection(0, handle);
2441cb0ef41Sopenharmony_ci  else
2451cb0ef41Sopenharmony_ci    handle.close();
2461cb0ef41Sopenharmony_ci}
2471cb0ef41Sopenharmony_ci
2481cb0ef41Sopenharmony_cifunction send(message, cb) {
2491cb0ef41Sopenharmony_ci  return sendHelper(process, message, null, cb);
2501cb0ef41Sopenharmony_ci}
2511cb0ef41Sopenharmony_ci
2521cb0ef41Sopenharmony_cifunction _disconnect(primaryInitiated) {
2531cb0ef41Sopenharmony_ci  this.exitedAfterDisconnect = true;
2541cb0ef41Sopenharmony_ci  let waitingCount = 1;
2551cb0ef41Sopenharmony_ci
2561cb0ef41Sopenharmony_ci  function checkWaitingCount() {
2571cb0ef41Sopenharmony_ci    waitingCount--;
2581cb0ef41Sopenharmony_ci
2591cb0ef41Sopenharmony_ci    if (waitingCount === 0) {
2601cb0ef41Sopenharmony_ci      // If disconnect is worker initiated, wait for ack to be sure
2611cb0ef41Sopenharmony_ci      // exitedAfterDisconnect is properly set in the primary, otherwise, if
2621cb0ef41Sopenharmony_ci      // it's primary initiated there's no need to send the
2631cb0ef41Sopenharmony_ci      // exitedAfterDisconnect message
2641cb0ef41Sopenharmony_ci      if (primaryInitiated) {
2651cb0ef41Sopenharmony_ci        process.disconnect();
2661cb0ef41Sopenharmony_ci      } else {
2671cb0ef41Sopenharmony_ci        send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
2681cb0ef41Sopenharmony_ci      }
2691cb0ef41Sopenharmony_ci    }
2701cb0ef41Sopenharmony_ci  }
2711cb0ef41Sopenharmony_ci
2721cb0ef41Sopenharmony_ci  handles.forEach((handle) => {
2731cb0ef41Sopenharmony_ci    waitingCount++;
2741cb0ef41Sopenharmony_ci
2751cb0ef41Sopenharmony_ci    if (handle[owner_symbol])
2761cb0ef41Sopenharmony_ci      handle[owner_symbol].close(checkWaitingCount);
2771cb0ef41Sopenharmony_ci    else
2781cb0ef41Sopenharmony_ci      handle.close(checkWaitingCount);
2791cb0ef41Sopenharmony_ci  });
2801cb0ef41Sopenharmony_ci
2811cb0ef41Sopenharmony_ci  handles.clear();
2821cb0ef41Sopenharmony_ci  checkWaitingCount();
2831cb0ef41Sopenharmony_ci}
2841cb0ef41Sopenharmony_ci
2851cb0ef41Sopenharmony_ci// Extend generic Worker with methods specific to worker processes.
2861cb0ef41Sopenharmony_ciWorker.prototype.disconnect = function() {
2871cb0ef41Sopenharmony_ci  if (this.state !== 'disconnecting' && this.state !== 'destroying') {
2881cb0ef41Sopenharmony_ci    this.state = 'disconnecting';
2891cb0ef41Sopenharmony_ci    ReflectApply(_disconnect, this, []);
2901cb0ef41Sopenharmony_ci  }
2911cb0ef41Sopenharmony_ci
2921cb0ef41Sopenharmony_ci  return this;
2931cb0ef41Sopenharmony_ci};
2941cb0ef41Sopenharmony_ci
2951cb0ef41Sopenharmony_ciWorker.prototype.destroy = function() {
2961cb0ef41Sopenharmony_ci  if (this.state === 'destroying')
2971cb0ef41Sopenharmony_ci    return;
2981cb0ef41Sopenharmony_ci
2991cb0ef41Sopenharmony_ci  this.exitedAfterDisconnect = true;
3001cb0ef41Sopenharmony_ci  if (!this.isConnected()) {
3011cb0ef41Sopenharmony_ci    process.exit(0);
3021cb0ef41Sopenharmony_ci  } else {
3031cb0ef41Sopenharmony_ci    this.state = 'destroying';
3041cb0ef41Sopenharmony_ci    send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
3051cb0ef41Sopenharmony_ci    process.once('disconnect', () => process.exit(0));
3061cb0ef41Sopenharmony_ci  }
3071cb0ef41Sopenharmony_ci};
308