17db96d56Sopenharmony_ci__all__ = 'iscoroutinefunction', 'iscoroutine'
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport collections.abc
47db96d56Sopenharmony_ciimport inspect
57db96d56Sopenharmony_ciimport os
67db96d56Sopenharmony_ciimport sys
77db96d56Sopenharmony_ciimport traceback
87db96d56Sopenharmony_ciimport types
97db96d56Sopenharmony_ci
107db96d56Sopenharmony_ci
117db96d56Sopenharmony_cidef _is_debug_mode():
127db96d56Sopenharmony_ci    # See: https://docs.python.org/3/library/asyncio-dev.html#asyncio-debug-mode.
137db96d56Sopenharmony_ci    return sys.flags.dev_mode or (not sys.flags.ignore_environment and
147db96d56Sopenharmony_ci                                  bool(os.environ.get('PYTHONASYNCIODEBUG')))
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_ci# A marker for iscoroutinefunction.
187db96d56Sopenharmony_ci_is_coroutine = object()
197db96d56Sopenharmony_ci
207db96d56Sopenharmony_ci
217db96d56Sopenharmony_cidef iscoroutinefunction(func):
227db96d56Sopenharmony_ci    """Return True if func is a decorated coroutine function."""
237db96d56Sopenharmony_ci    return (inspect.iscoroutinefunction(func) or
247db96d56Sopenharmony_ci            getattr(func, '_is_coroutine', None) is _is_coroutine)
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_ci# Prioritize native coroutine check to speed-up
287db96d56Sopenharmony_ci# asyncio.iscoroutine.
297db96d56Sopenharmony_ci_COROUTINE_TYPES = (types.CoroutineType, types.GeneratorType,
307db96d56Sopenharmony_ci                    collections.abc.Coroutine)
317db96d56Sopenharmony_ci_iscoroutine_typecache = set()
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_cidef iscoroutine(obj):
357db96d56Sopenharmony_ci    """Return True if obj is a coroutine object."""
367db96d56Sopenharmony_ci    if type(obj) in _iscoroutine_typecache:
377db96d56Sopenharmony_ci        return True
387db96d56Sopenharmony_ci
397db96d56Sopenharmony_ci    if isinstance(obj, _COROUTINE_TYPES):
407db96d56Sopenharmony_ci        # Just in case we don't want to cache more than 100
417db96d56Sopenharmony_ci        # positive types.  That shouldn't ever happen, unless
427db96d56Sopenharmony_ci        # someone stressing the system on purpose.
437db96d56Sopenharmony_ci        if len(_iscoroutine_typecache) < 100:
447db96d56Sopenharmony_ci            _iscoroutine_typecache.add(type(obj))
457db96d56Sopenharmony_ci        return True
467db96d56Sopenharmony_ci    else:
477db96d56Sopenharmony_ci        return False
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ci
507db96d56Sopenharmony_cidef _format_coroutine(coro):
517db96d56Sopenharmony_ci    assert iscoroutine(coro)
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci    def get_name(coro):
547db96d56Sopenharmony_ci        # Coroutines compiled with Cython sometimes don't have
557db96d56Sopenharmony_ci        # proper __qualname__ or __name__.  While that is a bug
567db96d56Sopenharmony_ci        # in Cython, asyncio shouldn't crash with an AttributeError
577db96d56Sopenharmony_ci        # in its __repr__ functions.
587db96d56Sopenharmony_ci        if hasattr(coro, '__qualname__') and coro.__qualname__:
597db96d56Sopenharmony_ci            coro_name = coro.__qualname__
607db96d56Sopenharmony_ci        elif hasattr(coro, '__name__') and coro.__name__:
617db96d56Sopenharmony_ci            coro_name = coro.__name__
627db96d56Sopenharmony_ci        else:
637db96d56Sopenharmony_ci            # Stop masking Cython bugs, expose them in a friendly way.
647db96d56Sopenharmony_ci            coro_name = f'<{type(coro).__name__} without __name__>'
657db96d56Sopenharmony_ci        return f'{coro_name}()'
667db96d56Sopenharmony_ci
677db96d56Sopenharmony_ci    def is_running(coro):
687db96d56Sopenharmony_ci        try:
697db96d56Sopenharmony_ci            return coro.cr_running
707db96d56Sopenharmony_ci        except AttributeError:
717db96d56Sopenharmony_ci            try:
727db96d56Sopenharmony_ci                return coro.gi_running
737db96d56Sopenharmony_ci            except AttributeError:
747db96d56Sopenharmony_ci                return False
757db96d56Sopenharmony_ci
767db96d56Sopenharmony_ci    coro_code = None
777db96d56Sopenharmony_ci    if hasattr(coro, 'cr_code') and coro.cr_code:
787db96d56Sopenharmony_ci        coro_code = coro.cr_code
797db96d56Sopenharmony_ci    elif hasattr(coro, 'gi_code') and coro.gi_code:
807db96d56Sopenharmony_ci        coro_code = coro.gi_code
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci    coro_name = get_name(coro)
837db96d56Sopenharmony_ci
847db96d56Sopenharmony_ci    if not coro_code:
857db96d56Sopenharmony_ci        # Built-in types might not have __qualname__ or __name__.
867db96d56Sopenharmony_ci        if is_running(coro):
877db96d56Sopenharmony_ci            return f'{coro_name} running'
887db96d56Sopenharmony_ci        else:
897db96d56Sopenharmony_ci            return coro_name
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci    coro_frame = None
927db96d56Sopenharmony_ci    if hasattr(coro, 'gi_frame') and coro.gi_frame:
937db96d56Sopenharmony_ci        coro_frame = coro.gi_frame
947db96d56Sopenharmony_ci    elif hasattr(coro, 'cr_frame') and coro.cr_frame:
957db96d56Sopenharmony_ci        coro_frame = coro.cr_frame
967db96d56Sopenharmony_ci
977db96d56Sopenharmony_ci    # If Cython's coroutine has a fake code object without proper
987db96d56Sopenharmony_ci    # co_filename -- expose that.
997db96d56Sopenharmony_ci    filename = coro_code.co_filename or '<empty co_filename>'
1007db96d56Sopenharmony_ci
1017db96d56Sopenharmony_ci    lineno = 0
1027db96d56Sopenharmony_ci
1037db96d56Sopenharmony_ci    if coro_frame is not None:
1047db96d56Sopenharmony_ci        lineno = coro_frame.f_lineno
1057db96d56Sopenharmony_ci        coro_repr = f'{coro_name} running at {filename}:{lineno}'
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ci    else:
1087db96d56Sopenharmony_ci        lineno = coro_code.co_firstlineno
1097db96d56Sopenharmony_ci        coro_repr = f'{coro_name} done, defined at {filename}:{lineno}'
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci    return coro_repr
112