17db96d56Sopenharmony_ci__all__ = ('Runner', 'run')
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport contextvars
47db96d56Sopenharmony_ciimport enum
57db96d56Sopenharmony_ciimport functools
67db96d56Sopenharmony_ciimport threading
77db96d56Sopenharmony_ciimport signal
87db96d56Sopenharmony_ciimport sys
97db96d56Sopenharmony_cifrom . import coroutines
107db96d56Sopenharmony_cifrom . import events
117db96d56Sopenharmony_cifrom . import exceptions
127db96d56Sopenharmony_cifrom . import tasks
137db96d56Sopenharmony_ci
147db96d56Sopenharmony_ci
157db96d56Sopenharmony_ciclass _State(enum.Enum):
167db96d56Sopenharmony_ci    CREATED = "created"
177db96d56Sopenharmony_ci    INITIALIZED = "initialized"
187db96d56Sopenharmony_ci    CLOSED = "closed"
197db96d56Sopenharmony_ci
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_ciclass Runner:
227db96d56Sopenharmony_ci    """A context manager that controls event loop life cycle.
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ci    The context manager always creates a new event loop,
257db96d56Sopenharmony_ci    allows to run async functions inside it,
267db96d56Sopenharmony_ci    and properly finalizes the loop at the context manager exit.
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ci    If debug is True, the event loop will be run in debug mode.
297db96d56Sopenharmony_ci    If loop_factory is passed, it is used for new event loop creation.
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci    asyncio.run(main(), debug=True)
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ci    is a shortcut for
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_ci    with asyncio.Runner(debug=True) as runner:
367db96d56Sopenharmony_ci        runner.run(main())
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ci    The run() method can be called multiple times within the runner's context.
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_ci    This can be useful for interactive console (e.g. IPython),
417db96d56Sopenharmony_ci    unittest runners, console tools, -- everywhere when async code
427db96d56Sopenharmony_ci    is called from existing sync framework and where the preferred single
437db96d56Sopenharmony_ci    asyncio.run() call doesn't work.
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci    """
467db96d56Sopenharmony_ci
477db96d56Sopenharmony_ci    # Note: the class is final, it is not intended for inheritance.
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ci    def __init__(self, *, debug=None, loop_factory=None):
507db96d56Sopenharmony_ci        self._state = _State.CREATED
517db96d56Sopenharmony_ci        self._debug = debug
527db96d56Sopenharmony_ci        self._loop_factory = loop_factory
537db96d56Sopenharmony_ci        self._loop = None
547db96d56Sopenharmony_ci        self._context = None
557db96d56Sopenharmony_ci        self._interrupt_count = 0
567db96d56Sopenharmony_ci        self._set_event_loop = False
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci    def __enter__(self):
597db96d56Sopenharmony_ci        self._lazy_init()
607db96d56Sopenharmony_ci        return self
617db96d56Sopenharmony_ci
627db96d56Sopenharmony_ci    def __exit__(self, exc_type, exc_val, exc_tb):
637db96d56Sopenharmony_ci        self.close()
647db96d56Sopenharmony_ci
657db96d56Sopenharmony_ci    def close(self):
667db96d56Sopenharmony_ci        """Shutdown and close event loop."""
677db96d56Sopenharmony_ci        if self._state is not _State.INITIALIZED:
687db96d56Sopenharmony_ci            return
697db96d56Sopenharmony_ci        try:
707db96d56Sopenharmony_ci            loop = self._loop
717db96d56Sopenharmony_ci            _cancel_all_tasks(loop)
727db96d56Sopenharmony_ci            loop.run_until_complete(loop.shutdown_asyncgens())
737db96d56Sopenharmony_ci            loop.run_until_complete(loop.shutdown_default_executor())
747db96d56Sopenharmony_ci        finally:
757db96d56Sopenharmony_ci            if self._set_event_loop:
767db96d56Sopenharmony_ci                events.set_event_loop(None)
777db96d56Sopenharmony_ci            loop.close()
787db96d56Sopenharmony_ci            self._loop = None
797db96d56Sopenharmony_ci            self._state = _State.CLOSED
807db96d56Sopenharmony_ci
817db96d56Sopenharmony_ci    def get_loop(self):
827db96d56Sopenharmony_ci        """Return embedded event loop."""
837db96d56Sopenharmony_ci        self._lazy_init()
847db96d56Sopenharmony_ci        return self._loop
857db96d56Sopenharmony_ci
867db96d56Sopenharmony_ci    def run(self, coro, *, context=None):
877db96d56Sopenharmony_ci        """Run a coroutine inside the embedded event loop."""
887db96d56Sopenharmony_ci        if not coroutines.iscoroutine(coro):
897db96d56Sopenharmony_ci            raise ValueError("a coroutine was expected, got {!r}".format(coro))
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci        if events._get_running_loop() is not None:
927db96d56Sopenharmony_ci            # fail fast with short traceback
937db96d56Sopenharmony_ci            raise RuntimeError(
947db96d56Sopenharmony_ci                "Runner.run() cannot be called from a running event loop")
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ci        self._lazy_init()
977db96d56Sopenharmony_ci
987db96d56Sopenharmony_ci        if context is None:
997db96d56Sopenharmony_ci            context = self._context
1007db96d56Sopenharmony_ci        task = self._loop.create_task(coro, context=context)
1017db96d56Sopenharmony_ci
1027db96d56Sopenharmony_ci        if (threading.current_thread() is threading.main_thread()
1037db96d56Sopenharmony_ci            and signal.getsignal(signal.SIGINT) is signal.default_int_handler
1047db96d56Sopenharmony_ci        ):
1057db96d56Sopenharmony_ci            sigint_handler = functools.partial(self._on_sigint, main_task=task)
1067db96d56Sopenharmony_ci            try:
1077db96d56Sopenharmony_ci                signal.signal(signal.SIGINT, sigint_handler)
1087db96d56Sopenharmony_ci            except ValueError:
1097db96d56Sopenharmony_ci                # `signal.signal` may throw if `threading.main_thread` does
1107db96d56Sopenharmony_ci                # not support signals (e.g. embedded interpreter with signals
1117db96d56Sopenharmony_ci                # not registered - see gh-91880)
1127db96d56Sopenharmony_ci                sigint_handler = None
1137db96d56Sopenharmony_ci        else:
1147db96d56Sopenharmony_ci            sigint_handler = None
1157db96d56Sopenharmony_ci
1167db96d56Sopenharmony_ci        self._interrupt_count = 0
1177db96d56Sopenharmony_ci        try:
1187db96d56Sopenharmony_ci            return self._loop.run_until_complete(task)
1197db96d56Sopenharmony_ci        except exceptions.CancelledError:
1207db96d56Sopenharmony_ci            if self._interrupt_count > 0:
1217db96d56Sopenharmony_ci                uncancel = getattr(task, "uncancel", None)
1227db96d56Sopenharmony_ci                if uncancel is not None and uncancel() == 0:
1237db96d56Sopenharmony_ci                    raise KeyboardInterrupt()
1247db96d56Sopenharmony_ci            raise  # CancelledError
1257db96d56Sopenharmony_ci        finally:
1267db96d56Sopenharmony_ci            if (sigint_handler is not None
1277db96d56Sopenharmony_ci                and signal.getsignal(signal.SIGINT) is sigint_handler
1287db96d56Sopenharmony_ci            ):
1297db96d56Sopenharmony_ci                signal.signal(signal.SIGINT, signal.default_int_handler)
1307db96d56Sopenharmony_ci
1317db96d56Sopenharmony_ci    def _lazy_init(self):
1327db96d56Sopenharmony_ci        if self._state is _State.CLOSED:
1337db96d56Sopenharmony_ci            raise RuntimeError("Runner is closed")
1347db96d56Sopenharmony_ci        if self._state is _State.INITIALIZED:
1357db96d56Sopenharmony_ci            return
1367db96d56Sopenharmony_ci        if self._loop_factory is None:
1377db96d56Sopenharmony_ci            self._loop = events.new_event_loop()
1387db96d56Sopenharmony_ci            if not self._set_event_loop:
1397db96d56Sopenharmony_ci                # Call set_event_loop only once to avoid calling
1407db96d56Sopenharmony_ci                # attach_loop multiple times on child watchers
1417db96d56Sopenharmony_ci                events.set_event_loop(self._loop)
1427db96d56Sopenharmony_ci                self._set_event_loop = True
1437db96d56Sopenharmony_ci        else:
1447db96d56Sopenharmony_ci            self._loop = self._loop_factory()
1457db96d56Sopenharmony_ci        if self._debug is not None:
1467db96d56Sopenharmony_ci            self._loop.set_debug(self._debug)
1477db96d56Sopenharmony_ci        self._context = contextvars.copy_context()
1487db96d56Sopenharmony_ci        self._state = _State.INITIALIZED
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci    def _on_sigint(self, signum, frame, main_task):
1517db96d56Sopenharmony_ci        self._interrupt_count += 1
1527db96d56Sopenharmony_ci        if self._interrupt_count == 1 and not main_task.done():
1537db96d56Sopenharmony_ci            main_task.cancel()
1547db96d56Sopenharmony_ci            # wakeup loop if it is blocked by select() with long timeout
1557db96d56Sopenharmony_ci            self._loop.call_soon_threadsafe(lambda: None)
1567db96d56Sopenharmony_ci            return
1577db96d56Sopenharmony_ci        raise KeyboardInterrupt()
1587db96d56Sopenharmony_ci
1597db96d56Sopenharmony_ci
1607db96d56Sopenharmony_cidef run(main, *, debug=None):
1617db96d56Sopenharmony_ci    """Execute the coroutine and return the result.
1627db96d56Sopenharmony_ci
1637db96d56Sopenharmony_ci    This function runs the passed coroutine, taking care of
1647db96d56Sopenharmony_ci    managing the asyncio event loop and finalizing asynchronous
1657db96d56Sopenharmony_ci    generators.
1667db96d56Sopenharmony_ci
1677db96d56Sopenharmony_ci    This function cannot be called when another asyncio event loop is
1687db96d56Sopenharmony_ci    running in the same thread.
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ci    If debug is True, the event loop will be run in debug mode.
1717db96d56Sopenharmony_ci
1727db96d56Sopenharmony_ci    This function always creates a new event loop and closes it at the end.
1737db96d56Sopenharmony_ci    It should be used as a main entry point for asyncio programs, and should
1747db96d56Sopenharmony_ci    ideally only be called once.
1757db96d56Sopenharmony_ci
1767db96d56Sopenharmony_ci    Example:
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci        async def main():
1797db96d56Sopenharmony_ci            await asyncio.sleep(1)
1807db96d56Sopenharmony_ci            print('hello')
1817db96d56Sopenharmony_ci
1827db96d56Sopenharmony_ci        asyncio.run(main())
1837db96d56Sopenharmony_ci    """
1847db96d56Sopenharmony_ci    if events._get_running_loop() is not None:
1857db96d56Sopenharmony_ci        # fail fast with short traceback
1867db96d56Sopenharmony_ci        raise RuntimeError(
1877db96d56Sopenharmony_ci            "asyncio.run() cannot be called from a running event loop")
1887db96d56Sopenharmony_ci
1897db96d56Sopenharmony_ci    with Runner(debug=debug) as runner:
1907db96d56Sopenharmony_ci        return runner.run(main)
1917db96d56Sopenharmony_ci
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_cidef _cancel_all_tasks(loop):
1947db96d56Sopenharmony_ci    to_cancel = tasks.all_tasks(loop)
1957db96d56Sopenharmony_ci    if not to_cancel:
1967db96d56Sopenharmony_ci        return
1977db96d56Sopenharmony_ci
1987db96d56Sopenharmony_ci    for task in to_cancel:
1997db96d56Sopenharmony_ci        task.cancel()
2007db96d56Sopenharmony_ci
2017db96d56Sopenharmony_ci    loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
2027db96d56Sopenharmony_ci
2037db96d56Sopenharmony_ci    for task in to_cancel:
2047db96d56Sopenharmony_ci        if task.cancelled():
2057db96d56Sopenharmony_ci            continue
2067db96d56Sopenharmony_ci        if task.exception() is not None:
2077db96d56Sopenharmony_ci            loop.call_exception_handler({
2087db96d56Sopenharmony_ci                'message': 'unhandled exception during asyncio.run() shutdown',
2097db96d56Sopenharmony_ci                'exception': task.exception(),
2107db96d56Sopenharmony_ci                'task': task,
2117db96d56Sopenharmony_ci            })
212