17db96d56Sopenharmony_ci# Copyright 2009 Brian Quinlan. All Rights Reserved.
27db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement.
37db96d56Sopenharmony_ci
47db96d56Sopenharmony_ci"""Implements ThreadPoolExecutor."""
57db96d56Sopenharmony_ci
67db96d56Sopenharmony_ci__author__ = 'Brian Quinlan (brian@sweetapp.com)'
77db96d56Sopenharmony_ci
87db96d56Sopenharmony_cifrom concurrent.futures import _base
97db96d56Sopenharmony_ciimport itertools
107db96d56Sopenharmony_ciimport queue
117db96d56Sopenharmony_ciimport threading
127db96d56Sopenharmony_ciimport types
137db96d56Sopenharmony_ciimport weakref
147db96d56Sopenharmony_ciimport os
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_ci_threads_queues = weakref.WeakKeyDictionary()
187db96d56Sopenharmony_ci_shutdown = False
197db96d56Sopenharmony_ci# Lock that ensures that new workers are not created while the interpreter is
207db96d56Sopenharmony_ci# shutting down. Must be held while mutating _threads_queues and _shutdown.
217db96d56Sopenharmony_ci_global_shutdown_lock = threading.Lock()
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_cidef _python_exit():
247db96d56Sopenharmony_ci    global _shutdown
257db96d56Sopenharmony_ci    with _global_shutdown_lock:
267db96d56Sopenharmony_ci        _shutdown = True
277db96d56Sopenharmony_ci    items = list(_threads_queues.items())
287db96d56Sopenharmony_ci    for t, q in items:
297db96d56Sopenharmony_ci        q.put(None)
307db96d56Sopenharmony_ci    for t, q in items:
317db96d56Sopenharmony_ci        t.join()
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ci# Register for `_python_exit()` to be called just before joining all
347db96d56Sopenharmony_ci# non-daemon threads. This is used instead of `atexit.register()` for
357db96d56Sopenharmony_ci# compatibility with subinterpreters, which no longer support daemon threads.
367db96d56Sopenharmony_ci# See bpo-39812 for context.
377db96d56Sopenharmony_cithreading._register_atexit(_python_exit)
387db96d56Sopenharmony_ci
397db96d56Sopenharmony_ci# At fork, reinitialize the `_global_shutdown_lock` lock in the child process
407db96d56Sopenharmony_ciif hasattr(os, 'register_at_fork'):
417db96d56Sopenharmony_ci    os.register_at_fork(before=_global_shutdown_lock.acquire,
427db96d56Sopenharmony_ci                        after_in_child=_global_shutdown_lock._at_fork_reinit,
437db96d56Sopenharmony_ci                        after_in_parent=_global_shutdown_lock.release)
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ciclass _WorkItem(object):
477db96d56Sopenharmony_ci    def __init__(self, future, fn, args, kwargs):
487db96d56Sopenharmony_ci        self.future = future
497db96d56Sopenharmony_ci        self.fn = fn
507db96d56Sopenharmony_ci        self.args = args
517db96d56Sopenharmony_ci        self.kwargs = kwargs
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci    def run(self):
547db96d56Sopenharmony_ci        if not self.future.set_running_or_notify_cancel():
557db96d56Sopenharmony_ci            return
567db96d56Sopenharmony_ci
577db96d56Sopenharmony_ci        try:
587db96d56Sopenharmony_ci            result = self.fn(*self.args, **self.kwargs)
597db96d56Sopenharmony_ci        except BaseException as exc:
607db96d56Sopenharmony_ci            self.future.set_exception(exc)
617db96d56Sopenharmony_ci            # Break a reference cycle with the exception 'exc'
627db96d56Sopenharmony_ci            self = None
637db96d56Sopenharmony_ci        else:
647db96d56Sopenharmony_ci            self.future.set_result(result)
657db96d56Sopenharmony_ci
667db96d56Sopenharmony_ci    __class_getitem__ = classmethod(types.GenericAlias)
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_cidef _worker(executor_reference, work_queue, initializer, initargs):
707db96d56Sopenharmony_ci    if initializer is not None:
717db96d56Sopenharmony_ci        try:
727db96d56Sopenharmony_ci            initializer(*initargs)
737db96d56Sopenharmony_ci        except BaseException:
747db96d56Sopenharmony_ci            _base.LOGGER.critical('Exception in initializer:', exc_info=True)
757db96d56Sopenharmony_ci            executor = executor_reference()
767db96d56Sopenharmony_ci            if executor is not None:
777db96d56Sopenharmony_ci                executor._initializer_failed()
787db96d56Sopenharmony_ci            return
797db96d56Sopenharmony_ci    try:
807db96d56Sopenharmony_ci        while True:
817db96d56Sopenharmony_ci            work_item = work_queue.get(block=True)
827db96d56Sopenharmony_ci            if work_item is not None:
837db96d56Sopenharmony_ci                work_item.run()
847db96d56Sopenharmony_ci                # Delete references to object. See issue16284
857db96d56Sopenharmony_ci                del work_item
867db96d56Sopenharmony_ci
877db96d56Sopenharmony_ci                # attempt to increment idle count
887db96d56Sopenharmony_ci                executor = executor_reference()
897db96d56Sopenharmony_ci                if executor is not None:
907db96d56Sopenharmony_ci                    executor._idle_semaphore.release()
917db96d56Sopenharmony_ci                del executor
927db96d56Sopenharmony_ci                continue
937db96d56Sopenharmony_ci
947db96d56Sopenharmony_ci            executor = executor_reference()
957db96d56Sopenharmony_ci            # Exit if:
967db96d56Sopenharmony_ci            #   - The interpreter is shutting down OR
977db96d56Sopenharmony_ci            #   - The executor that owns the worker has been collected OR
987db96d56Sopenharmony_ci            #   - The executor that owns the worker has been shutdown.
997db96d56Sopenharmony_ci            if _shutdown or executor is None or executor._shutdown:
1007db96d56Sopenharmony_ci                # Flag the executor as shutting down as early as possible if it
1017db96d56Sopenharmony_ci                # is not gc-ed yet.
1027db96d56Sopenharmony_ci                if executor is not None:
1037db96d56Sopenharmony_ci                    executor._shutdown = True
1047db96d56Sopenharmony_ci                # Notice other workers
1057db96d56Sopenharmony_ci                work_queue.put(None)
1067db96d56Sopenharmony_ci                return
1077db96d56Sopenharmony_ci            del executor
1087db96d56Sopenharmony_ci    except BaseException:
1097db96d56Sopenharmony_ci        _base.LOGGER.critical('Exception in worker', exc_info=True)
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci
1127db96d56Sopenharmony_ciclass BrokenThreadPool(_base.BrokenExecutor):
1137db96d56Sopenharmony_ci    """
1147db96d56Sopenharmony_ci    Raised when a worker thread in a ThreadPoolExecutor failed initializing.
1157db96d56Sopenharmony_ci    """
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ci
1187db96d56Sopenharmony_ciclass ThreadPoolExecutor(_base.Executor):
1197db96d56Sopenharmony_ci
1207db96d56Sopenharmony_ci    # Used to assign unique thread names when thread_name_prefix is not supplied.
1217db96d56Sopenharmony_ci    _counter = itertools.count().__next__
1227db96d56Sopenharmony_ci
1237db96d56Sopenharmony_ci    def __init__(self, max_workers=None, thread_name_prefix='',
1247db96d56Sopenharmony_ci                 initializer=None, initargs=()):
1257db96d56Sopenharmony_ci        """Initializes a new ThreadPoolExecutor instance.
1267db96d56Sopenharmony_ci
1277db96d56Sopenharmony_ci        Args:
1287db96d56Sopenharmony_ci            max_workers: The maximum number of threads that can be used to
1297db96d56Sopenharmony_ci                execute the given calls.
1307db96d56Sopenharmony_ci            thread_name_prefix: An optional name prefix to give our threads.
1317db96d56Sopenharmony_ci            initializer: A callable used to initialize worker threads.
1327db96d56Sopenharmony_ci            initargs: A tuple of arguments to pass to the initializer.
1337db96d56Sopenharmony_ci        """
1347db96d56Sopenharmony_ci        if max_workers is None:
1357db96d56Sopenharmony_ci            # ThreadPoolExecutor is often used to:
1367db96d56Sopenharmony_ci            # * CPU bound task which releases GIL
1377db96d56Sopenharmony_ci            # * I/O bound task (which releases GIL, of course)
1387db96d56Sopenharmony_ci            #
1397db96d56Sopenharmony_ci            # We use cpu_count + 4 for both types of tasks.
1407db96d56Sopenharmony_ci            # But we limit it to 32 to avoid consuming surprisingly large resource
1417db96d56Sopenharmony_ci            # on many core machine.
1427db96d56Sopenharmony_ci            max_workers = min(32, (os.cpu_count() or 1) + 4)
1437db96d56Sopenharmony_ci        if max_workers <= 0:
1447db96d56Sopenharmony_ci            raise ValueError("max_workers must be greater than 0")
1457db96d56Sopenharmony_ci
1467db96d56Sopenharmony_ci        if initializer is not None and not callable(initializer):
1477db96d56Sopenharmony_ci            raise TypeError("initializer must be a callable")
1487db96d56Sopenharmony_ci
1497db96d56Sopenharmony_ci        self._max_workers = max_workers
1507db96d56Sopenharmony_ci        self._work_queue = queue.SimpleQueue()
1517db96d56Sopenharmony_ci        self._idle_semaphore = threading.Semaphore(0)
1527db96d56Sopenharmony_ci        self._threads = set()
1537db96d56Sopenharmony_ci        self._broken = False
1547db96d56Sopenharmony_ci        self._shutdown = False
1557db96d56Sopenharmony_ci        self._shutdown_lock = threading.Lock()
1567db96d56Sopenharmony_ci        self._thread_name_prefix = (thread_name_prefix or
1577db96d56Sopenharmony_ci                                    ("ThreadPoolExecutor-%d" % self._counter()))
1587db96d56Sopenharmony_ci        self._initializer = initializer
1597db96d56Sopenharmony_ci        self._initargs = initargs
1607db96d56Sopenharmony_ci
1617db96d56Sopenharmony_ci    def submit(self, fn, /, *args, **kwargs):
1627db96d56Sopenharmony_ci        with self._shutdown_lock, _global_shutdown_lock:
1637db96d56Sopenharmony_ci            if self._broken:
1647db96d56Sopenharmony_ci                raise BrokenThreadPool(self._broken)
1657db96d56Sopenharmony_ci
1667db96d56Sopenharmony_ci            if self._shutdown:
1677db96d56Sopenharmony_ci                raise RuntimeError('cannot schedule new futures after shutdown')
1687db96d56Sopenharmony_ci            if _shutdown:
1697db96d56Sopenharmony_ci                raise RuntimeError('cannot schedule new futures after '
1707db96d56Sopenharmony_ci                                   'interpreter shutdown')
1717db96d56Sopenharmony_ci
1727db96d56Sopenharmony_ci            f = _base.Future()
1737db96d56Sopenharmony_ci            w = _WorkItem(f, fn, args, kwargs)
1747db96d56Sopenharmony_ci
1757db96d56Sopenharmony_ci            self._work_queue.put(w)
1767db96d56Sopenharmony_ci            self._adjust_thread_count()
1777db96d56Sopenharmony_ci            return f
1787db96d56Sopenharmony_ci    submit.__doc__ = _base.Executor.submit.__doc__
1797db96d56Sopenharmony_ci
1807db96d56Sopenharmony_ci    def _adjust_thread_count(self):
1817db96d56Sopenharmony_ci        # if idle threads are available, don't spin new threads
1827db96d56Sopenharmony_ci        if self._idle_semaphore.acquire(timeout=0):
1837db96d56Sopenharmony_ci            return
1847db96d56Sopenharmony_ci
1857db96d56Sopenharmony_ci        # When the executor gets lost, the weakref callback will wake up
1867db96d56Sopenharmony_ci        # the worker threads.
1877db96d56Sopenharmony_ci        def weakref_cb(_, q=self._work_queue):
1887db96d56Sopenharmony_ci            q.put(None)
1897db96d56Sopenharmony_ci
1907db96d56Sopenharmony_ci        num_threads = len(self._threads)
1917db96d56Sopenharmony_ci        if num_threads < self._max_workers:
1927db96d56Sopenharmony_ci            thread_name = '%s_%d' % (self._thread_name_prefix or self,
1937db96d56Sopenharmony_ci                                     num_threads)
1947db96d56Sopenharmony_ci            t = threading.Thread(name=thread_name, target=_worker,
1957db96d56Sopenharmony_ci                                 args=(weakref.ref(self, weakref_cb),
1967db96d56Sopenharmony_ci                                       self._work_queue,
1977db96d56Sopenharmony_ci                                       self._initializer,
1987db96d56Sopenharmony_ci                                       self._initargs))
1997db96d56Sopenharmony_ci            t.start()
2007db96d56Sopenharmony_ci            self._threads.add(t)
2017db96d56Sopenharmony_ci            _threads_queues[t] = self._work_queue
2027db96d56Sopenharmony_ci
2037db96d56Sopenharmony_ci    def _initializer_failed(self):
2047db96d56Sopenharmony_ci        with self._shutdown_lock:
2057db96d56Sopenharmony_ci            self._broken = ('A thread initializer failed, the thread pool '
2067db96d56Sopenharmony_ci                            'is not usable anymore')
2077db96d56Sopenharmony_ci            # Drain work queue and mark pending futures failed
2087db96d56Sopenharmony_ci            while True:
2097db96d56Sopenharmony_ci                try:
2107db96d56Sopenharmony_ci                    work_item = self._work_queue.get_nowait()
2117db96d56Sopenharmony_ci                except queue.Empty:
2127db96d56Sopenharmony_ci                    break
2137db96d56Sopenharmony_ci                if work_item is not None:
2147db96d56Sopenharmony_ci                    work_item.future.set_exception(BrokenThreadPool(self._broken))
2157db96d56Sopenharmony_ci
2167db96d56Sopenharmony_ci    def shutdown(self, wait=True, *, cancel_futures=False):
2177db96d56Sopenharmony_ci        with self._shutdown_lock:
2187db96d56Sopenharmony_ci            self._shutdown = True
2197db96d56Sopenharmony_ci            if cancel_futures:
2207db96d56Sopenharmony_ci                # Drain all work items from the queue, and then cancel their
2217db96d56Sopenharmony_ci                # associated futures.
2227db96d56Sopenharmony_ci                while True:
2237db96d56Sopenharmony_ci                    try:
2247db96d56Sopenharmony_ci                        work_item = self._work_queue.get_nowait()
2257db96d56Sopenharmony_ci                    except queue.Empty:
2267db96d56Sopenharmony_ci                        break
2277db96d56Sopenharmony_ci                    if work_item is not None:
2287db96d56Sopenharmony_ci                        work_item.future.cancel()
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_ci            # Send a wake-up to prevent threads calling
2317db96d56Sopenharmony_ci            # _work_queue.get(block=True) from permanently blocking.
2327db96d56Sopenharmony_ci            self._work_queue.put(None)
2337db96d56Sopenharmony_ci        if wait:
2347db96d56Sopenharmony_ci            for t in self._threads:
2357db96d56Sopenharmony_ci                t.join()
2367db96d56Sopenharmony_ci    shutdown.__doc__ = _base.Executor.shutdown.__doc__
237