11cb0ef41Sopenharmony_ci'use strict'; 21cb0ef41Sopenharmony_ci 31cb0ef41Sopenharmony_ciconst { 41cb0ef41Sopenharmony_ci DataViewPrototypeGetBuffer, 51cb0ef41Sopenharmony_ci Int32Array, 61cb0ef41Sopenharmony_ci PromisePrototypeThen, 71cb0ef41Sopenharmony_ci ReflectApply, 81cb0ef41Sopenharmony_ci SafeSet, 91cb0ef41Sopenharmony_ci TypedArrayPrototypeGetBuffer, 101cb0ef41Sopenharmony_ci globalThis: { 111cb0ef41Sopenharmony_ci Atomics: { 121cb0ef41Sopenharmony_ci add: AtomicsAdd, 131cb0ef41Sopenharmony_ci notify: AtomicsNotify, 141cb0ef41Sopenharmony_ci }, 151cb0ef41Sopenharmony_ci }, 161cb0ef41Sopenharmony_ci} = primordials; 171cb0ef41Sopenharmony_ciconst assert = require('internal/assert'); 181cb0ef41Sopenharmony_ciconst { clearImmediate, setImmediate } = require('timers'); 191cb0ef41Sopenharmony_ciconst { 201cb0ef41Sopenharmony_ci hasUncaughtExceptionCaptureCallback, 211cb0ef41Sopenharmony_ci} = require('internal/process/execution'); 221cb0ef41Sopenharmony_ciconst { 231cb0ef41Sopenharmony_ci isArrayBuffer, 241cb0ef41Sopenharmony_ci isDataView, 251cb0ef41Sopenharmony_ci isTypedArray, 261cb0ef41Sopenharmony_ci} = require('util/types'); 271cb0ef41Sopenharmony_ci 281cb0ef41Sopenharmony_ciconst { receiveMessageOnPort } = require('internal/worker/io'); 291cb0ef41Sopenharmony_ciconst { 301cb0ef41Sopenharmony_ci WORKER_TO_MAIN_THREAD_NOTIFICATION, 311cb0ef41Sopenharmony_ci} = require('internal/modules/esm/shared_constants'); 321cb0ef41Sopenharmony_ciconst { initializeHooks } = require('internal/modules/esm/utils'); 331cb0ef41Sopenharmony_ci 341cb0ef41Sopenharmony_ci 351cb0ef41Sopenharmony_ci/** 361cb0ef41Sopenharmony_ci * Transfers an ArrayBuffer, TypedArray, or DataView to a worker thread. 371cb0ef41Sopenharmony_ci * @param {boolean} hasError - Whether an error occurred during transfer. 381cb0ef41Sopenharmony_ci * @param {ArrayBuffer | TypedArray | DataView} source - The data to transfer. 391cb0ef41Sopenharmony_ci */ 401cb0ef41Sopenharmony_cifunction transferArrayBuffer(hasError, source) { 411cb0ef41Sopenharmony_ci if (hasError || source == null) { return; } 421cb0ef41Sopenharmony_ci if (isArrayBuffer(source)) { return [source]; } 431cb0ef41Sopenharmony_ci if (isTypedArray(source)) { return [TypedArrayPrototypeGetBuffer(source)]; } 441cb0ef41Sopenharmony_ci if (isDataView(source)) { return [DataViewPrototypeGetBuffer(source)]; } 451cb0ef41Sopenharmony_ci} 461cb0ef41Sopenharmony_ci 471cb0ef41Sopenharmony_ci/** 481cb0ef41Sopenharmony_ci * Wraps a message with a status and body, and serializes the body if necessary. 491cb0ef41Sopenharmony_ci * @param {string} status - The status of the message. 501cb0ef41Sopenharmony_ci * @param {unknown} body - The body of the message. 511cb0ef41Sopenharmony_ci */ 521cb0ef41Sopenharmony_cifunction wrapMessage(status, body) { 531cb0ef41Sopenharmony_ci if (status === 'success' || body === null || 541cb0ef41Sopenharmony_ci (typeof body !== 'object' && 551cb0ef41Sopenharmony_ci typeof body !== 'function' && 561cb0ef41Sopenharmony_ci typeof body !== 'symbol')) { 571cb0ef41Sopenharmony_ci return { status, body }; 581cb0ef41Sopenharmony_ci } 591cb0ef41Sopenharmony_ci 601cb0ef41Sopenharmony_ci let serialized; 611cb0ef41Sopenharmony_ci let serializationFailed; 621cb0ef41Sopenharmony_ci try { 631cb0ef41Sopenharmony_ci const { serializeError } = require('internal/error_serdes'); 641cb0ef41Sopenharmony_ci serialized = serializeError(body); 651cb0ef41Sopenharmony_ci } catch { 661cb0ef41Sopenharmony_ci serializationFailed = true; 671cb0ef41Sopenharmony_ci } 681cb0ef41Sopenharmony_ci 691cb0ef41Sopenharmony_ci return { 701cb0ef41Sopenharmony_ci status, 711cb0ef41Sopenharmony_ci body: { 721cb0ef41Sopenharmony_ci serialized, 731cb0ef41Sopenharmony_ci serializationFailed, 741cb0ef41Sopenharmony_ci }, 751cb0ef41Sopenharmony_ci }; 761cb0ef41Sopenharmony_ci} 771cb0ef41Sopenharmony_ci 781cb0ef41Sopenharmony_ci/** 791cb0ef41Sopenharmony_ci * Initializes a worker thread for a customized module loader. 801cb0ef41Sopenharmony_ci * @param {SharedArrayBuffer} lock - The lock used to synchronize communication between the worker and the main thread. 811cb0ef41Sopenharmony_ci * @param {MessagePort} syncCommPort - The message port used for synchronous communication between the worker and the 821cb0ef41Sopenharmony_ci * main thread. 831cb0ef41Sopenharmony_ci * @param {(err: Error, origin?: string) => void} errorHandler - The function to use for uncaught exceptions. 841cb0ef41Sopenharmony_ci * @returns {Promise<void>} A promise that resolves when the worker thread has been initialized. 851cb0ef41Sopenharmony_ci */ 861cb0ef41Sopenharmony_ciasync function customizedModuleWorker(lock, syncCommPort, errorHandler) { 871cb0ef41Sopenharmony_ci let hooks, preloadScripts, initializationError; 881cb0ef41Sopenharmony_ci let hasInitializationError = false; 891cb0ef41Sopenharmony_ci 901cb0ef41Sopenharmony_ci { 911cb0ef41Sopenharmony_ci // If a custom hook is calling `process.exit`, we should wake up the main thread 921cb0ef41Sopenharmony_ci // so it can detect the exit event. 931cb0ef41Sopenharmony_ci const { exit } = process; 941cb0ef41Sopenharmony_ci process.exit = function(code) { 951cb0ef41Sopenharmony_ci syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode)); 961cb0ef41Sopenharmony_ci AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 971cb0ef41Sopenharmony_ci AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 981cb0ef41Sopenharmony_ci return ReflectApply(exit, this, arguments); 991cb0ef41Sopenharmony_ci }; 1001cb0ef41Sopenharmony_ci } 1011cb0ef41Sopenharmony_ci 1021cb0ef41Sopenharmony_ci 1031cb0ef41Sopenharmony_ci try { 1041cb0ef41Sopenharmony_ci const initResult = await initializeHooks(); 1051cb0ef41Sopenharmony_ci hooks = initResult.hooks; 1061cb0ef41Sopenharmony_ci preloadScripts = initResult.preloadScripts; 1071cb0ef41Sopenharmony_ci } catch (exception) { 1081cb0ef41Sopenharmony_ci // If there was an error while parsing and executing a user loader, for example if because a 1091cb0ef41Sopenharmony_ci // loader contained a syntax error, then we need to send the error to the main thread so it can 1101cb0ef41Sopenharmony_ci // be thrown and printed. 1111cb0ef41Sopenharmony_ci hasInitializationError = true; 1121cb0ef41Sopenharmony_ci initializationError = exception; 1131cb0ef41Sopenharmony_ci } 1141cb0ef41Sopenharmony_ci 1151cb0ef41Sopenharmony_ci syncCommPort.on('message', handleMessage); 1161cb0ef41Sopenharmony_ci 1171cb0ef41Sopenharmony_ci if (hasInitializationError) { 1181cb0ef41Sopenharmony_ci syncCommPort.postMessage(wrapMessage('error', initializationError)); 1191cb0ef41Sopenharmony_ci } else { 1201cb0ef41Sopenharmony_ci syncCommPort.postMessage(wrapMessage('success', { preloadScripts }), preloadScripts.map(({ port }) => port)); 1211cb0ef41Sopenharmony_ci } 1221cb0ef41Sopenharmony_ci 1231cb0ef41Sopenharmony_ci // We're ready, so unlock the main thread. 1241cb0ef41Sopenharmony_ci AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 1251cb0ef41Sopenharmony_ci AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 1261cb0ef41Sopenharmony_ci 1271cb0ef41Sopenharmony_ci let immediate; 1281cb0ef41Sopenharmony_ci /** 1291cb0ef41Sopenharmony_ci * Checks for messages on the syncCommPort and handles them asynchronously. 1301cb0ef41Sopenharmony_ci */ 1311cb0ef41Sopenharmony_ci function checkForMessages() { 1321cb0ef41Sopenharmony_ci immediate = setImmediate(checkForMessages).unref(); 1331cb0ef41Sopenharmony_ci // We need to let the event loop tick a few times to give the main thread a chance to send 1341cb0ef41Sopenharmony_ci // follow-up messages. 1351cb0ef41Sopenharmony_ci const response = receiveMessageOnPort(syncCommPort); 1361cb0ef41Sopenharmony_ci 1371cb0ef41Sopenharmony_ci if (response !== undefined) { 1381cb0ef41Sopenharmony_ci PromisePrototypeThen(handleMessage(response.message), undefined, errorHandler); 1391cb0ef41Sopenharmony_ci } 1401cb0ef41Sopenharmony_ci } 1411cb0ef41Sopenharmony_ci 1421cb0ef41Sopenharmony_ci const unsettledResponsePorts = new SafeSet(); 1431cb0ef41Sopenharmony_ci 1441cb0ef41Sopenharmony_ci process.on('beforeExit', () => { 1451cb0ef41Sopenharmony_ci for (const port of unsettledResponsePorts) { 1461cb0ef41Sopenharmony_ci port.postMessage(wrapMessage('never-settle')); 1471cb0ef41Sopenharmony_ci } 1481cb0ef41Sopenharmony_ci unsettledResponsePorts.clear(); 1491cb0ef41Sopenharmony_ci 1501cb0ef41Sopenharmony_ci AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 1511cb0ef41Sopenharmony_ci AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 1521cb0ef41Sopenharmony_ci 1531cb0ef41Sopenharmony_ci // Attach back the event handler. 1541cb0ef41Sopenharmony_ci syncCommPort.on('message', handleMessage); 1551cb0ef41Sopenharmony_ci // Also check synchronously for a message, in case it's already there. 1561cb0ef41Sopenharmony_ci clearImmediate(immediate); 1571cb0ef41Sopenharmony_ci checkForMessages(); 1581cb0ef41Sopenharmony_ci // We don't need the sync check after this tick, as we already have added the event handler. 1591cb0ef41Sopenharmony_ci clearImmediate(immediate); 1601cb0ef41Sopenharmony_ci // Add some work for next tick so the worker cannot exit. 1611cb0ef41Sopenharmony_ci setImmediate(() => {}); 1621cb0ef41Sopenharmony_ci }); 1631cb0ef41Sopenharmony_ci 1641cb0ef41Sopenharmony_ci /** 1651cb0ef41Sopenharmony_ci * Handles incoming messages from the main thread or other workers. 1661cb0ef41Sopenharmony_ci * @param {object} options - The options object. 1671cb0ef41Sopenharmony_ci * @param {string} options.method - The name of the hook. 1681cb0ef41Sopenharmony_ci * @param {Array} options.args - The arguments to pass to the method. 1691cb0ef41Sopenharmony_ci * @param {MessagePort} options.port - The message port to use for communication. 1701cb0ef41Sopenharmony_ci */ 1711cb0ef41Sopenharmony_ci async function handleMessage({ method, args, port }) { 1721cb0ef41Sopenharmony_ci // Each potential exception needs to be caught individually so that the correct error is sent to 1731cb0ef41Sopenharmony_ci // the main thread. 1741cb0ef41Sopenharmony_ci let hasError = false; 1751cb0ef41Sopenharmony_ci let shouldRemoveGlobalErrorHandler = false; 1761cb0ef41Sopenharmony_ci assert(typeof hooks[method] === 'function'); 1771cb0ef41Sopenharmony_ci if (port == null && !hasUncaughtExceptionCaptureCallback()) { 1781cb0ef41Sopenharmony_ci // When receiving sync messages, we want to unlock the main thread when there's an exception. 1791cb0ef41Sopenharmony_ci process.on('uncaughtException', errorHandler); 1801cb0ef41Sopenharmony_ci shouldRemoveGlobalErrorHandler = true; 1811cb0ef41Sopenharmony_ci } 1821cb0ef41Sopenharmony_ci 1831cb0ef41Sopenharmony_ci // We are about to yield the execution with `await ReflectApply` below. In case the code 1841cb0ef41Sopenharmony_ci // following the `await` never runs, we remove the message handler so the `beforeExit` event 1851cb0ef41Sopenharmony_ci // can be triggered. 1861cb0ef41Sopenharmony_ci syncCommPort.off('message', handleMessage); 1871cb0ef41Sopenharmony_ci 1881cb0ef41Sopenharmony_ci // We keep checking for new messages to not miss any. 1891cb0ef41Sopenharmony_ci clearImmediate(immediate); 1901cb0ef41Sopenharmony_ci immediate = setImmediate(checkForMessages).unref(); 1911cb0ef41Sopenharmony_ci 1921cb0ef41Sopenharmony_ci unsettledResponsePorts.add(port ?? syncCommPort); 1931cb0ef41Sopenharmony_ci 1941cb0ef41Sopenharmony_ci let response; 1951cb0ef41Sopenharmony_ci try { 1961cb0ef41Sopenharmony_ci response = await ReflectApply(hooks[method], hooks, args); 1971cb0ef41Sopenharmony_ci } catch (exception) { 1981cb0ef41Sopenharmony_ci hasError = true; 1991cb0ef41Sopenharmony_ci response = exception; 2001cb0ef41Sopenharmony_ci } 2011cb0ef41Sopenharmony_ci 2021cb0ef41Sopenharmony_ci unsettledResponsePorts.delete(port ?? syncCommPort); 2031cb0ef41Sopenharmony_ci 2041cb0ef41Sopenharmony_ci // Send the method response (or exception) to the main thread. 2051cb0ef41Sopenharmony_ci try { 2061cb0ef41Sopenharmony_ci (port ?? syncCommPort).postMessage( 2071cb0ef41Sopenharmony_ci wrapMessage(hasError ? 'error' : 'success', response), 2081cb0ef41Sopenharmony_ci transferArrayBuffer(hasError, response?.source), 2091cb0ef41Sopenharmony_ci ); 2101cb0ef41Sopenharmony_ci } catch (exception) { 2111cb0ef41Sopenharmony_ci // Or send the exception thrown when trying to send the response. 2121cb0ef41Sopenharmony_ci (port ?? syncCommPort).postMessage(wrapMessage('error', exception)); 2131cb0ef41Sopenharmony_ci } 2141cb0ef41Sopenharmony_ci 2151cb0ef41Sopenharmony_ci AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 2161cb0ef41Sopenharmony_ci AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 2171cb0ef41Sopenharmony_ci if (shouldRemoveGlobalErrorHandler) { 2181cb0ef41Sopenharmony_ci process.off('uncaughtException', errorHandler); 2191cb0ef41Sopenharmony_ci } 2201cb0ef41Sopenharmony_ci 2211cb0ef41Sopenharmony_ci syncCommPort.off('message', handleMessage); 2221cb0ef41Sopenharmony_ci // We keep checking for new messages to not miss any. 2231cb0ef41Sopenharmony_ci clearImmediate(immediate); 2241cb0ef41Sopenharmony_ci immediate = setImmediate(checkForMessages).unref(); 2251cb0ef41Sopenharmony_ci } 2261cb0ef41Sopenharmony_ci} 2271cb0ef41Sopenharmony_ci 2281cb0ef41Sopenharmony_ci/** 2291cb0ef41Sopenharmony_ci * Initializes a worker thread for a module with customized hooks. 2301cb0ef41Sopenharmony_ci * ! Run everything possible within this function so errors get reported. 2311cb0ef41Sopenharmony_ci * @param {{lock: SharedArrayBuffer}} workerData - The lock used to synchronize with the main thread. 2321cb0ef41Sopenharmony_ci * @param {MessagePort} syncCommPort - The communication port used to communicate with the main thread. 2331cb0ef41Sopenharmony_ci */ 2341cb0ef41Sopenharmony_cimodule.exports = function setupModuleWorker(workerData, syncCommPort) { 2351cb0ef41Sopenharmony_ci const lock = new Int32Array(workerData.lock); 2361cb0ef41Sopenharmony_ci 2371cb0ef41Sopenharmony_ci /** 2381cb0ef41Sopenharmony_ci * Handles errors that occur in the worker thread. 2391cb0ef41Sopenharmony_ci * @param {Error} err - The error that occurred. 2401cb0ef41Sopenharmony_ci * @param {string} [origin='unhandledRejection'] - The origin of the error. 2411cb0ef41Sopenharmony_ci */ 2421cb0ef41Sopenharmony_ci function errorHandler(err, origin = 'unhandledRejection') { 2431cb0ef41Sopenharmony_ci AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 2441cb0ef41Sopenharmony_ci AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 2451cb0ef41Sopenharmony_ci process.off('uncaughtException', errorHandler); 2461cb0ef41Sopenharmony_ci if (hasUncaughtExceptionCaptureCallback()) { 2471cb0ef41Sopenharmony_ci process._fatalException(err); 2481cb0ef41Sopenharmony_ci return; 2491cb0ef41Sopenharmony_ci } 2501cb0ef41Sopenharmony_ci internalBinding('errors').triggerUncaughtException( 2511cb0ef41Sopenharmony_ci err, 2521cb0ef41Sopenharmony_ci origin === 'unhandledRejection', 2531cb0ef41Sopenharmony_ci ); 2541cb0ef41Sopenharmony_ci } 2551cb0ef41Sopenharmony_ci 2561cb0ef41Sopenharmony_ci return PromisePrototypeThen( 2571cb0ef41Sopenharmony_ci customizedModuleWorker(lock, syncCommPort, errorHandler), 2581cb0ef41Sopenharmony_ci undefined, 2591cb0ef41Sopenharmony_ci errorHandler, 2601cb0ef41Sopenharmony_ci ); 2611cb0ef41Sopenharmony_ci}; 262