17db96d56Sopenharmony_ci__all__ = ('Runner', 'run') 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport contextvars 47db96d56Sopenharmony_ciimport enum 57db96d56Sopenharmony_ciimport functools 67db96d56Sopenharmony_ciimport threading 77db96d56Sopenharmony_ciimport signal 87db96d56Sopenharmony_ciimport sys 97db96d56Sopenharmony_cifrom . import coroutines 107db96d56Sopenharmony_cifrom . import events 117db96d56Sopenharmony_cifrom . import exceptions 127db96d56Sopenharmony_cifrom . import tasks 137db96d56Sopenharmony_ci 147db96d56Sopenharmony_ci 157db96d56Sopenharmony_ciclass _State(enum.Enum): 167db96d56Sopenharmony_ci CREATED = "created" 177db96d56Sopenharmony_ci INITIALIZED = "initialized" 187db96d56Sopenharmony_ci CLOSED = "closed" 197db96d56Sopenharmony_ci 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_ciclass Runner: 227db96d56Sopenharmony_ci """A context manager that controls event loop life cycle. 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ci The context manager always creates a new event loop, 257db96d56Sopenharmony_ci allows to run async functions inside it, 267db96d56Sopenharmony_ci and properly finalizes the loop at the context manager exit. 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci If debug is True, the event loop will be run in debug mode. 297db96d56Sopenharmony_ci If loop_factory is passed, it is used for new event loop creation. 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci asyncio.run(main(), debug=True) 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ci is a shortcut for 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ci with asyncio.Runner(debug=True) as runner: 367db96d56Sopenharmony_ci runner.run(main()) 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci The run() method can be called multiple times within the runner's context. 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci This can be useful for interactive console (e.g. IPython), 417db96d56Sopenharmony_ci unittest runners, console tools, -- everywhere when async code 427db96d56Sopenharmony_ci is called from existing sync framework and where the preferred single 437db96d56Sopenharmony_ci asyncio.run() call doesn't work. 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci """ 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci # Note: the class is final, it is not intended for inheritance. 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci def __init__(self, *, debug=None, loop_factory=None): 507db96d56Sopenharmony_ci self._state = _State.CREATED 517db96d56Sopenharmony_ci self._debug = debug 527db96d56Sopenharmony_ci self._loop_factory = loop_factory 537db96d56Sopenharmony_ci self._loop = None 547db96d56Sopenharmony_ci self._context = None 557db96d56Sopenharmony_ci self._interrupt_count = 0 567db96d56Sopenharmony_ci self._set_event_loop = False 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci def __enter__(self): 597db96d56Sopenharmony_ci self._lazy_init() 607db96d56Sopenharmony_ci return self 617db96d56Sopenharmony_ci 627db96d56Sopenharmony_ci def __exit__(self, exc_type, exc_val, exc_tb): 637db96d56Sopenharmony_ci self.close() 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ci def close(self): 667db96d56Sopenharmony_ci """Shutdown and close event loop.""" 677db96d56Sopenharmony_ci if self._state is not _State.INITIALIZED: 687db96d56Sopenharmony_ci return 697db96d56Sopenharmony_ci try: 707db96d56Sopenharmony_ci loop = self._loop 717db96d56Sopenharmony_ci _cancel_all_tasks(loop) 727db96d56Sopenharmony_ci loop.run_until_complete(loop.shutdown_asyncgens()) 737db96d56Sopenharmony_ci loop.run_until_complete(loop.shutdown_default_executor()) 747db96d56Sopenharmony_ci finally: 757db96d56Sopenharmony_ci if self._set_event_loop: 767db96d56Sopenharmony_ci events.set_event_loop(None) 777db96d56Sopenharmony_ci loop.close() 787db96d56Sopenharmony_ci self._loop = None 797db96d56Sopenharmony_ci self._state = _State.CLOSED 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci def get_loop(self): 827db96d56Sopenharmony_ci """Return embedded event loop.""" 837db96d56Sopenharmony_ci self._lazy_init() 847db96d56Sopenharmony_ci return self._loop 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci def run(self, coro, *, context=None): 877db96d56Sopenharmony_ci """Run a coroutine inside the embedded event loop.""" 887db96d56Sopenharmony_ci if not coroutines.iscoroutine(coro): 897db96d56Sopenharmony_ci raise ValueError("a coroutine was expected, got {!r}".format(coro)) 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci if events._get_running_loop() is not None: 927db96d56Sopenharmony_ci # fail fast with short traceback 937db96d56Sopenharmony_ci raise RuntimeError( 947db96d56Sopenharmony_ci "Runner.run() cannot be called from a running event loop") 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci self._lazy_init() 977db96d56Sopenharmony_ci 987db96d56Sopenharmony_ci if context is None: 997db96d56Sopenharmony_ci context = self._context 1007db96d56Sopenharmony_ci task = self._loop.create_task(coro, context=context) 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci if (threading.current_thread() is threading.main_thread() 1037db96d56Sopenharmony_ci and signal.getsignal(signal.SIGINT) is signal.default_int_handler 1047db96d56Sopenharmony_ci ): 1057db96d56Sopenharmony_ci sigint_handler = functools.partial(self._on_sigint, main_task=task) 1067db96d56Sopenharmony_ci try: 1077db96d56Sopenharmony_ci signal.signal(signal.SIGINT, sigint_handler) 1087db96d56Sopenharmony_ci except ValueError: 1097db96d56Sopenharmony_ci # `signal.signal` may throw if `threading.main_thread` does 1107db96d56Sopenharmony_ci # not support signals (e.g. embedded interpreter with signals 1117db96d56Sopenharmony_ci # not registered - see gh-91880) 1127db96d56Sopenharmony_ci sigint_handler = None 1137db96d56Sopenharmony_ci else: 1147db96d56Sopenharmony_ci sigint_handler = None 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci self._interrupt_count = 0 1177db96d56Sopenharmony_ci try: 1187db96d56Sopenharmony_ci return self._loop.run_until_complete(task) 1197db96d56Sopenharmony_ci except exceptions.CancelledError: 1207db96d56Sopenharmony_ci if self._interrupt_count > 0: 1217db96d56Sopenharmony_ci uncancel = getattr(task, "uncancel", None) 1227db96d56Sopenharmony_ci if uncancel is not None and uncancel() == 0: 1237db96d56Sopenharmony_ci raise KeyboardInterrupt() 1247db96d56Sopenharmony_ci raise # CancelledError 1257db96d56Sopenharmony_ci finally: 1267db96d56Sopenharmony_ci if (sigint_handler is not None 1277db96d56Sopenharmony_ci and signal.getsignal(signal.SIGINT) is sigint_handler 1287db96d56Sopenharmony_ci ): 1297db96d56Sopenharmony_ci signal.signal(signal.SIGINT, signal.default_int_handler) 1307db96d56Sopenharmony_ci 1317db96d56Sopenharmony_ci def _lazy_init(self): 1327db96d56Sopenharmony_ci if self._state is _State.CLOSED: 1337db96d56Sopenharmony_ci raise RuntimeError("Runner is closed") 1347db96d56Sopenharmony_ci if self._state is _State.INITIALIZED: 1357db96d56Sopenharmony_ci return 1367db96d56Sopenharmony_ci if self._loop_factory is None: 1377db96d56Sopenharmony_ci self._loop = events.new_event_loop() 1387db96d56Sopenharmony_ci if not self._set_event_loop: 1397db96d56Sopenharmony_ci # Call set_event_loop only once to avoid calling 1407db96d56Sopenharmony_ci # attach_loop multiple times on child watchers 1417db96d56Sopenharmony_ci events.set_event_loop(self._loop) 1427db96d56Sopenharmony_ci self._set_event_loop = True 1437db96d56Sopenharmony_ci else: 1447db96d56Sopenharmony_ci self._loop = self._loop_factory() 1457db96d56Sopenharmony_ci if self._debug is not None: 1467db96d56Sopenharmony_ci self._loop.set_debug(self._debug) 1477db96d56Sopenharmony_ci self._context = contextvars.copy_context() 1487db96d56Sopenharmony_ci self._state = _State.INITIALIZED 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci def _on_sigint(self, signum, frame, main_task): 1517db96d56Sopenharmony_ci self._interrupt_count += 1 1527db96d56Sopenharmony_ci if self._interrupt_count == 1 and not main_task.done(): 1537db96d56Sopenharmony_ci main_task.cancel() 1547db96d56Sopenharmony_ci # wakeup loop if it is blocked by select() with long timeout 1557db96d56Sopenharmony_ci self._loop.call_soon_threadsafe(lambda: None) 1567db96d56Sopenharmony_ci return 1577db96d56Sopenharmony_ci raise KeyboardInterrupt() 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci 1607db96d56Sopenharmony_cidef run(main, *, debug=None): 1617db96d56Sopenharmony_ci """Execute the coroutine and return the result. 1627db96d56Sopenharmony_ci 1637db96d56Sopenharmony_ci This function runs the passed coroutine, taking care of 1647db96d56Sopenharmony_ci managing the asyncio event loop and finalizing asynchronous 1657db96d56Sopenharmony_ci generators. 1667db96d56Sopenharmony_ci 1677db96d56Sopenharmony_ci This function cannot be called when another asyncio event loop is 1687db96d56Sopenharmony_ci running in the same thread. 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci If debug is True, the event loop will be run in debug mode. 1717db96d56Sopenharmony_ci 1727db96d56Sopenharmony_ci This function always creates a new event loop and closes it at the end. 1737db96d56Sopenharmony_ci It should be used as a main entry point for asyncio programs, and should 1747db96d56Sopenharmony_ci ideally only be called once. 1757db96d56Sopenharmony_ci 1767db96d56Sopenharmony_ci Example: 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci async def main(): 1797db96d56Sopenharmony_ci await asyncio.sleep(1) 1807db96d56Sopenharmony_ci print('hello') 1817db96d56Sopenharmony_ci 1827db96d56Sopenharmony_ci asyncio.run(main()) 1837db96d56Sopenharmony_ci """ 1847db96d56Sopenharmony_ci if events._get_running_loop() is not None: 1857db96d56Sopenharmony_ci # fail fast with short traceback 1867db96d56Sopenharmony_ci raise RuntimeError( 1877db96d56Sopenharmony_ci "asyncio.run() cannot be called from a running event loop") 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_ci with Runner(debug=debug) as runner: 1907db96d56Sopenharmony_ci return runner.run(main) 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_cidef _cancel_all_tasks(loop): 1947db96d56Sopenharmony_ci to_cancel = tasks.all_tasks(loop) 1957db96d56Sopenharmony_ci if not to_cancel: 1967db96d56Sopenharmony_ci return 1977db96d56Sopenharmony_ci 1987db96d56Sopenharmony_ci for task in to_cancel: 1997db96d56Sopenharmony_ci task.cancel() 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_ci loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True)) 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_ci for task in to_cancel: 2047db96d56Sopenharmony_ci if task.cancelled(): 2057db96d56Sopenharmony_ci continue 2067db96d56Sopenharmony_ci if task.exception() is not None: 2077db96d56Sopenharmony_ci loop.call_exception_handler({ 2087db96d56Sopenharmony_ci 'message': 'unhandled exception during asyncio.run() shutdown', 2097db96d56Sopenharmony_ci 'exception': task.exception(), 2107db96d56Sopenharmony_ci 'task': task, 2117db96d56Sopenharmony_ci }) 212