17db96d56Sopenharmony_ci# 27db96d56Sopenharmony_ci# Module providing the `Pool` class for managing a process pool 37db96d56Sopenharmony_ci# 47db96d56Sopenharmony_ci# multiprocessing/pool.py 57db96d56Sopenharmony_ci# 67db96d56Sopenharmony_ci# Copyright (c) 2006-2008, R Oudkerk 77db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement. 87db96d56Sopenharmony_ci# 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ci__all__ = ['Pool', 'ThreadPool'] 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ci# 137db96d56Sopenharmony_ci# Imports 147db96d56Sopenharmony_ci# 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ciimport collections 177db96d56Sopenharmony_ciimport itertools 187db96d56Sopenharmony_ciimport os 197db96d56Sopenharmony_ciimport queue 207db96d56Sopenharmony_ciimport threading 217db96d56Sopenharmony_ciimport time 227db96d56Sopenharmony_ciimport traceback 237db96d56Sopenharmony_ciimport types 247db96d56Sopenharmony_ciimport warnings 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci# If threading is available then ThreadPool should be provided. Therefore 277db96d56Sopenharmony_ci# we avoid top-level imports which are liable to fail on some systems. 287db96d56Sopenharmony_cifrom . import util 297db96d56Sopenharmony_cifrom . import get_context, TimeoutError 307db96d56Sopenharmony_cifrom .connection import wait 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_ci# 337db96d56Sopenharmony_ci# Constants representing the state of a pool 347db96d56Sopenharmony_ci# 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ciINIT = "INIT" 377db96d56Sopenharmony_ciRUN = "RUN" 387db96d56Sopenharmony_ciCLOSE = "CLOSE" 397db96d56Sopenharmony_ciTERMINATE = "TERMINATE" 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ci# 427db96d56Sopenharmony_ci# Miscellaneous 437db96d56Sopenharmony_ci# 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_cijob_counter = itertools.count() 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_cidef mapstar(args): 487db96d56Sopenharmony_ci return list(map(*args)) 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_cidef starmapstar(args): 517db96d56Sopenharmony_ci return list(itertools.starmap(args[0], args[1])) 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci# 547db96d56Sopenharmony_ci# Hack to embed stringification of remote traceback in local traceback 557db96d56Sopenharmony_ci# 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_ciclass RemoteTraceback(Exception): 587db96d56Sopenharmony_ci def __init__(self, tb): 597db96d56Sopenharmony_ci self.tb = tb 607db96d56Sopenharmony_ci def __str__(self): 617db96d56Sopenharmony_ci return self.tb 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ciclass ExceptionWithTraceback: 647db96d56Sopenharmony_ci def __init__(self, exc, tb): 657db96d56Sopenharmony_ci tb = traceback.format_exception(type(exc), exc, tb) 667db96d56Sopenharmony_ci tb = ''.join(tb) 677db96d56Sopenharmony_ci self.exc = exc 687db96d56Sopenharmony_ci self.tb = '\n"""\n%s"""' % tb 697db96d56Sopenharmony_ci def __reduce__(self): 707db96d56Sopenharmony_ci return rebuild_exc, (self.exc, self.tb) 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_cidef rebuild_exc(exc, tb): 737db96d56Sopenharmony_ci exc.__cause__ = RemoteTraceback(tb) 747db96d56Sopenharmony_ci return exc 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci# 777db96d56Sopenharmony_ci# Code run by worker processes 787db96d56Sopenharmony_ci# 797db96d56Sopenharmony_ci 807db96d56Sopenharmony_ciclass MaybeEncodingError(Exception): 817db96d56Sopenharmony_ci """Wraps possible unpickleable errors, so they can be 827db96d56Sopenharmony_ci safely sent through the socket.""" 837db96d56Sopenharmony_ci 847db96d56Sopenharmony_ci def __init__(self, exc, value): 857db96d56Sopenharmony_ci self.exc = repr(exc) 867db96d56Sopenharmony_ci self.value = repr(value) 877db96d56Sopenharmony_ci super(MaybeEncodingError, self).__init__(self.exc, self.value) 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci def __str__(self): 907db96d56Sopenharmony_ci return "Error sending result: '%s'. Reason: '%s'" % (self.value, 917db96d56Sopenharmony_ci self.exc) 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci def __repr__(self): 947db96d56Sopenharmony_ci return "<%s: %s>" % (self.__class__.__name__, self) 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_cidef worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None, 987db96d56Sopenharmony_ci wrap_exception=False): 997db96d56Sopenharmony_ci if (maxtasks is not None) and not (isinstance(maxtasks, int) 1007db96d56Sopenharmony_ci and maxtasks >= 1): 1017db96d56Sopenharmony_ci raise AssertionError("Maxtasks {!r} is not valid".format(maxtasks)) 1027db96d56Sopenharmony_ci put = outqueue.put 1037db96d56Sopenharmony_ci get = inqueue.get 1047db96d56Sopenharmony_ci if hasattr(inqueue, '_writer'): 1057db96d56Sopenharmony_ci inqueue._writer.close() 1067db96d56Sopenharmony_ci outqueue._reader.close() 1077db96d56Sopenharmony_ci 1087db96d56Sopenharmony_ci if initializer is not None: 1097db96d56Sopenharmony_ci initializer(*initargs) 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci completed = 0 1127db96d56Sopenharmony_ci while maxtasks is None or (maxtasks and completed < maxtasks): 1137db96d56Sopenharmony_ci try: 1147db96d56Sopenharmony_ci task = get() 1157db96d56Sopenharmony_ci except (EOFError, OSError): 1167db96d56Sopenharmony_ci util.debug('worker got EOFError or OSError -- exiting') 1177db96d56Sopenharmony_ci break 1187db96d56Sopenharmony_ci 1197db96d56Sopenharmony_ci if task is None: 1207db96d56Sopenharmony_ci util.debug('worker got sentinel -- exiting') 1217db96d56Sopenharmony_ci break 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci job, i, func, args, kwds = task 1247db96d56Sopenharmony_ci try: 1257db96d56Sopenharmony_ci result = (True, func(*args, **kwds)) 1267db96d56Sopenharmony_ci except Exception as e: 1277db96d56Sopenharmony_ci if wrap_exception and func is not _helper_reraises_exception: 1287db96d56Sopenharmony_ci e = ExceptionWithTraceback(e, e.__traceback__) 1297db96d56Sopenharmony_ci result = (False, e) 1307db96d56Sopenharmony_ci try: 1317db96d56Sopenharmony_ci put((job, i, result)) 1327db96d56Sopenharmony_ci except Exception as e: 1337db96d56Sopenharmony_ci wrapped = MaybeEncodingError(e, result[1]) 1347db96d56Sopenharmony_ci util.debug("Possible encoding error while sending result: %s" % ( 1357db96d56Sopenharmony_ci wrapped)) 1367db96d56Sopenharmony_ci put((job, i, (False, wrapped))) 1377db96d56Sopenharmony_ci 1387db96d56Sopenharmony_ci task = job = result = func = args = kwds = None 1397db96d56Sopenharmony_ci completed += 1 1407db96d56Sopenharmony_ci util.debug('worker exiting after %d tasks' % completed) 1417db96d56Sopenharmony_ci 1427db96d56Sopenharmony_cidef _helper_reraises_exception(ex): 1437db96d56Sopenharmony_ci 'Pickle-able helper function for use by _guarded_task_generation.' 1447db96d56Sopenharmony_ci raise ex 1457db96d56Sopenharmony_ci 1467db96d56Sopenharmony_ci# 1477db96d56Sopenharmony_ci# Class representing a process pool 1487db96d56Sopenharmony_ci# 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ciclass _PoolCache(dict): 1517db96d56Sopenharmony_ci """ 1527db96d56Sopenharmony_ci Class that implements a cache for the Pool class that will notify 1537db96d56Sopenharmony_ci the pool management threads every time the cache is emptied. The 1547db96d56Sopenharmony_ci notification is done by the use of a queue that is provided when 1557db96d56Sopenharmony_ci instantiating the cache. 1567db96d56Sopenharmony_ci """ 1577db96d56Sopenharmony_ci def __init__(self, /, *args, notifier=None, **kwds): 1587db96d56Sopenharmony_ci self.notifier = notifier 1597db96d56Sopenharmony_ci super().__init__(*args, **kwds) 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci def __delitem__(self, item): 1627db96d56Sopenharmony_ci super().__delitem__(item) 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci # Notify that the cache is empty. This is important because the 1657db96d56Sopenharmony_ci # pool keeps maintaining workers until the cache gets drained. This 1667db96d56Sopenharmony_ci # eliminates a race condition in which a task is finished after the 1677db96d56Sopenharmony_ci # the pool's _handle_workers method has enter another iteration of the 1687db96d56Sopenharmony_ci # loop. In this situation, the only event that can wake up the pool 1697db96d56Sopenharmony_ci # is the cache to be emptied (no more tasks available). 1707db96d56Sopenharmony_ci if not self: 1717db96d56Sopenharmony_ci self.notifier.put(None) 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ciclass Pool(object): 1747db96d56Sopenharmony_ci ''' 1757db96d56Sopenharmony_ci Class which supports an async version of applying functions to arguments. 1767db96d56Sopenharmony_ci ''' 1777db96d56Sopenharmony_ci _wrap_exception = True 1787db96d56Sopenharmony_ci 1797db96d56Sopenharmony_ci @staticmethod 1807db96d56Sopenharmony_ci def Process(ctx, *args, **kwds): 1817db96d56Sopenharmony_ci return ctx.Process(*args, **kwds) 1827db96d56Sopenharmony_ci 1837db96d56Sopenharmony_ci def __init__(self, processes=None, initializer=None, initargs=(), 1847db96d56Sopenharmony_ci maxtasksperchild=None, context=None): 1857db96d56Sopenharmony_ci # Attributes initialized early to make sure that they exist in 1867db96d56Sopenharmony_ci # __del__() if __init__() raises an exception 1877db96d56Sopenharmony_ci self._pool = [] 1887db96d56Sopenharmony_ci self._state = INIT 1897db96d56Sopenharmony_ci 1907db96d56Sopenharmony_ci self._ctx = context or get_context() 1917db96d56Sopenharmony_ci self._setup_queues() 1927db96d56Sopenharmony_ci self._taskqueue = queue.SimpleQueue() 1937db96d56Sopenharmony_ci # The _change_notifier queue exist to wake up self._handle_workers() 1947db96d56Sopenharmony_ci # when the cache (self._cache) is empty or when there is a change in 1957db96d56Sopenharmony_ci # the _state variable of the thread that runs _handle_workers. 1967db96d56Sopenharmony_ci self._change_notifier = self._ctx.SimpleQueue() 1977db96d56Sopenharmony_ci self._cache = _PoolCache(notifier=self._change_notifier) 1987db96d56Sopenharmony_ci self._maxtasksperchild = maxtasksperchild 1997db96d56Sopenharmony_ci self._initializer = initializer 2007db96d56Sopenharmony_ci self._initargs = initargs 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci if processes is None: 2037db96d56Sopenharmony_ci processes = os.cpu_count() or 1 2047db96d56Sopenharmony_ci if processes < 1: 2057db96d56Sopenharmony_ci raise ValueError("Number of processes must be at least 1") 2067db96d56Sopenharmony_ci if maxtasksperchild is not None: 2077db96d56Sopenharmony_ci if not isinstance(maxtasksperchild, int) or maxtasksperchild <= 0: 2087db96d56Sopenharmony_ci raise ValueError("maxtasksperchild must be a positive int or None") 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ci if initializer is not None and not callable(initializer): 2117db96d56Sopenharmony_ci raise TypeError('initializer must be a callable') 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci self._processes = processes 2147db96d56Sopenharmony_ci try: 2157db96d56Sopenharmony_ci self._repopulate_pool() 2167db96d56Sopenharmony_ci except Exception: 2177db96d56Sopenharmony_ci for p in self._pool: 2187db96d56Sopenharmony_ci if p.exitcode is None: 2197db96d56Sopenharmony_ci p.terminate() 2207db96d56Sopenharmony_ci for p in self._pool: 2217db96d56Sopenharmony_ci p.join() 2227db96d56Sopenharmony_ci raise 2237db96d56Sopenharmony_ci 2247db96d56Sopenharmony_ci sentinels = self._get_sentinels() 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci self._worker_handler = threading.Thread( 2277db96d56Sopenharmony_ci target=Pool._handle_workers, 2287db96d56Sopenharmony_ci args=(self._cache, self._taskqueue, self._ctx, self.Process, 2297db96d56Sopenharmony_ci self._processes, self._pool, self._inqueue, self._outqueue, 2307db96d56Sopenharmony_ci self._initializer, self._initargs, self._maxtasksperchild, 2317db96d56Sopenharmony_ci self._wrap_exception, sentinels, self._change_notifier) 2327db96d56Sopenharmony_ci ) 2337db96d56Sopenharmony_ci self._worker_handler.daemon = True 2347db96d56Sopenharmony_ci self._worker_handler._state = RUN 2357db96d56Sopenharmony_ci self._worker_handler.start() 2367db96d56Sopenharmony_ci 2377db96d56Sopenharmony_ci 2387db96d56Sopenharmony_ci self._task_handler = threading.Thread( 2397db96d56Sopenharmony_ci target=Pool._handle_tasks, 2407db96d56Sopenharmony_ci args=(self._taskqueue, self._quick_put, self._outqueue, 2417db96d56Sopenharmony_ci self._pool, self._cache) 2427db96d56Sopenharmony_ci ) 2437db96d56Sopenharmony_ci self._task_handler.daemon = True 2447db96d56Sopenharmony_ci self._task_handler._state = RUN 2457db96d56Sopenharmony_ci self._task_handler.start() 2467db96d56Sopenharmony_ci 2477db96d56Sopenharmony_ci self._result_handler = threading.Thread( 2487db96d56Sopenharmony_ci target=Pool._handle_results, 2497db96d56Sopenharmony_ci args=(self._outqueue, self._quick_get, self._cache) 2507db96d56Sopenharmony_ci ) 2517db96d56Sopenharmony_ci self._result_handler.daemon = True 2527db96d56Sopenharmony_ci self._result_handler._state = RUN 2537db96d56Sopenharmony_ci self._result_handler.start() 2547db96d56Sopenharmony_ci 2557db96d56Sopenharmony_ci self._terminate = util.Finalize( 2567db96d56Sopenharmony_ci self, self._terminate_pool, 2577db96d56Sopenharmony_ci args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, 2587db96d56Sopenharmony_ci self._change_notifier, self._worker_handler, self._task_handler, 2597db96d56Sopenharmony_ci self._result_handler, self._cache), 2607db96d56Sopenharmony_ci exitpriority=15 2617db96d56Sopenharmony_ci ) 2627db96d56Sopenharmony_ci self._state = RUN 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci # Copy globals as function locals to make sure that they are available 2657db96d56Sopenharmony_ci # during Python shutdown when the Pool is destroyed. 2667db96d56Sopenharmony_ci def __del__(self, _warn=warnings.warn, RUN=RUN): 2677db96d56Sopenharmony_ci if self._state == RUN: 2687db96d56Sopenharmony_ci _warn(f"unclosed running multiprocessing pool {self!r}", 2697db96d56Sopenharmony_ci ResourceWarning, source=self) 2707db96d56Sopenharmony_ci if getattr(self, '_change_notifier', None) is not None: 2717db96d56Sopenharmony_ci self._change_notifier.put(None) 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci def __repr__(self): 2747db96d56Sopenharmony_ci cls = self.__class__ 2757db96d56Sopenharmony_ci return (f'<{cls.__module__}.{cls.__qualname__} ' 2767db96d56Sopenharmony_ci f'state={self._state} ' 2777db96d56Sopenharmony_ci f'pool_size={len(self._pool)}>') 2787db96d56Sopenharmony_ci 2797db96d56Sopenharmony_ci def _get_sentinels(self): 2807db96d56Sopenharmony_ci task_queue_sentinels = [self._outqueue._reader] 2817db96d56Sopenharmony_ci self_notifier_sentinels = [self._change_notifier._reader] 2827db96d56Sopenharmony_ci return [*task_queue_sentinels, *self_notifier_sentinels] 2837db96d56Sopenharmony_ci 2847db96d56Sopenharmony_ci @staticmethod 2857db96d56Sopenharmony_ci def _get_worker_sentinels(workers): 2867db96d56Sopenharmony_ci return [worker.sentinel for worker in 2877db96d56Sopenharmony_ci workers if hasattr(worker, "sentinel")] 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_ci @staticmethod 2907db96d56Sopenharmony_ci def _join_exited_workers(pool): 2917db96d56Sopenharmony_ci """Cleanup after any worker processes which have exited due to reaching 2927db96d56Sopenharmony_ci their specified lifetime. Returns True if any workers were cleaned up. 2937db96d56Sopenharmony_ci """ 2947db96d56Sopenharmony_ci cleaned = False 2957db96d56Sopenharmony_ci for i in reversed(range(len(pool))): 2967db96d56Sopenharmony_ci worker = pool[i] 2977db96d56Sopenharmony_ci if worker.exitcode is not None: 2987db96d56Sopenharmony_ci # worker exited 2997db96d56Sopenharmony_ci util.debug('cleaning up worker %d' % i) 3007db96d56Sopenharmony_ci worker.join() 3017db96d56Sopenharmony_ci cleaned = True 3027db96d56Sopenharmony_ci del pool[i] 3037db96d56Sopenharmony_ci return cleaned 3047db96d56Sopenharmony_ci 3057db96d56Sopenharmony_ci def _repopulate_pool(self): 3067db96d56Sopenharmony_ci return self._repopulate_pool_static(self._ctx, self.Process, 3077db96d56Sopenharmony_ci self._processes, 3087db96d56Sopenharmony_ci self._pool, self._inqueue, 3097db96d56Sopenharmony_ci self._outqueue, self._initializer, 3107db96d56Sopenharmony_ci self._initargs, 3117db96d56Sopenharmony_ci self._maxtasksperchild, 3127db96d56Sopenharmony_ci self._wrap_exception) 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_ci @staticmethod 3157db96d56Sopenharmony_ci def _repopulate_pool_static(ctx, Process, processes, pool, inqueue, 3167db96d56Sopenharmony_ci outqueue, initializer, initargs, 3177db96d56Sopenharmony_ci maxtasksperchild, wrap_exception): 3187db96d56Sopenharmony_ci """Bring the number of pool processes up to the specified number, 3197db96d56Sopenharmony_ci for use after reaping workers which have exited. 3207db96d56Sopenharmony_ci """ 3217db96d56Sopenharmony_ci for i in range(processes - len(pool)): 3227db96d56Sopenharmony_ci w = Process(ctx, target=worker, 3237db96d56Sopenharmony_ci args=(inqueue, outqueue, 3247db96d56Sopenharmony_ci initializer, 3257db96d56Sopenharmony_ci initargs, maxtasksperchild, 3267db96d56Sopenharmony_ci wrap_exception)) 3277db96d56Sopenharmony_ci w.name = w.name.replace('Process', 'PoolWorker') 3287db96d56Sopenharmony_ci w.daemon = True 3297db96d56Sopenharmony_ci w.start() 3307db96d56Sopenharmony_ci pool.append(w) 3317db96d56Sopenharmony_ci util.debug('added worker') 3327db96d56Sopenharmony_ci 3337db96d56Sopenharmony_ci @staticmethod 3347db96d56Sopenharmony_ci def _maintain_pool(ctx, Process, processes, pool, inqueue, outqueue, 3357db96d56Sopenharmony_ci initializer, initargs, maxtasksperchild, 3367db96d56Sopenharmony_ci wrap_exception): 3377db96d56Sopenharmony_ci """Clean up any exited workers and start replacements for them. 3387db96d56Sopenharmony_ci """ 3397db96d56Sopenharmony_ci if Pool._join_exited_workers(pool): 3407db96d56Sopenharmony_ci Pool._repopulate_pool_static(ctx, Process, processes, pool, 3417db96d56Sopenharmony_ci inqueue, outqueue, initializer, 3427db96d56Sopenharmony_ci initargs, maxtasksperchild, 3437db96d56Sopenharmony_ci wrap_exception) 3447db96d56Sopenharmony_ci 3457db96d56Sopenharmony_ci def _setup_queues(self): 3467db96d56Sopenharmony_ci self._inqueue = self._ctx.SimpleQueue() 3477db96d56Sopenharmony_ci self._outqueue = self._ctx.SimpleQueue() 3487db96d56Sopenharmony_ci self._quick_put = self._inqueue._writer.send 3497db96d56Sopenharmony_ci self._quick_get = self._outqueue._reader.recv 3507db96d56Sopenharmony_ci 3517db96d56Sopenharmony_ci def _check_running(self): 3527db96d56Sopenharmony_ci if self._state != RUN: 3537db96d56Sopenharmony_ci raise ValueError("Pool not running") 3547db96d56Sopenharmony_ci 3557db96d56Sopenharmony_ci def apply(self, func, args=(), kwds={}): 3567db96d56Sopenharmony_ci ''' 3577db96d56Sopenharmony_ci Equivalent of `func(*args, **kwds)`. 3587db96d56Sopenharmony_ci Pool must be running. 3597db96d56Sopenharmony_ci ''' 3607db96d56Sopenharmony_ci return self.apply_async(func, args, kwds).get() 3617db96d56Sopenharmony_ci 3627db96d56Sopenharmony_ci def map(self, func, iterable, chunksize=None): 3637db96d56Sopenharmony_ci ''' 3647db96d56Sopenharmony_ci Apply `func` to each element in `iterable`, collecting the results 3657db96d56Sopenharmony_ci in a list that is returned. 3667db96d56Sopenharmony_ci ''' 3677db96d56Sopenharmony_ci return self._map_async(func, iterable, mapstar, chunksize).get() 3687db96d56Sopenharmony_ci 3697db96d56Sopenharmony_ci def starmap(self, func, iterable, chunksize=None): 3707db96d56Sopenharmony_ci ''' 3717db96d56Sopenharmony_ci Like `map()` method but the elements of the `iterable` are expected to 3727db96d56Sopenharmony_ci be iterables as well and will be unpacked as arguments. Hence 3737db96d56Sopenharmony_ci `func` and (a, b) becomes func(a, b). 3747db96d56Sopenharmony_ci ''' 3757db96d56Sopenharmony_ci return self._map_async(func, iterable, starmapstar, chunksize).get() 3767db96d56Sopenharmony_ci 3777db96d56Sopenharmony_ci def starmap_async(self, func, iterable, chunksize=None, callback=None, 3787db96d56Sopenharmony_ci error_callback=None): 3797db96d56Sopenharmony_ci ''' 3807db96d56Sopenharmony_ci Asynchronous version of `starmap()` method. 3817db96d56Sopenharmony_ci ''' 3827db96d56Sopenharmony_ci return self._map_async(func, iterable, starmapstar, chunksize, 3837db96d56Sopenharmony_ci callback, error_callback) 3847db96d56Sopenharmony_ci 3857db96d56Sopenharmony_ci def _guarded_task_generation(self, result_job, func, iterable): 3867db96d56Sopenharmony_ci '''Provides a generator of tasks for imap and imap_unordered with 3877db96d56Sopenharmony_ci appropriate handling for iterables which throw exceptions during 3887db96d56Sopenharmony_ci iteration.''' 3897db96d56Sopenharmony_ci try: 3907db96d56Sopenharmony_ci i = -1 3917db96d56Sopenharmony_ci for i, x in enumerate(iterable): 3927db96d56Sopenharmony_ci yield (result_job, i, func, (x,), {}) 3937db96d56Sopenharmony_ci except Exception as e: 3947db96d56Sopenharmony_ci yield (result_job, i+1, _helper_reraises_exception, (e,), {}) 3957db96d56Sopenharmony_ci 3967db96d56Sopenharmony_ci def imap(self, func, iterable, chunksize=1): 3977db96d56Sopenharmony_ci ''' 3987db96d56Sopenharmony_ci Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. 3997db96d56Sopenharmony_ci ''' 4007db96d56Sopenharmony_ci self._check_running() 4017db96d56Sopenharmony_ci if chunksize == 1: 4027db96d56Sopenharmony_ci result = IMapIterator(self) 4037db96d56Sopenharmony_ci self._taskqueue.put( 4047db96d56Sopenharmony_ci ( 4057db96d56Sopenharmony_ci self._guarded_task_generation(result._job, func, iterable), 4067db96d56Sopenharmony_ci result._set_length 4077db96d56Sopenharmony_ci )) 4087db96d56Sopenharmony_ci return result 4097db96d56Sopenharmony_ci else: 4107db96d56Sopenharmony_ci if chunksize < 1: 4117db96d56Sopenharmony_ci raise ValueError( 4127db96d56Sopenharmony_ci "Chunksize must be 1+, not {0:n}".format( 4137db96d56Sopenharmony_ci chunksize)) 4147db96d56Sopenharmony_ci task_batches = Pool._get_tasks(func, iterable, chunksize) 4157db96d56Sopenharmony_ci result = IMapIterator(self) 4167db96d56Sopenharmony_ci self._taskqueue.put( 4177db96d56Sopenharmony_ci ( 4187db96d56Sopenharmony_ci self._guarded_task_generation(result._job, 4197db96d56Sopenharmony_ci mapstar, 4207db96d56Sopenharmony_ci task_batches), 4217db96d56Sopenharmony_ci result._set_length 4227db96d56Sopenharmony_ci )) 4237db96d56Sopenharmony_ci return (item for chunk in result for item in chunk) 4247db96d56Sopenharmony_ci 4257db96d56Sopenharmony_ci def imap_unordered(self, func, iterable, chunksize=1): 4267db96d56Sopenharmony_ci ''' 4277db96d56Sopenharmony_ci Like `imap()` method but ordering of results is arbitrary. 4287db96d56Sopenharmony_ci ''' 4297db96d56Sopenharmony_ci self._check_running() 4307db96d56Sopenharmony_ci if chunksize == 1: 4317db96d56Sopenharmony_ci result = IMapUnorderedIterator(self) 4327db96d56Sopenharmony_ci self._taskqueue.put( 4337db96d56Sopenharmony_ci ( 4347db96d56Sopenharmony_ci self._guarded_task_generation(result._job, func, iterable), 4357db96d56Sopenharmony_ci result._set_length 4367db96d56Sopenharmony_ci )) 4377db96d56Sopenharmony_ci return result 4387db96d56Sopenharmony_ci else: 4397db96d56Sopenharmony_ci if chunksize < 1: 4407db96d56Sopenharmony_ci raise ValueError( 4417db96d56Sopenharmony_ci "Chunksize must be 1+, not {0!r}".format(chunksize)) 4427db96d56Sopenharmony_ci task_batches = Pool._get_tasks(func, iterable, chunksize) 4437db96d56Sopenharmony_ci result = IMapUnorderedIterator(self) 4447db96d56Sopenharmony_ci self._taskqueue.put( 4457db96d56Sopenharmony_ci ( 4467db96d56Sopenharmony_ci self._guarded_task_generation(result._job, 4477db96d56Sopenharmony_ci mapstar, 4487db96d56Sopenharmony_ci task_batches), 4497db96d56Sopenharmony_ci result._set_length 4507db96d56Sopenharmony_ci )) 4517db96d56Sopenharmony_ci return (item for chunk in result for item in chunk) 4527db96d56Sopenharmony_ci 4537db96d56Sopenharmony_ci def apply_async(self, func, args=(), kwds={}, callback=None, 4547db96d56Sopenharmony_ci error_callback=None): 4557db96d56Sopenharmony_ci ''' 4567db96d56Sopenharmony_ci Asynchronous version of `apply()` method. 4577db96d56Sopenharmony_ci ''' 4587db96d56Sopenharmony_ci self._check_running() 4597db96d56Sopenharmony_ci result = ApplyResult(self, callback, error_callback) 4607db96d56Sopenharmony_ci self._taskqueue.put(([(result._job, 0, func, args, kwds)], None)) 4617db96d56Sopenharmony_ci return result 4627db96d56Sopenharmony_ci 4637db96d56Sopenharmony_ci def map_async(self, func, iterable, chunksize=None, callback=None, 4647db96d56Sopenharmony_ci error_callback=None): 4657db96d56Sopenharmony_ci ''' 4667db96d56Sopenharmony_ci Asynchronous version of `map()` method. 4677db96d56Sopenharmony_ci ''' 4687db96d56Sopenharmony_ci return self._map_async(func, iterable, mapstar, chunksize, callback, 4697db96d56Sopenharmony_ci error_callback) 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, 4727db96d56Sopenharmony_ci error_callback=None): 4737db96d56Sopenharmony_ci ''' 4747db96d56Sopenharmony_ci Helper function to implement map, starmap and their async counterparts. 4757db96d56Sopenharmony_ci ''' 4767db96d56Sopenharmony_ci self._check_running() 4777db96d56Sopenharmony_ci if not hasattr(iterable, '__len__'): 4787db96d56Sopenharmony_ci iterable = list(iterable) 4797db96d56Sopenharmony_ci 4807db96d56Sopenharmony_ci if chunksize is None: 4817db96d56Sopenharmony_ci chunksize, extra = divmod(len(iterable), len(self._pool) * 4) 4827db96d56Sopenharmony_ci if extra: 4837db96d56Sopenharmony_ci chunksize += 1 4847db96d56Sopenharmony_ci if len(iterable) == 0: 4857db96d56Sopenharmony_ci chunksize = 0 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ci task_batches = Pool._get_tasks(func, iterable, chunksize) 4887db96d56Sopenharmony_ci result = MapResult(self, chunksize, len(iterable), callback, 4897db96d56Sopenharmony_ci error_callback=error_callback) 4907db96d56Sopenharmony_ci self._taskqueue.put( 4917db96d56Sopenharmony_ci ( 4927db96d56Sopenharmony_ci self._guarded_task_generation(result._job, 4937db96d56Sopenharmony_ci mapper, 4947db96d56Sopenharmony_ci task_batches), 4957db96d56Sopenharmony_ci None 4967db96d56Sopenharmony_ci ) 4977db96d56Sopenharmony_ci ) 4987db96d56Sopenharmony_ci return result 4997db96d56Sopenharmony_ci 5007db96d56Sopenharmony_ci @staticmethod 5017db96d56Sopenharmony_ci def _wait_for_updates(sentinels, change_notifier, timeout=None): 5027db96d56Sopenharmony_ci wait(sentinels, timeout=timeout) 5037db96d56Sopenharmony_ci while not change_notifier.empty(): 5047db96d56Sopenharmony_ci change_notifier.get() 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ci @classmethod 5077db96d56Sopenharmony_ci def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, 5087db96d56Sopenharmony_ci pool, inqueue, outqueue, initializer, initargs, 5097db96d56Sopenharmony_ci maxtasksperchild, wrap_exception, sentinels, 5107db96d56Sopenharmony_ci change_notifier): 5117db96d56Sopenharmony_ci thread = threading.current_thread() 5127db96d56Sopenharmony_ci 5137db96d56Sopenharmony_ci # Keep maintaining workers until the cache gets drained, unless the pool 5147db96d56Sopenharmony_ci # is terminated. 5157db96d56Sopenharmony_ci while thread._state == RUN or (cache and thread._state != TERMINATE): 5167db96d56Sopenharmony_ci cls._maintain_pool(ctx, Process, processes, pool, inqueue, 5177db96d56Sopenharmony_ci outqueue, initializer, initargs, 5187db96d56Sopenharmony_ci maxtasksperchild, wrap_exception) 5197db96d56Sopenharmony_ci 5207db96d56Sopenharmony_ci current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci cls._wait_for_updates(current_sentinels, change_notifier) 5237db96d56Sopenharmony_ci # send sentinel to stop workers 5247db96d56Sopenharmony_ci taskqueue.put(None) 5257db96d56Sopenharmony_ci util.debug('worker handler exiting') 5267db96d56Sopenharmony_ci 5277db96d56Sopenharmony_ci @staticmethod 5287db96d56Sopenharmony_ci def _handle_tasks(taskqueue, put, outqueue, pool, cache): 5297db96d56Sopenharmony_ci thread = threading.current_thread() 5307db96d56Sopenharmony_ci 5317db96d56Sopenharmony_ci for taskseq, set_length in iter(taskqueue.get, None): 5327db96d56Sopenharmony_ci task = None 5337db96d56Sopenharmony_ci try: 5347db96d56Sopenharmony_ci # iterating taskseq cannot fail 5357db96d56Sopenharmony_ci for task in taskseq: 5367db96d56Sopenharmony_ci if thread._state != RUN: 5377db96d56Sopenharmony_ci util.debug('task handler found thread._state != RUN') 5387db96d56Sopenharmony_ci break 5397db96d56Sopenharmony_ci try: 5407db96d56Sopenharmony_ci put(task) 5417db96d56Sopenharmony_ci except Exception as e: 5427db96d56Sopenharmony_ci job, idx = task[:2] 5437db96d56Sopenharmony_ci try: 5447db96d56Sopenharmony_ci cache[job]._set(idx, (False, e)) 5457db96d56Sopenharmony_ci except KeyError: 5467db96d56Sopenharmony_ci pass 5477db96d56Sopenharmony_ci else: 5487db96d56Sopenharmony_ci if set_length: 5497db96d56Sopenharmony_ci util.debug('doing set_length()') 5507db96d56Sopenharmony_ci idx = task[1] if task else -1 5517db96d56Sopenharmony_ci set_length(idx + 1) 5527db96d56Sopenharmony_ci continue 5537db96d56Sopenharmony_ci break 5547db96d56Sopenharmony_ci finally: 5557db96d56Sopenharmony_ci task = taskseq = job = None 5567db96d56Sopenharmony_ci else: 5577db96d56Sopenharmony_ci util.debug('task handler got sentinel') 5587db96d56Sopenharmony_ci 5597db96d56Sopenharmony_ci try: 5607db96d56Sopenharmony_ci # tell result handler to finish when cache is empty 5617db96d56Sopenharmony_ci util.debug('task handler sending sentinel to result handler') 5627db96d56Sopenharmony_ci outqueue.put(None) 5637db96d56Sopenharmony_ci 5647db96d56Sopenharmony_ci # tell workers there is no more work 5657db96d56Sopenharmony_ci util.debug('task handler sending sentinel to workers') 5667db96d56Sopenharmony_ci for p in pool: 5677db96d56Sopenharmony_ci put(None) 5687db96d56Sopenharmony_ci except OSError: 5697db96d56Sopenharmony_ci util.debug('task handler got OSError when sending sentinels') 5707db96d56Sopenharmony_ci 5717db96d56Sopenharmony_ci util.debug('task handler exiting') 5727db96d56Sopenharmony_ci 5737db96d56Sopenharmony_ci @staticmethod 5747db96d56Sopenharmony_ci def _handle_results(outqueue, get, cache): 5757db96d56Sopenharmony_ci thread = threading.current_thread() 5767db96d56Sopenharmony_ci 5777db96d56Sopenharmony_ci while 1: 5787db96d56Sopenharmony_ci try: 5797db96d56Sopenharmony_ci task = get() 5807db96d56Sopenharmony_ci except (OSError, EOFError): 5817db96d56Sopenharmony_ci util.debug('result handler got EOFError/OSError -- exiting') 5827db96d56Sopenharmony_ci return 5837db96d56Sopenharmony_ci 5847db96d56Sopenharmony_ci if thread._state != RUN: 5857db96d56Sopenharmony_ci assert thread._state == TERMINATE, "Thread not in TERMINATE" 5867db96d56Sopenharmony_ci util.debug('result handler found thread._state=TERMINATE') 5877db96d56Sopenharmony_ci break 5887db96d56Sopenharmony_ci 5897db96d56Sopenharmony_ci if task is None: 5907db96d56Sopenharmony_ci util.debug('result handler got sentinel') 5917db96d56Sopenharmony_ci break 5927db96d56Sopenharmony_ci 5937db96d56Sopenharmony_ci job, i, obj = task 5947db96d56Sopenharmony_ci try: 5957db96d56Sopenharmony_ci cache[job]._set(i, obj) 5967db96d56Sopenharmony_ci except KeyError: 5977db96d56Sopenharmony_ci pass 5987db96d56Sopenharmony_ci task = job = obj = None 5997db96d56Sopenharmony_ci 6007db96d56Sopenharmony_ci while cache and thread._state != TERMINATE: 6017db96d56Sopenharmony_ci try: 6027db96d56Sopenharmony_ci task = get() 6037db96d56Sopenharmony_ci except (OSError, EOFError): 6047db96d56Sopenharmony_ci util.debug('result handler got EOFError/OSError -- exiting') 6057db96d56Sopenharmony_ci return 6067db96d56Sopenharmony_ci 6077db96d56Sopenharmony_ci if task is None: 6087db96d56Sopenharmony_ci util.debug('result handler ignoring extra sentinel') 6097db96d56Sopenharmony_ci continue 6107db96d56Sopenharmony_ci job, i, obj = task 6117db96d56Sopenharmony_ci try: 6127db96d56Sopenharmony_ci cache[job]._set(i, obj) 6137db96d56Sopenharmony_ci except KeyError: 6147db96d56Sopenharmony_ci pass 6157db96d56Sopenharmony_ci task = job = obj = None 6167db96d56Sopenharmony_ci 6177db96d56Sopenharmony_ci if hasattr(outqueue, '_reader'): 6187db96d56Sopenharmony_ci util.debug('ensuring that outqueue is not full') 6197db96d56Sopenharmony_ci # If we don't make room available in outqueue then 6207db96d56Sopenharmony_ci # attempts to add the sentinel (None) to outqueue may 6217db96d56Sopenharmony_ci # block. There is guaranteed to be no more than 2 sentinels. 6227db96d56Sopenharmony_ci try: 6237db96d56Sopenharmony_ci for i in range(10): 6247db96d56Sopenharmony_ci if not outqueue._reader.poll(): 6257db96d56Sopenharmony_ci break 6267db96d56Sopenharmony_ci get() 6277db96d56Sopenharmony_ci except (OSError, EOFError): 6287db96d56Sopenharmony_ci pass 6297db96d56Sopenharmony_ci 6307db96d56Sopenharmony_ci util.debug('result handler exiting: len(cache)=%s, thread._state=%s', 6317db96d56Sopenharmony_ci len(cache), thread._state) 6327db96d56Sopenharmony_ci 6337db96d56Sopenharmony_ci @staticmethod 6347db96d56Sopenharmony_ci def _get_tasks(func, it, size): 6357db96d56Sopenharmony_ci it = iter(it) 6367db96d56Sopenharmony_ci while 1: 6377db96d56Sopenharmony_ci x = tuple(itertools.islice(it, size)) 6387db96d56Sopenharmony_ci if not x: 6397db96d56Sopenharmony_ci return 6407db96d56Sopenharmony_ci yield (func, x) 6417db96d56Sopenharmony_ci 6427db96d56Sopenharmony_ci def __reduce__(self): 6437db96d56Sopenharmony_ci raise NotImplementedError( 6447db96d56Sopenharmony_ci 'pool objects cannot be passed between processes or pickled' 6457db96d56Sopenharmony_ci ) 6467db96d56Sopenharmony_ci 6477db96d56Sopenharmony_ci def close(self): 6487db96d56Sopenharmony_ci util.debug('closing pool') 6497db96d56Sopenharmony_ci if self._state == RUN: 6507db96d56Sopenharmony_ci self._state = CLOSE 6517db96d56Sopenharmony_ci self._worker_handler._state = CLOSE 6527db96d56Sopenharmony_ci self._change_notifier.put(None) 6537db96d56Sopenharmony_ci 6547db96d56Sopenharmony_ci def terminate(self): 6557db96d56Sopenharmony_ci util.debug('terminating pool') 6567db96d56Sopenharmony_ci self._state = TERMINATE 6577db96d56Sopenharmony_ci self._terminate() 6587db96d56Sopenharmony_ci 6597db96d56Sopenharmony_ci def join(self): 6607db96d56Sopenharmony_ci util.debug('joining pool') 6617db96d56Sopenharmony_ci if self._state == RUN: 6627db96d56Sopenharmony_ci raise ValueError("Pool is still running") 6637db96d56Sopenharmony_ci elif self._state not in (CLOSE, TERMINATE): 6647db96d56Sopenharmony_ci raise ValueError("In unknown state") 6657db96d56Sopenharmony_ci self._worker_handler.join() 6667db96d56Sopenharmony_ci self._task_handler.join() 6677db96d56Sopenharmony_ci self._result_handler.join() 6687db96d56Sopenharmony_ci for p in self._pool: 6697db96d56Sopenharmony_ci p.join() 6707db96d56Sopenharmony_ci 6717db96d56Sopenharmony_ci @staticmethod 6727db96d56Sopenharmony_ci def _help_stuff_finish(inqueue, task_handler, size): 6737db96d56Sopenharmony_ci # task_handler may be blocked trying to put items on inqueue 6747db96d56Sopenharmony_ci util.debug('removing tasks from inqueue until task handler finished') 6757db96d56Sopenharmony_ci inqueue._rlock.acquire() 6767db96d56Sopenharmony_ci while task_handler.is_alive() and inqueue._reader.poll(): 6777db96d56Sopenharmony_ci inqueue._reader.recv() 6787db96d56Sopenharmony_ci time.sleep(0) 6797db96d56Sopenharmony_ci 6807db96d56Sopenharmony_ci @classmethod 6817db96d56Sopenharmony_ci def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, 6827db96d56Sopenharmony_ci worker_handler, task_handler, result_handler, cache): 6837db96d56Sopenharmony_ci # this is guaranteed to only be called once 6847db96d56Sopenharmony_ci util.debug('finalizing pool') 6857db96d56Sopenharmony_ci 6867db96d56Sopenharmony_ci # Notify that the worker_handler state has been changed so the 6877db96d56Sopenharmony_ci # _handle_workers loop can be unblocked (and exited) in order to 6887db96d56Sopenharmony_ci # send the finalization sentinel all the workers. 6897db96d56Sopenharmony_ci worker_handler._state = TERMINATE 6907db96d56Sopenharmony_ci change_notifier.put(None) 6917db96d56Sopenharmony_ci 6927db96d56Sopenharmony_ci task_handler._state = TERMINATE 6937db96d56Sopenharmony_ci 6947db96d56Sopenharmony_ci util.debug('helping task handler/workers to finish') 6957db96d56Sopenharmony_ci cls._help_stuff_finish(inqueue, task_handler, len(pool)) 6967db96d56Sopenharmony_ci 6977db96d56Sopenharmony_ci if (not result_handler.is_alive()) and (len(cache) != 0): 6987db96d56Sopenharmony_ci raise AssertionError( 6997db96d56Sopenharmony_ci "Cannot have cache with result_hander not alive") 7007db96d56Sopenharmony_ci 7017db96d56Sopenharmony_ci result_handler._state = TERMINATE 7027db96d56Sopenharmony_ci change_notifier.put(None) 7037db96d56Sopenharmony_ci outqueue.put(None) # sentinel 7047db96d56Sopenharmony_ci 7057db96d56Sopenharmony_ci # We must wait for the worker handler to exit before terminating 7067db96d56Sopenharmony_ci # workers because we don't want workers to be restarted behind our back. 7077db96d56Sopenharmony_ci util.debug('joining worker handler') 7087db96d56Sopenharmony_ci if threading.current_thread() is not worker_handler: 7097db96d56Sopenharmony_ci worker_handler.join() 7107db96d56Sopenharmony_ci 7117db96d56Sopenharmony_ci # Terminate workers which haven't already finished. 7127db96d56Sopenharmony_ci if pool and hasattr(pool[0], 'terminate'): 7137db96d56Sopenharmony_ci util.debug('terminating workers') 7147db96d56Sopenharmony_ci for p in pool: 7157db96d56Sopenharmony_ci if p.exitcode is None: 7167db96d56Sopenharmony_ci p.terminate() 7177db96d56Sopenharmony_ci 7187db96d56Sopenharmony_ci util.debug('joining task handler') 7197db96d56Sopenharmony_ci if threading.current_thread() is not task_handler: 7207db96d56Sopenharmony_ci task_handler.join() 7217db96d56Sopenharmony_ci 7227db96d56Sopenharmony_ci util.debug('joining result handler') 7237db96d56Sopenharmony_ci if threading.current_thread() is not result_handler: 7247db96d56Sopenharmony_ci result_handler.join() 7257db96d56Sopenharmony_ci 7267db96d56Sopenharmony_ci if pool and hasattr(pool[0], 'terminate'): 7277db96d56Sopenharmony_ci util.debug('joining pool workers') 7287db96d56Sopenharmony_ci for p in pool: 7297db96d56Sopenharmony_ci if p.is_alive(): 7307db96d56Sopenharmony_ci # worker has not yet exited 7317db96d56Sopenharmony_ci util.debug('cleaning up worker %d' % p.pid) 7327db96d56Sopenharmony_ci p.join() 7337db96d56Sopenharmony_ci 7347db96d56Sopenharmony_ci def __enter__(self): 7357db96d56Sopenharmony_ci self._check_running() 7367db96d56Sopenharmony_ci return self 7377db96d56Sopenharmony_ci 7387db96d56Sopenharmony_ci def __exit__(self, exc_type, exc_val, exc_tb): 7397db96d56Sopenharmony_ci self.terminate() 7407db96d56Sopenharmony_ci 7417db96d56Sopenharmony_ci# 7427db96d56Sopenharmony_ci# Class whose instances are returned by `Pool.apply_async()` 7437db96d56Sopenharmony_ci# 7447db96d56Sopenharmony_ci 7457db96d56Sopenharmony_ciclass ApplyResult(object): 7467db96d56Sopenharmony_ci 7477db96d56Sopenharmony_ci def __init__(self, pool, callback, error_callback): 7487db96d56Sopenharmony_ci self._pool = pool 7497db96d56Sopenharmony_ci self._event = threading.Event() 7507db96d56Sopenharmony_ci self._job = next(job_counter) 7517db96d56Sopenharmony_ci self._cache = pool._cache 7527db96d56Sopenharmony_ci self._callback = callback 7537db96d56Sopenharmony_ci self._error_callback = error_callback 7547db96d56Sopenharmony_ci self._cache[self._job] = self 7557db96d56Sopenharmony_ci 7567db96d56Sopenharmony_ci def ready(self): 7577db96d56Sopenharmony_ci return self._event.is_set() 7587db96d56Sopenharmony_ci 7597db96d56Sopenharmony_ci def successful(self): 7607db96d56Sopenharmony_ci if not self.ready(): 7617db96d56Sopenharmony_ci raise ValueError("{0!r} not ready".format(self)) 7627db96d56Sopenharmony_ci return self._success 7637db96d56Sopenharmony_ci 7647db96d56Sopenharmony_ci def wait(self, timeout=None): 7657db96d56Sopenharmony_ci self._event.wait(timeout) 7667db96d56Sopenharmony_ci 7677db96d56Sopenharmony_ci def get(self, timeout=None): 7687db96d56Sopenharmony_ci self.wait(timeout) 7697db96d56Sopenharmony_ci if not self.ready(): 7707db96d56Sopenharmony_ci raise TimeoutError 7717db96d56Sopenharmony_ci if self._success: 7727db96d56Sopenharmony_ci return self._value 7737db96d56Sopenharmony_ci else: 7747db96d56Sopenharmony_ci raise self._value 7757db96d56Sopenharmony_ci 7767db96d56Sopenharmony_ci def _set(self, i, obj): 7777db96d56Sopenharmony_ci self._success, self._value = obj 7787db96d56Sopenharmony_ci if self._callback and self._success: 7797db96d56Sopenharmony_ci self._callback(self._value) 7807db96d56Sopenharmony_ci if self._error_callback and not self._success: 7817db96d56Sopenharmony_ci self._error_callback(self._value) 7827db96d56Sopenharmony_ci self._event.set() 7837db96d56Sopenharmony_ci del self._cache[self._job] 7847db96d56Sopenharmony_ci self._pool = None 7857db96d56Sopenharmony_ci 7867db96d56Sopenharmony_ci __class_getitem__ = classmethod(types.GenericAlias) 7877db96d56Sopenharmony_ci 7887db96d56Sopenharmony_ciAsyncResult = ApplyResult # create alias -- see #17805 7897db96d56Sopenharmony_ci 7907db96d56Sopenharmony_ci# 7917db96d56Sopenharmony_ci# Class whose instances are returned by `Pool.map_async()` 7927db96d56Sopenharmony_ci# 7937db96d56Sopenharmony_ci 7947db96d56Sopenharmony_ciclass MapResult(ApplyResult): 7957db96d56Sopenharmony_ci 7967db96d56Sopenharmony_ci def __init__(self, pool, chunksize, length, callback, error_callback): 7977db96d56Sopenharmony_ci ApplyResult.__init__(self, pool, callback, 7987db96d56Sopenharmony_ci error_callback=error_callback) 7997db96d56Sopenharmony_ci self._success = True 8007db96d56Sopenharmony_ci self._value = [None] * length 8017db96d56Sopenharmony_ci self._chunksize = chunksize 8027db96d56Sopenharmony_ci if chunksize <= 0: 8037db96d56Sopenharmony_ci self._number_left = 0 8047db96d56Sopenharmony_ci self._event.set() 8057db96d56Sopenharmony_ci del self._cache[self._job] 8067db96d56Sopenharmony_ci else: 8077db96d56Sopenharmony_ci self._number_left = length//chunksize + bool(length % chunksize) 8087db96d56Sopenharmony_ci 8097db96d56Sopenharmony_ci def _set(self, i, success_result): 8107db96d56Sopenharmony_ci self._number_left -= 1 8117db96d56Sopenharmony_ci success, result = success_result 8127db96d56Sopenharmony_ci if success and self._success: 8137db96d56Sopenharmony_ci self._value[i*self._chunksize:(i+1)*self._chunksize] = result 8147db96d56Sopenharmony_ci if self._number_left == 0: 8157db96d56Sopenharmony_ci if self._callback: 8167db96d56Sopenharmony_ci self._callback(self._value) 8177db96d56Sopenharmony_ci del self._cache[self._job] 8187db96d56Sopenharmony_ci self._event.set() 8197db96d56Sopenharmony_ci self._pool = None 8207db96d56Sopenharmony_ci else: 8217db96d56Sopenharmony_ci if not success and self._success: 8227db96d56Sopenharmony_ci # only store first exception 8237db96d56Sopenharmony_ci self._success = False 8247db96d56Sopenharmony_ci self._value = result 8257db96d56Sopenharmony_ci if self._number_left == 0: 8267db96d56Sopenharmony_ci # only consider the result ready once all jobs are done 8277db96d56Sopenharmony_ci if self._error_callback: 8287db96d56Sopenharmony_ci self._error_callback(self._value) 8297db96d56Sopenharmony_ci del self._cache[self._job] 8307db96d56Sopenharmony_ci self._event.set() 8317db96d56Sopenharmony_ci self._pool = None 8327db96d56Sopenharmony_ci 8337db96d56Sopenharmony_ci# 8347db96d56Sopenharmony_ci# Class whose instances are returned by `Pool.imap()` 8357db96d56Sopenharmony_ci# 8367db96d56Sopenharmony_ci 8377db96d56Sopenharmony_ciclass IMapIterator(object): 8387db96d56Sopenharmony_ci 8397db96d56Sopenharmony_ci def __init__(self, pool): 8407db96d56Sopenharmony_ci self._pool = pool 8417db96d56Sopenharmony_ci self._cond = threading.Condition(threading.Lock()) 8427db96d56Sopenharmony_ci self._job = next(job_counter) 8437db96d56Sopenharmony_ci self._cache = pool._cache 8447db96d56Sopenharmony_ci self._items = collections.deque() 8457db96d56Sopenharmony_ci self._index = 0 8467db96d56Sopenharmony_ci self._length = None 8477db96d56Sopenharmony_ci self._unsorted = {} 8487db96d56Sopenharmony_ci self._cache[self._job] = self 8497db96d56Sopenharmony_ci 8507db96d56Sopenharmony_ci def __iter__(self): 8517db96d56Sopenharmony_ci return self 8527db96d56Sopenharmony_ci 8537db96d56Sopenharmony_ci def next(self, timeout=None): 8547db96d56Sopenharmony_ci with self._cond: 8557db96d56Sopenharmony_ci try: 8567db96d56Sopenharmony_ci item = self._items.popleft() 8577db96d56Sopenharmony_ci except IndexError: 8587db96d56Sopenharmony_ci if self._index == self._length: 8597db96d56Sopenharmony_ci self._pool = None 8607db96d56Sopenharmony_ci raise StopIteration from None 8617db96d56Sopenharmony_ci self._cond.wait(timeout) 8627db96d56Sopenharmony_ci try: 8637db96d56Sopenharmony_ci item = self._items.popleft() 8647db96d56Sopenharmony_ci except IndexError: 8657db96d56Sopenharmony_ci if self._index == self._length: 8667db96d56Sopenharmony_ci self._pool = None 8677db96d56Sopenharmony_ci raise StopIteration from None 8687db96d56Sopenharmony_ci raise TimeoutError from None 8697db96d56Sopenharmony_ci 8707db96d56Sopenharmony_ci success, value = item 8717db96d56Sopenharmony_ci if success: 8727db96d56Sopenharmony_ci return value 8737db96d56Sopenharmony_ci raise value 8747db96d56Sopenharmony_ci 8757db96d56Sopenharmony_ci __next__ = next # XXX 8767db96d56Sopenharmony_ci 8777db96d56Sopenharmony_ci def _set(self, i, obj): 8787db96d56Sopenharmony_ci with self._cond: 8797db96d56Sopenharmony_ci if self._index == i: 8807db96d56Sopenharmony_ci self._items.append(obj) 8817db96d56Sopenharmony_ci self._index += 1 8827db96d56Sopenharmony_ci while self._index in self._unsorted: 8837db96d56Sopenharmony_ci obj = self._unsorted.pop(self._index) 8847db96d56Sopenharmony_ci self._items.append(obj) 8857db96d56Sopenharmony_ci self._index += 1 8867db96d56Sopenharmony_ci self._cond.notify() 8877db96d56Sopenharmony_ci else: 8887db96d56Sopenharmony_ci self._unsorted[i] = obj 8897db96d56Sopenharmony_ci 8907db96d56Sopenharmony_ci if self._index == self._length: 8917db96d56Sopenharmony_ci del self._cache[self._job] 8927db96d56Sopenharmony_ci self._pool = None 8937db96d56Sopenharmony_ci 8947db96d56Sopenharmony_ci def _set_length(self, length): 8957db96d56Sopenharmony_ci with self._cond: 8967db96d56Sopenharmony_ci self._length = length 8977db96d56Sopenharmony_ci if self._index == self._length: 8987db96d56Sopenharmony_ci self._cond.notify() 8997db96d56Sopenharmony_ci del self._cache[self._job] 9007db96d56Sopenharmony_ci self._pool = None 9017db96d56Sopenharmony_ci 9027db96d56Sopenharmony_ci# 9037db96d56Sopenharmony_ci# Class whose instances are returned by `Pool.imap_unordered()` 9047db96d56Sopenharmony_ci# 9057db96d56Sopenharmony_ci 9067db96d56Sopenharmony_ciclass IMapUnorderedIterator(IMapIterator): 9077db96d56Sopenharmony_ci 9087db96d56Sopenharmony_ci def _set(self, i, obj): 9097db96d56Sopenharmony_ci with self._cond: 9107db96d56Sopenharmony_ci self._items.append(obj) 9117db96d56Sopenharmony_ci self._index += 1 9127db96d56Sopenharmony_ci self._cond.notify() 9137db96d56Sopenharmony_ci if self._index == self._length: 9147db96d56Sopenharmony_ci del self._cache[self._job] 9157db96d56Sopenharmony_ci self._pool = None 9167db96d56Sopenharmony_ci 9177db96d56Sopenharmony_ci# 9187db96d56Sopenharmony_ci# 9197db96d56Sopenharmony_ci# 9207db96d56Sopenharmony_ci 9217db96d56Sopenharmony_ciclass ThreadPool(Pool): 9227db96d56Sopenharmony_ci _wrap_exception = False 9237db96d56Sopenharmony_ci 9247db96d56Sopenharmony_ci @staticmethod 9257db96d56Sopenharmony_ci def Process(ctx, *args, **kwds): 9267db96d56Sopenharmony_ci from .dummy import Process 9277db96d56Sopenharmony_ci return Process(*args, **kwds) 9287db96d56Sopenharmony_ci 9297db96d56Sopenharmony_ci def __init__(self, processes=None, initializer=None, initargs=()): 9307db96d56Sopenharmony_ci Pool.__init__(self, processes, initializer, initargs) 9317db96d56Sopenharmony_ci 9327db96d56Sopenharmony_ci def _setup_queues(self): 9337db96d56Sopenharmony_ci self._inqueue = queue.SimpleQueue() 9347db96d56Sopenharmony_ci self._outqueue = queue.SimpleQueue() 9357db96d56Sopenharmony_ci self._quick_put = self._inqueue.put 9367db96d56Sopenharmony_ci self._quick_get = self._outqueue.get 9377db96d56Sopenharmony_ci 9387db96d56Sopenharmony_ci def _get_sentinels(self): 9397db96d56Sopenharmony_ci return [self._change_notifier._reader] 9407db96d56Sopenharmony_ci 9417db96d56Sopenharmony_ci @staticmethod 9427db96d56Sopenharmony_ci def _get_worker_sentinels(workers): 9437db96d56Sopenharmony_ci return [] 9447db96d56Sopenharmony_ci 9457db96d56Sopenharmony_ci @staticmethod 9467db96d56Sopenharmony_ci def _help_stuff_finish(inqueue, task_handler, size): 9477db96d56Sopenharmony_ci # drain inqueue, and put sentinels at its head to make workers finish 9487db96d56Sopenharmony_ci try: 9497db96d56Sopenharmony_ci while True: 9507db96d56Sopenharmony_ci inqueue.get(block=False) 9517db96d56Sopenharmony_ci except queue.Empty: 9527db96d56Sopenharmony_ci pass 9537db96d56Sopenharmony_ci for i in range(size): 9547db96d56Sopenharmony_ci inqueue.put(None) 9557db96d56Sopenharmony_ci 9567db96d56Sopenharmony_ci def _wait_for_updates(self, sentinels, change_notifier, timeout): 9577db96d56Sopenharmony_ci time.sleep(timeout) 958