17db96d56Sopenharmony_ci# Copyright 2009 Brian Quinlan. All Rights Reserved. 27db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement. 37db96d56Sopenharmony_ci 47db96d56Sopenharmony_ci"""Implements ProcessPoolExecutor. 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_ciThe following diagram and text describe the data-flow through the system: 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_ci|======================= In-process =====================|== Out-of-process ==| 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ci+----------+ +----------+ +--------+ +-----------+ +---------+ 117db96d56Sopenharmony_ci| | => | Work Ids | | | | Call Q | | Process | 127db96d56Sopenharmony_ci| | +----------+ | | +-----------+ | Pool | 137db96d56Sopenharmony_ci| | | ... | | | | ... | +---------+ 147db96d56Sopenharmony_ci| | | 6 | => | | => | 5, call() | => | | 157db96d56Sopenharmony_ci| | | 7 | | | | ... | | | 167db96d56Sopenharmony_ci| Process | | ... | | Local | +-----------+ | Process | 177db96d56Sopenharmony_ci| Pool | +----------+ | Worker | | #1..n | 187db96d56Sopenharmony_ci| Executor | | Thread | | | 197db96d56Sopenharmony_ci| | +----------- + | | +-----------+ | | 207db96d56Sopenharmony_ci| | <=> | Work Items | <=> | | <= | Result Q | <= | | 217db96d56Sopenharmony_ci| | +------------+ | | +-----------+ | | 227db96d56Sopenharmony_ci| | | 6: call() | | | | ... | | | 237db96d56Sopenharmony_ci| | | future | | | | 4, result | | | 247db96d56Sopenharmony_ci| | | ... | | | | 3, except | | | 257db96d56Sopenharmony_ci+----------+ +------------+ +--------+ +-----------+ +---------+ 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ciExecutor.submit() called: 287db96d56Sopenharmony_ci- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict 297db96d56Sopenharmony_ci- adds the id of the _WorkItem to the "Work Ids" queue 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ciLocal worker thread: 327db96d56Sopenharmony_ci- reads work ids from the "Work Ids" queue and looks up the corresponding 337db96d56Sopenharmony_ci WorkItem from the "Work Items" dict: if the work item has been cancelled then 347db96d56Sopenharmony_ci it is simply removed from the dict, otherwise it is repackaged as a 357db96d56Sopenharmony_ci _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" 367db96d56Sopenharmony_ci until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because 377db96d56Sopenharmony_ci calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). 387db96d56Sopenharmony_ci- reads _ResultItems from "Result Q", updates the future stored in the 397db96d56Sopenharmony_ci "Work Items" dict and deletes the dict entry 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ciProcess #1..n: 427db96d56Sopenharmony_ci- reads _CallItems from "Call Q", executes the calls, and puts the resulting 437db96d56Sopenharmony_ci _ResultItems in "Result Q" 447db96d56Sopenharmony_ci""" 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci__author__ = 'Brian Quinlan (brian@sweetapp.com)' 477db96d56Sopenharmony_ci 487db96d56Sopenharmony_ciimport os 497db96d56Sopenharmony_cifrom concurrent.futures import _base 507db96d56Sopenharmony_ciimport queue 517db96d56Sopenharmony_ciimport multiprocessing as mp 527db96d56Sopenharmony_ciimport multiprocessing.connection 537db96d56Sopenharmony_cifrom multiprocessing.queues import Queue 547db96d56Sopenharmony_ciimport threading 557db96d56Sopenharmony_ciimport weakref 567db96d56Sopenharmony_cifrom functools import partial 577db96d56Sopenharmony_ciimport itertools 587db96d56Sopenharmony_ciimport sys 597db96d56Sopenharmony_cifrom traceback import format_exception 607db96d56Sopenharmony_ci 617db96d56Sopenharmony_ci 627db96d56Sopenharmony_ci_threads_wakeups = weakref.WeakKeyDictionary() 637db96d56Sopenharmony_ci_global_shutdown = False 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ciclass _ThreadWakeup: 677db96d56Sopenharmony_ci def __init__(self): 687db96d56Sopenharmony_ci self._closed = False 697db96d56Sopenharmony_ci self._reader, self._writer = mp.Pipe(duplex=False) 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci def close(self): 727db96d56Sopenharmony_ci if not self._closed: 737db96d56Sopenharmony_ci self._closed = True 747db96d56Sopenharmony_ci self._writer.close() 757db96d56Sopenharmony_ci self._reader.close() 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def wakeup(self): 787db96d56Sopenharmony_ci if not self._closed: 797db96d56Sopenharmony_ci self._writer.send_bytes(b"") 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci def clear(self): 827db96d56Sopenharmony_ci if not self._closed: 837db96d56Sopenharmony_ci while self._reader.poll(): 847db96d56Sopenharmony_ci self._reader.recv_bytes() 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_cidef _python_exit(): 887db96d56Sopenharmony_ci global _global_shutdown 897db96d56Sopenharmony_ci _global_shutdown = True 907db96d56Sopenharmony_ci items = list(_threads_wakeups.items()) 917db96d56Sopenharmony_ci for _, thread_wakeup in items: 927db96d56Sopenharmony_ci # call not protected by ProcessPoolExecutor._shutdown_lock 937db96d56Sopenharmony_ci thread_wakeup.wakeup() 947db96d56Sopenharmony_ci for t, _ in items: 957db96d56Sopenharmony_ci t.join() 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci# Register for `_python_exit()` to be called just before joining all 987db96d56Sopenharmony_ci# non-daemon threads. This is used instead of `atexit.register()` for 997db96d56Sopenharmony_ci# compatibility with subinterpreters, which no longer support daemon threads. 1007db96d56Sopenharmony_ci# See bpo-39812 for context. 1017db96d56Sopenharmony_cithreading._register_atexit(_python_exit) 1027db96d56Sopenharmony_ci 1037db96d56Sopenharmony_ci# Controls how many more calls than processes will be queued in the call queue. 1047db96d56Sopenharmony_ci# A smaller number will mean that processes spend more time idle waiting for 1057db96d56Sopenharmony_ci# work while a larger number will make Future.cancel() succeed less frequently 1067db96d56Sopenharmony_ci# (Futures in the call queue cannot be cancelled). 1077db96d56Sopenharmony_ciEXTRA_QUEUED_CALLS = 1 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci# On Windows, WaitForMultipleObjects is used to wait for processes to finish. 1117db96d56Sopenharmony_ci# It can wait on, at most, 63 objects. There is an overhead of two objects: 1127db96d56Sopenharmony_ci# - the result queue reader 1137db96d56Sopenharmony_ci# - the thread wakeup reader 1147db96d56Sopenharmony_ci_MAX_WINDOWS_WORKERS = 63 - 2 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci# Hack to embed stringification of remote traceback in local traceback 1177db96d56Sopenharmony_ci 1187db96d56Sopenharmony_ciclass _RemoteTraceback(Exception): 1197db96d56Sopenharmony_ci def __init__(self, tb): 1207db96d56Sopenharmony_ci self.tb = tb 1217db96d56Sopenharmony_ci def __str__(self): 1227db96d56Sopenharmony_ci return self.tb 1237db96d56Sopenharmony_ci 1247db96d56Sopenharmony_ciclass _ExceptionWithTraceback: 1257db96d56Sopenharmony_ci def __init__(self, exc, tb): 1267db96d56Sopenharmony_ci tb = ''.join(format_exception(type(exc), exc, tb)) 1277db96d56Sopenharmony_ci self.exc = exc 1287db96d56Sopenharmony_ci # Traceback object needs to be garbage-collected as its frames 1297db96d56Sopenharmony_ci # contain references to all the objects in the exception scope 1307db96d56Sopenharmony_ci self.exc.__traceback__ = None 1317db96d56Sopenharmony_ci self.tb = '\n"""\n%s"""' % tb 1327db96d56Sopenharmony_ci def __reduce__(self): 1337db96d56Sopenharmony_ci return _rebuild_exc, (self.exc, self.tb) 1347db96d56Sopenharmony_ci 1357db96d56Sopenharmony_cidef _rebuild_exc(exc, tb): 1367db96d56Sopenharmony_ci exc.__cause__ = _RemoteTraceback(tb) 1377db96d56Sopenharmony_ci return exc 1387db96d56Sopenharmony_ci 1397db96d56Sopenharmony_ciclass _WorkItem(object): 1407db96d56Sopenharmony_ci def __init__(self, future, fn, args, kwargs): 1417db96d56Sopenharmony_ci self.future = future 1427db96d56Sopenharmony_ci self.fn = fn 1437db96d56Sopenharmony_ci self.args = args 1447db96d56Sopenharmony_ci self.kwargs = kwargs 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ciclass _ResultItem(object): 1477db96d56Sopenharmony_ci def __init__(self, work_id, exception=None, result=None, exit_pid=None): 1487db96d56Sopenharmony_ci self.work_id = work_id 1497db96d56Sopenharmony_ci self.exception = exception 1507db96d56Sopenharmony_ci self.result = result 1517db96d56Sopenharmony_ci self.exit_pid = exit_pid 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ciclass _CallItem(object): 1547db96d56Sopenharmony_ci def __init__(self, work_id, fn, args, kwargs): 1557db96d56Sopenharmony_ci self.work_id = work_id 1567db96d56Sopenharmony_ci self.fn = fn 1577db96d56Sopenharmony_ci self.args = args 1587db96d56Sopenharmony_ci self.kwargs = kwargs 1597db96d56Sopenharmony_ci 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ciclass _SafeQueue(Queue): 1627db96d56Sopenharmony_ci """Safe Queue set exception to the future object linked to a job""" 1637db96d56Sopenharmony_ci def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock, 1647db96d56Sopenharmony_ci thread_wakeup): 1657db96d56Sopenharmony_ci self.pending_work_items = pending_work_items 1667db96d56Sopenharmony_ci self.shutdown_lock = shutdown_lock 1677db96d56Sopenharmony_ci self.thread_wakeup = thread_wakeup 1687db96d56Sopenharmony_ci super().__init__(max_size, ctx=ctx) 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci def _on_queue_feeder_error(self, e, obj): 1717db96d56Sopenharmony_ci if isinstance(obj, _CallItem): 1727db96d56Sopenharmony_ci tb = format_exception(type(e), e, e.__traceback__) 1737db96d56Sopenharmony_ci e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) 1747db96d56Sopenharmony_ci work_item = self.pending_work_items.pop(obj.work_id, None) 1757db96d56Sopenharmony_ci with self.shutdown_lock: 1767db96d56Sopenharmony_ci self.thread_wakeup.wakeup() 1777db96d56Sopenharmony_ci # work_item can be None if another process terminated. In this 1787db96d56Sopenharmony_ci # case, the executor_manager_thread fails all work_items 1797db96d56Sopenharmony_ci # with BrokenProcessPool 1807db96d56Sopenharmony_ci if work_item is not None: 1817db96d56Sopenharmony_ci work_item.future.set_exception(e) 1827db96d56Sopenharmony_ci else: 1837db96d56Sopenharmony_ci super()._on_queue_feeder_error(e, obj) 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ci 1867db96d56Sopenharmony_cidef _get_chunks(*iterables, chunksize): 1877db96d56Sopenharmony_ci """ Iterates over zip()ed iterables in chunks. """ 1887db96d56Sopenharmony_ci it = zip(*iterables) 1897db96d56Sopenharmony_ci while True: 1907db96d56Sopenharmony_ci chunk = tuple(itertools.islice(it, chunksize)) 1917db96d56Sopenharmony_ci if not chunk: 1927db96d56Sopenharmony_ci return 1937db96d56Sopenharmony_ci yield chunk 1947db96d56Sopenharmony_ci 1957db96d56Sopenharmony_ci 1967db96d56Sopenharmony_cidef _process_chunk(fn, chunk): 1977db96d56Sopenharmony_ci """ Processes a chunk of an iterable passed to map. 1987db96d56Sopenharmony_ci 1997db96d56Sopenharmony_ci Runs the function passed to map() on a chunk of the 2007db96d56Sopenharmony_ci iterable passed to map. 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci This function is run in a separate process. 2037db96d56Sopenharmony_ci 2047db96d56Sopenharmony_ci """ 2057db96d56Sopenharmony_ci return [fn(*args) for args in chunk] 2067db96d56Sopenharmony_ci 2077db96d56Sopenharmony_ci 2087db96d56Sopenharmony_cidef _sendback_result(result_queue, work_id, result=None, exception=None, 2097db96d56Sopenharmony_ci exit_pid=None): 2107db96d56Sopenharmony_ci """Safely send back the given result or exception""" 2117db96d56Sopenharmony_ci try: 2127db96d56Sopenharmony_ci result_queue.put(_ResultItem(work_id, result=result, 2137db96d56Sopenharmony_ci exception=exception, exit_pid=exit_pid)) 2147db96d56Sopenharmony_ci except BaseException as e: 2157db96d56Sopenharmony_ci exc = _ExceptionWithTraceback(e, e.__traceback__) 2167db96d56Sopenharmony_ci result_queue.put(_ResultItem(work_id, exception=exc, 2177db96d56Sopenharmony_ci exit_pid=exit_pid)) 2187db96d56Sopenharmony_ci 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_cidef _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None): 2217db96d56Sopenharmony_ci """Evaluates calls from call_queue and places the results in result_queue. 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci This worker is run in a separate process. 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci Args: 2267db96d56Sopenharmony_ci call_queue: A ctx.Queue of _CallItems that will be read and 2277db96d56Sopenharmony_ci evaluated by the worker. 2287db96d56Sopenharmony_ci result_queue: A ctx.Queue of _ResultItems that will written 2297db96d56Sopenharmony_ci to by the worker. 2307db96d56Sopenharmony_ci initializer: A callable initializer, or None 2317db96d56Sopenharmony_ci initargs: A tuple of args for the initializer 2327db96d56Sopenharmony_ci """ 2337db96d56Sopenharmony_ci if initializer is not None: 2347db96d56Sopenharmony_ci try: 2357db96d56Sopenharmony_ci initializer(*initargs) 2367db96d56Sopenharmony_ci except BaseException: 2377db96d56Sopenharmony_ci _base.LOGGER.critical('Exception in initializer:', exc_info=True) 2387db96d56Sopenharmony_ci # The parent will notice that the process stopped and 2397db96d56Sopenharmony_ci # mark the pool broken 2407db96d56Sopenharmony_ci return 2417db96d56Sopenharmony_ci num_tasks = 0 2427db96d56Sopenharmony_ci exit_pid = None 2437db96d56Sopenharmony_ci while True: 2447db96d56Sopenharmony_ci call_item = call_queue.get(block=True) 2457db96d56Sopenharmony_ci if call_item is None: 2467db96d56Sopenharmony_ci # Wake up queue management thread 2477db96d56Sopenharmony_ci result_queue.put(os.getpid()) 2487db96d56Sopenharmony_ci return 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ci if max_tasks is not None: 2517db96d56Sopenharmony_ci num_tasks += 1 2527db96d56Sopenharmony_ci if num_tasks >= max_tasks: 2537db96d56Sopenharmony_ci exit_pid = os.getpid() 2547db96d56Sopenharmony_ci 2557db96d56Sopenharmony_ci try: 2567db96d56Sopenharmony_ci r = call_item.fn(*call_item.args, **call_item.kwargs) 2577db96d56Sopenharmony_ci except BaseException as e: 2587db96d56Sopenharmony_ci exc = _ExceptionWithTraceback(e, e.__traceback__) 2597db96d56Sopenharmony_ci _sendback_result(result_queue, call_item.work_id, exception=exc, 2607db96d56Sopenharmony_ci exit_pid=exit_pid) 2617db96d56Sopenharmony_ci else: 2627db96d56Sopenharmony_ci _sendback_result(result_queue, call_item.work_id, result=r, 2637db96d56Sopenharmony_ci exit_pid=exit_pid) 2647db96d56Sopenharmony_ci del r 2657db96d56Sopenharmony_ci 2667db96d56Sopenharmony_ci # Liberate the resource as soon as possible, to avoid holding onto 2677db96d56Sopenharmony_ci # open files or shared memory that is not needed anymore 2687db96d56Sopenharmony_ci del call_item 2697db96d56Sopenharmony_ci 2707db96d56Sopenharmony_ci if exit_pid is not None: 2717db96d56Sopenharmony_ci return 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci 2747db96d56Sopenharmony_ciclass _ExecutorManagerThread(threading.Thread): 2757db96d56Sopenharmony_ci """Manages the communication between this process and the worker processes. 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci The manager is run in a local thread. 2787db96d56Sopenharmony_ci 2797db96d56Sopenharmony_ci Args: 2807db96d56Sopenharmony_ci executor: A reference to the ProcessPoolExecutor that owns 2817db96d56Sopenharmony_ci this thread. A weakref will be own by the manager as well as 2827db96d56Sopenharmony_ci references to internal objects used to introspect the state of 2837db96d56Sopenharmony_ci the executor. 2847db96d56Sopenharmony_ci """ 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_ci def __init__(self, executor): 2877db96d56Sopenharmony_ci # Store references to necessary internals of the executor. 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_ci # A _ThreadWakeup to allow waking up the queue_manager_thread from the 2907db96d56Sopenharmony_ci # main Thread and avoid deadlocks caused by permanently locked queues. 2917db96d56Sopenharmony_ci self.thread_wakeup = executor._executor_manager_thread_wakeup 2927db96d56Sopenharmony_ci self.shutdown_lock = executor._shutdown_lock 2937db96d56Sopenharmony_ci 2947db96d56Sopenharmony_ci # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used 2957db96d56Sopenharmony_ci # to determine if the ProcessPoolExecutor has been garbage collected 2967db96d56Sopenharmony_ci # and that the manager can exit. 2977db96d56Sopenharmony_ci # When the executor gets garbage collected, the weakref callback 2987db96d56Sopenharmony_ci # will wake up the queue management thread so that it can terminate 2997db96d56Sopenharmony_ci # if there is no pending work item. 3007db96d56Sopenharmony_ci def weakref_cb(_, 3017db96d56Sopenharmony_ci thread_wakeup=self.thread_wakeup, 3027db96d56Sopenharmony_ci shutdown_lock=self.shutdown_lock): 3037db96d56Sopenharmony_ci mp.util.debug('Executor collected: triggering callback for' 3047db96d56Sopenharmony_ci ' QueueManager wakeup') 3057db96d56Sopenharmony_ci with shutdown_lock: 3067db96d56Sopenharmony_ci thread_wakeup.wakeup() 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci self.executor_reference = weakref.ref(executor, weakref_cb) 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci # A list of the ctx.Process instances used as workers. 3117db96d56Sopenharmony_ci self.processes = executor._processes 3127db96d56Sopenharmony_ci 3137db96d56Sopenharmony_ci # A ctx.Queue that will be filled with _CallItems derived from 3147db96d56Sopenharmony_ci # _WorkItems for processing by the process workers. 3157db96d56Sopenharmony_ci self.call_queue = executor._call_queue 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci # A ctx.SimpleQueue of _ResultItems generated by the process workers. 3187db96d56Sopenharmony_ci self.result_queue = executor._result_queue 3197db96d56Sopenharmony_ci 3207db96d56Sopenharmony_ci # A queue.Queue of work ids e.g. Queue([5, 6, ...]). 3217db96d56Sopenharmony_ci self.work_ids_queue = executor._work_ids 3227db96d56Sopenharmony_ci 3237db96d56Sopenharmony_ci # Maximum number of tasks a worker process can execute before 3247db96d56Sopenharmony_ci # exiting safely 3257db96d56Sopenharmony_ci self.max_tasks_per_child = executor._max_tasks_per_child 3267db96d56Sopenharmony_ci 3277db96d56Sopenharmony_ci # A dict mapping work ids to _WorkItems e.g. 3287db96d56Sopenharmony_ci # {5: <_WorkItem...>, 6: <_WorkItem...>, ...} 3297db96d56Sopenharmony_ci self.pending_work_items = executor._pending_work_items 3307db96d56Sopenharmony_ci 3317db96d56Sopenharmony_ci super().__init__() 3327db96d56Sopenharmony_ci 3337db96d56Sopenharmony_ci def run(self): 3347db96d56Sopenharmony_ci # Main loop for the executor manager thread. 3357db96d56Sopenharmony_ci 3367db96d56Sopenharmony_ci while True: 3377db96d56Sopenharmony_ci self.add_call_item_to_queue() 3387db96d56Sopenharmony_ci 3397db96d56Sopenharmony_ci result_item, is_broken, cause = self.wait_result_broken_or_wakeup() 3407db96d56Sopenharmony_ci 3417db96d56Sopenharmony_ci if is_broken: 3427db96d56Sopenharmony_ci self.terminate_broken(cause) 3437db96d56Sopenharmony_ci return 3447db96d56Sopenharmony_ci if result_item is not None: 3457db96d56Sopenharmony_ci self.process_result_item(result_item) 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci process_exited = result_item.exit_pid is not None 3487db96d56Sopenharmony_ci if process_exited: 3497db96d56Sopenharmony_ci p = self.processes.pop(result_item.exit_pid) 3507db96d56Sopenharmony_ci p.join() 3517db96d56Sopenharmony_ci 3527db96d56Sopenharmony_ci # Delete reference to result_item to avoid keeping references 3537db96d56Sopenharmony_ci # while waiting on new results. 3547db96d56Sopenharmony_ci del result_item 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci if executor := self.executor_reference(): 3577db96d56Sopenharmony_ci if process_exited: 3587db96d56Sopenharmony_ci with self.shutdown_lock: 3597db96d56Sopenharmony_ci executor._adjust_process_count() 3607db96d56Sopenharmony_ci else: 3617db96d56Sopenharmony_ci executor._idle_worker_semaphore.release() 3627db96d56Sopenharmony_ci del executor 3637db96d56Sopenharmony_ci 3647db96d56Sopenharmony_ci if self.is_shutting_down(): 3657db96d56Sopenharmony_ci self.flag_executor_shutting_down() 3667db96d56Sopenharmony_ci 3677db96d56Sopenharmony_ci # When only canceled futures remain in pending_work_items, our 3687db96d56Sopenharmony_ci # next call to wait_result_broken_or_wakeup would hang forever. 3697db96d56Sopenharmony_ci # This makes sure we have some running futures or none at all. 3707db96d56Sopenharmony_ci self.add_call_item_to_queue() 3717db96d56Sopenharmony_ci 3727db96d56Sopenharmony_ci # Since no new work items can be added, it is safe to shutdown 3737db96d56Sopenharmony_ci # this thread if there are no pending work items. 3747db96d56Sopenharmony_ci if not self.pending_work_items: 3757db96d56Sopenharmony_ci self.join_executor_internals() 3767db96d56Sopenharmony_ci return 3777db96d56Sopenharmony_ci 3787db96d56Sopenharmony_ci def add_call_item_to_queue(self): 3797db96d56Sopenharmony_ci # Fills call_queue with _WorkItems from pending_work_items. 3807db96d56Sopenharmony_ci # This function never blocks. 3817db96d56Sopenharmony_ci while True: 3827db96d56Sopenharmony_ci if self.call_queue.full(): 3837db96d56Sopenharmony_ci return 3847db96d56Sopenharmony_ci try: 3857db96d56Sopenharmony_ci work_id = self.work_ids_queue.get(block=False) 3867db96d56Sopenharmony_ci except queue.Empty: 3877db96d56Sopenharmony_ci return 3887db96d56Sopenharmony_ci else: 3897db96d56Sopenharmony_ci work_item = self.pending_work_items[work_id] 3907db96d56Sopenharmony_ci 3917db96d56Sopenharmony_ci if work_item.future.set_running_or_notify_cancel(): 3927db96d56Sopenharmony_ci self.call_queue.put(_CallItem(work_id, 3937db96d56Sopenharmony_ci work_item.fn, 3947db96d56Sopenharmony_ci work_item.args, 3957db96d56Sopenharmony_ci work_item.kwargs), 3967db96d56Sopenharmony_ci block=True) 3977db96d56Sopenharmony_ci else: 3987db96d56Sopenharmony_ci del self.pending_work_items[work_id] 3997db96d56Sopenharmony_ci continue 4007db96d56Sopenharmony_ci 4017db96d56Sopenharmony_ci def wait_result_broken_or_wakeup(self): 4027db96d56Sopenharmony_ci # Wait for a result to be ready in the result_queue while checking 4037db96d56Sopenharmony_ci # that all worker processes are still running, or for a wake up 4047db96d56Sopenharmony_ci # signal send. The wake up signals come either from new tasks being 4057db96d56Sopenharmony_ci # submitted, from the executor being shutdown/gc-ed, or from the 4067db96d56Sopenharmony_ci # shutdown of the python interpreter. 4077db96d56Sopenharmony_ci result_reader = self.result_queue._reader 4087db96d56Sopenharmony_ci assert not self.thread_wakeup._closed 4097db96d56Sopenharmony_ci wakeup_reader = self.thread_wakeup._reader 4107db96d56Sopenharmony_ci readers = [result_reader, wakeup_reader] 4117db96d56Sopenharmony_ci worker_sentinels = [p.sentinel for p in list(self.processes.values())] 4127db96d56Sopenharmony_ci ready = mp.connection.wait(readers + worker_sentinels) 4137db96d56Sopenharmony_ci 4147db96d56Sopenharmony_ci cause = None 4157db96d56Sopenharmony_ci is_broken = True 4167db96d56Sopenharmony_ci result_item = None 4177db96d56Sopenharmony_ci if result_reader in ready: 4187db96d56Sopenharmony_ci try: 4197db96d56Sopenharmony_ci result_item = result_reader.recv() 4207db96d56Sopenharmony_ci is_broken = False 4217db96d56Sopenharmony_ci except BaseException as e: 4227db96d56Sopenharmony_ci cause = format_exception(type(e), e, e.__traceback__) 4237db96d56Sopenharmony_ci 4247db96d56Sopenharmony_ci elif wakeup_reader in ready: 4257db96d56Sopenharmony_ci is_broken = False 4267db96d56Sopenharmony_ci 4277db96d56Sopenharmony_ci with self.shutdown_lock: 4287db96d56Sopenharmony_ci self.thread_wakeup.clear() 4297db96d56Sopenharmony_ci 4307db96d56Sopenharmony_ci return result_item, is_broken, cause 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ci def process_result_item(self, result_item): 4337db96d56Sopenharmony_ci # Process the received a result_item. This can be either the PID of a 4347db96d56Sopenharmony_ci # worker that exited gracefully or a _ResultItem 4357db96d56Sopenharmony_ci 4367db96d56Sopenharmony_ci if isinstance(result_item, int): 4377db96d56Sopenharmony_ci # Clean shutdown of a worker using its PID 4387db96d56Sopenharmony_ci # (avoids marking the executor broken) 4397db96d56Sopenharmony_ci assert self.is_shutting_down() 4407db96d56Sopenharmony_ci p = self.processes.pop(result_item) 4417db96d56Sopenharmony_ci p.join() 4427db96d56Sopenharmony_ci if not self.processes: 4437db96d56Sopenharmony_ci self.join_executor_internals() 4447db96d56Sopenharmony_ci return 4457db96d56Sopenharmony_ci else: 4467db96d56Sopenharmony_ci # Received a _ResultItem so mark the future as completed. 4477db96d56Sopenharmony_ci work_item = self.pending_work_items.pop(result_item.work_id, None) 4487db96d56Sopenharmony_ci # work_item can be None if another process terminated (see above) 4497db96d56Sopenharmony_ci if work_item is not None: 4507db96d56Sopenharmony_ci if result_item.exception: 4517db96d56Sopenharmony_ci work_item.future.set_exception(result_item.exception) 4527db96d56Sopenharmony_ci else: 4537db96d56Sopenharmony_ci work_item.future.set_result(result_item.result) 4547db96d56Sopenharmony_ci 4557db96d56Sopenharmony_ci def is_shutting_down(self): 4567db96d56Sopenharmony_ci # Check whether we should start shutting down the executor. 4577db96d56Sopenharmony_ci executor = self.executor_reference() 4587db96d56Sopenharmony_ci # No more work items can be added if: 4597db96d56Sopenharmony_ci # - The interpreter is shutting down OR 4607db96d56Sopenharmony_ci # - The executor that owns this worker has been collected OR 4617db96d56Sopenharmony_ci # - The executor that owns this worker has been shutdown. 4627db96d56Sopenharmony_ci return (_global_shutdown or executor is None 4637db96d56Sopenharmony_ci or executor._shutdown_thread) 4647db96d56Sopenharmony_ci 4657db96d56Sopenharmony_ci def terminate_broken(self, cause): 4667db96d56Sopenharmony_ci # Terminate the executor because it is in a broken state. The cause 4677db96d56Sopenharmony_ci # argument can be used to display more information on the error that 4687db96d56Sopenharmony_ci # lead the executor into becoming broken. 4697db96d56Sopenharmony_ci 4707db96d56Sopenharmony_ci # Mark the process pool broken so that submits fail right now. 4717db96d56Sopenharmony_ci executor = self.executor_reference() 4727db96d56Sopenharmony_ci if executor is not None: 4737db96d56Sopenharmony_ci executor._broken = ('A child process terminated ' 4747db96d56Sopenharmony_ci 'abruptly, the process pool is not ' 4757db96d56Sopenharmony_ci 'usable anymore') 4767db96d56Sopenharmony_ci executor._shutdown_thread = True 4777db96d56Sopenharmony_ci executor = None 4787db96d56Sopenharmony_ci 4797db96d56Sopenharmony_ci # All pending tasks are to be marked failed with the following 4807db96d56Sopenharmony_ci # BrokenProcessPool error 4817db96d56Sopenharmony_ci bpe = BrokenProcessPool("A process in the process pool was " 4827db96d56Sopenharmony_ci "terminated abruptly while the future was " 4837db96d56Sopenharmony_ci "running or pending.") 4847db96d56Sopenharmony_ci if cause is not None: 4857db96d56Sopenharmony_ci bpe.__cause__ = _RemoteTraceback( 4867db96d56Sopenharmony_ci f"\n'''\n{''.join(cause)}'''") 4877db96d56Sopenharmony_ci 4887db96d56Sopenharmony_ci # Mark pending tasks as failed. 4897db96d56Sopenharmony_ci for work_id, work_item in self.pending_work_items.items(): 4907db96d56Sopenharmony_ci work_item.future.set_exception(bpe) 4917db96d56Sopenharmony_ci # Delete references to object. See issue16284 4927db96d56Sopenharmony_ci del work_item 4937db96d56Sopenharmony_ci self.pending_work_items.clear() 4947db96d56Sopenharmony_ci 4957db96d56Sopenharmony_ci # Terminate remaining workers forcibly: the queues or their 4967db96d56Sopenharmony_ci # locks may be in a dirty state and block forever. 4977db96d56Sopenharmony_ci for p in self.processes.values(): 4987db96d56Sopenharmony_ci p.terminate() 4997db96d56Sopenharmony_ci 5007db96d56Sopenharmony_ci # clean up resources 5017db96d56Sopenharmony_ci self.join_executor_internals() 5027db96d56Sopenharmony_ci 5037db96d56Sopenharmony_ci def flag_executor_shutting_down(self): 5047db96d56Sopenharmony_ci # Flag the executor as shutting down and cancel remaining tasks if 5057db96d56Sopenharmony_ci # requested as early as possible if it is not gc-ed yet. 5067db96d56Sopenharmony_ci executor = self.executor_reference() 5077db96d56Sopenharmony_ci if executor is not None: 5087db96d56Sopenharmony_ci executor._shutdown_thread = True 5097db96d56Sopenharmony_ci # Cancel pending work items if requested. 5107db96d56Sopenharmony_ci if executor._cancel_pending_futures: 5117db96d56Sopenharmony_ci # Cancel all pending futures and update pending_work_items 5127db96d56Sopenharmony_ci # to only have futures that are currently running. 5137db96d56Sopenharmony_ci new_pending_work_items = {} 5147db96d56Sopenharmony_ci for work_id, work_item in self.pending_work_items.items(): 5157db96d56Sopenharmony_ci if not work_item.future.cancel(): 5167db96d56Sopenharmony_ci new_pending_work_items[work_id] = work_item 5177db96d56Sopenharmony_ci self.pending_work_items = new_pending_work_items 5187db96d56Sopenharmony_ci # Drain work_ids_queue since we no longer need to 5197db96d56Sopenharmony_ci # add items to the call queue. 5207db96d56Sopenharmony_ci while True: 5217db96d56Sopenharmony_ci try: 5227db96d56Sopenharmony_ci self.work_ids_queue.get_nowait() 5237db96d56Sopenharmony_ci except queue.Empty: 5247db96d56Sopenharmony_ci break 5257db96d56Sopenharmony_ci # Make sure we do this only once to not waste time looping 5267db96d56Sopenharmony_ci # on running processes over and over. 5277db96d56Sopenharmony_ci executor._cancel_pending_futures = False 5287db96d56Sopenharmony_ci 5297db96d56Sopenharmony_ci def shutdown_workers(self): 5307db96d56Sopenharmony_ci n_children_to_stop = self.get_n_children_alive() 5317db96d56Sopenharmony_ci n_sentinels_sent = 0 5327db96d56Sopenharmony_ci # Send the right number of sentinels, to make sure all children are 5337db96d56Sopenharmony_ci # properly terminated. 5347db96d56Sopenharmony_ci while (n_sentinels_sent < n_children_to_stop 5357db96d56Sopenharmony_ci and self.get_n_children_alive() > 0): 5367db96d56Sopenharmony_ci for i in range(n_children_to_stop - n_sentinels_sent): 5377db96d56Sopenharmony_ci try: 5387db96d56Sopenharmony_ci self.call_queue.put_nowait(None) 5397db96d56Sopenharmony_ci n_sentinels_sent += 1 5407db96d56Sopenharmony_ci except queue.Full: 5417db96d56Sopenharmony_ci break 5427db96d56Sopenharmony_ci 5437db96d56Sopenharmony_ci def join_executor_internals(self): 5447db96d56Sopenharmony_ci self.shutdown_workers() 5457db96d56Sopenharmony_ci # Release the queue's resources as soon as possible. 5467db96d56Sopenharmony_ci self.call_queue.close() 5477db96d56Sopenharmony_ci self.call_queue.join_thread() 5487db96d56Sopenharmony_ci with self.shutdown_lock: 5497db96d56Sopenharmony_ci self.thread_wakeup.close() 5507db96d56Sopenharmony_ci # If .join() is not called on the created processes then 5517db96d56Sopenharmony_ci # some ctx.Queue methods may deadlock on Mac OS X. 5527db96d56Sopenharmony_ci for p in self.processes.values(): 5537db96d56Sopenharmony_ci p.join() 5547db96d56Sopenharmony_ci 5557db96d56Sopenharmony_ci def get_n_children_alive(self): 5567db96d56Sopenharmony_ci # This is an upper bound on the number of children alive. 5577db96d56Sopenharmony_ci return sum(p.is_alive() for p in self.processes.values()) 5587db96d56Sopenharmony_ci 5597db96d56Sopenharmony_ci 5607db96d56Sopenharmony_ci_system_limits_checked = False 5617db96d56Sopenharmony_ci_system_limited = None 5627db96d56Sopenharmony_ci 5637db96d56Sopenharmony_ci 5647db96d56Sopenharmony_cidef _check_system_limits(): 5657db96d56Sopenharmony_ci global _system_limits_checked, _system_limited 5667db96d56Sopenharmony_ci if _system_limits_checked: 5677db96d56Sopenharmony_ci if _system_limited: 5687db96d56Sopenharmony_ci raise NotImplementedError(_system_limited) 5697db96d56Sopenharmony_ci _system_limits_checked = True 5707db96d56Sopenharmony_ci try: 5717db96d56Sopenharmony_ci import multiprocessing.synchronize 5727db96d56Sopenharmony_ci except ImportError: 5737db96d56Sopenharmony_ci _system_limited = ( 5747db96d56Sopenharmony_ci "This Python build lacks multiprocessing.synchronize, usually due " 5757db96d56Sopenharmony_ci "to named semaphores being unavailable on this platform." 5767db96d56Sopenharmony_ci ) 5777db96d56Sopenharmony_ci raise NotImplementedError(_system_limited) 5787db96d56Sopenharmony_ci try: 5797db96d56Sopenharmony_ci nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") 5807db96d56Sopenharmony_ci except (AttributeError, ValueError): 5817db96d56Sopenharmony_ci # sysconf not available or setting not available 5827db96d56Sopenharmony_ci return 5837db96d56Sopenharmony_ci if nsems_max == -1: 5847db96d56Sopenharmony_ci # indetermined limit, assume that limit is determined 5857db96d56Sopenharmony_ci # by available memory only 5867db96d56Sopenharmony_ci return 5877db96d56Sopenharmony_ci if nsems_max >= 256: 5887db96d56Sopenharmony_ci # minimum number of semaphores available 5897db96d56Sopenharmony_ci # according to POSIX 5907db96d56Sopenharmony_ci return 5917db96d56Sopenharmony_ci _system_limited = ("system provides too few semaphores (%d" 5927db96d56Sopenharmony_ci " available, 256 necessary)" % nsems_max) 5937db96d56Sopenharmony_ci raise NotImplementedError(_system_limited) 5947db96d56Sopenharmony_ci 5957db96d56Sopenharmony_ci 5967db96d56Sopenharmony_cidef _chain_from_iterable_of_lists(iterable): 5977db96d56Sopenharmony_ci """ 5987db96d56Sopenharmony_ci Specialized implementation of itertools.chain.from_iterable. 5997db96d56Sopenharmony_ci Each item in *iterable* should be a list. This function is 6007db96d56Sopenharmony_ci careful not to keep references to yielded objects. 6017db96d56Sopenharmony_ci """ 6027db96d56Sopenharmony_ci for element in iterable: 6037db96d56Sopenharmony_ci element.reverse() 6047db96d56Sopenharmony_ci while element: 6057db96d56Sopenharmony_ci yield element.pop() 6067db96d56Sopenharmony_ci 6077db96d56Sopenharmony_ci 6087db96d56Sopenharmony_ciclass BrokenProcessPool(_base.BrokenExecutor): 6097db96d56Sopenharmony_ci """ 6107db96d56Sopenharmony_ci Raised when a process in a ProcessPoolExecutor terminated abruptly 6117db96d56Sopenharmony_ci while a future was in the running state. 6127db96d56Sopenharmony_ci """ 6137db96d56Sopenharmony_ci 6147db96d56Sopenharmony_ci 6157db96d56Sopenharmony_ciclass ProcessPoolExecutor(_base.Executor): 6167db96d56Sopenharmony_ci def __init__(self, max_workers=None, mp_context=None, 6177db96d56Sopenharmony_ci initializer=None, initargs=(), *, max_tasks_per_child=None): 6187db96d56Sopenharmony_ci """Initializes a new ProcessPoolExecutor instance. 6197db96d56Sopenharmony_ci 6207db96d56Sopenharmony_ci Args: 6217db96d56Sopenharmony_ci max_workers: The maximum number of processes that can be used to 6227db96d56Sopenharmony_ci execute the given calls. If None or not given then as many 6237db96d56Sopenharmony_ci worker processes will be created as the machine has processors. 6247db96d56Sopenharmony_ci mp_context: A multiprocessing context to launch the workers. This 6257db96d56Sopenharmony_ci object should provide SimpleQueue, Queue and Process. Useful 6267db96d56Sopenharmony_ci to allow specific multiprocessing start methods. 6277db96d56Sopenharmony_ci initializer: A callable used to initialize worker processes. 6287db96d56Sopenharmony_ci initargs: A tuple of arguments to pass to the initializer. 6297db96d56Sopenharmony_ci max_tasks_per_child: The maximum number of tasks a worker process 6307db96d56Sopenharmony_ci can complete before it will exit and be replaced with a fresh 6317db96d56Sopenharmony_ci worker process. The default of None means worker process will 6327db96d56Sopenharmony_ci live as long as the executor. Requires a non-'fork' mp_context 6337db96d56Sopenharmony_ci start method. When given, we default to using 'spawn' if no 6347db96d56Sopenharmony_ci mp_context is supplied. 6357db96d56Sopenharmony_ci """ 6367db96d56Sopenharmony_ci _check_system_limits() 6377db96d56Sopenharmony_ci 6387db96d56Sopenharmony_ci if max_workers is None: 6397db96d56Sopenharmony_ci self._max_workers = os.cpu_count() or 1 6407db96d56Sopenharmony_ci if sys.platform == 'win32': 6417db96d56Sopenharmony_ci self._max_workers = min(_MAX_WINDOWS_WORKERS, 6427db96d56Sopenharmony_ci self._max_workers) 6437db96d56Sopenharmony_ci else: 6447db96d56Sopenharmony_ci if max_workers <= 0: 6457db96d56Sopenharmony_ci raise ValueError("max_workers must be greater than 0") 6467db96d56Sopenharmony_ci elif (sys.platform == 'win32' and 6477db96d56Sopenharmony_ci max_workers > _MAX_WINDOWS_WORKERS): 6487db96d56Sopenharmony_ci raise ValueError( 6497db96d56Sopenharmony_ci f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") 6507db96d56Sopenharmony_ci 6517db96d56Sopenharmony_ci self._max_workers = max_workers 6527db96d56Sopenharmony_ci 6537db96d56Sopenharmony_ci if mp_context is None: 6547db96d56Sopenharmony_ci if max_tasks_per_child is not None: 6557db96d56Sopenharmony_ci mp_context = mp.get_context("spawn") 6567db96d56Sopenharmony_ci else: 6577db96d56Sopenharmony_ci mp_context = mp.get_context() 6587db96d56Sopenharmony_ci self._mp_context = mp_context 6597db96d56Sopenharmony_ci 6607db96d56Sopenharmony_ci # https://github.com/python/cpython/issues/90622 6617db96d56Sopenharmony_ci self._safe_to_dynamically_spawn_children = ( 6627db96d56Sopenharmony_ci self._mp_context.get_start_method(allow_none=False) != "fork") 6637db96d56Sopenharmony_ci 6647db96d56Sopenharmony_ci if initializer is not None and not callable(initializer): 6657db96d56Sopenharmony_ci raise TypeError("initializer must be a callable") 6667db96d56Sopenharmony_ci self._initializer = initializer 6677db96d56Sopenharmony_ci self._initargs = initargs 6687db96d56Sopenharmony_ci 6697db96d56Sopenharmony_ci if max_tasks_per_child is not None: 6707db96d56Sopenharmony_ci if not isinstance(max_tasks_per_child, int): 6717db96d56Sopenharmony_ci raise TypeError("max_tasks_per_child must be an integer") 6727db96d56Sopenharmony_ci elif max_tasks_per_child <= 0: 6737db96d56Sopenharmony_ci raise ValueError("max_tasks_per_child must be >= 1") 6747db96d56Sopenharmony_ci if self._mp_context.get_start_method(allow_none=False) == "fork": 6757db96d56Sopenharmony_ci # https://github.com/python/cpython/issues/90622 6767db96d56Sopenharmony_ci raise ValueError("max_tasks_per_child is incompatible with" 6777db96d56Sopenharmony_ci " the 'fork' multiprocessing start method;" 6787db96d56Sopenharmony_ci " supply a different mp_context.") 6797db96d56Sopenharmony_ci self._max_tasks_per_child = max_tasks_per_child 6807db96d56Sopenharmony_ci 6817db96d56Sopenharmony_ci # Management thread 6827db96d56Sopenharmony_ci self._executor_manager_thread = None 6837db96d56Sopenharmony_ci 6847db96d56Sopenharmony_ci # Map of pids to processes 6857db96d56Sopenharmony_ci self._processes = {} 6867db96d56Sopenharmony_ci 6877db96d56Sopenharmony_ci # Shutdown is a two-step process. 6887db96d56Sopenharmony_ci self._shutdown_thread = False 6897db96d56Sopenharmony_ci self._shutdown_lock = threading.Lock() 6907db96d56Sopenharmony_ci self._idle_worker_semaphore = threading.Semaphore(0) 6917db96d56Sopenharmony_ci self._broken = False 6927db96d56Sopenharmony_ci self._queue_count = 0 6937db96d56Sopenharmony_ci self._pending_work_items = {} 6947db96d56Sopenharmony_ci self._cancel_pending_futures = False 6957db96d56Sopenharmony_ci 6967db96d56Sopenharmony_ci # _ThreadWakeup is a communication channel used to interrupt the wait 6977db96d56Sopenharmony_ci # of the main loop of executor_manager_thread from another thread (e.g. 6987db96d56Sopenharmony_ci # when calling executor.submit or executor.shutdown). We do not use the 6997db96d56Sopenharmony_ci # _result_queue to send wakeup signals to the executor_manager_thread 7007db96d56Sopenharmony_ci # as it could result in a deadlock if a worker process dies with the 7017db96d56Sopenharmony_ci # _result_queue write lock still acquired. 7027db96d56Sopenharmony_ci # 7037db96d56Sopenharmony_ci # _shutdown_lock must be locked to access _ThreadWakeup. 7047db96d56Sopenharmony_ci self._executor_manager_thread_wakeup = _ThreadWakeup() 7057db96d56Sopenharmony_ci 7067db96d56Sopenharmony_ci # Create communication channels for the executor 7077db96d56Sopenharmony_ci # Make the call queue slightly larger than the number of processes to 7087db96d56Sopenharmony_ci # prevent the worker processes from idling. But don't make it too big 7097db96d56Sopenharmony_ci # because futures in the call queue cannot be cancelled. 7107db96d56Sopenharmony_ci queue_size = self._max_workers + EXTRA_QUEUED_CALLS 7117db96d56Sopenharmony_ci self._call_queue = _SafeQueue( 7127db96d56Sopenharmony_ci max_size=queue_size, ctx=self._mp_context, 7137db96d56Sopenharmony_ci pending_work_items=self._pending_work_items, 7147db96d56Sopenharmony_ci shutdown_lock=self._shutdown_lock, 7157db96d56Sopenharmony_ci thread_wakeup=self._executor_manager_thread_wakeup) 7167db96d56Sopenharmony_ci # Killed worker processes can produce spurious "broken pipe" 7177db96d56Sopenharmony_ci # tracebacks in the queue's own worker thread. But we detect killed 7187db96d56Sopenharmony_ci # processes anyway, so silence the tracebacks. 7197db96d56Sopenharmony_ci self._call_queue._ignore_epipe = True 7207db96d56Sopenharmony_ci self._result_queue = mp_context.SimpleQueue() 7217db96d56Sopenharmony_ci self._work_ids = queue.Queue() 7227db96d56Sopenharmony_ci 7237db96d56Sopenharmony_ci def _start_executor_manager_thread(self): 7247db96d56Sopenharmony_ci if self._executor_manager_thread is None: 7257db96d56Sopenharmony_ci # Start the processes so that their sentinels are known. 7267db96d56Sopenharmony_ci if not self._safe_to_dynamically_spawn_children: # ie, using fork. 7277db96d56Sopenharmony_ci self._launch_processes() 7287db96d56Sopenharmony_ci self._executor_manager_thread = _ExecutorManagerThread(self) 7297db96d56Sopenharmony_ci self._executor_manager_thread.start() 7307db96d56Sopenharmony_ci _threads_wakeups[self._executor_manager_thread] = \ 7317db96d56Sopenharmony_ci self._executor_manager_thread_wakeup 7327db96d56Sopenharmony_ci 7337db96d56Sopenharmony_ci def _adjust_process_count(self): 7347db96d56Sopenharmony_ci # if there's an idle process, we don't need to spawn a new one. 7357db96d56Sopenharmony_ci if self._idle_worker_semaphore.acquire(blocking=False): 7367db96d56Sopenharmony_ci return 7377db96d56Sopenharmony_ci 7387db96d56Sopenharmony_ci process_count = len(self._processes) 7397db96d56Sopenharmony_ci if process_count < self._max_workers: 7407db96d56Sopenharmony_ci # Assertion disabled as this codepath is also used to replace a 7417db96d56Sopenharmony_ci # worker that unexpectedly dies, even when using the 'fork' start 7427db96d56Sopenharmony_ci # method. That means there is still a potential deadlock bug. If a 7437db96d56Sopenharmony_ci # 'fork' mp_context worker dies, we'll be forking a new one when 7447db96d56Sopenharmony_ci # we know a thread is running (self._executor_manager_thread). 7457db96d56Sopenharmony_ci #assert self._safe_to_dynamically_spawn_children or not self._executor_manager_thread, 'https://github.com/python/cpython/issues/90622' 7467db96d56Sopenharmony_ci self._spawn_process() 7477db96d56Sopenharmony_ci 7487db96d56Sopenharmony_ci def _launch_processes(self): 7497db96d56Sopenharmony_ci # https://github.com/python/cpython/issues/90622 7507db96d56Sopenharmony_ci assert not self._executor_manager_thread, ( 7517db96d56Sopenharmony_ci 'Processes cannot be fork()ed after the thread has started, ' 7527db96d56Sopenharmony_ci 'deadlock in the child processes could result.') 7537db96d56Sopenharmony_ci for _ in range(len(self._processes), self._max_workers): 7547db96d56Sopenharmony_ci self._spawn_process() 7557db96d56Sopenharmony_ci 7567db96d56Sopenharmony_ci def _spawn_process(self): 7577db96d56Sopenharmony_ci p = self._mp_context.Process( 7587db96d56Sopenharmony_ci target=_process_worker, 7597db96d56Sopenharmony_ci args=(self._call_queue, 7607db96d56Sopenharmony_ci self._result_queue, 7617db96d56Sopenharmony_ci self._initializer, 7627db96d56Sopenharmony_ci self._initargs, 7637db96d56Sopenharmony_ci self._max_tasks_per_child)) 7647db96d56Sopenharmony_ci p.start() 7657db96d56Sopenharmony_ci self._processes[p.pid] = p 7667db96d56Sopenharmony_ci 7677db96d56Sopenharmony_ci def submit(self, fn, /, *args, **kwargs): 7687db96d56Sopenharmony_ci with self._shutdown_lock: 7697db96d56Sopenharmony_ci if self._broken: 7707db96d56Sopenharmony_ci raise BrokenProcessPool(self._broken) 7717db96d56Sopenharmony_ci if self._shutdown_thread: 7727db96d56Sopenharmony_ci raise RuntimeError('cannot schedule new futures after shutdown') 7737db96d56Sopenharmony_ci if _global_shutdown: 7747db96d56Sopenharmony_ci raise RuntimeError('cannot schedule new futures after ' 7757db96d56Sopenharmony_ci 'interpreter shutdown') 7767db96d56Sopenharmony_ci 7777db96d56Sopenharmony_ci f = _base.Future() 7787db96d56Sopenharmony_ci w = _WorkItem(f, fn, args, kwargs) 7797db96d56Sopenharmony_ci 7807db96d56Sopenharmony_ci self._pending_work_items[self._queue_count] = w 7817db96d56Sopenharmony_ci self._work_ids.put(self._queue_count) 7827db96d56Sopenharmony_ci self._queue_count += 1 7837db96d56Sopenharmony_ci # Wake up queue management thread 7847db96d56Sopenharmony_ci self._executor_manager_thread_wakeup.wakeup() 7857db96d56Sopenharmony_ci 7867db96d56Sopenharmony_ci if self._safe_to_dynamically_spawn_children: 7877db96d56Sopenharmony_ci self._adjust_process_count() 7887db96d56Sopenharmony_ci self._start_executor_manager_thread() 7897db96d56Sopenharmony_ci return f 7907db96d56Sopenharmony_ci submit.__doc__ = _base.Executor.submit.__doc__ 7917db96d56Sopenharmony_ci 7927db96d56Sopenharmony_ci def map(self, fn, *iterables, timeout=None, chunksize=1): 7937db96d56Sopenharmony_ci """Returns an iterator equivalent to map(fn, iter). 7947db96d56Sopenharmony_ci 7957db96d56Sopenharmony_ci Args: 7967db96d56Sopenharmony_ci fn: A callable that will take as many arguments as there are 7977db96d56Sopenharmony_ci passed iterables. 7987db96d56Sopenharmony_ci timeout: The maximum number of seconds to wait. If None, then there 7997db96d56Sopenharmony_ci is no limit on the wait time. 8007db96d56Sopenharmony_ci chunksize: If greater than one, the iterables will be chopped into 8017db96d56Sopenharmony_ci chunks of size chunksize and submitted to the process pool. 8027db96d56Sopenharmony_ci If set to one, the items in the list will be sent one at a time. 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci Returns: 8057db96d56Sopenharmony_ci An iterator equivalent to: map(func, *iterables) but the calls may 8067db96d56Sopenharmony_ci be evaluated out-of-order. 8077db96d56Sopenharmony_ci 8087db96d56Sopenharmony_ci Raises: 8097db96d56Sopenharmony_ci TimeoutError: If the entire result iterator could not be generated 8107db96d56Sopenharmony_ci before the given timeout. 8117db96d56Sopenharmony_ci Exception: If fn(*args) raises for any values. 8127db96d56Sopenharmony_ci """ 8137db96d56Sopenharmony_ci if chunksize < 1: 8147db96d56Sopenharmony_ci raise ValueError("chunksize must be >= 1.") 8157db96d56Sopenharmony_ci 8167db96d56Sopenharmony_ci results = super().map(partial(_process_chunk, fn), 8177db96d56Sopenharmony_ci _get_chunks(*iterables, chunksize=chunksize), 8187db96d56Sopenharmony_ci timeout=timeout) 8197db96d56Sopenharmony_ci return _chain_from_iterable_of_lists(results) 8207db96d56Sopenharmony_ci 8217db96d56Sopenharmony_ci def shutdown(self, wait=True, *, cancel_futures=False): 8227db96d56Sopenharmony_ci with self._shutdown_lock: 8237db96d56Sopenharmony_ci self._cancel_pending_futures = cancel_futures 8247db96d56Sopenharmony_ci self._shutdown_thread = True 8257db96d56Sopenharmony_ci if self._executor_manager_thread_wakeup is not None: 8267db96d56Sopenharmony_ci # Wake up queue management thread 8277db96d56Sopenharmony_ci self._executor_manager_thread_wakeup.wakeup() 8287db96d56Sopenharmony_ci 8297db96d56Sopenharmony_ci if self._executor_manager_thread is not None and wait: 8307db96d56Sopenharmony_ci self._executor_manager_thread.join() 8317db96d56Sopenharmony_ci # To reduce the risk of opening too many files, remove references to 8327db96d56Sopenharmony_ci # objects that use file descriptors. 8337db96d56Sopenharmony_ci self._executor_manager_thread = None 8347db96d56Sopenharmony_ci self._call_queue = None 8357db96d56Sopenharmony_ci if self._result_queue is not None and wait: 8367db96d56Sopenharmony_ci self._result_queue.close() 8377db96d56Sopenharmony_ci self._result_queue = None 8387db96d56Sopenharmony_ci self._processes = None 8397db96d56Sopenharmony_ci self._executor_manager_thread_wakeup = None 8407db96d56Sopenharmony_ci 8417db96d56Sopenharmony_ci shutdown.__doc__ = _base.Executor.shutdown.__doc__ 842