17db96d56Sopenharmony_ci__all__ = 'iscoroutinefunction', 'iscoroutine' 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport collections.abc 47db96d56Sopenharmony_ciimport inspect 57db96d56Sopenharmony_ciimport os 67db96d56Sopenharmony_ciimport sys 77db96d56Sopenharmony_ciimport traceback 87db96d56Sopenharmony_ciimport types 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ci 117db96d56Sopenharmony_cidef _is_debug_mode(): 127db96d56Sopenharmony_ci # See: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode. 137db96d56Sopenharmony_ci return sys.flags.dev_mode or (not sys.flags.ignore_environment and 147db96d56Sopenharmony_ci bool(os.environ.get('PYTHONASYNCIODEBUG'))) 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ci# A marker for iscoroutinefunction. 187db96d56Sopenharmony_ci_is_coroutine = object() 197db96d56Sopenharmony_ci 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_cidef iscoroutinefunction(func): 227db96d56Sopenharmony_ci """Return True if func is a decorated coroutine function.""" 237db96d56Sopenharmony_ci return (inspect.iscoroutinefunction(func) or 247db96d56Sopenharmony_ci getattr(func, '_is_coroutine', None) is _is_coroutine) 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_ci# Prioritize native coroutine check to speed-up 287db96d56Sopenharmony_ci# asyncio.iscoroutine. 297db96d56Sopenharmony_ci_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType, 307db96d56Sopenharmony_ci collections.abc.Coroutine) 317db96d56Sopenharmony_ci_iscoroutine_typecache = set() 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_cidef iscoroutine(obj): 357db96d56Sopenharmony_ci """Return True if obj is a coroutine object.""" 367db96d56Sopenharmony_ci if type(obj) in _iscoroutine_typecache: 377db96d56Sopenharmony_ci return True 387db96d56Sopenharmony_ci 397db96d56Sopenharmony_ci if isinstance(obj, _COROUTINE_TYPES): 407db96d56Sopenharmony_ci # Just in case we don't want to cache more than 100 417db96d56Sopenharmony_ci # positive types. That shouldn't ever happen, unless 427db96d56Sopenharmony_ci # someone stressing the system on purpose. 437db96d56Sopenharmony_ci if len(_iscoroutine_typecache) < 100: 447db96d56Sopenharmony_ci _iscoroutine_typecache.add(type(obj)) 457db96d56Sopenharmony_ci return True 467db96d56Sopenharmony_ci else: 477db96d56Sopenharmony_ci return False 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_cidef _format_coroutine(coro): 517db96d56Sopenharmony_ci assert iscoroutine(coro) 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci def get_name(coro): 547db96d56Sopenharmony_ci # Coroutines compiled with Cython sometimes don't have 557db96d56Sopenharmony_ci # proper __qualname__ or __name__. While that is a bug 567db96d56Sopenharmony_ci # in Cython, asyncio shouldn't crash with an AttributeError 577db96d56Sopenharmony_ci # in its __repr__ functions. 587db96d56Sopenharmony_ci if hasattr(coro, '__qualname__') and coro.__qualname__: 597db96d56Sopenharmony_ci coro_name = coro.__qualname__ 607db96d56Sopenharmony_ci elif hasattr(coro, '__name__') and coro.__name__: 617db96d56Sopenharmony_ci coro_name = coro.__name__ 627db96d56Sopenharmony_ci else: 637db96d56Sopenharmony_ci # Stop masking Cython bugs, expose them in a friendly way. 647db96d56Sopenharmony_ci coro_name = f'<{type(coro).__name__} without __name__>' 657db96d56Sopenharmony_ci return f'{coro_name}()' 667db96d56Sopenharmony_ci 677db96d56Sopenharmony_ci def is_running(coro): 687db96d56Sopenharmony_ci try: 697db96d56Sopenharmony_ci return coro.cr_running 707db96d56Sopenharmony_ci except AttributeError: 717db96d56Sopenharmony_ci try: 727db96d56Sopenharmony_ci return coro.gi_running 737db96d56Sopenharmony_ci except AttributeError: 747db96d56Sopenharmony_ci return False 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci coro_code = None 777db96d56Sopenharmony_ci if hasattr(coro, 'cr_code') and coro.cr_code: 787db96d56Sopenharmony_ci coro_code = coro.cr_code 797db96d56Sopenharmony_ci elif hasattr(coro, 'gi_code') and coro.gi_code: 807db96d56Sopenharmony_ci coro_code = coro.gi_code 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci coro_name = get_name(coro) 837db96d56Sopenharmony_ci 847db96d56Sopenharmony_ci if not coro_code: 857db96d56Sopenharmony_ci # Built-in types might not have __qualname__ or __name__. 867db96d56Sopenharmony_ci if is_running(coro): 877db96d56Sopenharmony_ci return f'{coro_name} running' 887db96d56Sopenharmony_ci else: 897db96d56Sopenharmony_ci return coro_name 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci coro_frame = None 927db96d56Sopenharmony_ci if hasattr(coro, 'gi_frame') and coro.gi_frame: 937db96d56Sopenharmony_ci coro_frame = coro.gi_frame 947db96d56Sopenharmony_ci elif hasattr(coro, 'cr_frame') and coro.cr_frame: 957db96d56Sopenharmony_ci coro_frame = coro.cr_frame 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci # If Cython's coroutine has a fake code object without proper 987db96d56Sopenharmony_ci # co_filename -- expose that. 997db96d56Sopenharmony_ci filename = coro_code.co_filename or '<empty co_filename>' 1007db96d56Sopenharmony_ci 1017db96d56Sopenharmony_ci lineno = 0 1027db96d56Sopenharmony_ci 1037db96d56Sopenharmony_ci if coro_frame is not None: 1047db96d56Sopenharmony_ci lineno = coro_frame.f_lineno 1057db96d56Sopenharmony_ci coro_repr = f'{coro_name} running at {filename}:{lineno}' 1067db96d56Sopenharmony_ci 1077db96d56Sopenharmony_ci else: 1087db96d56Sopenharmony_ci lineno = coro_code.co_firstlineno 1097db96d56Sopenharmony_ci coro_repr = f'{coro_name} done, defined at {filename}:{lineno}' 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci return coro_repr 112