17db96d56Sopenharmony_ci# Copyright 2009 Brian Quinlan. All Rights Reserved.
27db96d56Sopenharmony_ci# Licensed to PSF under a Contributor Agreement.
37db96d56Sopenharmony_ci
47db96d56Sopenharmony_ci__author__ = 'Brian Quinlan (brian@sweetapp.com)'
57db96d56Sopenharmony_ci
67db96d56Sopenharmony_ciimport collections
77db96d56Sopenharmony_ciimport logging
87db96d56Sopenharmony_ciimport threading
97db96d56Sopenharmony_ciimport time
107db96d56Sopenharmony_ciimport types
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_ciFIRST_COMPLETED = 'FIRST_COMPLETED'
137db96d56Sopenharmony_ciFIRST_EXCEPTION = 'FIRST_EXCEPTION'
147db96d56Sopenharmony_ciALL_COMPLETED = 'ALL_COMPLETED'
157db96d56Sopenharmony_ci_AS_COMPLETED = '_AS_COMPLETED'
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_ci# Possible future states (for internal use by the futures package).
187db96d56Sopenharmony_ciPENDING = 'PENDING'
197db96d56Sopenharmony_ciRUNNING = 'RUNNING'
207db96d56Sopenharmony_ci# The future was cancelled by the user...
217db96d56Sopenharmony_ciCANCELLED = 'CANCELLED'
227db96d56Sopenharmony_ci# ...and _Waiter.add_cancelled() was called by a worker.
237db96d56Sopenharmony_ciCANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
247db96d56Sopenharmony_ciFINISHED = 'FINISHED'
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ci_FUTURE_STATES = [
277db96d56Sopenharmony_ci    PENDING,
287db96d56Sopenharmony_ci    RUNNING,
297db96d56Sopenharmony_ci    CANCELLED,
307db96d56Sopenharmony_ci    CANCELLED_AND_NOTIFIED,
317db96d56Sopenharmony_ci    FINISHED
327db96d56Sopenharmony_ci]
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_ci_STATE_TO_DESCRIPTION_MAP = {
357db96d56Sopenharmony_ci    PENDING: "pending",
367db96d56Sopenharmony_ci    RUNNING: "running",
377db96d56Sopenharmony_ci    CANCELLED: "cancelled",
387db96d56Sopenharmony_ci    CANCELLED_AND_NOTIFIED: "cancelled",
397db96d56Sopenharmony_ci    FINISHED: "finished"
407db96d56Sopenharmony_ci}
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci# Logger for internal use by the futures package.
437db96d56Sopenharmony_ciLOGGER = logging.getLogger("concurrent.futures")
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ciclass Error(Exception):
467db96d56Sopenharmony_ci    """Base class for all future-related exceptions."""
477db96d56Sopenharmony_ci    pass
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ciclass CancelledError(Error):
507db96d56Sopenharmony_ci    """The Future was cancelled."""
517db96d56Sopenharmony_ci    pass
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ciTimeoutError = TimeoutError  # make local alias for the standard exception
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ciclass InvalidStateError(Error):
567db96d56Sopenharmony_ci    """The operation is not allowed in this state."""
577db96d56Sopenharmony_ci    pass
587db96d56Sopenharmony_ci
597db96d56Sopenharmony_ciclass _Waiter(object):
607db96d56Sopenharmony_ci    """Provides the event that wait() and as_completed() block on."""
617db96d56Sopenharmony_ci    def __init__(self):
627db96d56Sopenharmony_ci        self.event = threading.Event()
637db96d56Sopenharmony_ci        self.finished_futures = []
647db96d56Sopenharmony_ci
657db96d56Sopenharmony_ci    def add_result(self, future):
667db96d56Sopenharmony_ci        self.finished_futures.append(future)
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci    def add_exception(self, future):
697db96d56Sopenharmony_ci        self.finished_futures.append(future)
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_ci    def add_cancelled(self, future):
727db96d56Sopenharmony_ci        self.finished_futures.append(future)
737db96d56Sopenharmony_ci
747db96d56Sopenharmony_ciclass _AsCompletedWaiter(_Waiter):
757db96d56Sopenharmony_ci    """Used by as_completed()."""
767db96d56Sopenharmony_ci
777db96d56Sopenharmony_ci    def __init__(self):
787db96d56Sopenharmony_ci        super(_AsCompletedWaiter, self).__init__()
797db96d56Sopenharmony_ci        self.lock = threading.Lock()
807db96d56Sopenharmony_ci
817db96d56Sopenharmony_ci    def add_result(self, future):
827db96d56Sopenharmony_ci        with self.lock:
837db96d56Sopenharmony_ci            super(_AsCompletedWaiter, self).add_result(future)
847db96d56Sopenharmony_ci            self.event.set()
857db96d56Sopenharmony_ci
867db96d56Sopenharmony_ci    def add_exception(self, future):
877db96d56Sopenharmony_ci        with self.lock:
887db96d56Sopenharmony_ci            super(_AsCompletedWaiter, self).add_exception(future)
897db96d56Sopenharmony_ci            self.event.set()
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci    def add_cancelled(self, future):
927db96d56Sopenharmony_ci        with self.lock:
937db96d56Sopenharmony_ci            super(_AsCompletedWaiter, self).add_cancelled(future)
947db96d56Sopenharmony_ci            self.event.set()
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ciclass _FirstCompletedWaiter(_Waiter):
977db96d56Sopenharmony_ci    """Used by wait(return_when=FIRST_COMPLETED)."""
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci    def add_result(self, future):
1007db96d56Sopenharmony_ci        super().add_result(future)
1017db96d56Sopenharmony_ci        self.event.set()
1027db96d56Sopenharmony_ci
1037db96d56Sopenharmony_ci    def add_exception(self, future):
1047db96d56Sopenharmony_ci        super().add_exception(future)
1057db96d56Sopenharmony_ci        self.event.set()
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ci    def add_cancelled(self, future):
1087db96d56Sopenharmony_ci        super().add_cancelled(future)
1097db96d56Sopenharmony_ci        self.event.set()
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ciclass _AllCompletedWaiter(_Waiter):
1127db96d56Sopenharmony_ci    """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ci    def __init__(self, num_pending_calls, stop_on_exception):
1157db96d56Sopenharmony_ci        self.num_pending_calls = num_pending_calls
1167db96d56Sopenharmony_ci        self.stop_on_exception = stop_on_exception
1177db96d56Sopenharmony_ci        self.lock = threading.Lock()
1187db96d56Sopenharmony_ci        super().__init__()
1197db96d56Sopenharmony_ci
1207db96d56Sopenharmony_ci    def _decrement_pending_calls(self):
1217db96d56Sopenharmony_ci        with self.lock:
1227db96d56Sopenharmony_ci            self.num_pending_calls -= 1
1237db96d56Sopenharmony_ci            if not self.num_pending_calls:
1247db96d56Sopenharmony_ci                self.event.set()
1257db96d56Sopenharmony_ci
1267db96d56Sopenharmony_ci    def add_result(self, future):
1277db96d56Sopenharmony_ci        super().add_result(future)
1287db96d56Sopenharmony_ci        self._decrement_pending_calls()
1297db96d56Sopenharmony_ci
1307db96d56Sopenharmony_ci    def add_exception(self, future):
1317db96d56Sopenharmony_ci        super().add_exception(future)
1327db96d56Sopenharmony_ci        if self.stop_on_exception:
1337db96d56Sopenharmony_ci            self.event.set()
1347db96d56Sopenharmony_ci        else:
1357db96d56Sopenharmony_ci            self._decrement_pending_calls()
1367db96d56Sopenharmony_ci
1377db96d56Sopenharmony_ci    def add_cancelled(self, future):
1387db96d56Sopenharmony_ci        super().add_cancelled(future)
1397db96d56Sopenharmony_ci        self._decrement_pending_calls()
1407db96d56Sopenharmony_ci
1417db96d56Sopenharmony_ciclass _AcquireFutures(object):
1427db96d56Sopenharmony_ci    """A context manager that does an ordered acquire of Future conditions."""
1437db96d56Sopenharmony_ci
1447db96d56Sopenharmony_ci    def __init__(self, futures):
1457db96d56Sopenharmony_ci        self.futures = sorted(futures, key=id)
1467db96d56Sopenharmony_ci
1477db96d56Sopenharmony_ci    def __enter__(self):
1487db96d56Sopenharmony_ci        for future in self.futures:
1497db96d56Sopenharmony_ci            future._condition.acquire()
1507db96d56Sopenharmony_ci
1517db96d56Sopenharmony_ci    def __exit__(self, *args):
1527db96d56Sopenharmony_ci        for future in self.futures:
1537db96d56Sopenharmony_ci            future._condition.release()
1547db96d56Sopenharmony_ci
1557db96d56Sopenharmony_cidef _create_and_install_waiters(fs, return_when):
1567db96d56Sopenharmony_ci    if return_when == _AS_COMPLETED:
1577db96d56Sopenharmony_ci        waiter = _AsCompletedWaiter()
1587db96d56Sopenharmony_ci    elif return_when == FIRST_COMPLETED:
1597db96d56Sopenharmony_ci        waiter = _FirstCompletedWaiter()
1607db96d56Sopenharmony_ci    else:
1617db96d56Sopenharmony_ci        pending_count = sum(
1627db96d56Sopenharmony_ci                f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
1637db96d56Sopenharmony_ci
1647db96d56Sopenharmony_ci        if return_when == FIRST_EXCEPTION:
1657db96d56Sopenharmony_ci            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
1667db96d56Sopenharmony_ci        elif return_when == ALL_COMPLETED:
1677db96d56Sopenharmony_ci            waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
1687db96d56Sopenharmony_ci        else:
1697db96d56Sopenharmony_ci            raise ValueError("Invalid return condition: %r" % return_when)
1707db96d56Sopenharmony_ci
1717db96d56Sopenharmony_ci    for f in fs:
1727db96d56Sopenharmony_ci        f._waiters.append(waiter)
1737db96d56Sopenharmony_ci
1747db96d56Sopenharmony_ci    return waiter
1757db96d56Sopenharmony_ci
1767db96d56Sopenharmony_ci
1777db96d56Sopenharmony_cidef _yield_finished_futures(fs, waiter, ref_collect):
1787db96d56Sopenharmony_ci    """
1797db96d56Sopenharmony_ci    Iterate on the list *fs*, yielding finished futures one by one in
1807db96d56Sopenharmony_ci    reverse order.
1817db96d56Sopenharmony_ci    Before yielding a future, *waiter* is removed from its waiters
1827db96d56Sopenharmony_ci    and the future is removed from each set in the collection of sets
1837db96d56Sopenharmony_ci    *ref_collect*.
1847db96d56Sopenharmony_ci
1857db96d56Sopenharmony_ci    The aim of this function is to avoid keeping stale references after
1867db96d56Sopenharmony_ci    the future is yielded and before the iterator resumes.
1877db96d56Sopenharmony_ci    """
1887db96d56Sopenharmony_ci    while fs:
1897db96d56Sopenharmony_ci        f = fs[-1]
1907db96d56Sopenharmony_ci        for futures_set in ref_collect:
1917db96d56Sopenharmony_ci            futures_set.remove(f)
1927db96d56Sopenharmony_ci        with f._condition:
1937db96d56Sopenharmony_ci            f._waiters.remove(waiter)
1947db96d56Sopenharmony_ci        del f
1957db96d56Sopenharmony_ci        # Careful not to keep a reference to the popped value
1967db96d56Sopenharmony_ci        yield fs.pop()
1977db96d56Sopenharmony_ci
1987db96d56Sopenharmony_ci
1997db96d56Sopenharmony_cidef as_completed(fs, timeout=None):
2007db96d56Sopenharmony_ci    """An iterator over the given futures that yields each as it completes.
2017db96d56Sopenharmony_ci
2027db96d56Sopenharmony_ci    Args:
2037db96d56Sopenharmony_ci        fs: The sequence of Futures (possibly created by different Executors) to
2047db96d56Sopenharmony_ci            iterate over.
2057db96d56Sopenharmony_ci        timeout: The maximum number of seconds to wait. If None, then there
2067db96d56Sopenharmony_ci            is no limit on the wait time.
2077db96d56Sopenharmony_ci
2087db96d56Sopenharmony_ci    Returns:
2097db96d56Sopenharmony_ci        An iterator that yields the given Futures as they complete (finished or
2107db96d56Sopenharmony_ci        cancelled). If any given Futures are duplicated, they will be returned
2117db96d56Sopenharmony_ci        once.
2127db96d56Sopenharmony_ci
2137db96d56Sopenharmony_ci    Raises:
2147db96d56Sopenharmony_ci        TimeoutError: If the entire result iterator could not be generated
2157db96d56Sopenharmony_ci            before the given timeout.
2167db96d56Sopenharmony_ci    """
2177db96d56Sopenharmony_ci    if timeout is not None:
2187db96d56Sopenharmony_ci        end_time = timeout + time.monotonic()
2197db96d56Sopenharmony_ci
2207db96d56Sopenharmony_ci    fs = set(fs)
2217db96d56Sopenharmony_ci    total_futures = len(fs)
2227db96d56Sopenharmony_ci    with _AcquireFutures(fs):
2237db96d56Sopenharmony_ci        finished = set(
2247db96d56Sopenharmony_ci                f for f in fs
2257db96d56Sopenharmony_ci                if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
2267db96d56Sopenharmony_ci        pending = fs - finished
2277db96d56Sopenharmony_ci        waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
2287db96d56Sopenharmony_ci    finished = list(finished)
2297db96d56Sopenharmony_ci    try:
2307db96d56Sopenharmony_ci        yield from _yield_finished_futures(finished, waiter,
2317db96d56Sopenharmony_ci                                           ref_collect=(fs,))
2327db96d56Sopenharmony_ci
2337db96d56Sopenharmony_ci        while pending:
2347db96d56Sopenharmony_ci            if timeout is None:
2357db96d56Sopenharmony_ci                wait_timeout = None
2367db96d56Sopenharmony_ci            else:
2377db96d56Sopenharmony_ci                wait_timeout = end_time - time.monotonic()
2387db96d56Sopenharmony_ci                if wait_timeout < 0:
2397db96d56Sopenharmony_ci                    raise TimeoutError(
2407db96d56Sopenharmony_ci                            '%d (of %d) futures unfinished' % (
2417db96d56Sopenharmony_ci                            len(pending), total_futures))
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ci            waiter.event.wait(wait_timeout)
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ci            with waiter.lock:
2467db96d56Sopenharmony_ci                finished = waiter.finished_futures
2477db96d56Sopenharmony_ci                waiter.finished_futures = []
2487db96d56Sopenharmony_ci                waiter.event.clear()
2497db96d56Sopenharmony_ci
2507db96d56Sopenharmony_ci            # reverse to keep finishing order
2517db96d56Sopenharmony_ci            finished.reverse()
2527db96d56Sopenharmony_ci            yield from _yield_finished_futures(finished, waiter,
2537db96d56Sopenharmony_ci                                               ref_collect=(fs, pending))
2547db96d56Sopenharmony_ci
2557db96d56Sopenharmony_ci    finally:
2567db96d56Sopenharmony_ci        # Remove waiter from unfinished futures
2577db96d56Sopenharmony_ci        for f in fs:
2587db96d56Sopenharmony_ci            with f._condition:
2597db96d56Sopenharmony_ci                f._waiters.remove(waiter)
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ciDoneAndNotDoneFutures = collections.namedtuple(
2627db96d56Sopenharmony_ci        'DoneAndNotDoneFutures', 'done not_done')
2637db96d56Sopenharmony_cidef wait(fs, timeout=None, return_when=ALL_COMPLETED):
2647db96d56Sopenharmony_ci    """Wait for the futures in the given sequence to complete.
2657db96d56Sopenharmony_ci
2667db96d56Sopenharmony_ci    Args:
2677db96d56Sopenharmony_ci        fs: The sequence of Futures (possibly created by different Executors) to
2687db96d56Sopenharmony_ci            wait upon.
2697db96d56Sopenharmony_ci        timeout: The maximum number of seconds to wait. If None, then there
2707db96d56Sopenharmony_ci            is no limit on the wait time.
2717db96d56Sopenharmony_ci        return_when: Indicates when this function should return. The options
2727db96d56Sopenharmony_ci            are:
2737db96d56Sopenharmony_ci
2747db96d56Sopenharmony_ci            FIRST_COMPLETED - Return when any future finishes or is
2757db96d56Sopenharmony_ci                              cancelled.
2767db96d56Sopenharmony_ci            FIRST_EXCEPTION - Return when any future finishes by raising an
2777db96d56Sopenharmony_ci                              exception. If no future raises an exception
2787db96d56Sopenharmony_ci                              then it is equivalent to ALL_COMPLETED.
2797db96d56Sopenharmony_ci            ALL_COMPLETED -   Return when all futures finish or are cancelled.
2807db96d56Sopenharmony_ci
2817db96d56Sopenharmony_ci    Returns:
2827db96d56Sopenharmony_ci        A named 2-tuple of sets. The first set, named 'done', contains the
2837db96d56Sopenharmony_ci        futures that completed (is finished or cancelled) before the wait
2847db96d56Sopenharmony_ci        completed. The second set, named 'not_done', contains uncompleted
2857db96d56Sopenharmony_ci        futures. Duplicate futures given to *fs* are removed and will be
2867db96d56Sopenharmony_ci        returned only once.
2877db96d56Sopenharmony_ci    """
2887db96d56Sopenharmony_ci    fs = set(fs)
2897db96d56Sopenharmony_ci    with _AcquireFutures(fs):
2907db96d56Sopenharmony_ci        done = {f for f in fs
2917db96d56Sopenharmony_ci                   if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]}
2927db96d56Sopenharmony_ci        not_done = fs - done
2937db96d56Sopenharmony_ci        if (return_when == FIRST_COMPLETED) and done:
2947db96d56Sopenharmony_ci            return DoneAndNotDoneFutures(done, not_done)
2957db96d56Sopenharmony_ci        elif (return_when == FIRST_EXCEPTION) and done:
2967db96d56Sopenharmony_ci            if any(f for f in done
2977db96d56Sopenharmony_ci                   if not f.cancelled() and f.exception() is not None):
2987db96d56Sopenharmony_ci                return DoneAndNotDoneFutures(done, not_done)
2997db96d56Sopenharmony_ci
3007db96d56Sopenharmony_ci        if len(done) == len(fs):
3017db96d56Sopenharmony_ci            return DoneAndNotDoneFutures(done, not_done)
3027db96d56Sopenharmony_ci
3037db96d56Sopenharmony_ci        waiter = _create_and_install_waiters(fs, return_when)
3047db96d56Sopenharmony_ci
3057db96d56Sopenharmony_ci    waiter.event.wait(timeout)
3067db96d56Sopenharmony_ci    for f in fs:
3077db96d56Sopenharmony_ci        with f._condition:
3087db96d56Sopenharmony_ci            f._waiters.remove(waiter)
3097db96d56Sopenharmony_ci
3107db96d56Sopenharmony_ci    done.update(waiter.finished_futures)
3117db96d56Sopenharmony_ci    return DoneAndNotDoneFutures(done, fs - done)
3127db96d56Sopenharmony_ci
3137db96d56Sopenharmony_ci
3147db96d56Sopenharmony_cidef _result_or_cancel(fut, timeout=None):
3157db96d56Sopenharmony_ci    try:
3167db96d56Sopenharmony_ci        try:
3177db96d56Sopenharmony_ci            return fut.result(timeout)
3187db96d56Sopenharmony_ci        finally:
3197db96d56Sopenharmony_ci            fut.cancel()
3207db96d56Sopenharmony_ci    finally:
3217db96d56Sopenharmony_ci        # Break a reference cycle with the exception in self._exception
3227db96d56Sopenharmony_ci        del fut
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ci
3257db96d56Sopenharmony_ciclass Future(object):
3267db96d56Sopenharmony_ci    """Represents the result of an asynchronous computation."""
3277db96d56Sopenharmony_ci
3287db96d56Sopenharmony_ci    def __init__(self):
3297db96d56Sopenharmony_ci        """Initializes the future. Should not be called by clients."""
3307db96d56Sopenharmony_ci        self._condition = threading.Condition()
3317db96d56Sopenharmony_ci        self._state = PENDING
3327db96d56Sopenharmony_ci        self._result = None
3337db96d56Sopenharmony_ci        self._exception = None
3347db96d56Sopenharmony_ci        self._waiters = []
3357db96d56Sopenharmony_ci        self._done_callbacks = []
3367db96d56Sopenharmony_ci
3377db96d56Sopenharmony_ci    def _invoke_callbacks(self):
3387db96d56Sopenharmony_ci        for callback in self._done_callbacks:
3397db96d56Sopenharmony_ci            try:
3407db96d56Sopenharmony_ci                callback(self)
3417db96d56Sopenharmony_ci            except Exception:
3427db96d56Sopenharmony_ci                LOGGER.exception('exception calling callback for %r', self)
3437db96d56Sopenharmony_ci
3447db96d56Sopenharmony_ci    def __repr__(self):
3457db96d56Sopenharmony_ci        with self._condition:
3467db96d56Sopenharmony_ci            if self._state == FINISHED:
3477db96d56Sopenharmony_ci                if self._exception:
3487db96d56Sopenharmony_ci                    return '<%s at %#x state=%s raised %s>' % (
3497db96d56Sopenharmony_ci                        self.__class__.__name__,
3507db96d56Sopenharmony_ci                        id(self),
3517db96d56Sopenharmony_ci                        _STATE_TO_DESCRIPTION_MAP[self._state],
3527db96d56Sopenharmony_ci                        self._exception.__class__.__name__)
3537db96d56Sopenharmony_ci                else:
3547db96d56Sopenharmony_ci                    return '<%s at %#x state=%s returned %s>' % (
3557db96d56Sopenharmony_ci                        self.__class__.__name__,
3567db96d56Sopenharmony_ci                        id(self),
3577db96d56Sopenharmony_ci                        _STATE_TO_DESCRIPTION_MAP[self._state],
3587db96d56Sopenharmony_ci                        self._result.__class__.__name__)
3597db96d56Sopenharmony_ci            return '<%s at %#x state=%s>' % (
3607db96d56Sopenharmony_ci                    self.__class__.__name__,
3617db96d56Sopenharmony_ci                    id(self),
3627db96d56Sopenharmony_ci                   _STATE_TO_DESCRIPTION_MAP[self._state])
3637db96d56Sopenharmony_ci
3647db96d56Sopenharmony_ci    def cancel(self):
3657db96d56Sopenharmony_ci        """Cancel the future if possible.
3667db96d56Sopenharmony_ci
3677db96d56Sopenharmony_ci        Returns True if the future was cancelled, False otherwise. A future
3687db96d56Sopenharmony_ci        cannot be cancelled if it is running or has already completed.
3697db96d56Sopenharmony_ci        """
3707db96d56Sopenharmony_ci        with self._condition:
3717db96d56Sopenharmony_ci            if self._state in [RUNNING, FINISHED]:
3727db96d56Sopenharmony_ci                return False
3737db96d56Sopenharmony_ci
3747db96d56Sopenharmony_ci            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
3757db96d56Sopenharmony_ci                return True
3767db96d56Sopenharmony_ci
3777db96d56Sopenharmony_ci            self._state = CANCELLED
3787db96d56Sopenharmony_ci            self._condition.notify_all()
3797db96d56Sopenharmony_ci
3807db96d56Sopenharmony_ci        self._invoke_callbacks()
3817db96d56Sopenharmony_ci        return True
3827db96d56Sopenharmony_ci
3837db96d56Sopenharmony_ci    def cancelled(self):
3847db96d56Sopenharmony_ci        """Return True if the future was cancelled."""
3857db96d56Sopenharmony_ci        with self._condition:
3867db96d56Sopenharmony_ci            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
3877db96d56Sopenharmony_ci
3887db96d56Sopenharmony_ci    def running(self):
3897db96d56Sopenharmony_ci        """Return True if the future is currently executing."""
3907db96d56Sopenharmony_ci        with self._condition:
3917db96d56Sopenharmony_ci            return self._state == RUNNING
3927db96d56Sopenharmony_ci
3937db96d56Sopenharmony_ci    def done(self):
3947db96d56Sopenharmony_ci        """Return True if the future was cancelled or finished executing."""
3957db96d56Sopenharmony_ci        with self._condition:
3967db96d56Sopenharmony_ci            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
3977db96d56Sopenharmony_ci
3987db96d56Sopenharmony_ci    def __get_result(self):
3997db96d56Sopenharmony_ci        if self._exception:
4007db96d56Sopenharmony_ci            try:
4017db96d56Sopenharmony_ci                raise self._exception
4027db96d56Sopenharmony_ci            finally:
4037db96d56Sopenharmony_ci                # Break a reference cycle with the exception in self._exception
4047db96d56Sopenharmony_ci                self = None
4057db96d56Sopenharmony_ci        else:
4067db96d56Sopenharmony_ci            return self._result
4077db96d56Sopenharmony_ci
4087db96d56Sopenharmony_ci    def add_done_callback(self, fn):
4097db96d56Sopenharmony_ci        """Attaches a callable that will be called when the future finishes.
4107db96d56Sopenharmony_ci
4117db96d56Sopenharmony_ci        Args:
4127db96d56Sopenharmony_ci            fn: A callable that will be called with this future as its only
4137db96d56Sopenharmony_ci                argument when the future completes or is cancelled. The callable
4147db96d56Sopenharmony_ci                will always be called by a thread in the same process in which
4157db96d56Sopenharmony_ci                it was added. If the future has already completed or been
4167db96d56Sopenharmony_ci                cancelled then the callable will be called immediately. These
4177db96d56Sopenharmony_ci                callables are called in the order that they were added.
4187db96d56Sopenharmony_ci        """
4197db96d56Sopenharmony_ci        with self._condition:
4207db96d56Sopenharmony_ci            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
4217db96d56Sopenharmony_ci                self._done_callbacks.append(fn)
4227db96d56Sopenharmony_ci                return
4237db96d56Sopenharmony_ci        try:
4247db96d56Sopenharmony_ci            fn(self)
4257db96d56Sopenharmony_ci        except Exception:
4267db96d56Sopenharmony_ci            LOGGER.exception('exception calling callback for %r', self)
4277db96d56Sopenharmony_ci
4287db96d56Sopenharmony_ci    def result(self, timeout=None):
4297db96d56Sopenharmony_ci        """Return the result of the call that the future represents.
4307db96d56Sopenharmony_ci
4317db96d56Sopenharmony_ci        Args:
4327db96d56Sopenharmony_ci            timeout: The number of seconds to wait for the result if the future
4337db96d56Sopenharmony_ci                isn't done. If None, then there is no limit on the wait time.
4347db96d56Sopenharmony_ci
4357db96d56Sopenharmony_ci        Returns:
4367db96d56Sopenharmony_ci            The result of the call that the future represents.
4377db96d56Sopenharmony_ci
4387db96d56Sopenharmony_ci        Raises:
4397db96d56Sopenharmony_ci            CancelledError: If the future was cancelled.
4407db96d56Sopenharmony_ci            TimeoutError: If the future didn't finish executing before the given
4417db96d56Sopenharmony_ci                timeout.
4427db96d56Sopenharmony_ci            Exception: If the call raised then that exception will be raised.
4437db96d56Sopenharmony_ci        """
4447db96d56Sopenharmony_ci        try:
4457db96d56Sopenharmony_ci            with self._condition:
4467db96d56Sopenharmony_ci                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
4477db96d56Sopenharmony_ci                    raise CancelledError()
4487db96d56Sopenharmony_ci                elif self._state == FINISHED:
4497db96d56Sopenharmony_ci                    return self.__get_result()
4507db96d56Sopenharmony_ci
4517db96d56Sopenharmony_ci                self._condition.wait(timeout)
4527db96d56Sopenharmony_ci
4537db96d56Sopenharmony_ci                if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
4547db96d56Sopenharmony_ci                    raise CancelledError()
4557db96d56Sopenharmony_ci                elif self._state == FINISHED:
4567db96d56Sopenharmony_ci                    return self.__get_result()
4577db96d56Sopenharmony_ci                else:
4587db96d56Sopenharmony_ci                    raise TimeoutError()
4597db96d56Sopenharmony_ci        finally:
4607db96d56Sopenharmony_ci            # Break a reference cycle with the exception in self._exception
4617db96d56Sopenharmony_ci            self = None
4627db96d56Sopenharmony_ci
4637db96d56Sopenharmony_ci    def exception(self, timeout=None):
4647db96d56Sopenharmony_ci        """Return the exception raised by the call that the future represents.
4657db96d56Sopenharmony_ci
4667db96d56Sopenharmony_ci        Args:
4677db96d56Sopenharmony_ci            timeout: The number of seconds to wait for the exception if the
4687db96d56Sopenharmony_ci                future isn't done. If None, then there is no limit on the wait
4697db96d56Sopenharmony_ci                time.
4707db96d56Sopenharmony_ci
4717db96d56Sopenharmony_ci        Returns:
4727db96d56Sopenharmony_ci            The exception raised by the call that the future represents or None
4737db96d56Sopenharmony_ci            if the call completed without raising.
4747db96d56Sopenharmony_ci
4757db96d56Sopenharmony_ci        Raises:
4767db96d56Sopenharmony_ci            CancelledError: If the future was cancelled.
4777db96d56Sopenharmony_ci            TimeoutError: If the future didn't finish executing before the given
4787db96d56Sopenharmony_ci                timeout.
4797db96d56Sopenharmony_ci        """
4807db96d56Sopenharmony_ci
4817db96d56Sopenharmony_ci        with self._condition:
4827db96d56Sopenharmony_ci            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
4837db96d56Sopenharmony_ci                raise CancelledError()
4847db96d56Sopenharmony_ci            elif self._state == FINISHED:
4857db96d56Sopenharmony_ci                return self._exception
4867db96d56Sopenharmony_ci
4877db96d56Sopenharmony_ci            self._condition.wait(timeout)
4887db96d56Sopenharmony_ci
4897db96d56Sopenharmony_ci            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
4907db96d56Sopenharmony_ci                raise CancelledError()
4917db96d56Sopenharmony_ci            elif self._state == FINISHED:
4927db96d56Sopenharmony_ci                return self._exception
4937db96d56Sopenharmony_ci            else:
4947db96d56Sopenharmony_ci                raise TimeoutError()
4957db96d56Sopenharmony_ci
4967db96d56Sopenharmony_ci    # The following methods should only be used by Executors and in tests.
4977db96d56Sopenharmony_ci    def set_running_or_notify_cancel(self):
4987db96d56Sopenharmony_ci        """Mark the future as running or process any cancel notifications.
4997db96d56Sopenharmony_ci
5007db96d56Sopenharmony_ci        Should only be used by Executor implementations and unit tests.
5017db96d56Sopenharmony_ci
5027db96d56Sopenharmony_ci        If the future has been cancelled (cancel() was called and returned
5037db96d56Sopenharmony_ci        True) then any threads waiting on the future completing (though calls
5047db96d56Sopenharmony_ci        to as_completed() or wait()) are notified and False is returned.
5057db96d56Sopenharmony_ci
5067db96d56Sopenharmony_ci        If the future was not cancelled then it is put in the running state
5077db96d56Sopenharmony_ci        (future calls to running() will return True) and True is returned.
5087db96d56Sopenharmony_ci
5097db96d56Sopenharmony_ci        This method should be called by Executor implementations before
5107db96d56Sopenharmony_ci        executing the work associated with this future. If this method returns
5117db96d56Sopenharmony_ci        False then the work should not be executed.
5127db96d56Sopenharmony_ci
5137db96d56Sopenharmony_ci        Returns:
5147db96d56Sopenharmony_ci            False if the Future was cancelled, True otherwise.
5157db96d56Sopenharmony_ci
5167db96d56Sopenharmony_ci        Raises:
5177db96d56Sopenharmony_ci            RuntimeError: if this method was already called or if set_result()
5187db96d56Sopenharmony_ci                or set_exception() was called.
5197db96d56Sopenharmony_ci        """
5207db96d56Sopenharmony_ci        with self._condition:
5217db96d56Sopenharmony_ci            if self._state == CANCELLED:
5227db96d56Sopenharmony_ci                self._state = CANCELLED_AND_NOTIFIED
5237db96d56Sopenharmony_ci                for waiter in self._waiters:
5247db96d56Sopenharmony_ci                    waiter.add_cancelled(self)
5257db96d56Sopenharmony_ci                # self._condition.notify_all() is not necessary because
5267db96d56Sopenharmony_ci                # self.cancel() triggers a notification.
5277db96d56Sopenharmony_ci                return False
5287db96d56Sopenharmony_ci            elif self._state == PENDING:
5297db96d56Sopenharmony_ci                self._state = RUNNING
5307db96d56Sopenharmony_ci                return True
5317db96d56Sopenharmony_ci            else:
5327db96d56Sopenharmony_ci                LOGGER.critical('Future %s in unexpected state: %s',
5337db96d56Sopenharmony_ci                                id(self),
5347db96d56Sopenharmony_ci                                self._state)
5357db96d56Sopenharmony_ci                raise RuntimeError('Future in unexpected state')
5367db96d56Sopenharmony_ci
5377db96d56Sopenharmony_ci    def set_result(self, result):
5387db96d56Sopenharmony_ci        """Sets the return value of work associated with the future.
5397db96d56Sopenharmony_ci
5407db96d56Sopenharmony_ci        Should only be used by Executor implementations and unit tests.
5417db96d56Sopenharmony_ci        """
5427db96d56Sopenharmony_ci        with self._condition:
5437db96d56Sopenharmony_ci            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
5447db96d56Sopenharmony_ci                raise InvalidStateError('{}: {!r}'.format(self._state, self))
5457db96d56Sopenharmony_ci            self._result = result
5467db96d56Sopenharmony_ci            self._state = FINISHED
5477db96d56Sopenharmony_ci            for waiter in self._waiters:
5487db96d56Sopenharmony_ci                waiter.add_result(self)
5497db96d56Sopenharmony_ci            self._condition.notify_all()
5507db96d56Sopenharmony_ci        self._invoke_callbacks()
5517db96d56Sopenharmony_ci
5527db96d56Sopenharmony_ci    def set_exception(self, exception):
5537db96d56Sopenharmony_ci        """Sets the result of the future as being the given exception.
5547db96d56Sopenharmony_ci
5557db96d56Sopenharmony_ci        Should only be used by Executor implementations and unit tests.
5567db96d56Sopenharmony_ci        """
5577db96d56Sopenharmony_ci        with self._condition:
5587db96d56Sopenharmony_ci            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
5597db96d56Sopenharmony_ci                raise InvalidStateError('{}: {!r}'.format(self._state, self))
5607db96d56Sopenharmony_ci            self._exception = exception
5617db96d56Sopenharmony_ci            self._state = FINISHED
5627db96d56Sopenharmony_ci            for waiter in self._waiters:
5637db96d56Sopenharmony_ci                waiter.add_exception(self)
5647db96d56Sopenharmony_ci            self._condition.notify_all()
5657db96d56Sopenharmony_ci        self._invoke_callbacks()
5667db96d56Sopenharmony_ci
5677db96d56Sopenharmony_ci    __class_getitem__ = classmethod(types.GenericAlias)
5687db96d56Sopenharmony_ci
5697db96d56Sopenharmony_ciclass Executor(object):
5707db96d56Sopenharmony_ci    """This is an abstract base class for concrete asynchronous executors."""
5717db96d56Sopenharmony_ci
5727db96d56Sopenharmony_ci    def submit(self, fn, /, *args, **kwargs):
5737db96d56Sopenharmony_ci        """Submits a callable to be executed with the given arguments.
5747db96d56Sopenharmony_ci
5757db96d56Sopenharmony_ci        Schedules the callable to be executed as fn(*args, **kwargs) and returns
5767db96d56Sopenharmony_ci        a Future instance representing the execution of the callable.
5777db96d56Sopenharmony_ci
5787db96d56Sopenharmony_ci        Returns:
5797db96d56Sopenharmony_ci            A Future representing the given call.
5807db96d56Sopenharmony_ci        """
5817db96d56Sopenharmony_ci        raise NotImplementedError()
5827db96d56Sopenharmony_ci
5837db96d56Sopenharmony_ci    def map(self, fn, *iterables, timeout=None, chunksize=1):
5847db96d56Sopenharmony_ci        """Returns an iterator equivalent to map(fn, iter).
5857db96d56Sopenharmony_ci
5867db96d56Sopenharmony_ci        Args:
5877db96d56Sopenharmony_ci            fn: A callable that will take as many arguments as there are
5887db96d56Sopenharmony_ci                passed iterables.
5897db96d56Sopenharmony_ci            timeout: The maximum number of seconds to wait. If None, then there
5907db96d56Sopenharmony_ci                is no limit on the wait time.
5917db96d56Sopenharmony_ci            chunksize: The size of the chunks the iterable will be broken into
5927db96d56Sopenharmony_ci                before being passed to a child process. This argument is only
5937db96d56Sopenharmony_ci                used by ProcessPoolExecutor; it is ignored by
5947db96d56Sopenharmony_ci                ThreadPoolExecutor.
5957db96d56Sopenharmony_ci
5967db96d56Sopenharmony_ci        Returns:
5977db96d56Sopenharmony_ci            An iterator equivalent to: map(func, *iterables) but the calls may
5987db96d56Sopenharmony_ci            be evaluated out-of-order.
5997db96d56Sopenharmony_ci
6007db96d56Sopenharmony_ci        Raises:
6017db96d56Sopenharmony_ci            TimeoutError: If the entire result iterator could not be generated
6027db96d56Sopenharmony_ci                before the given timeout.
6037db96d56Sopenharmony_ci            Exception: If fn(*args) raises for any values.
6047db96d56Sopenharmony_ci        """
6057db96d56Sopenharmony_ci        if timeout is not None:
6067db96d56Sopenharmony_ci            end_time = timeout + time.monotonic()
6077db96d56Sopenharmony_ci
6087db96d56Sopenharmony_ci        fs = [self.submit(fn, *args) for args in zip(*iterables)]
6097db96d56Sopenharmony_ci
6107db96d56Sopenharmony_ci        # Yield must be hidden in closure so that the futures are submitted
6117db96d56Sopenharmony_ci        # before the first iterator value is required.
6127db96d56Sopenharmony_ci        def result_iterator():
6137db96d56Sopenharmony_ci            try:
6147db96d56Sopenharmony_ci                # reverse to keep finishing order
6157db96d56Sopenharmony_ci                fs.reverse()
6167db96d56Sopenharmony_ci                while fs:
6177db96d56Sopenharmony_ci                    # Careful not to keep a reference to the popped future
6187db96d56Sopenharmony_ci                    if timeout is None:
6197db96d56Sopenharmony_ci                        yield _result_or_cancel(fs.pop())
6207db96d56Sopenharmony_ci                    else:
6217db96d56Sopenharmony_ci                        yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
6227db96d56Sopenharmony_ci            finally:
6237db96d56Sopenharmony_ci                for future in fs:
6247db96d56Sopenharmony_ci                    future.cancel()
6257db96d56Sopenharmony_ci        return result_iterator()
6267db96d56Sopenharmony_ci
6277db96d56Sopenharmony_ci    def shutdown(self, wait=True, *, cancel_futures=False):
6287db96d56Sopenharmony_ci        """Clean-up the resources associated with the Executor.
6297db96d56Sopenharmony_ci
6307db96d56Sopenharmony_ci        It is safe to call this method several times. Otherwise, no other
6317db96d56Sopenharmony_ci        methods can be called after this one.
6327db96d56Sopenharmony_ci
6337db96d56Sopenharmony_ci        Args:
6347db96d56Sopenharmony_ci            wait: If True then shutdown will not return until all running
6357db96d56Sopenharmony_ci                futures have finished executing and the resources used by the
6367db96d56Sopenharmony_ci                executor have been reclaimed.
6377db96d56Sopenharmony_ci            cancel_futures: If True then shutdown will cancel all pending
6387db96d56Sopenharmony_ci                futures. Futures that are completed or running will not be
6397db96d56Sopenharmony_ci                cancelled.
6407db96d56Sopenharmony_ci        """
6417db96d56Sopenharmony_ci        pass
6427db96d56Sopenharmony_ci
6437db96d56Sopenharmony_ci    def __enter__(self):
6447db96d56Sopenharmony_ci        return self
6457db96d56Sopenharmony_ci
6467db96d56Sopenharmony_ci    def __exit__(self, exc_type, exc_val, exc_tb):
6477db96d56Sopenharmony_ci        self.shutdown(wait=True)
6487db96d56Sopenharmony_ci        return False
6497db96d56Sopenharmony_ci
6507db96d56Sopenharmony_ci
6517db96d56Sopenharmony_ciclass BrokenExecutor(RuntimeError):
6527db96d56Sopenharmony_ci    """
6537db96d56Sopenharmony_ci    Raised when a executor has become non-functional after a severe failure.
6547db96d56Sopenharmony_ci    """
655