17db96d56Sopenharmony_ci# Copyright 2009 Brian Quinlan. All Rights Reserved. 27db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement. 37db96d56Sopenharmony_ci 47db96d56Sopenharmony_ci__author__ = 'Brian Quinlan (brian@sweetapp.com)' 57db96d56Sopenharmony_ci 67db96d56Sopenharmony_ciimport collections 77db96d56Sopenharmony_ciimport logging 87db96d56Sopenharmony_ciimport threading 97db96d56Sopenharmony_ciimport time 107db96d56Sopenharmony_ciimport types 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ciFIRST_COMPLETED = 'FIRST_COMPLETED' 137db96d56Sopenharmony_ciFIRST_EXCEPTION = 'FIRST_EXCEPTION' 147db96d56Sopenharmony_ciALL_COMPLETED = 'ALL_COMPLETED' 157db96d56Sopenharmony_ci_AS_COMPLETED = '_AS_COMPLETED' 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ci# Possible future states (for internal use by the futures package). 187db96d56Sopenharmony_ciPENDING = 'PENDING' 197db96d56Sopenharmony_ciRUNNING = 'RUNNING' 207db96d56Sopenharmony_ci# The future was cancelled by the user... 217db96d56Sopenharmony_ciCANCELLED = 'CANCELLED' 227db96d56Sopenharmony_ci# ...and _Waiter.add_cancelled() was called by a worker. 237db96d56Sopenharmony_ciCANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' 247db96d56Sopenharmony_ciFINISHED = 'FINISHED' 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci_FUTURE_STATES = [ 277db96d56Sopenharmony_ci PENDING, 287db96d56Sopenharmony_ci RUNNING, 297db96d56Sopenharmony_ci CANCELLED, 307db96d56Sopenharmony_ci CANCELLED_AND_NOTIFIED, 317db96d56Sopenharmony_ci FINISHED 327db96d56Sopenharmony_ci] 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci_STATE_TO_DESCRIPTION_MAP = { 357db96d56Sopenharmony_ci PENDING: "pending", 367db96d56Sopenharmony_ci RUNNING: "running", 377db96d56Sopenharmony_ci CANCELLED: "cancelled", 387db96d56Sopenharmony_ci CANCELLED_AND_NOTIFIED: "cancelled", 397db96d56Sopenharmony_ci FINISHED: "finished" 407db96d56Sopenharmony_ci} 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci# Logger for internal use by the futures package. 437db96d56Sopenharmony_ciLOGGER = logging.getLogger("concurrent.futures") 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ciclass Error(Exception): 467db96d56Sopenharmony_ci """Base class for all future-related exceptions.""" 477db96d56Sopenharmony_ci pass 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ciclass CancelledError(Error): 507db96d56Sopenharmony_ci """The Future was cancelled.""" 517db96d56Sopenharmony_ci pass 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ciTimeoutError = TimeoutError # make local alias for the standard exception 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ciclass InvalidStateError(Error): 567db96d56Sopenharmony_ci """The operation is not allowed in this state.""" 577db96d56Sopenharmony_ci pass 587db96d56Sopenharmony_ci 597db96d56Sopenharmony_ciclass _Waiter(object): 607db96d56Sopenharmony_ci """Provides the event that wait() and as_completed() block on.""" 617db96d56Sopenharmony_ci def __init__(self): 627db96d56Sopenharmony_ci self.event = threading.Event() 637db96d56Sopenharmony_ci self.finished_futures = [] 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ci def add_result(self, future): 667db96d56Sopenharmony_ci self.finished_futures.append(future) 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci def add_exception(self, future): 697db96d56Sopenharmony_ci self.finished_futures.append(future) 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci def add_cancelled(self, future): 727db96d56Sopenharmony_ci self.finished_futures.append(future) 737db96d56Sopenharmony_ci 747db96d56Sopenharmony_ciclass _AsCompletedWaiter(_Waiter): 757db96d56Sopenharmony_ci """Used by as_completed().""" 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def __init__(self): 787db96d56Sopenharmony_ci super(_AsCompletedWaiter, self).__init__() 797db96d56Sopenharmony_ci self.lock = threading.Lock() 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci def add_result(self, future): 827db96d56Sopenharmony_ci with self.lock: 837db96d56Sopenharmony_ci super(_AsCompletedWaiter, self).add_result(future) 847db96d56Sopenharmony_ci self.event.set() 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci def add_exception(self, future): 877db96d56Sopenharmony_ci with self.lock: 887db96d56Sopenharmony_ci super(_AsCompletedWaiter, self).add_exception(future) 897db96d56Sopenharmony_ci self.event.set() 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def add_cancelled(self, future): 927db96d56Sopenharmony_ci with self.lock: 937db96d56Sopenharmony_ci super(_AsCompletedWaiter, self).add_cancelled(future) 947db96d56Sopenharmony_ci self.event.set() 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ciclass _FirstCompletedWaiter(_Waiter): 977db96d56Sopenharmony_ci """Used by wait(return_when=FIRST_COMPLETED).""" 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci def add_result(self, future): 1007db96d56Sopenharmony_ci super().add_result(future) 1017db96d56Sopenharmony_ci self.event.set() 1027db96d56Sopenharmony_ci 1037db96d56Sopenharmony_ci def add_exception(self, future): 1047db96d56Sopenharmony_ci super().add_exception(future) 1057db96d56Sopenharmony_ci self.event.set() 1067db96d56Sopenharmony_ci 1077db96d56Sopenharmony_ci def add_cancelled(self, future): 1087db96d56Sopenharmony_ci super().add_cancelled(future) 1097db96d56Sopenharmony_ci self.event.set() 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ciclass _AllCompletedWaiter(_Waiter): 1127db96d56Sopenharmony_ci """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci def __init__(self, num_pending_calls, stop_on_exception): 1157db96d56Sopenharmony_ci self.num_pending_calls = num_pending_calls 1167db96d56Sopenharmony_ci self.stop_on_exception = stop_on_exception 1177db96d56Sopenharmony_ci self.lock = threading.Lock() 1187db96d56Sopenharmony_ci super().__init__() 1197db96d56Sopenharmony_ci 1207db96d56Sopenharmony_ci def _decrement_pending_calls(self): 1217db96d56Sopenharmony_ci with self.lock: 1227db96d56Sopenharmony_ci self.num_pending_calls -= 1 1237db96d56Sopenharmony_ci if not self.num_pending_calls: 1247db96d56Sopenharmony_ci self.event.set() 1257db96d56Sopenharmony_ci 1267db96d56Sopenharmony_ci def add_result(self, future): 1277db96d56Sopenharmony_ci super().add_result(future) 1287db96d56Sopenharmony_ci self._decrement_pending_calls() 1297db96d56Sopenharmony_ci 1307db96d56Sopenharmony_ci def add_exception(self, future): 1317db96d56Sopenharmony_ci super().add_exception(future) 1327db96d56Sopenharmony_ci if self.stop_on_exception: 1337db96d56Sopenharmony_ci self.event.set() 1347db96d56Sopenharmony_ci else: 1357db96d56Sopenharmony_ci self._decrement_pending_calls() 1367db96d56Sopenharmony_ci 1377db96d56Sopenharmony_ci def add_cancelled(self, future): 1387db96d56Sopenharmony_ci super().add_cancelled(future) 1397db96d56Sopenharmony_ci self._decrement_pending_calls() 1407db96d56Sopenharmony_ci 1417db96d56Sopenharmony_ciclass _AcquireFutures(object): 1427db96d56Sopenharmony_ci """A context manager that does an ordered acquire of Future conditions.""" 1437db96d56Sopenharmony_ci 1447db96d56Sopenharmony_ci def __init__(self, futures): 1457db96d56Sopenharmony_ci self.futures = sorted(futures, key=id) 1467db96d56Sopenharmony_ci 1477db96d56Sopenharmony_ci def __enter__(self): 1487db96d56Sopenharmony_ci for future in self.futures: 1497db96d56Sopenharmony_ci future._condition.acquire() 1507db96d56Sopenharmony_ci 1517db96d56Sopenharmony_ci def __exit__(self, *args): 1527db96d56Sopenharmony_ci for future in self.futures: 1537db96d56Sopenharmony_ci future._condition.release() 1547db96d56Sopenharmony_ci 1557db96d56Sopenharmony_cidef _create_and_install_waiters(fs, return_when): 1567db96d56Sopenharmony_ci if return_when == _AS_COMPLETED: 1577db96d56Sopenharmony_ci waiter = _AsCompletedWaiter() 1587db96d56Sopenharmony_ci elif return_when == FIRST_COMPLETED: 1597db96d56Sopenharmony_ci waiter = _FirstCompletedWaiter() 1607db96d56Sopenharmony_ci else: 1617db96d56Sopenharmony_ci pending_count = sum( 1627db96d56Sopenharmony_ci f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci if return_when == FIRST_EXCEPTION: 1657db96d56Sopenharmony_ci waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) 1667db96d56Sopenharmony_ci elif return_when == ALL_COMPLETED: 1677db96d56Sopenharmony_ci waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) 1687db96d56Sopenharmony_ci else: 1697db96d56Sopenharmony_ci raise ValueError("Invalid return condition: %r" % return_when) 1707db96d56Sopenharmony_ci 1717db96d56Sopenharmony_ci for f in fs: 1727db96d56Sopenharmony_ci f._waiters.append(waiter) 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci return waiter 1757db96d56Sopenharmony_ci 1767db96d56Sopenharmony_ci 1777db96d56Sopenharmony_cidef _yield_finished_futures(fs, waiter, ref_collect): 1787db96d56Sopenharmony_ci """ 1797db96d56Sopenharmony_ci Iterate on the list *fs*, yielding finished futures one by one in 1807db96d56Sopenharmony_ci reverse order. 1817db96d56Sopenharmony_ci Before yielding a future, *waiter* is removed from its waiters 1827db96d56Sopenharmony_ci and the future is removed from each set in the collection of sets 1837db96d56Sopenharmony_ci *ref_collect*. 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ci The aim of this function is to avoid keeping stale references after 1867db96d56Sopenharmony_ci the future is yielded and before the iterator resumes. 1877db96d56Sopenharmony_ci """ 1887db96d56Sopenharmony_ci while fs: 1897db96d56Sopenharmony_ci f = fs[-1] 1907db96d56Sopenharmony_ci for futures_set in ref_collect: 1917db96d56Sopenharmony_ci futures_set.remove(f) 1927db96d56Sopenharmony_ci with f._condition: 1937db96d56Sopenharmony_ci f._waiters.remove(waiter) 1947db96d56Sopenharmony_ci del f 1957db96d56Sopenharmony_ci # Careful not to keep a reference to the popped value 1967db96d56Sopenharmony_ci yield fs.pop() 1977db96d56Sopenharmony_ci 1987db96d56Sopenharmony_ci 1997db96d56Sopenharmony_cidef as_completed(fs, timeout=None): 2007db96d56Sopenharmony_ci """An iterator over the given futures that yields each as it completes. 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci Args: 2037db96d56Sopenharmony_ci fs: The sequence of Futures (possibly created by different Executors) to 2047db96d56Sopenharmony_ci iterate over. 2057db96d56Sopenharmony_ci timeout: The maximum number of seconds to wait. If None, then there 2067db96d56Sopenharmony_ci is no limit on the wait time. 2077db96d56Sopenharmony_ci 2087db96d56Sopenharmony_ci Returns: 2097db96d56Sopenharmony_ci An iterator that yields the given Futures as they complete (finished or 2107db96d56Sopenharmony_ci cancelled). If any given Futures are duplicated, they will be returned 2117db96d56Sopenharmony_ci once. 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci Raises: 2147db96d56Sopenharmony_ci TimeoutError: If the entire result iterator could not be generated 2157db96d56Sopenharmony_ci before the given timeout. 2167db96d56Sopenharmony_ci """ 2177db96d56Sopenharmony_ci if timeout is not None: 2187db96d56Sopenharmony_ci end_time = timeout + time.monotonic() 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci fs = set(fs) 2217db96d56Sopenharmony_ci total_futures = len(fs) 2227db96d56Sopenharmony_ci with _AcquireFutures(fs): 2237db96d56Sopenharmony_ci finished = set( 2247db96d56Sopenharmony_ci f for f in fs 2257db96d56Sopenharmony_ci if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) 2267db96d56Sopenharmony_ci pending = fs - finished 2277db96d56Sopenharmony_ci waiter = _create_and_install_waiters(fs, _AS_COMPLETED) 2287db96d56Sopenharmony_ci finished = list(finished) 2297db96d56Sopenharmony_ci try: 2307db96d56Sopenharmony_ci yield from _yield_finished_futures(finished, waiter, 2317db96d56Sopenharmony_ci ref_collect=(fs,)) 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci while pending: 2347db96d56Sopenharmony_ci if timeout is None: 2357db96d56Sopenharmony_ci wait_timeout = None 2367db96d56Sopenharmony_ci else: 2377db96d56Sopenharmony_ci wait_timeout = end_time - time.monotonic() 2387db96d56Sopenharmony_ci if wait_timeout < 0: 2397db96d56Sopenharmony_ci raise TimeoutError( 2407db96d56Sopenharmony_ci '%d (of %d) futures unfinished' % ( 2417db96d56Sopenharmony_ci len(pending), total_futures)) 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci waiter.event.wait(wait_timeout) 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci with waiter.lock: 2467db96d56Sopenharmony_ci finished = waiter.finished_futures 2477db96d56Sopenharmony_ci waiter.finished_futures = [] 2487db96d56Sopenharmony_ci waiter.event.clear() 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ci # reverse to keep finishing order 2517db96d56Sopenharmony_ci finished.reverse() 2527db96d56Sopenharmony_ci yield from _yield_finished_futures(finished, waiter, 2537db96d56Sopenharmony_ci ref_collect=(fs, pending)) 2547db96d56Sopenharmony_ci 2557db96d56Sopenharmony_ci finally: 2567db96d56Sopenharmony_ci # Remove waiter from unfinished futures 2577db96d56Sopenharmony_ci for f in fs: 2587db96d56Sopenharmony_ci with f._condition: 2597db96d56Sopenharmony_ci f._waiters.remove(waiter) 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ciDoneAndNotDoneFutures = collections.namedtuple( 2627db96d56Sopenharmony_ci 'DoneAndNotDoneFutures', 'done not_done') 2637db96d56Sopenharmony_cidef wait(fs, timeout=None, return_when=ALL_COMPLETED): 2647db96d56Sopenharmony_ci """Wait for the futures in the given sequence to complete. 2657db96d56Sopenharmony_ci 2667db96d56Sopenharmony_ci Args: 2677db96d56Sopenharmony_ci fs: The sequence of Futures (possibly created by different Executors) to 2687db96d56Sopenharmony_ci wait upon. 2697db96d56Sopenharmony_ci timeout: The maximum number of seconds to wait. If None, then there 2707db96d56Sopenharmony_ci is no limit on the wait time. 2717db96d56Sopenharmony_ci return_when: Indicates when this function should return. The options 2727db96d56Sopenharmony_ci are: 2737db96d56Sopenharmony_ci 2747db96d56Sopenharmony_ci FIRST_COMPLETED - Return when any future finishes or is 2757db96d56Sopenharmony_ci cancelled. 2767db96d56Sopenharmony_ci FIRST_EXCEPTION - Return when any future finishes by raising an 2777db96d56Sopenharmony_ci exception. If no future raises an exception 2787db96d56Sopenharmony_ci then it is equivalent to ALL_COMPLETED. 2797db96d56Sopenharmony_ci ALL_COMPLETED - Return when all futures finish or are cancelled. 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci Returns: 2827db96d56Sopenharmony_ci A named 2-tuple of sets. The first set, named 'done', contains the 2837db96d56Sopenharmony_ci futures that completed (is finished or cancelled) before the wait 2847db96d56Sopenharmony_ci completed. The second set, named 'not_done', contains uncompleted 2857db96d56Sopenharmony_ci futures. Duplicate futures given to *fs* are removed and will be 2867db96d56Sopenharmony_ci returned only once. 2877db96d56Sopenharmony_ci """ 2887db96d56Sopenharmony_ci fs = set(fs) 2897db96d56Sopenharmony_ci with _AcquireFutures(fs): 2907db96d56Sopenharmony_ci done = {f for f in fs 2917db96d56Sopenharmony_ci if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]} 2927db96d56Sopenharmony_ci not_done = fs - done 2937db96d56Sopenharmony_ci if (return_when == FIRST_COMPLETED) and done: 2947db96d56Sopenharmony_ci return DoneAndNotDoneFutures(done, not_done) 2957db96d56Sopenharmony_ci elif (return_when == FIRST_EXCEPTION) and done: 2967db96d56Sopenharmony_ci if any(f for f in done 2977db96d56Sopenharmony_ci if not f.cancelled() and f.exception() is not None): 2987db96d56Sopenharmony_ci return DoneAndNotDoneFutures(done, not_done) 2997db96d56Sopenharmony_ci 3007db96d56Sopenharmony_ci if len(done) == len(fs): 3017db96d56Sopenharmony_ci return DoneAndNotDoneFutures(done, not_done) 3027db96d56Sopenharmony_ci 3037db96d56Sopenharmony_ci waiter = _create_and_install_waiters(fs, return_when) 3047db96d56Sopenharmony_ci 3057db96d56Sopenharmony_ci waiter.event.wait(timeout) 3067db96d56Sopenharmony_ci for f in fs: 3077db96d56Sopenharmony_ci with f._condition: 3087db96d56Sopenharmony_ci f._waiters.remove(waiter) 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci done.update(waiter.finished_futures) 3117db96d56Sopenharmony_ci return DoneAndNotDoneFutures(done, fs - done) 3127db96d56Sopenharmony_ci 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_cidef _result_or_cancel(fut, timeout=None): 3157db96d56Sopenharmony_ci try: 3167db96d56Sopenharmony_ci try: 3177db96d56Sopenharmony_ci return fut.result(timeout) 3187db96d56Sopenharmony_ci finally: 3197db96d56Sopenharmony_ci fut.cancel() 3207db96d56Sopenharmony_ci finally: 3217db96d56Sopenharmony_ci # Break a reference cycle with the exception in self._exception 3227db96d56Sopenharmony_ci del fut 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci 3257db96d56Sopenharmony_ciclass Future(object): 3267db96d56Sopenharmony_ci """Represents the result of an asynchronous computation.""" 3277db96d56Sopenharmony_ci 3287db96d56Sopenharmony_ci def __init__(self): 3297db96d56Sopenharmony_ci """Initializes the future. Should not be called by clients.""" 3307db96d56Sopenharmony_ci self._condition = threading.Condition() 3317db96d56Sopenharmony_ci self._state = PENDING 3327db96d56Sopenharmony_ci self._result = None 3337db96d56Sopenharmony_ci self._exception = None 3347db96d56Sopenharmony_ci self._waiters = [] 3357db96d56Sopenharmony_ci self._done_callbacks = [] 3367db96d56Sopenharmony_ci 3377db96d56Sopenharmony_ci def _invoke_callbacks(self): 3387db96d56Sopenharmony_ci for callback in self._done_callbacks: 3397db96d56Sopenharmony_ci try: 3407db96d56Sopenharmony_ci callback(self) 3417db96d56Sopenharmony_ci except Exception: 3427db96d56Sopenharmony_ci LOGGER.exception('exception calling callback for %r', self) 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci def __repr__(self): 3457db96d56Sopenharmony_ci with self._condition: 3467db96d56Sopenharmony_ci if self._state == FINISHED: 3477db96d56Sopenharmony_ci if self._exception: 3487db96d56Sopenharmony_ci return '<%s at %#x state=%s raised %s>' % ( 3497db96d56Sopenharmony_ci self.__class__.__name__, 3507db96d56Sopenharmony_ci id(self), 3517db96d56Sopenharmony_ci _STATE_TO_DESCRIPTION_MAP[self._state], 3527db96d56Sopenharmony_ci self._exception.__class__.__name__) 3537db96d56Sopenharmony_ci else: 3547db96d56Sopenharmony_ci return '<%s at %#x state=%s returned %s>' % ( 3557db96d56Sopenharmony_ci self.__class__.__name__, 3567db96d56Sopenharmony_ci id(self), 3577db96d56Sopenharmony_ci _STATE_TO_DESCRIPTION_MAP[self._state], 3587db96d56Sopenharmony_ci self._result.__class__.__name__) 3597db96d56Sopenharmony_ci return '<%s at %#x state=%s>' % ( 3607db96d56Sopenharmony_ci self.__class__.__name__, 3617db96d56Sopenharmony_ci id(self), 3627db96d56Sopenharmony_ci _STATE_TO_DESCRIPTION_MAP[self._state]) 3637db96d56Sopenharmony_ci 3647db96d56Sopenharmony_ci def cancel(self): 3657db96d56Sopenharmony_ci """Cancel the future if possible. 3667db96d56Sopenharmony_ci 3677db96d56Sopenharmony_ci Returns True if the future was cancelled, False otherwise. A future 3687db96d56Sopenharmony_ci cannot be cancelled if it is running or has already completed. 3697db96d56Sopenharmony_ci """ 3707db96d56Sopenharmony_ci with self._condition: 3717db96d56Sopenharmony_ci if self._state in [RUNNING, FINISHED]: 3727db96d56Sopenharmony_ci return False 3737db96d56Sopenharmony_ci 3747db96d56Sopenharmony_ci if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 3757db96d56Sopenharmony_ci return True 3767db96d56Sopenharmony_ci 3777db96d56Sopenharmony_ci self._state = CANCELLED 3787db96d56Sopenharmony_ci self._condition.notify_all() 3797db96d56Sopenharmony_ci 3807db96d56Sopenharmony_ci self._invoke_callbacks() 3817db96d56Sopenharmony_ci return True 3827db96d56Sopenharmony_ci 3837db96d56Sopenharmony_ci def cancelled(self): 3847db96d56Sopenharmony_ci """Return True if the future was cancelled.""" 3857db96d56Sopenharmony_ci with self._condition: 3867db96d56Sopenharmony_ci return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] 3877db96d56Sopenharmony_ci 3887db96d56Sopenharmony_ci def running(self): 3897db96d56Sopenharmony_ci """Return True if the future is currently executing.""" 3907db96d56Sopenharmony_ci with self._condition: 3917db96d56Sopenharmony_ci return self._state == RUNNING 3927db96d56Sopenharmony_ci 3937db96d56Sopenharmony_ci def done(self): 3947db96d56Sopenharmony_ci """Return True if the future was cancelled or finished executing.""" 3957db96d56Sopenharmony_ci with self._condition: 3967db96d56Sopenharmony_ci return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] 3977db96d56Sopenharmony_ci 3987db96d56Sopenharmony_ci def __get_result(self): 3997db96d56Sopenharmony_ci if self._exception: 4007db96d56Sopenharmony_ci try: 4017db96d56Sopenharmony_ci raise self._exception 4027db96d56Sopenharmony_ci finally: 4037db96d56Sopenharmony_ci # Break a reference cycle with the exception in self._exception 4047db96d56Sopenharmony_ci self = None 4057db96d56Sopenharmony_ci else: 4067db96d56Sopenharmony_ci return self._result 4077db96d56Sopenharmony_ci 4087db96d56Sopenharmony_ci def add_done_callback(self, fn): 4097db96d56Sopenharmony_ci """Attaches a callable that will be called when the future finishes. 4107db96d56Sopenharmony_ci 4117db96d56Sopenharmony_ci Args: 4127db96d56Sopenharmony_ci fn: A callable that will be called with this future as its only 4137db96d56Sopenharmony_ci argument when the future completes or is cancelled. The callable 4147db96d56Sopenharmony_ci will always be called by a thread in the same process in which 4157db96d56Sopenharmony_ci it was added. If the future has already completed or been 4167db96d56Sopenharmony_ci cancelled then the callable will be called immediately. These 4177db96d56Sopenharmony_ci callables are called in the order that they were added. 4187db96d56Sopenharmony_ci """ 4197db96d56Sopenharmony_ci with self._condition: 4207db96d56Sopenharmony_ci if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: 4217db96d56Sopenharmony_ci self._done_callbacks.append(fn) 4227db96d56Sopenharmony_ci return 4237db96d56Sopenharmony_ci try: 4247db96d56Sopenharmony_ci fn(self) 4257db96d56Sopenharmony_ci except Exception: 4267db96d56Sopenharmony_ci LOGGER.exception('exception calling callback for %r', self) 4277db96d56Sopenharmony_ci 4287db96d56Sopenharmony_ci def result(self, timeout=None): 4297db96d56Sopenharmony_ci """Return the result of the call that the future represents. 4307db96d56Sopenharmony_ci 4317db96d56Sopenharmony_ci Args: 4327db96d56Sopenharmony_ci timeout: The number of seconds to wait for the result if the future 4337db96d56Sopenharmony_ci isn't done. If None, then there is no limit on the wait time. 4347db96d56Sopenharmony_ci 4357db96d56Sopenharmony_ci Returns: 4367db96d56Sopenharmony_ci The result of the call that the future represents. 4377db96d56Sopenharmony_ci 4387db96d56Sopenharmony_ci Raises: 4397db96d56Sopenharmony_ci CancelledError: If the future was cancelled. 4407db96d56Sopenharmony_ci TimeoutError: If the future didn't finish executing before the given 4417db96d56Sopenharmony_ci timeout. 4427db96d56Sopenharmony_ci Exception: If the call raised then that exception will be raised. 4437db96d56Sopenharmony_ci """ 4447db96d56Sopenharmony_ci try: 4457db96d56Sopenharmony_ci with self._condition: 4467db96d56Sopenharmony_ci if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 4477db96d56Sopenharmony_ci raise CancelledError() 4487db96d56Sopenharmony_ci elif self._state == FINISHED: 4497db96d56Sopenharmony_ci return self.__get_result() 4507db96d56Sopenharmony_ci 4517db96d56Sopenharmony_ci self._condition.wait(timeout) 4527db96d56Sopenharmony_ci 4537db96d56Sopenharmony_ci if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 4547db96d56Sopenharmony_ci raise CancelledError() 4557db96d56Sopenharmony_ci elif self._state == FINISHED: 4567db96d56Sopenharmony_ci return self.__get_result() 4577db96d56Sopenharmony_ci else: 4587db96d56Sopenharmony_ci raise TimeoutError() 4597db96d56Sopenharmony_ci finally: 4607db96d56Sopenharmony_ci # Break a reference cycle with the exception in self._exception 4617db96d56Sopenharmony_ci self = None 4627db96d56Sopenharmony_ci 4637db96d56Sopenharmony_ci def exception(self, timeout=None): 4647db96d56Sopenharmony_ci """Return the exception raised by the call that the future represents. 4657db96d56Sopenharmony_ci 4667db96d56Sopenharmony_ci Args: 4677db96d56Sopenharmony_ci timeout: The number of seconds to wait for the exception if the 4687db96d56Sopenharmony_ci future isn't done. If None, then there is no limit on the wait 4697db96d56Sopenharmony_ci time. 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci Returns: 4727db96d56Sopenharmony_ci The exception raised by the call that the future represents or None 4737db96d56Sopenharmony_ci if the call completed without raising. 4747db96d56Sopenharmony_ci 4757db96d56Sopenharmony_ci Raises: 4767db96d56Sopenharmony_ci CancelledError: If the future was cancelled. 4777db96d56Sopenharmony_ci TimeoutError: If the future didn't finish executing before the given 4787db96d56Sopenharmony_ci timeout. 4797db96d56Sopenharmony_ci """ 4807db96d56Sopenharmony_ci 4817db96d56Sopenharmony_ci with self._condition: 4827db96d56Sopenharmony_ci if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 4837db96d56Sopenharmony_ci raise CancelledError() 4847db96d56Sopenharmony_ci elif self._state == FINISHED: 4857db96d56Sopenharmony_ci return self._exception 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ci self._condition.wait(timeout) 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: 4907db96d56Sopenharmony_ci raise CancelledError() 4917db96d56Sopenharmony_ci elif self._state == FINISHED: 4927db96d56Sopenharmony_ci return self._exception 4937db96d56Sopenharmony_ci else: 4947db96d56Sopenharmony_ci raise TimeoutError() 4957db96d56Sopenharmony_ci 4967db96d56Sopenharmony_ci # The following methods should only be used by Executors and in tests. 4977db96d56Sopenharmony_ci def set_running_or_notify_cancel(self): 4987db96d56Sopenharmony_ci """Mark the future as running or process any cancel notifications. 4997db96d56Sopenharmony_ci 5007db96d56Sopenharmony_ci Should only be used by Executor implementations and unit tests. 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci If the future has been cancelled (cancel() was called and returned 5037db96d56Sopenharmony_ci True) then any threads waiting on the future completing (though calls 5047db96d56Sopenharmony_ci to as_completed() or wait()) are notified and False is returned. 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ci If the future was not cancelled then it is put in the running state 5077db96d56Sopenharmony_ci (future calls to running() will return True) and True is returned. 5087db96d56Sopenharmony_ci 5097db96d56Sopenharmony_ci This method should be called by Executor implementations before 5107db96d56Sopenharmony_ci executing the work associated with this future. If this method returns 5117db96d56Sopenharmony_ci False then the work should not be executed. 5127db96d56Sopenharmony_ci 5137db96d56Sopenharmony_ci Returns: 5147db96d56Sopenharmony_ci False if the Future was cancelled, True otherwise. 5157db96d56Sopenharmony_ci 5167db96d56Sopenharmony_ci Raises: 5177db96d56Sopenharmony_ci RuntimeError: if this method was already called or if set_result() 5187db96d56Sopenharmony_ci or set_exception() was called. 5197db96d56Sopenharmony_ci """ 5207db96d56Sopenharmony_ci with self._condition: 5217db96d56Sopenharmony_ci if self._state == CANCELLED: 5227db96d56Sopenharmony_ci self._state = CANCELLED_AND_NOTIFIED 5237db96d56Sopenharmony_ci for waiter in self._waiters: 5247db96d56Sopenharmony_ci waiter.add_cancelled(self) 5257db96d56Sopenharmony_ci # self._condition.notify_all() is not necessary because 5267db96d56Sopenharmony_ci # self.cancel() triggers a notification. 5277db96d56Sopenharmony_ci return False 5287db96d56Sopenharmony_ci elif self._state == PENDING: 5297db96d56Sopenharmony_ci self._state = RUNNING 5307db96d56Sopenharmony_ci return True 5317db96d56Sopenharmony_ci else: 5327db96d56Sopenharmony_ci LOGGER.critical('Future %s in unexpected state: %s', 5337db96d56Sopenharmony_ci id(self), 5347db96d56Sopenharmony_ci self._state) 5357db96d56Sopenharmony_ci raise RuntimeError('Future in unexpected state') 5367db96d56Sopenharmony_ci 5377db96d56Sopenharmony_ci def set_result(self, result): 5387db96d56Sopenharmony_ci """Sets the return value of work associated with the future. 5397db96d56Sopenharmony_ci 5407db96d56Sopenharmony_ci Should only be used by Executor implementations and unit tests. 5417db96d56Sopenharmony_ci """ 5427db96d56Sopenharmony_ci with self._condition: 5437db96d56Sopenharmony_ci if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 5447db96d56Sopenharmony_ci raise InvalidStateError('{}: {!r}'.format(self._state, self)) 5457db96d56Sopenharmony_ci self._result = result 5467db96d56Sopenharmony_ci self._state = FINISHED 5477db96d56Sopenharmony_ci for waiter in self._waiters: 5487db96d56Sopenharmony_ci waiter.add_result(self) 5497db96d56Sopenharmony_ci self._condition.notify_all() 5507db96d56Sopenharmony_ci self._invoke_callbacks() 5517db96d56Sopenharmony_ci 5527db96d56Sopenharmony_ci def set_exception(self, exception): 5537db96d56Sopenharmony_ci """Sets the result of the future as being the given exception. 5547db96d56Sopenharmony_ci 5557db96d56Sopenharmony_ci Should only be used by Executor implementations and unit tests. 5567db96d56Sopenharmony_ci """ 5577db96d56Sopenharmony_ci with self._condition: 5587db96d56Sopenharmony_ci if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}: 5597db96d56Sopenharmony_ci raise InvalidStateError('{}: {!r}'.format(self._state, self)) 5607db96d56Sopenharmony_ci self._exception = exception 5617db96d56Sopenharmony_ci self._state = FINISHED 5627db96d56Sopenharmony_ci for waiter in self._waiters: 5637db96d56Sopenharmony_ci waiter.add_exception(self) 5647db96d56Sopenharmony_ci self._condition.notify_all() 5657db96d56Sopenharmony_ci self._invoke_callbacks() 5667db96d56Sopenharmony_ci 5677db96d56Sopenharmony_ci __class_getitem__ = classmethod(types.GenericAlias) 5687db96d56Sopenharmony_ci 5697db96d56Sopenharmony_ciclass Executor(object): 5707db96d56Sopenharmony_ci """This is an abstract base class for concrete asynchronous executors.""" 5717db96d56Sopenharmony_ci 5727db96d56Sopenharmony_ci def submit(self, fn, /, *args, **kwargs): 5737db96d56Sopenharmony_ci """Submits a callable to be executed with the given arguments. 5747db96d56Sopenharmony_ci 5757db96d56Sopenharmony_ci Schedules the callable to be executed as fn(*args, **kwargs) and returns 5767db96d56Sopenharmony_ci a Future instance representing the execution of the callable. 5777db96d56Sopenharmony_ci 5787db96d56Sopenharmony_ci Returns: 5797db96d56Sopenharmony_ci A Future representing the given call. 5807db96d56Sopenharmony_ci """ 5817db96d56Sopenharmony_ci raise NotImplementedError() 5827db96d56Sopenharmony_ci 5837db96d56Sopenharmony_ci def map(self, fn, *iterables, timeout=None, chunksize=1): 5847db96d56Sopenharmony_ci """Returns an iterator equivalent to map(fn, iter). 5857db96d56Sopenharmony_ci 5867db96d56Sopenharmony_ci Args: 5877db96d56Sopenharmony_ci fn: A callable that will take as many arguments as there are 5887db96d56Sopenharmony_ci passed iterables. 5897db96d56Sopenharmony_ci timeout: The maximum number of seconds to wait. If None, then there 5907db96d56Sopenharmony_ci is no limit on the wait time. 5917db96d56Sopenharmony_ci chunksize: The size of the chunks the iterable will be broken into 5927db96d56Sopenharmony_ci before being passed to a child process. This argument is only 5937db96d56Sopenharmony_ci used by ProcessPoolExecutor; it is ignored by 5947db96d56Sopenharmony_ci ThreadPoolExecutor. 5957db96d56Sopenharmony_ci 5967db96d56Sopenharmony_ci Returns: 5977db96d56Sopenharmony_ci An iterator equivalent to: map(func, *iterables) but the calls may 5987db96d56Sopenharmony_ci be evaluated out-of-order. 5997db96d56Sopenharmony_ci 6007db96d56Sopenharmony_ci Raises: 6017db96d56Sopenharmony_ci TimeoutError: If the entire result iterator could not be generated 6027db96d56Sopenharmony_ci before the given timeout. 6037db96d56Sopenharmony_ci Exception: If fn(*args) raises for any values. 6047db96d56Sopenharmony_ci """ 6057db96d56Sopenharmony_ci if timeout is not None: 6067db96d56Sopenharmony_ci end_time = timeout + time.monotonic() 6077db96d56Sopenharmony_ci 6087db96d56Sopenharmony_ci fs = [self.submit(fn, *args) for args in zip(*iterables)] 6097db96d56Sopenharmony_ci 6107db96d56Sopenharmony_ci # Yield must be hidden in closure so that the futures are submitted 6117db96d56Sopenharmony_ci # before the first iterator value is required. 6127db96d56Sopenharmony_ci def result_iterator(): 6137db96d56Sopenharmony_ci try: 6147db96d56Sopenharmony_ci # reverse to keep finishing order 6157db96d56Sopenharmony_ci fs.reverse() 6167db96d56Sopenharmony_ci while fs: 6177db96d56Sopenharmony_ci # Careful not to keep a reference to the popped future 6187db96d56Sopenharmony_ci if timeout is None: 6197db96d56Sopenharmony_ci yield _result_or_cancel(fs.pop()) 6207db96d56Sopenharmony_ci else: 6217db96d56Sopenharmony_ci yield _result_or_cancel(fs.pop(), end_time - time.monotonic()) 6227db96d56Sopenharmony_ci finally: 6237db96d56Sopenharmony_ci for future in fs: 6247db96d56Sopenharmony_ci future.cancel() 6257db96d56Sopenharmony_ci return result_iterator() 6267db96d56Sopenharmony_ci 6277db96d56Sopenharmony_ci def shutdown(self, wait=True, *, cancel_futures=False): 6287db96d56Sopenharmony_ci """Clean-up the resources associated with the Executor. 6297db96d56Sopenharmony_ci 6307db96d56Sopenharmony_ci It is safe to call this method several times. Otherwise, no other 6317db96d56Sopenharmony_ci methods can be called after this one. 6327db96d56Sopenharmony_ci 6337db96d56Sopenharmony_ci Args: 6347db96d56Sopenharmony_ci wait: If True then shutdown will not return until all running 6357db96d56Sopenharmony_ci futures have finished executing and the resources used by the 6367db96d56Sopenharmony_ci executor have been reclaimed. 6377db96d56Sopenharmony_ci cancel_futures: If True then shutdown will cancel all pending 6387db96d56Sopenharmony_ci futures. Futures that are completed or running will not be 6397db96d56Sopenharmony_ci cancelled. 6407db96d56Sopenharmony_ci """ 6417db96d56Sopenharmony_ci pass 6427db96d56Sopenharmony_ci 6437db96d56Sopenharmony_ci def __enter__(self): 6447db96d56Sopenharmony_ci return self 6457db96d56Sopenharmony_ci 6467db96d56Sopenharmony_ci def __exit__(self, exc_type, exc_val, exc_tb): 6477db96d56Sopenharmony_ci self.shutdown(wait=True) 6487db96d56Sopenharmony_ci return False 6497db96d56Sopenharmony_ci 6507db96d56Sopenharmony_ci 6517db96d56Sopenharmony_ciclass BrokenExecutor(RuntimeError): 6527db96d56Sopenharmony_ci """ 6537db96d56Sopenharmony_ci Raised when a executor has become non-functional after a severe failure. 6547db96d56Sopenharmony_ci """ 655