17db96d56Sopenharmony_ci# Copyright 2009 Brian Quinlan. All Rights Reserved. 27db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement. 37db96d56Sopenharmony_ci 47db96d56Sopenharmony_ci"""Implements ThreadPoolExecutor.""" 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_ci__author__ = 'Brian Quinlan (brian@sweetapp.com)' 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_cifrom concurrent.futures import _base 97db96d56Sopenharmony_ciimport itertools 107db96d56Sopenharmony_ciimport queue 117db96d56Sopenharmony_ciimport threading 127db96d56Sopenharmony_ciimport types 137db96d56Sopenharmony_ciimport weakref 147db96d56Sopenharmony_ciimport os 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ci_threads_queues = weakref.WeakKeyDictionary() 187db96d56Sopenharmony_ci_shutdown = False 197db96d56Sopenharmony_ci# Lock that ensures that new workers are not created while the interpreter is 207db96d56Sopenharmony_ci# shutting down. Must be held while mutating _threads_queues and _shutdown. 217db96d56Sopenharmony_ci_global_shutdown_lock = threading.Lock() 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_cidef _python_exit(): 247db96d56Sopenharmony_ci global _shutdown 257db96d56Sopenharmony_ci with _global_shutdown_lock: 267db96d56Sopenharmony_ci _shutdown = True 277db96d56Sopenharmony_ci items = list(_threads_queues.items()) 287db96d56Sopenharmony_ci for t, q in items: 297db96d56Sopenharmony_ci q.put(None) 307db96d56Sopenharmony_ci for t, q in items: 317db96d56Sopenharmony_ci t.join() 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ci# Register for `_python_exit()` to be called just before joining all 347db96d56Sopenharmony_ci# non-daemon threads. This is used instead of `atexit.register()` for 357db96d56Sopenharmony_ci# compatibility with subinterpreters, which no longer support daemon threads. 367db96d56Sopenharmony_ci# See bpo-39812 for context. 377db96d56Sopenharmony_cithreading._register_atexit(_python_exit) 387db96d56Sopenharmony_ci 397db96d56Sopenharmony_ci# At fork, reinitialize the `_global_shutdown_lock` lock in the child process 407db96d56Sopenharmony_ciif hasattr(os, 'register_at_fork'): 417db96d56Sopenharmony_ci os.register_at_fork(before=_global_shutdown_lock.acquire, 427db96d56Sopenharmony_ci after_in_child=_global_shutdown_lock._at_fork_reinit, 437db96d56Sopenharmony_ci after_in_parent=_global_shutdown_lock.release) 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ciclass _WorkItem(object): 477db96d56Sopenharmony_ci def __init__(self, future, fn, args, kwargs): 487db96d56Sopenharmony_ci self.future = future 497db96d56Sopenharmony_ci self.fn = fn 507db96d56Sopenharmony_ci self.args = args 517db96d56Sopenharmony_ci self.kwargs = kwargs 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci def run(self): 547db96d56Sopenharmony_ci if not self.future.set_running_or_notify_cancel(): 557db96d56Sopenharmony_ci return 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_ci try: 587db96d56Sopenharmony_ci result = self.fn(*self.args, **self.kwargs) 597db96d56Sopenharmony_ci except BaseException as exc: 607db96d56Sopenharmony_ci self.future.set_exception(exc) 617db96d56Sopenharmony_ci # Break a reference cycle with the exception 'exc' 627db96d56Sopenharmony_ci self = None 637db96d56Sopenharmony_ci else: 647db96d56Sopenharmony_ci self.future.set_result(result) 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci __class_getitem__ = classmethod(types.GenericAlias) 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_cidef _worker(executor_reference, work_queue, initializer, initargs): 707db96d56Sopenharmony_ci if initializer is not None: 717db96d56Sopenharmony_ci try: 727db96d56Sopenharmony_ci initializer(*initargs) 737db96d56Sopenharmony_ci except BaseException: 747db96d56Sopenharmony_ci _base.LOGGER.critical('Exception in initializer:', exc_info=True) 757db96d56Sopenharmony_ci executor = executor_reference() 767db96d56Sopenharmony_ci if executor is not None: 777db96d56Sopenharmony_ci executor._initializer_failed() 787db96d56Sopenharmony_ci return 797db96d56Sopenharmony_ci try: 807db96d56Sopenharmony_ci while True: 817db96d56Sopenharmony_ci work_item = work_queue.get(block=True) 827db96d56Sopenharmony_ci if work_item is not None: 837db96d56Sopenharmony_ci work_item.run() 847db96d56Sopenharmony_ci # Delete references to object. See issue16284 857db96d56Sopenharmony_ci del work_item 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci # attempt to increment idle count 887db96d56Sopenharmony_ci executor = executor_reference() 897db96d56Sopenharmony_ci if executor is not None: 907db96d56Sopenharmony_ci executor._idle_semaphore.release() 917db96d56Sopenharmony_ci del executor 927db96d56Sopenharmony_ci continue 937db96d56Sopenharmony_ci 947db96d56Sopenharmony_ci executor = executor_reference() 957db96d56Sopenharmony_ci # Exit if: 967db96d56Sopenharmony_ci # - The interpreter is shutting down OR 977db96d56Sopenharmony_ci # - The executor that owns the worker has been collected OR 987db96d56Sopenharmony_ci # - The executor that owns the worker has been shutdown. 997db96d56Sopenharmony_ci if _shutdown or executor is None or executor._shutdown: 1007db96d56Sopenharmony_ci # Flag the executor as shutting down as early as possible if it 1017db96d56Sopenharmony_ci # is not gc-ed yet. 1027db96d56Sopenharmony_ci if executor is not None: 1037db96d56Sopenharmony_ci executor._shutdown = True 1047db96d56Sopenharmony_ci # Notice other workers 1057db96d56Sopenharmony_ci work_queue.put(None) 1067db96d56Sopenharmony_ci return 1077db96d56Sopenharmony_ci del executor 1087db96d56Sopenharmony_ci except BaseException: 1097db96d56Sopenharmony_ci _base.LOGGER.critical('Exception in worker', exc_info=True) 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci 1127db96d56Sopenharmony_ciclass BrokenThreadPool(_base.BrokenExecutor): 1137db96d56Sopenharmony_ci """ 1147db96d56Sopenharmony_ci Raised when a worker thread in a ThreadPoolExecutor failed initializing. 1157db96d56Sopenharmony_ci """ 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci 1187db96d56Sopenharmony_ciclass ThreadPoolExecutor(_base.Executor): 1197db96d56Sopenharmony_ci 1207db96d56Sopenharmony_ci # Used to assign unique thread names when thread_name_prefix is not supplied. 1217db96d56Sopenharmony_ci _counter = itertools.count().__next__ 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci def __init__(self, max_workers=None, thread_name_prefix='', 1247db96d56Sopenharmony_ci initializer=None, initargs=()): 1257db96d56Sopenharmony_ci """Initializes a new ThreadPoolExecutor instance. 1267db96d56Sopenharmony_ci 1277db96d56Sopenharmony_ci Args: 1287db96d56Sopenharmony_ci max_workers: The maximum number of threads that can be used to 1297db96d56Sopenharmony_ci execute the given calls. 1307db96d56Sopenharmony_ci thread_name_prefix: An optional name prefix to give our threads. 1317db96d56Sopenharmony_ci initializer: A callable used to initialize worker threads. 1327db96d56Sopenharmony_ci initargs: A tuple of arguments to pass to the initializer. 1337db96d56Sopenharmony_ci """ 1347db96d56Sopenharmony_ci if max_workers is None: 1357db96d56Sopenharmony_ci # ThreadPoolExecutor is often used to: 1367db96d56Sopenharmony_ci # * CPU bound task which releases GIL 1377db96d56Sopenharmony_ci # * I/O bound task (which releases GIL, of course) 1387db96d56Sopenharmony_ci # 1397db96d56Sopenharmony_ci # We use cpu_count + 4 for both types of tasks. 1407db96d56Sopenharmony_ci # But we limit it to 32 to avoid consuming surprisingly large resource 1417db96d56Sopenharmony_ci # on many core machine. 1427db96d56Sopenharmony_ci max_workers = min(32, (os.cpu_count() or 1) + 4) 1437db96d56Sopenharmony_ci if max_workers <= 0: 1447db96d56Sopenharmony_ci raise ValueError("max_workers must be greater than 0") 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci if initializer is not None and not callable(initializer): 1477db96d56Sopenharmony_ci raise TypeError("initializer must be a callable") 1487db96d56Sopenharmony_ci 1497db96d56Sopenharmony_ci self._max_workers = max_workers 1507db96d56Sopenharmony_ci self._work_queue = queue.SimpleQueue() 1517db96d56Sopenharmony_ci self._idle_semaphore = threading.Semaphore(0) 1527db96d56Sopenharmony_ci self._threads = set() 1537db96d56Sopenharmony_ci self._broken = False 1547db96d56Sopenharmony_ci self._shutdown = False 1557db96d56Sopenharmony_ci self._shutdown_lock = threading.Lock() 1567db96d56Sopenharmony_ci self._thread_name_prefix = (thread_name_prefix or 1577db96d56Sopenharmony_ci ("ThreadPoolExecutor-%d" % self._counter())) 1587db96d56Sopenharmony_ci self._initializer = initializer 1597db96d56Sopenharmony_ci self._initargs = initargs 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci def submit(self, fn, /, *args, **kwargs): 1627db96d56Sopenharmony_ci with self._shutdown_lock, _global_shutdown_lock: 1637db96d56Sopenharmony_ci if self._broken: 1647db96d56Sopenharmony_ci raise BrokenThreadPool(self._broken) 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci if self._shutdown: 1677db96d56Sopenharmony_ci raise RuntimeError('cannot schedule new futures after shutdown') 1687db96d56Sopenharmony_ci if _shutdown: 1697db96d56Sopenharmony_ci raise RuntimeError('cannot schedule new futures after ' 1707db96d56Sopenharmony_ci 'interpreter shutdown') 1717db96d56Sopenharmony_ci 1727db96d56Sopenharmony_ci f = _base.Future() 1737db96d56Sopenharmony_ci w = _WorkItem(f, fn, args, kwargs) 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ci self._work_queue.put(w) 1767db96d56Sopenharmony_ci self._adjust_thread_count() 1777db96d56Sopenharmony_ci return f 1787db96d56Sopenharmony_ci submit.__doc__ = _base.Executor.submit.__doc__ 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci def _adjust_thread_count(self): 1817db96d56Sopenharmony_ci # if idle threads are available, don't spin new threads 1827db96d56Sopenharmony_ci if self._idle_semaphore.acquire(timeout=0): 1837db96d56Sopenharmony_ci return 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ci # When the executor gets lost, the weakref callback will wake up 1867db96d56Sopenharmony_ci # the worker threads. 1877db96d56Sopenharmony_ci def weakref_cb(_, q=self._work_queue): 1887db96d56Sopenharmony_ci q.put(None) 1897db96d56Sopenharmony_ci 1907db96d56Sopenharmony_ci num_threads = len(self._threads) 1917db96d56Sopenharmony_ci if num_threads < self._max_workers: 1927db96d56Sopenharmony_ci thread_name = '%s_%d' % (self._thread_name_prefix or self, 1937db96d56Sopenharmony_ci num_threads) 1947db96d56Sopenharmony_ci t = threading.Thread(name=thread_name, target=_worker, 1957db96d56Sopenharmony_ci args=(weakref.ref(self, weakref_cb), 1967db96d56Sopenharmony_ci self._work_queue, 1977db96d56Sopenharmony_ci self._initializer, 1987db96d56Sopenharmony_ci self._initargs)) 1997db96d56Sopenharmony_ci t.start() 2007db96d56Sopenharmony_ci self._threads.add(t) 2017db96d56Sopenharmony_ci _threads_queues[t] = self._work_queue 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_ci def _initializer_failed(self): 2047db96d56Sopenharmony_ci with self._shutdown_lock: 2057db96d56Sopenharmony_ci self._broken = ('A thread initializer failed, the thread pool ' 2067db96d56Sopenharmony_ci 'is not usable anymore') 2077db96d56Sopenharmony_ci # Drain work queue and mark pending futures failed 2087db96d56Sopenharmony_ci while True: 2097db96d56Sopenharmony_ci try: 2107db96d56Sopenharmony_ci work_item = self._work_queue.get_nowait() 2117db96d56Sopenharmony_ci except queue.Empty: 2127db96d56Sopenharmony_ci break 2137db96d56Sopenharmony_ci if work_item is not None: 2147db96d56Sopenharmony_ci work_item.future.set_exception(BrokenThreadPool(self._broken)) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci def shutdown(self, wait=True, *, cancel_futures=False): 2177db96d56Sopenharmony_ci with self._shutdown_lock: 2187db96d56Sopenharmony_ci self._shutdown = True 2197db96d56Sopenharmony_ci if cancel_futures: 2207db96d56Sopenharmony_ci # Drain all work items from the queue, and then cancel their 2217db96d56Sopenharmony_ci # associated futures. 2227db96d56Sopenharmony_ci while True: 2237db96d56Sopenharmony_ci try: 2247db96d56Sopenharmony_ci work_item = self._work_queue.get_nowait() 2257db96d56Sopenharmony_ci except queue.Empty: 2267db96d56Sopenharmony_ci break 2277db96d56Sopenharmony_ci if work_item is not None: 2287db96d56Sopenharmony_ci work_item.future.cancel() 2297db96d56Sopenharmony_ci 2307db96d56Sopenharmony_ci # Send a wake-up to prevent threads calling 2317db96d56Sopenharmony_ci # _work_queue.get(block=True) from permanently blocking. 2327db96d56Sopenharmony_ci self._work_queue.put(None) 2337db96d56Sopenharmony_ci if wait: 2347db96d56Sopenharmony_ci for t in self._threads: 2357db96d56Sopenharmony_ci t.join() 2367db96d56Sopenharmony_ci shutdown.__doc__ = _base.Executor.shutdown.__doc__ 237