11cb0ef41Sopenharmony_ci'use strict';
21cb0ef41Sopenharmony_ci
31cb0ef41Sopenharmony_ciconst {
41cb0ef41Sopenharmony_ci  DataViewPrototypeGetBuffer,
51cb0ef41Sopenharmony_ci  Int32Array,
61cb0ef41Sopenharmony_ci  PromisePrototypeThen,
71cb0ef41Sopenharmony_ci  ReflectApply,
81cb0ef41Sopenharmony_ci  SafeSet,
91cb0ef41Sopenharmony_ci  TypedArrayPrototypeGetBuffer,
101cb0ef41Sopenharmony_ci  globalThis: {
111cb0ef41Sopenharmony_ci    Atomics: {
121cb0ef41Sopenharmony_ci      add: AtomicsAdd,
131cb0ef41Sopenharmony_ci      notify: AtomicsNotify,
141cb0ef41Sopenharmony_ci    },
151cb0ef41Sopenharmony_ci  },
161cb0ef41Sopenharmony_ci} = primordials;
171cb0ef41Sopenharmony_ciconst assert = require('internal/assert');
181cb0ef41Sopenharmony_ciconst { clearImmediate, setImmediate } = require('timers');
191cb0ef41Sopenharmony_ciconst {
201cb0ef41Sopenharmony_ci  hasUncaughtExceptionCaptureCallback,
211cb0ef41Sopenharmony_ci} = require('internal/process/execution');
221cb0ef41Sopenharmony_ciconst {
231cb0ef41Sopenharmony_ci  isArrayBuffer,
241cb0ef41Sopenharmony_ci  isDataView,
251cb0ef41Sopenharmony_ci  isTypedArray,
261cb0ef41Sopenharmony_ci} = require('util/types');
271cb0ef41Sopenharmony_ci
281cb0ef41Sopenharmony_ciconst { receiveMessageOnPort } = require('internal/worker/io');
291cb0ef41Sopenharmony_ciconst {
301cb0ef41Sopenharmony_ci  WORKER_TO_MAIN_THREAD_NOTIFICATION,
311cb0ef41Sopenharmony_ci} = require('internal/modules/esm/shared_constants');
321cb0ef41Sopenharmony_ciconst { initializeHooks } = require('internal/modules/esm/utils');
331cb0ef41Sopenharmony_ci
341cb0ef41Sopenharmony_ci
351cb0ef41Sopenharmony_ci/**
361cb0ef41Sopenharmony_ci * Transfers an ArrayBuffer, TypedArray, or DataView to a worker thread.
371cb0ef41Sopenharmony_ci * @param {boolean} hasError - Whether an error occurred during transfer.
381cb0ef41Sopenharmony_ci * @param {ArrayBuffer | TypedArray | DataView} source - The data to transfer.
391cb0ef41Sopenharmony_ci */
401cb0ef41Sopenharmony_cifunction transferArrayBuffer(hasError, source) {
411cb0ef41Sopenharmony_ci  if (hasError || source == null) { return; }
421cb0ef41Sopenharmony_ci  if (isArrayBuffer(source)) { return [source]; }
431cb0ef41Sopenharmony_ci  if (isTypedArray(source)) { return [TypedArrayPrototypeGetBuffer(source)]; }
441cb0ef41Sopenharmony_ci  if (isDataView(source)) { return [DataViewPrototypeGetBuffer(source)]; }
451cb0ef41Sopenharmony_ci}
461cb0ef41Sopenharmony_ci
471cb0ef41Sopenharmony_ci/**
481cb0ef41Sopenharmony_ci * Wraps a message with a status and body, and serializes the body if necessary.
491cb0ef41Sopenharmony_ci * @param {string} status - The status of the message.
501cb0ef41Sopenharmony_ci * @param {unknown} body - The body of the message.
511cb0ef41Sopenharmony_ci */
521cb0ef41Sopenharmony_cifunction wrapMessage(status, body) {
531cb0ef41Sopenharmony_ci  if (status === 'success' || body === null ||
541cb0ef41Sopenharmony_ci     (typeof body !== 'object' &&
551cb0ef41Sopenharmony_ci      typeof body !== 'function' &&
561cb0ef41Sopenharmony_ci      typeof body !== 'symbol')) {
571cb0ef41Sopenharmony_ci    return { status, body };
581cb0ef41Sopenharmony_ci  }
591cb0ef41Sopenharmony_ci
601cb0ef41Sopenharmony_ci  let serialized;
611cb0ef41Sopenharmony_ci  let serializationFailed;
621cb0ef41Sopenharmony_ci  try {
631cb0ef41Sopenharmony_ci    const { serializeError } = require('internal/error_serdes');
641cb0ef41Sopenharmony_ci    serialized = serializeError(body);
651cb0ef41Sopenharmony_ci  } catch {
661cb0ef41Sopenharmony_ci    serializationFailed = true;
671cb0ef41Sopenharmony_ci  }
681cb0ef41Sopenharmony_ci
691cb0ef41Sopenharmony_ci  return {
701cb0ef41Sopenharmony_ci    status,
711cb0ef41Sopenharmony_ci    body: {
721cb0ef41Sopenharmony_ci      serialized,
731cb0ef41Sopenharmony_ci      serializationFailed,
741cb0ef41Sopenharmony_ci    },
751cb0ef41Sopenharmony_ci  };
761cb0ef41Sopenharmony_ci}
771cb0ef41Sopenharmony_ci
781cb0ef41Sopenharmony_ci/**
791cb0ef41Sopenharmony_ci * Initializes a worker thread for a customized module loader.
801cb0ef41Sopenharmony_ci * @param {SharedArrayBuffer} lock - The lock used to synchronize communication between the worker and the main thread.
811cb0ef41Sopenharmony_ci * @param {MessagePort} syncCommPort - The message port used for synchronous communication between the worker and the
821cb0ef41Sopenharmony_ci * main thread.
831cb0ef41Sopenharmony_ci * @param {(err: Error, origin?: string) => void} errorHandler - The function to use for uncaught exceptions.
841cb0ef41Sopenharmony_ci * @returns {Promise<void>} A promise that resolves when the worker thread has been initialized.
851cb0ef41Sopenharmony_ci */
861cb0ef41Sopenharmony_ciasync function customizedModuleWorker(lock, syncCommPort, errorHandler) {
871cb0ef41Sopenharmony_ci  let hooks, preloadScripts, initializationError;
881cb0ef41Sopenharmony_ci  let hasInitializationError = false;
891cb0ef41Sopenharmony_ci
901cb0ef41Sopenharmony_ci  {
911cb0ef41Sopenharmony_ci    // If a custom hook is calling `process.exit`, we should wake up the main thread
921cb0ef41Sopenharmony_ci    // so it can detect the exit event.
931cb0ef41Sopenharmony_ci    const { exit } = process;
941cb0ef41Sopenharmony_ci    process.exit = function(code) {
951cb0ef41Sopenharmony_ci      syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode));
961cb0ef41Sopenharmony_ci      AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
971cb0ef41Sopenharmony_ci      AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
981cb0ef41Sopenharmony_ci      return ReflectApply(exit, this, arguments);
991cb0ef41Sopenharmony_ci    };
1001cb0ef41Sopenharmony_ci  }
1011cb0ef41Sopenharmony_ci
1021cb0ef41Sopenharmony_ci
1031cb0ef41Sopenharmony_ci  try {
1041cb0ef41Sopenharmony_ci    const initResult = await initializeHooks();
1051cb0ef41Sopenharmony_ci    hooks = initResult.hooks;
1061cb0ef41Sopenharmony_ci    preloadScripts = initResult.preloadScripts;
1071cb0ef41Sopenharmony_ci  } catch (exception) {
1081cb0ef41Sopenharmony_ci    // If there was an error while parsing and executing a user loader, for example if because a
1091cb0ef41Sopenharmony_ci    // loader contained a syntax error, then we need to send the error to the main thread so it can
1101cb0ef41Sopenharmony_ci    // be thrown and printed.
1111cb0ef41Sopenharmony_ci    hasInitializationError = true;
1121cb0ef41Sopenharmony_ci    initializationError = exception;
1131cb0ef41Sopenharmony_ci  }
1141cb0ef41Sopenharmony_ci
1151cb0ef41Sopenharmony_ci  syncCommPort.on('message', handleMessage);
1161cb0ef41Sopenharmony_ci
1171cb0ef41Sopenharmony_ci  if (hasInitializationError) {
1181cb0ef41Sopenharmony_ci    syncCommPort.postMessage(wrapMessage('error', initializationError));
1191cb0ef41Sopenharmony_ci  } else {
1201cb0ef41Sopenharmony_ci    syncCommPort.postMessage(wrapMessage('success', { preloadScripts }), preloadScripts.map(({ port }) => port));
1211cb0ef41Sopenharmony_ci  }
1221cb0ef41Sopenharmony_ci
1231cb0ef41Sopenharmony_ci  // We're ready, so unlock the main thread.
1241cb0ef41Sopenharmony_ci  AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
1251cb0ef41Sopenharmony_ci  AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
1261cb0ef41Sopenharmony_ci
1271cb0ef41Sopenharmony_ci  let immediate;
1281cb0ef41Sopenharmony_ci  /**
1291cb0ef41Sopenharmony_ci   * Checks for messages on the syncCommPort and handles them asynchronously.
1301cb0ef41Sopenharmony_ci   */
1311cb0ef41Sopenharmony_ci  function checkForMessages() {
1321cb0ef41Sopenharmony_ci    immediate = setImmediate(checkForMessages).unref();
1331cb0ef41Sopenharmony_ci    // We need to let the event loop tick a few times to give the main thread a chance to send
1341cb0ef41Sopenharmony_ci    // follow-up messages.
1351cb0ef41Sopenharmony_ci    const response = receiveMessageOnPort(syncCommPort);
1361cb0ef41Sopenharmony_ci
1371cb0ef41Sopenharmony_ci    if (response !== undefined) {
1381cb0ef41Sopenharmony_ci      PromisePrototypeThen(handleMessage(response.message), undefined, errorHandler);
1391cb0ef41Sopenharmony_ci    }
1401cb0ef41Sopenharmony_ci  }
1411cb0ef41Sopenharmony_ci
1421cb0ef41Sopenharmony_ci  const unsettledResponsePorts = new SafeSet();
1431cb0ef41Sopenharmony_ci
1441cb0ef41Sopenharmony_ci  process.on('beforeExit', () => {
1451cb0ef41Sopenharmony_ci    for (const port of unsettledResponsePorts) {
1461cb0ef41Sopenharmony_ci      port.postMessage(wrapMessage('never-settle'));
1471cb0ef41Sopenharmony_ci    }
1481cb0ef41Sopenharmony_ci    unsettledResponsePorts.clear();
1491cb0ef41Sopenharmony_ci
1501cb0ef41Sopenharmony_ci    AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
1511cb0ef41Sopenharmony_ci    AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
1521cb0ef41Sopenharmony_ci
1531cb0ef41Sopenharmony_ci    // Attach back the event handler.
1541cb0ef41Sopenharmony_ci    syncCommPort.on('message', handleMessage);
1551cb0ef41Sopenharmony_ci    // Also check synchronously for a message, in case it's already there.
1561cb0ef41Sopenharmony_ci    clearImmediate(immediate);
1571cb0ef41Sopenharmony_ci    checkForMessages();
1581cb0ef41Sopenharmony_ci    // We don't need the sync check after this tick, as we already have added the event handler.
1591cb0ef41Sopenharmony_ci    clearImmediate(immediate);
1601cb0ef41Sopenharmony_ci    // Add some work for next tick so the worker cannot exit.
1611cb0ef41Sopenharmony_ci    setImmediate(() => {});
1621cb0ef41Sopenharmony_ci  });
1631cb0ef41Sopenharmony_ci
1641cb0ef41Sopenharmony_ci  /**
1651cb0ef41Sopenharmony_ci   * Handles incoming messages from the main thread or other workers.
1661cb0ef41Sopenharmony_ci   * @param {object} options - The options object.
1671cb0ef41Sopenharmony_ci   * @param {string} options.method - The name of the hook.
1681cb0ef41Sopenharmony_ci   * @param {Array} options.args - The arguments to pass to the method.
1691cb0ef41Sopenharmony_ci   * @param {MessagePort} options.port - The message port to use for communication.
1701cb0ef41Sopenharmony_ci   */
1711cb0ef41Sopenharmony_ci  async function handleMessage({ method, args, port }) {
1721cb0ef41Sopenharmony_ci    // Each potential exception needs to be caught individually so that the correct error is sent to
1731cb0ef41Sopenharmony_ci    // the main thread.
1741cb0ef41Sopenharmony_ci    let hasError = false;
1751cb0ef41Sopenharmony_ci    let shouldRemoveGlobalErrorHandler = false;
1761cb0ef41Sopenharmony_ci    assert(typeof hooks[method] === 'function');
1771cb0ef41Sopenharmony_ci    if (port == null && !hasUncaughtExceptionCaptureCallback()) {
1781cb0ef41Sopenharmony_ci      // When receiving sync messages, we want to unlock the main thread when there's an exception.
1791cb0ef41Sopenharmony_ci      process.on('uncaughtException', errorHandler);
1801cb0ef41Sopenharmony_ci      shouldRemoveGlobalErrorHandler = true;
1811cb0ef41Sopenharmony_ci    }
1821cb0ef41Sopenharmony_ci
1831cb0ef41Sopenharmony_ci    // We are about to yield the execution with `await ReflectApply` below. In case the code
1841cb0ef41Sopenharmony_ci    // following the `await` never runs, we remove the message handler so the `beforeExit` event
1851cb0ef41Sopenharmony_ci    // can be triggered.
1861cb0ef41Sopenharmony_ci    syncCommPort.off('message', handleMessage);
1871cb0ef41Sopenharmony_ci
1881cb0ef41Sopenharmony_ci    // We keep checking for new messages to not miss any.
1891cb0ef41Sopenharmony_ci    clearImmediate(immediate);
1901cb0ef41Sopenharmony_ci    immediate = setImmediate(checkForMessages).unref();
1911cb0ef41Sopenharmony_ci
1921cb0ef41Sopenharmony_ci    unsettledResponsePorts.add(port ?? syncCommPort);
1931cb0ef41Sopenharmony_ci
1941cb0ef41Sopenharmony_ci    let response;
1951cb0ef41Sopenharmony_ci    try {
1961cb0ef41Sopenharmony_ci      response = await ReflectApply(hooks[method], hooks, args);
1971cb0ef41Sopenharmony_ci    } catch (exception) {
1981cb0ef41Sopenharmony_ci      hasError = true;
1991cb0ef41Sopenharmony_ci      response = exception;
2001cb0ef41Sopenharmony_ci    }
2011cb0ef41Sopenharmony_ci
2021cb0ef41Sopenharmony_ci    unsettledResponsePorts.delete(port ?? syncCommPort);
2031cb0ef41Sopenharmony_ci
2041cb0ef41Sopenharmony_ci    // Send the method response (or exception) to the main thread.
2051cb0ef41Sopenharmony_ci    try {
2061cb0ef41Sopenharmony_ci      (port ?? syncCommPort).postMessage(
2071cb0ef41Sopenharmony_ci        wrapMessage(hasError ? 'error' : 'success', response),
2081cb0ef41Sopenharmony_ci        transferArrayBuffer(hasError, response?.source),
2091cb0ef41Sopenharmony_ci      );
2101cb0ef41Sopenharmony_ci    } catch (exception) {
2111cb0ef41Sopenharmony_ci      // Or send the exception thrown when trying to send the response.
2121cb0ef41Sopenharmony_ci      (port ?? syncCommPort).postMessage(wrapMessage('error', exception));
2131cb0ef41Sopenharmony_ci    }
2141cb0ef41Sopenharmony_ci
2151cb0ef41Sopenharmony_ci    AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
2161cb0ef41Sopenharmony_ci    AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
2171cb0ef41Sopenharmony_ci    if (shouldRemoveGlobalErrorHandler) {
2181cb0ef41Sopenharmony_ci      process.off('uncaughtException', errorHandler);
2191cb0ef41Sopenharmony_ci    }
2201cb0ef41Sopenharmony_ci
2211cb0ef41Sopenharmony_ci    syncCommPort.off('message', handleMessage);
2221cb0ef41Sopenharmony_ci    // We keep checking for new messages to not miss any.
2231cb0ef41Sopenharmony_ci    clearImmediate(immediate);
2241cb0ef41Sopenharmony_ci    immediate = setImmediate(checkForMessages).unref();
2251cb0ef41Sopenharmony_ci  }
2261cb0ef41Sopenharmony_ci}
2271cb0ef41Sopenharmony_ci
2281cb0ef41Sopenharmony_ci/**
2291cb0ef41Sopenharmony_ci * Initializes a worker thread for a module with customized hooks.
2301cb0ef41Sopenharmony_ci * ! Run everything possible within this function so errors get reported.
2311cb0ef41Sopenharmony_ci * @param {{lock: SharedArrayBuffer}} workerData - The lock used to synchronize with the main thread.
2321cb0ef41Sopenharmony_ci * @param {MessagePort} syncCommPort - The communication port used to communicate with the main thread.
2331cb0ef41Sopenharmony_ci */
2341cb0ef41Sopenharmony_cimodule.exports = function setupModuleWorker(workerData, syncCommPort) {
2351cb0ef41Sopenharmony_ci  const lock = new Int32Array(workerData.lock);
2361cb0ef41Sopenharmony_ci
2371cb0ef41Sopenharmony_ci  /**
2381cb0ef41Sopenharmony_ci   * Handles errors that occur in the worker thread.
2391cb0ef41Sopenharmony_ci   * @param {Error} err - The error that occurred.
2401cb0ef41Sopenharmony_ci   * @param {string} [origin='unhandledRejection'] - The origin of the error.
2411cb0ef41Sopenharmony_ci   */
2421cb0ef41Sopenharmony_ci  function errorHandler(err, origin = 'unhandledRejection') {
2431cb0ef41Sopenharmony_ci    AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
2441cb0ef41Sopenharmony_ci    AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
2451cb0ef41Sopenharmony_ci    process.off('uncaughtException', errorHandler);
2461cb0ef41Sopenharmony_ci    if (hasUncaughtExceptionCaptureCallback()) {
2471cb0ef41Sopenharmony_ci      process._fatalException(err);
2481cb0ef41Sopenharmony_ci      return;
2491cb0ef41Sopenharmony_ci    }
2501cb0ef41Sopenharmony_ci    internalBinding('errors').triggerUncaughtException(
2511cb0ef41Sopenharmony_ci      err,
2521cb0ef41Sopenharmony_ci      origin === 'unhandledRejection',
2531cb0ef41Sopenharmony_ci    );
2541cb0ef41Sopenharmony_ci  }
2551cb0ef41Sopenharmony_ci
2561cb0ef41Sopenharmony_ci  return PromisePrototypeThen(
2571cb0ef41Sopenharmony_ci    customizedModuleWorker(lock, syncCommPort, errorHandler),
2581cb0ef41Sopenharmony_ci    undefined,
2591cb0ef41Sopenharmony_ci    errorHandler,
2601cb0ef41Sopenharmony_ci  );
2611cb0ef41Sopenharmony_ci};
262