17db96d56Sopenharmony_ci"""Tests for tasks.py.""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport collections 47db96d56Sopenharmony_ciimport contextlib 57db96d56Sopenharmony_ciimport contextvars 67db96d56Sopenharmony_ciimport functools 77db96d56Sopenharmony_ciimport gc 87db96d56Sopenharmony_ciimport io 97db96d56Sopenharmony_ciimport random 107db96d56Sopenharmony_ciimport re 117db96d56Sopenharmony_ciimport sys 127db96d56Sopenharmony_ciimport textwrap 137db96d56Sopenharmony_ciimport traceback 147db96d56Sopenharmony_ciimport unittest 157db96d56Sopenharmony_cifrom unittest import mock 167db96d56Sopenharmony_cifrom types import GenericAlias 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_ciimport asyncio 197db96d56Sopenharmony_cifrom asyncio import coroutines 207db96d56Sopenharmony_cifrom asyncio import futures 217db96d56Sopenharmony_cifrom asyncio import tasks 227db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils 237db96d56Sopenharmony_cifrom test import support 247db96d56Sopenharmony_cifrom test.support.script_helper import assert_python_ok 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci 277db96d56Sopenharmony_cidef tearDownModule(): 287db96d56Sopenharmony_ci asyncio.set_event_loop_policy(None) 297db96d56Sopenharmony_ci 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ciasync def coroutine_function(): 327db96d56Sopenharmony_ci pass 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_cidef format_coroutine(qualname, state, src, source_traceback, generator=False): 367db96d56Sopenharmony_ci if generator: 377db96d56Sopenharmony_ci state = '%s' % state 387db96d56Sopenharmony_ci else: 397db96d56Sopenharmony_ci state = '%s, defined' % state 407db96d56Sopenharmony_ci if source_traceback is not None: 417db96d56Sopenharmony_ci frame = source_traceback[-1] 427db96d56Sopenharmony_ci return ('coro=<%s() %s at %s> created at %s:%s' 437db96d56Sopenharmony_ci % (qualname, state, src, frame[0], frame[1])) 447db96d56Sopenharmony_ci else: 457db96d56Sopenharmony_ci return 'coro=<%s() %s at %s>' % (qualname, state, src) 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci 487db96d56Sopenharmony_cidef get_innermost_context(exc): 497db96d56Sopenharmony_ci """ 507db96d56Sopenharmony_ci Return information about the innermost exception context in the chain. 517db96d56Sopenharmony_ci """ 527db96d56Sopenharmony_ci depth = 0 537db96d56Sopenharmony_ci while True: 547db96d56Sopenharmony_ci context = exc.__context__ 557db96d56Sopenharmony_ci if context is None: 567db96d56Sopenharmony_ci break 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci exc = context 597db96d56Sopenharmony_ci depth += 1 607db96d56Sopenharmony_ci 617db96d56Sopenharmony_ci return (type(exc), exc.args, depth) 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ciclass Dummy: 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci def __repr__(self): 677db96d56Sopenharmony_ci return '<Dummy>' 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci def __call__(self, *args): 707db96d56Sopenharmony_ci pass 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ciclass CoroLikeObject: 747db96d56Sopenharmony_ci def send(self, v): 757db96d56Sopenharmony_ci raise StopIteration(42) 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def throw(self, *exc): 787db96d56Sopenharmony_ci pass 797db96d56Sopenharmony_ci 807db96d56Sopenharmony_ci def close(self): 817db96d56Sopenharmony_ci pass 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_ci def __await__(self): 847db96d56Sopenharmony_ci return self 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ciclass BaseTaskTests: 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci Task = None 907db96d56Sopenharmony_ci Future = None 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci def new_task(self, loop, coro, name='TestTask', context=None): 937db96d56Sopenharmony_ci return self.__class__.Task(coro, loop=loop, name=name, context=context) 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def new_future(self, loop): 967db96d56Sopenharmony_ci return self.__class__.Future(loop=loop) 977db96d56Sopenharmony_ci 987db96d56Sopenharmony_ci def setUp(self): 997db96d56Sopenharmony_ci super().setUp() 1007db96d56Sopenharmony_ci self.loop = self.new_test_loop() 1017db96d56Sopenharmony_ci self.loop.set_task_factory(self.new_task) 1027db96d56Sopenharmony_ci self.loop.create_future = lambda: self.new_future(self.loop) 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ci def test_generic_alias(self): 1057db96d56Sopenharmony_ci task = self.__class__.Task[str] 1067db96d56Sopenharmony_ci self.assertEqual(task.__args__, (str,)) 1077db96d56Sopenharmony_ci self.assertIsInstance(task, GenericAlias) 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci def test_task_cancel_message_getter(self): 1107db96d56Sopenharmony_ci async def coro(): 1117db96d56Sopenharmony_ci pass 1127db96d56Sopenharmony_ci t = self.new_task(self.loop, coro()) 1137db96d56Sopenharmony_ci self.assertTrue(hasattr(t, '_cancel_message')) 1147db96d56Sopenharmony_ci self.assertEqual(t._cancel_message, None) 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci t.cancel('my message') 1177db96d56Sopenharmony_ci self.assertEqual(t._cancel_message, 'my message') 1187db96d56Sopenharmony_ci 1197db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError) as cm: 1207db96d56Sopenharmony_ci self.loop.run_until_complete(t) 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci self.assertEqual('my message', cm.exception.args[0]) 1237db96d56Sopenharmony_ci 1247db96d56Sopenharmony_ci def test_task_cancel_message_setter(self): 1257db96d56Sopenharmony_ci async def coro(): 1267db96d56Sopenharmony_ci pass 1277db96d56Sopenharmony_ci t = self.new_task(self.loop, coro()) 1287db96d56Sopenharmony_ci t.cancel('my message') 1297db96d56Sopenharmony_ci t._cancel_message = 'my new message' 1307db96d56Sopenharmony_ci self.assertEqual(t._cancel_message, 'my new message') 1317db96d56Sopenharmony_ci 1327db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError) as cm: 1337db96d56Sopenharmony_ci self.loop.run_until_complete(t) 1347db96d56Sopenharmony_ci 1357db96d56Sopenharmony_ci self.assertEqual('my new message', cm.exception.args[0]) 1367db96d56Sopenharmony_ci 1377db96d56Sopenharmony_ci def test_task_del_collect(self): 1387db96d56Sopenharmony_ci class Evil: 1397db96d56Sopenharmony_ci def __del__(self): 1407db96d56Sopenharmony_ci gc.collect() 1417db96d56Sopenharmony_ci 1427db96d56Sopenharmony_ci async def run(): 1437db96d56Sopenharmony_ci return Evil() 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci self.loop.run_until_complete( 1467db96d56Sopenharmony_ci asyncio.gather(*[ 1477db96d56Sopenharmony_ci self.new_task(self.loop, run()) for _ in range(100) 1487db96d56Sopenharmony_ci ])) 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci def test_other_loop_future(self): 1517db96d56Sopenharmony_ci other_loop = asyncio.new_event_loop() 1527db96d56Sopenharmony_ci fut = self.new_future(other_loop) 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci async def run(fut): 1557db96d56Sopenharmony_ci await fut 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci try: 1587db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 1597db96d56Sopenharmony_ci r'Task .* got Future .* attached'): 1607db96d56Sopenharmony_ci self.loop.run_until_complete(run(fut)) 1617db96d56Sopenharmony_ci finally: 1627db96d56Sopenharmony_ci other_loop.close() 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci def test_task_awaits_on_itself(self): 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci async def test(): 1677db96d56Sopenharmony_ci await task 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci task = asyncio.ensure_future(test(), loop=self.loop) 1707db96d56Sopenharmony_ci 1717db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 1727db96d56Sopenharmony_ci 'Task cannot await on itself'): 1737db96d56Sopenharmony_ci self.loop.run_until_complete(task) 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ci def test_task_class(self): 1767db96d56Sopenharmony_ci async def notmuch(): 1777db96d56Sopenharmony_ci return 'ok' 1787db96d56Sopenharmony_ci t = self.new_task(self.loop, notmuch()) 1797db96d56Sopenharmony_ci self.loop.run_until_complete(t) 1807db96d56Sopenharmony_ci self.assertTrue(t.done()) 1817db96d56Sopenharmony_ci self.assertEqual(t.result(), 'ok') 1827db96d56Sopenharmony_ci self.assertIs(t._loop, self.loop) 1837db96d56Sopenharmony_ci self.assertIs(t.get_loop(), self.loop) 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 1867db96d56Sopenharmony_ci self.set_event_loop(loop) 1877db96d56Sopenharmony_ci t = self.new_task(loop, notmuch()) 1887db96d56Sopenharmony_ci self.assertIs(t._loop, loop) 1897db96d56Sopenharmony_ci loop.run_until_complete(t) 1907db96d56Sopenharmony_ci loop.close() 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci def test_ensure_future_coroutine(self): 1937db96d56Sopenharmony_ci async def notmuch(): 1947db96d56Sopenharmony_ci return 'ok' 1957db96d56Sopenharmony_ci t = asyncio.ensure_future(notmuch(), loop=self.loop) 1967db96d56Sopenharmony_ci self.assertIs(t._loop, self.loop) 1977db96d56Sopenharmony_ci self.loop.run_until_complete(t) 1987db96d56Sopenharmony_ci self.assertTrue(t.done()) 1997db96d56Sopenharmony_ci self.assertEqual(t.result(), 'ok') 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_ci a = notmuch() 2027db96d56Sopenharmony_ci self.addCleanup(a.close) 2037db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 2047db96d56Sopenharmony_ci asyncio.ensure_future(a) 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ci async def test(): 2077db96d56Sopenharmony_ci return asyncio.ensure_future(notmuch()) 2087db96d56Sopenharmony_ci t = self.loop.run_until_complete(test()) 2097db96d56Sopenharmony_ci self.assertIs(t._loop, self.loop) 2107db96d56Sopenharmony_ci self.loop.run_until_complete(t) 2117db96d56Sopenharmony_ci self.assertTrue(t.done()) 2127db96d56Sopenharmony_ci self.assertEqual(t.result(), 'ok') 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ci # Deprecated in 3.10, undeprecated in 3.11.1 2157db96d56Sopenharmony_ci asyncio.set_event_loop(self.loop) 2167db96d56Sopenharmony_ci self.addCleanup(asyncio.set_event_loop, None) 2177db96d56Sopenharmony_ci t = asyncio.ensure_future(notmuch()) 2187db96d56Sopenharmony_ci self.assertIs(t._loop, self.loop) 2197db96d56Sopenharmony_ci self.loop.run_until_complete(t) 2207db96d56Sopenharmony_ci self.assertTrue(t.done()) 2217db96d56Sopenharmony_ci self.assertEqual(t.result(), 'ok') 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci def test_ensure_future_future(self): 2247db96d56Sopenharmony_ci f_orig = self.new_future(self.loop) 2257db96d56Sopenharmony_ci f_orig.set_result('ko') 2267db96d56Sopenharmony_ci 2277db96d56Sopenharmony_ci f = asyncio.ensure_future(f_orig) 2287db96d56Sopenharmony_ci self.loop.run_until_complete(f) 2297db96d56Sopenharmony_ci self.assertTrue(f.done()) 2307db96d56Sopenharmony_ci self.assertEqual(f.result(), 'ko') 2317db96d56Sopenharmony_ci self.assertIs(f, f_orig) 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 2347db96d56Sopenharmony_ci self.set_event_loop(loop) 2357db96d56Sopenharmony_ci 2367db96d56Sopenharmony_ci with self.assertRaises(ValueError): 2377db96d56Sopenharmony_ci f = asyncio.ensure_future(f_orig, loop=loop) 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci loop.close() 2407db96d56Sopenharmony_ci 2417db96d56Sopenharmony_ci f = asyncio.ensure_future(f_orig, loop=self.loop) 2427db96d56Sopenharmony_ci self.assertIs(f, f_orig) 2437db96d56Sopenharmony_ci 2447db96d56Sopenharmony_ci def test_ensure_future_task(self): 2457db96d56Sopenharmony_ci async def notmuch(): 2467db96d56Sopenharmony_ci return 'ok' 2477db96d56Sopenharmony_ci t_orig = self.new_task(self.loop, notmuch()) 2487db96d56Sopenharmony_ci t = asyncio.ensure_future(t_orig) 2497db96d56Sopenharmony_ci self.loop.run_until_complete(t) 2507db96d56Sopenharmony_ci self.assertTrue(t.done()) 2517db96d56Sopenharmony_ci self.assertEqual(t.result(), 'ok') 2527db96d56Sopenharmony_ci self.assertIs(t, t_orig) 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 2557db96d56Sopenharmony_ci self.set_event_loop(loop) 2567db96d56Sopenharmony_ci 2577db96d56Sopenharmony_ci with self.assertRaises(ValueError): 2587db96d56Sopenharmony_ci t = asyncio.ensure_future(t_orig, loop=loop) 2597db96d56Sopenharmony_ci 2607db96d56Sopenharmony_ci loop.close() 2617db96d56Sopenharmony_ci 2627db96d56Sopenharmony_ci t = asyncio.ensure_future(t_orig, loop=self.loop) 2637db96d56Sopenharmony_ci self.assertIs(t, t_orig) 2647db96d56Sopenharmony_ci 2657db96d56Sopenharmony_ci def test_ensure_future_awaitable(self): 2667db96d56Sopenharmony_ci class Aw: 2677db96d56Sopenharmony_ci def __init__(self, coro): 2687db96d56Sopenharmony_ci self.coro = coro 2697db96d56Sopenharmony_ci def __await__(self): 2707db96d56Sopenharmony_ci return self.coro.__await__() 2717db96d56Sopenharmony_ci 2727db96d56Sopenharmony_ci async def coro(): 2737db96d56Sopenharmony_ci return 'ok' 2747db96d56Sopenharmony_ci 2757db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 2767db96d56Sopenharmony_ci self.set_event_loop(loop) 2777db96d56Sopenharmony_ci fut = asyncio.ensure_future(Aw(coro()), loop=loop) 2787db96d56Sopenharmony_ci loop.run_until_complete(fut) 2797db96d56Sopenharmony_ci self.assertEqual(fut.result(), 'ok') 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci def test_ensure_future_neither(self): 2827db96d56Sopenharmony_ci with self.assertRaises(TypeError): 2837db96d56Sopenharmony_ci asyncio.ensure_future('ok') 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci def test_ensure_future_error_msg(self): 2867db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 2877db96d56Sopenharmony_ci f = self.new_future(self.loop) 2887db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 2897db96d56Sopenharmony_ci 'different loop than the one specified as ' 2907db96d56Sopenharmony_ci 'the loop argument'): 2917db96d56Sopenharmony_ci asyncio.ensure_future(f, loop=loop) 2927db96d56Sopenharmony_ci loop.close() 2937db96d56Sopenharmony_ci 2947db96d56Sopenharmony_ci def test_get_stack(self): 2957db96d56Sopenharmony_ci T = None 2967db96d56Sopenharmony_ci 2977db96d56Sopenharmony_ci async def foo(): 2987db96d56Sopenharmony_ci await bar() 2997db96d56Sopenharmony_ci 3007db96d56Sopenharmony_ci async def bar(): 3017db96d56Sopenharmony_ci # test get_stack() 3027db96d56Sopenharmony_ci f = T.get_stack(limit=1) 3037db96d56Sopenharmony_ci try: 3047db96d56Sopenharmony_ci self.assertEqual(f[0].f_code.co_name, 'foo') 3057db96d56Sopenharmony_ci finally: 3067db96d56Sopenharmony_ci f = None 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci # test print_stack() 3097db96d56Sopenharmony_ci file = io.StringIO() 3107db96d56Sopenharmony_ci T.print_stack(limit=1, file=file) 3117db96d56Sopenharmony_ci file.seek(0) 3127db96d56Sopenharmony_ci tb = file.read() 3137db96d56Sopenharmony_ci self.assertRegex(tb, r'foo\(\) running') 3147db96d56Sopenharmony_ci 3157db96d56Sopenharmony_ci async def runner(): 3167db96d56Sopenharmony_ci nonlocal T 3177db96d56Sopenharmony_ci T = asyncio.ensure_future(foo(), loop=self.loop) 3187db96d56Sopenharmony_ci await T 3197db96d56Sopenharmony_ci 3207db96d56Sopenharmony_ci self.loop.run_until_complete(runner()) 3217db96d56Sopenharmony_ci 3227db96d56Sopenharmony_ci def test_task_repr(self): 3237db96d56Sopenharmony_ci self.loop.set_debug(False) 3247db96d56Sopenharmony_ci 3257db96d56Sopenharmony_ci async def notmuch(): 3267db96d56Sopenharmony_ci return 'abc' 3277db96d56Sopenharmony_ci 3287db96d56Sopenharmony_ci # test coroutine function 3297db96d56Sopenharmony_ci self.assertEqual(notmuch.__name__, 'notmuch') 3307db96d56Sopenharmony_ci self.assertRegex(notmuch.__qualname__, 3317db96d56Sopenharmony_ci r'\w+.test_task_repr.<locals>.notmuch') 3327db96d56Sopenharmony_ci self.assertEqual(notmuch.__module__, __name__) 3337db96d56Sopenharmony_ci 3347db96d56Sopenharmony_ci filename, lineno = test_utils.get_function_source(notmuch) 3357db96d56Sopenharmony_ci src = "%s:%s" % (filename, lineno) 3367db96d56Sopenharmony_ci 3377db96d56Sopenharmony_ci # test coroutine object 3387db96d56Sopenharmony_ci gen = notmuch() 3397db96d56Sopenharmony_ci coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 3407db96d56Sopenharmony_ci self.assertEqual(gen.__name__, 'notmuch') 3417db96d56Sopenharmony_ci self.assertEqual(gen.__qualname__, coro_qualname) 3427db96d56Sopenharmony_ci 3437db96d56Sopenharmony_ci # test pending Task 3447db96d56Sopenharmony_ci t = self.new_task(self.loop, gen) 3457db96d56Sopenharmony_ci t.add_done_callback(Dummy()) 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci coro = format_coroutine(coro_qualname, 'running', src, 3487db96d56Sopenharmony_ci t._source_traceback, generator=True) 3497db96d56Sopenharmony_ci self.assertEqual(repr(t), 3507db96d56Sopenharmony_ci "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 3517db96d56Sopenharmony_ci 3527db96d56Sopenharmony_ci # test cancelling Task 3537db96d56Sopenharmony_ci t.cancel() # Does not take immediate effect! 3547db96d56Sopenharmony_ci self.assertEqual(repr(t), 3557db96d56Sopenharmony_ci "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 3567db96d56Sopenharmony_ci 3577db96d56Sopenharmony_ci # test cancelled Task 3587db96d56Sopenharmony_ci self.assertRaises(asyncio.CancelledError, 3597db96d56Sopenharmony_ci self.loop.run_until_complete, t) 3607db96d56Sopenharmony_ci coro = format_coroutine(coro_qualname, 'done', src, 3617db96d56Sopenharmony_ci t._source_traceback) 3627db96d56Sopenharmony_ci self.assertEqual(repr(t), 3637db96d56Sopenharmony_ci "<Task cancelled name='TestTask' %s>" % coro) 3647db96d56Sopenharmony_ci 3657db96d56Sopenharmony_ci # test finished Task 3667db96d56Sopenharmony_ci t = self.new_task(self.loop, notmuch()) 3677db96d56Sopenharmony_ci self.loop.run_until_complete(t) 3687db96d56Sopenharmony_ci coro = format_coroutine(coro_qualname, 'done', src, 3697db96d56Sopenharmony_ci t._source_traceback) 3707db96d56Sopenharmony_ci self.assertEqual(repr(t), 3717db96d56Sopenharmony_ci "<Task finished name='TestTask' %s result='abc'>" % coro) 3727db96d56Sopenharmony_ci 3737db96d56Sopenharmony_ci def test_task_repr_autogenerated(self): 3747db96d56Sopenharmony_ci async def notmuch(): 3757db96d56Sopenharmony_ci return 123 3767db96d56Sopenharmony_ci 3777db96d56Sopenharmony_ci t1 = self.new_task(self.loop, notmuch(), None) 3787db96d56Sopenharmony_ci t2 = self.new_task(self.loop, notmuch(), None) 3797db96d56Sopenharmony_ci self.assertNotEqual(repr(t1), repr(t2)) 3807db96d56Sopenharmony_ci 3817db96d56Sopenharmony_ci match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 3827db96d56Sopenharmony_ci self.assertIsNotNone(match1) 3837db96d56Sopenharmony_ci match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 3847db96d56Sopenharmony_ci self.assertIsNotNone(match2) 3857db96d56Sopenharmony_ci 3867db96d56Sopenharmony_ci # Autogenerated task names should have monotonically increasing numbers 3877db96d56Sopenharmony_ci self.assertLess(int(match1.group(1)), int(match2.group(1))) 3887db96d56Sopenharmony_ci self.loop.run_until_complete(t1) 3897db96d56Sopenharmony_ci self.loop.run_until_complete(t2) 3907db96d56Sopenharmony_ci 3917db96d56Sopenharmony_ci def test_task_repr_name_not_str(self): 3927db96d56Sopenharmony_ci async def notmuch(): 3937db96d56Sopenharmony_ci return 123 3947db96d56Sopenharmony_ci 3957db96d56Sopenharmony_ci t = self.new_task(self.loop, notmuch()) 3967db96d56Sopenharmony_ci t.set_name({6}) 3977db96d56Sopenharmony_ci self.assertEqual(t.get_name(), '{6}') 3987db96d56Sopenharmony_ci self.loop.run_until_complete(t) 3997db96d56Sopenharmony_ci 4007db96d56Sopenharmony_ci def test_task_repr_wait_for(self): 4017db96d56Sopenharmony_ci self.loop.set_debug(False) 4027db96d56Sopenharmony_ci 4037db96d56Sopenharmony_ci async def wait_for(fut): 4047db96d56Sopenharmony_ci return await fut 4057db96d56Sopenharmony_ci 4067db96d56Sopenharmony_ci fut = self.new_future(self.loop) 4077db96d56Sopenharmony_ci task = self.new_task(self.loop, wait_for(fut)) 4087db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 4097db96d56Sopenharmony_ci self.assertRegex(repr(task), 4107db96d56Sopenharmony_ci '<Task .* wait_for=%s>' % re.escape(repr(fut))) 4117db96d56Sopenharmony_ci 4127db96d56Sopenharmony_ci fut.set_result(None) 4137db96d56Sopenharmony_ci self.loop.run_until_complete(task) 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_ci def test_task_basics(self): 4167db96d56Sopenharmony_ci 4177db96d56Sopenharmony_ci async def outer(): 4187db96d56Sopenharmony_ci a = await inner1() 4197db96d56Sopenharmony_ci b = await inner2() 4207db96d56Sopenharmony_ci return a+b 4217db96d56Sopenharmony_ci 4227db96d56Sopenharmony_ci async def inner1(): 4237db96d56Sopenharmony_ci return 42 4247db96d56Sopenharmony_ci 4257db96d56Sopenharmony_ci async def inner2(): 4267db96d56Sopenharmony_ci return 1000 4277db96d56Sopenharmony_ci 4287db96d56Sopenharmony_ci t = outer() 4297db96d56Sopenharmony_ci self.assertEqual(self.loop.run_until_complete(t), 1042) 4307db96d56Sopenharmony_ci 4317db96d56Sopenharmony_ci def test_exception_chaining_after_await(self): 4327db96d56Sopenharmony_ci # Test that when awaiting on a task when an exception is already 4337db96d56Sopenharmony_ci # active, if the task raises an exception it will be chained 4347db96d56Sopenharmony_ci # with the original. 4357db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 4367db96d56Sopenharmony_ci self.set_event_loop(loop) 4377db96d56Sopenharmony_ci 4387db96d56Sopenharmony_ci async def raise_error(): 4397db96d56Sopenharmony_ci raise ValueError 4407db96d56Sopenharmony_ci 4417db96d56Sopenharmony_ci async def run(): 4427db96d56Sopenharmony_ci try: 4437db96d56Sopenharmony_ci raise KeyError(3) 4447db96d56Sopenharmony_ci except Exception as exc: 4457db96d56Sopenharmony_ci task = self.new_task(loop, raise_error()) 4467db96d56Sopenharmony_ci try: 4477db96d56Sopenharmony_ci await task 4487db96d56Sopenharmony_ci except Exception as exc: 4497db96d56Sopenharmony_ci self.assertEqual(type(exc), ValueError) 4507db96d56Sopenharmony_ci chained = exc.__context__ 4517db96d56Sopenharmony_ci self.assertEqual((type(chained), chained.args), 4527db96d56Sopenharmony_ci (KeyError, (3,))) 4537db96d56Sopenharmony_ci 4547db96d56Sopenharmony_ci try: 4557db96d56Sopenharmony_ci task = self.new_task(loop, run()) 4567db96d56Sopenharmony_ci loop.run_until_complete(task) 4577db96d56Sopenharmony_ci finally: 4587db96d56Sopenharmony_ci loop.close() 4597db96d56Sopenharmony_ci 4607db96d56Sopenharmony_ci def test_exception_chaining_after_await_with_context_cycle(self): 4617db96d56Sopenharmony_ci # Check trying to create an exception context cycle: 4627db96d56Sopenharmony_ci # https://bugs.python.org/issue40696 4637db96d56Sopenharmony_ci has_cycle = None 4647db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 4657db96d56Sopenharmony_ci self.set_event_loop(loop) 4667db96d56Sopenharmony_ci 4677db96d56Sopenharmony_ci async def process_exc(exc): 4687db96d56Sopenharmony_ci raise exc 4697db96d56Sopenharmony_ci 4707db96d56Sopenharmony_ci async def run(): 4717db96d56Sopenharmony_ci nonlocal has_cycle 4727db96d56Sopenharmony_ci try: 4737db96d56Sopenharmony_ci raise KeyError('a') 4747db96d56Sopenharmony_ci except Exception as exc: 4757db96d56Sopenharmony_ci task = self.new_task(loop, process_exc(exc)) 4767db96d56Sopenharmony_ci try: 4777db96d56Sopenharmony_ci await task 4787db96d56Sopenharmony_ci except BaseException as exc: 4797db96d56Sopenharmony_ci has_cycle = (exc is exc.__context__) 4807db96d56Sopenharmony_ci # Prevent a hang if has_cycle is True. 4817db96d56Sopenharmony_ci exc.__context__ = None 4827db96d56Sopenharmony_ci 4837db96d56Sopenharmony_ci try: 4847db96d56Sopenharmony_ci task = self.new_task(loop, run()) 4857db96d56Sopenharmony_ci loop.run_until_complete(task) 4867db96d56Sopenharmony_ci finally: 4877db96d56Sopenharmony_ci loop.close() 4887db96d56Sopenharmony_ci # This also distinguishes from the initial has_cycle=None. 4897db96d56Sopenharmony_ci self.assertEqual(has_cycle, False) 4907db96d56Sopenharmony_ci 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_ci def test_cancelling(self): 4937db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 4947db96d56Sopenharmony_ci 4957db96d56Sopenharmony_ci async def task(): 4967db96d56Sopenharmony_ci await asyncio.sleep(10) 4977db96d56Sopenharmony_ci 4987db96d56Sopenharmony_ci try: 4997db96d56Sopenharmony_ci t = self.new_task(loop, task()) 5007db96d56Sopenharmony_ci self.assertFalse(t.cancelling()) 5017db96d56Sopenharmony_ci self.assertNotIn(" cancelling ", repr(t)) 5027db96d56Sopenharmony_ci self.assertTrue(t.cancel()) 5037db96d56Sopenharmony_ci self.assertTrue(t.cancelling()) 5047db96d56Sopenharmony_ci self.assertIn(" cancelling ", repr(t)) 5057db96d56Sopenharmony_ci 5067db96d56Sopenharmony_ci # Since we commented out two lines from Task.cancel(), 5077db96d56Sopenharmony_ci # this t.cancel() call now returns True. 5087db96d56Sopenharmony_ci # self.assertFalse(t.cancel()) 5097db96d56Sopenharmony_ci self.assertTrue(t.cancel()) 5107db96d56Sopenharmony_ci 5117db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 5127db96d56Sopenharmony_ci loop.run_until_complete(t) 5137db96d56Sopenharmony_ci finally: 5147db96d56Sopenharmony_ci loop.close() 5157db96d56Sopenharmony_ci 5167db96d56Sopenharmony_ci def test_uncancel_basic(self): 5177db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 5187db96d56Sopenharmony_ci 5197db96d56Sopenharmony_ci async def task(): 5207db96d56Sopenharmony_ci try: 5217db96d56Sopenharmony_ci await asyncio.sleep(10) 5227db96d56Sopenharmony_ci except asyncio.CancelledError: 5237db96d56Sopenharmony_ci asyncio.current_task().uncancel() 5247db96d56Sopenharmony_ci await asyncio.sleep(10) 5257db96d56Sopenharmony_ci 5267db96d56Sopenharmony_ci try: 5277db96d56Sopenharmony_ci t = self.new_task(loop, task()) 5287db96d56Sopenharmony_ci loop.run_until_complete(asyncio.sleep(0.01)) 5297db96d56Sopenharmony_ci 5307db96d56Sopenharmony_ci # Cancel first sleep 5317db96d56Sopenharmony_ci self.assertTrue(t.cancel()) 5327db96d56Sopenharmony_ci self.assertIn(" cancelling ", repr(t)) 5337db96d56Sopenharmony_ci self.assertEqual(t.cancelling(), 1) 5347db96d56Sopenharmony_ci self.assertFalse(t.cancelled()) # Task is still not complete 5357db96d56Sopenharmony_ci loop.run_until_complete(asyncio.sleep(0.01)) 5367db96d56Sopenharmony_ci 5377db96d56Sopenharmony_ci # after .uncancel() 5387db96d56Sopenharmony_ci self.assertNotIn(" cancelling ", repr(t)) 5397db96d56Sopenharmony_ci self.assertEqual(t.cancelling(), 0) 5407db96d56Sopenharmony_ci self.assertFalse(t.cancelled()) # Task is still not complete 5417db96d56Sopenharmony_ci 5427db96d56Sopenharmony_ci # Cancel second sleep 5437db96d56Sopenharmony_ci self.assertTrue(t.cancel()) 5447db96d56Sopenharmony_ci self.assertEqual(t.cancelling(), 1) 5457db96d56Sopenharmony_ci self.assertFalse(t.cancelled()) # Task is still not complete 5467db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 5477db96d56Sopenharmony_ci loop.run_until_complete(t) 5487db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) # Finally, task complete 5497db96d56Sopenharmony_ci self.assertTrue(t.done()) 5507db96d56Sopenharmony_ci 5517db96d56Sopenharmony_ci # uncancel is no longer effective after the task is complete 5527db96d56Sopenharmony_ci t.uncancel() 5537db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) 5547db96d56Sopenharmony_ci self.assertTrue(t.done()) 5557db96d56Sopenharmony_ci finally: 5567db96d56Sopenharmony_ci loop.close() 5577db96d56Sopenharmony_ci 5587db96d56Sopenharmony_ci def test_uncancel_structured_blocks(self): 5597db96d56Sopenharmony_ci # This test recreates the following high-level structure using uncancel():: 5607db96d56Sopenharmony_ci # 5617db96d56Sopenharmony_ci # async def make_request_with_timeout(): 5627db96d56Sopenharmony_ci # try: 5637db96d56Sopenharmony_ci # async with asyncio.timeout(1): 5647db96d56Sopenharmony_ci # # Structured block affected by the timeout: 5657db96d56Sopenharmony_ci # await make_request() 5667db96d56Sopenharmony_ci # await make_another_request() 5677db96d56Sopenharmony_ci # except TimeoutError: 5687db96d56Sopenharmony_ci # pass # There was a timeout 5697db96d56Sopenharmony_ci # # Outer code not affected by the timeout: 5707db96d56Sopenharmony_ci # await unrelated_code() 5717db96d56Sopenharmony_ci 5727db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 5737db96d56Sopenharmony_ci 5747db96d56Sopenharmony_ci async def make_request_with_timeout(*, sleep: float, timeout: float): 5757db96d56Sopenharmony_ci task = asyncio.current_task() 5767db96d56Sopenharmony_ci loop = task.get_loop() 5777db96d56Sopenharmony_ci 5787db96d56Sopenharmony_ci timed_out = False 5797db96d56Sopenharmony_ci structured_block_finished = False 5807db96d56Sopenharmony_ci outer_code_reached = False 5817db96d56Sopenharmony_ci 5827db96d56Sopenharmony_ci def on_timeout(): 5837db96d56Sopenharmony_ci nonlocal timed_out 5847db96d56Sopenharmony_ci timed_out = True 5857db96d56Sopenharmony_ci task.cancel() 5867db96d56Sopenharmony_ci 5877db96d56Sopenharmony_ci timeout_handle = loop.call_later(timeout, on_timeout) 5887db96d56Sopenharmony_ci try: 5897db96d56Sopenharmony_ci try: 5907db96d56Sopenharmony_ci # Structured block affected by the timeout 5917db96d56Sopenharmony_ci await asyncio.sleep(sleep) 5927db96d56Sopenharmony_ci structured_block_finished = True 5937db96d56Sopenharmony_ci finally: 5947db96d56Sopenharmony_ci timeout_handle.cancel() 5957db96d56Sopenharmony_ci if ( 5967db96d56Sopenharmony_ci timed_out 5977db96d56Sopenharmony_ci and task.uncancel() == 0 5987db96d56Sopenharmony_ci and sys.exc_info()[0] is asyncio.CancelledError 5997db96d56Sopenharmony_ci ): 6007db96d56Sopenharmony_ci # Note the five rules that are needed here to satisfy proper 6017db96d56Sopenharmony_ci # uncancellation: 6027db96d56Sopenharmony_ci # 6037db96d56Sopenharmony_ci # 1. handle uncancellation in a `finally:` block to allow for 6047db96d56Sopenharmony_ci # plain returns; 6057db96d56Sopenharmony_ci # 2. our `timed_out` flag is set, meaning that it was our event 6067db96d56Sopenharmony_ci # that triggered the need to uncancel the task, regardless of 6077db96d56Sopenharmony_ci # what exception is raised; 6087db96d56Sopenharmony_ci # 3. we can call `uncancel()` because *we* called `cancel()` 6097db96d56Sopenharmony_ci # before; 6107db96d56Sopenharmony_ci # 4. we call `uncancel()` but we only continue converting the 6117db96d56Sopenharmony_ci # CancelledError to TimeoutError if `uncancel()` caused the 6127db96d56Sopenharmony_ci # cancellation request count go down to 0. We need to look 6137db96d56Sopenharmony_ci # at the counter vs having a simple boolean flag because our 6147db96d56Sopenharmony_ci # code might have been nested (think multiple timeouts). See 6157db96d56Sopenharmony_ci # commit 7fce1063b6e5a366f8504e039a8ccdd6944625cd for 6167db96d56Sopenharmony_ci # details. 6177db96d56Sopenharmony_ci # 5. we only convert CancelledError to TimeoutError; for other 6187db96d56Sopenharmony_ci # exceptions raised due to the cancellation (like 6197db96d56Sopenharmony_ci # a ConnectionLostError from a database client), simply 6207db96d56Sopenharmony_ci # propagate them. 6217db96d56Sopenharmony_ci # 6227db96d56Sopenharmony_ci # Those checks need to take place in this exact order to make 6237db96d56Sopenharmony_ci # sure the `cancelling()` counter always stays in sync. 6247db96d56Sopenharmony_ci # 6257db96d56Sopenharmony_ci # Additionally, the original stimulus to `cancel()` the task 6267db96d56Sopenharmony_ci # needs to be unscheduled to avoid re-cancelling the task later. 6277db96d56Sopenharmony_ci # Here we do it by cancelling `timeout_handle` in the `finally:` 6287db96d56Sopenharmony_ci # block. 6297db96d56Sopenharmony_ci raise TimeoutError 6307db96d56Sopenharmony_ci except TimeoutError: 6317db96d56Sopenharmony_ci self.assertTrue(timed_out) 6327db96d56Sopenharmony_ci 6337db96d56Sopenharmony_ci # Outer code not affected by the timeout: 6347db96d56Sopenharmony_ci outer_code_reached = True 6357db96d56Sopenharmony_ci await asyncio.sleep(0) 6367db96d56Sopenharmony_ci return timed_out, structured_block_finished, outer_code_reached 6377db96d56Sopenharmony_ci 6387db96d56Sopenharmony_ci try: 6397db96d56Sopenharmony_ci # Test which timed out. 6407db96d56Sopenharmony_ci t1 = self.new_task(loop, make_request_with_timeout(sleep=10.0, timeout=0.1)) 6417db96d56Sopenharmony_ci timed_out, structured_block_finished, outer_code_reached = ( 6427db96d56Sopenharmony_ci loop.run_until_complete(t1) 6437db96d56Sopenharmony_ci ) 6447db96d56Sopenharmony_ci self.assertTrue(timed_out) 6457db96d56Sopenharmony_ci self.assertFalse(structured_block_finished) # it was cancelled 6467db96d56Sopenharmony_ci self.assertTrue(outer_code_reached) # task got uncancelled after leaving 6477db96d56Sopenharmony_ci # the structured block and continued until 6487db96d56Sopenharmony_ci # completion 6497db96d56Sopenharmony_ci self.assertEqual(t1.cancelling(), 0) # no pending cancellation of the outer task 6507db96d56Sopenharmony_ci 6517db96d56Sopenharmony_ci # Test which did not time out. 6527db96d56Sopenharmony_ci t2 = self.new_task(loop, make_request_with_timeout(sleep=0, timeout=10.0)) 6537db96d56Sopenharmony_ci timed_out, structured_block_finished, outer_code_reached = ( 6547db96d56Sopenharmony_ci loop.run_until_complete(t2) 6557db96d56Sopenharmony_ci ) 6567db96d56Sopenharmony_ci self.assertFalse(timed_out) 6577db96d56Sopenharmony_ci self.assertTrue(structured_block_finished) 6587db96d56Sopenharmony_ci self.assertTrue(outer_code_reached) 6597db96d56Sopenharmony_ci self.assertEqual(t2.cancelling(), 0) 6607db96d56Sopenharmony_ci finally: 6617db96d56Sopenharmony_ci loop.close() 6627db96d56Sopenharmony_ci 6637db96d56Sopenharmony_ci def test_cancel(self): 6647db96d56Sopenharmony_ci 6657db96d56Sopenharmony_ci def gen(): 6667db96d56Sopenharmony_ci when = yield 6677db96d56Sopenharmony_ci self.assertAlmostEqual(10.0, when) 6687db96d56Sopenharmony_ci yield 0 6697db96d56Sopenharmony_ci 6707db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 6717db96d56Sopenharmony_ci 6727db96d56Sopenharmony_ci async def task(): 6737db96d56Sopenharmony_ci await asyncio.sleep(10.0) 6747db96d56Sopenharmony_ci return 12 6757db96d56Sopenharmony_ci 6767db96d56Sopenharmony_ci t = self.new_task(loop, task()) 6777db96d56Sopenharmony_ci loop.call_soon(t.cancel) 6787db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 6797db96d56Sopenharmony_ci loop.run_until_complete(t) 6807db96d56Sopenharmony_ci self.assertTrue(t.done()) 6817db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) 6827db96d56Sopenharmony_ci self.assertFalse(t.cancel()) 6837db96d56Sopenharmony_ci 6847db96d56Sopenharmony_ci def test_cancel_with_message_then_future_result(self): 6857db96d56Sopenharmony_ci # Test Future.result() after calling cancel() with a message. 6867db96d56Sopenharmony_ci cases = [ 6877db96d56Sopenharmony_ci ((), ()), 6887db96d56Sopenharmony_ci ((None,), ()), 6897db96d56Sopenharmony_ci (('my message',), ('my message',)), 6907db96d56Sopenharmony_ci # Non-string values should roundtrip. 6917db96d56Sopenharmony_ci ((5,), (5,)), 6927db96d56Sopenharmony_ci ] 6937db96d56Sopenharmony_ci for cancel_args, expected_args in cases: 6947db96d56Sopenharmony_ci with self.subTest(cancel_args=cancel_args): 6957db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 6967db96d56Sopenharmony_ci self.set_event_loop(loop) 6977db96d56Sopenharmony_ci 6987db96d56Sopenharmony_ci async def sleep(): 6997db96d56Sopenharmony_ci await asyncio.sleep(10) 7007db96d56Sopenharmony_ci 7017db96d56Sopenharmony_ci async def coro(): 7027db96d56Sopenharmony_ci task = self.new_task(loop, sleep()) 7037db96d56Sopenharmony_ci await asyncio.sleep(0) 7047db96d56Sopenharmony_ci task.cancel(*cancel_args) 7057db96d56Sopenharmony_ci done, pending = await asyncio.wait([task]) 7067db96d56Sopenharmony_ci task.result() 7077db96d56Sopenharmony_ci 7087db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 7097db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError) as cm: 7107db96d56Sopenharmony_ci loop.run_until_complete(task) 7117db96d56Sopenharmony_ci exc = cm.exception 7127db96d56Sopenharmony_ci self.assertEqual(exc.args, expected_args) 7137db96d56Sopenharmony_ci 7147db96d56Sopenharmony_ci actual = get_innermost_context(exc) 7157db96d56Sopenharmony_ci self.assertEqual(actual, 7167db96d56Sopenharmony_ci (asyncio.CancelledError, expected_args, 0)) 7177db96d56Sopenharmony_ci 7187db96d56Sopenharmony_ci def test_cancel_with_message_then_future_exception(self): 7197db96d56Sopenharmony_ci # Test Future.exception() after calling cancel() with a message. 7207db96d56Sopenharmony_ci cases = [ 7217db96d56Sopenharmony_ci ((), ()), 7227db96d56Sopenharmony_ci ((None,), ()), 7237db96d56Sopenharmony_ci (('my message',), ('my message',)), 7247db96d56Sopenharmony_ci # Non-string values should roundtrip. 7257db96d56Sopenharmony_ci ((5,), (5,)), 7267db96d56Sopenharmony_ci ] 7277db96d56Sopenharmony_ci for cancel_args, expected_args in cases: 7287db96d56Sopenharmony_ci with self.subTest(cancel_args=cancel_args): 7297db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 7307db96d56Sopenharmony_ci self.set_event_loop(loop) 7317db96d56Sopenharmony_ci 7327db96d56Sopenharmony_ci async def sleep(): 7337db96d56Sopenharmony_ci await asyncio.sleep(10) 7347db96d56Sopenharmony_ci 7357db96d56Sopenharmony_ci async def coro(): 7367db96d56Sopenharmony_ci task = self.new_task(loop, sleep()) 7377db96d56Sopenharmony_ci await asyncio.sleep(0) 7387db96d56Sopenharmony_ci task.cancel(*cancel_args) 7397db96d56Sopenharmony_ci done, pending = await asyncio.wait([task]) 7407db96d56Sopenharmony_ci task.exception() 7417db96d56Sopenharmony_ci 7427db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 7437db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError) as cm: 7447db96d56Sopenharmony_ci loop.run_until_complete(task) 7457db96d56Sopenharmony_ci exc = cm.exception 7467db96d56Sopenharmony_ci self.assertEqual(exc.args, expected_args) 7477db96d56Sopenharmony_ci 7487db96d56Sopenharmony_ci actual = get_innermost_context(exc) 7497db96d56Sopenharmony_ci self.assertEqual(actual, 7507db96d56Sopenharmony_ci (asyncio.CancelledError, expected_args, 0)) 7517db96d56Sopenharmony_ci 7527db96d56Sopenharmony_ci def test_cancellation_exception_context(self): 7537db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 7547db96d56Sopenharmony_ci self.set_event_loop(loop) 7557db96d56Sopenharmony_ci fut = loop.create_future() 7567db96d56Sopenharmony_ci 7577db96d56Sopenharmony_ci async def sleep(): 7587db96d56Sopenharmony_ci fut.set_result(None) 7597db96d56Sopenharmony_ci await asyncio.sleep(10) 7607db96d56Sopenharmony_ci 7617db96d56Sopenharmony_ci async def coro(): 7627db96d56Sopenharmony_ci inner_task = self.new_task(loop, sleep()) 7637db96d56Sopenharmony_ci await fut 7647db96d56Sopenharmony_ci loop.call_soon(inner_task.cancel, 'msg') 7657db96d56Sopenharmony_ci try: 7667db96d56Sopenharmony_ci await inner_task 7677db96d56Sopenharmony_ci except asyncio.CancelledError as ex: 7687db96d56Sopenharmony_ci raise ValueError("cancelled") from ex 7697db96d56Sopenharmony_ci 7707db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 7717db96d56Sopenharmony_ci with self.assertRaises(ValueError) as cm: 7727db96d56Sopenharmony_ci loop.run_until_complete(task) 7737db96d56Sopenharmony_ci exc = cm.exception 7747db96d56Sopenharmony_ci self.assertEqual(exc.args, ('cancelled',)) 7757db96d56Sopenharmony_ci 7767db96d56Sopenharmony_ci actual = get_innermost_context(exc) 7777db96d56Sopenharmony_ci self.assertEqual(actual, 7787db96d56Sopenharmony_ci (asyncio.CancelledError, ('msg',), 1)) 7797db96d56Sopenharmony_ci 7807db96d56Sopenharmony_ci def test_cancel_with_message_before_starting_task(self): 7817db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 7827db96d56Sopenharmony_ci self.set_event_loop(loop) 7837db96d56Sopenharmony_ci 7847db96d56Sopenharmony_ci async def sleep(): 7857db96d56Sopenharmony_ci await asyncio.sleep(10) 7867db96d56Sopenharmony_ci 7877db96d56Sopenharmony_ci async def coro(): 7887db96d56Sopenharmony_ci task = self.new_task(loop, sleep()) 7897db96d56Sopenharmony_ci # We deliberately leave out the sleep here. 7907db96d56Sopenharmony_ci task.cancel('my message') 7917db96d56Sopenharmony_ci done, pending = await asyncio.wait([task]) 7927db96d56Sopenharmony_ci task.exception() 7937db96d56Sopenharmony_ci 7947db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 7957db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError) as cm: 7967db96d56Sopenharmony_ci loop.run_until_complete(task) 7977db96d56Sopenharmony_ci exc = cm.exception 7987db96d56Sopenharmony_ci self.assertEqual(exc.args, ('my message',)) 7997db96d56Sopenharmony_ci 8007db96d56Sopenharmony_ci actual = get_innermost_context(exc) 8017db96d56Sopenharmony_ci self.assertEqual(actual, 8027db96d56Sopenharmony_ci (asyncio.CancelledError, ('my message',), 0)) 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci def test_cancel_yield(self): 8057db96d56Sopenharmony_ci async def task(): 8067db96d56Sopenharmony_ci await asyncio.sleep(0) 8077db96d56Sopenharmony_ci await asyncio.sleep(0) 8087db96d56Sopenharmony_ci return 12 8097db96d56Sopenharmony_ci 8107db96d56Sopenharmony_ci t = self.new_task(self.loop, task()) 8117db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) # start coro 8127db96d56Sopenharmony_ci t.cancel() 8137db96d56Sopenharmony_ci self.assertRaises( 8147db96d56Sopenharmony_ci asyncio.CancelledError, self.loop.run_until_complete, t) 8157db96d56Sopenharmony_ci self.assertTrue(t.done()) 8167db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) 8177db96d56Sopenharmony_ci self.assertFalse(t.cancel()) 8187db96d56Sopenharmony_ci 8197db96d56Sopenharmony_ci def test_cancel_inner_future(self): 8207db96d56Sopenharmony_ci f = self.new_future(self.loop) 8217db96d56Sopenharmony_ci 8227db96d56Sopenharmony_ci async def task(): 8237db96d56Sopenharmony_ci await f 8247db96d56Sopenharmony_ci return 12 8257db96d56Sopenharmony_ci 8267db96d56Sopenharmony_ci t = self.new_task(self.loop, task()) 8277db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) # start task 8287db96d56Sopenharmony_ci f.cancel() 8297db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 8307db96d56Sopenharmony_ci self.loop.run_until_complete(t) 8317db96d56Sopenharmony_ci self.assertTrue(f.cancelled()) 8327db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) 8337db96d56Sopenharmony_ci 8347db96d56Sopenharmony_ci def test_cancel_both_task_and_inner_future(self): 8357db96d56Sopenharmony_ci f = self.new_future(self.loop) 8367db96d56Sopenharmony_ci 8377db96d56Sopenharmony_ci async def task(): 8387db96d56Sopenharmony_ci await f 8397db96d56Sopenharmony_ci return 12 8407db96d56Sopenharmony_ci 8417db96d56Sopenharmony_ci t = self.new_task(self.loop, task()) 8427db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8437db96d56Sopenharmony_ci 8447db96d56Sopenharmony_ci f.cancel() 8457db96d56Sopenharmony_ci t.cancel() 8467db96d56Sopenharmony_ci 8477db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 8487db96d56Sopenharmony_ci self.loop.run_until_complete(t) 8497db96d56Sopenharmony_ci 8507db96d56Sopenharmony_ci self.assertTrue(t.done()) 8517db96d56Sopenharmony_ci self.assertTrue(f.cancelled()) 8527db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) 8537db96d56Sopenharmony_ci 8547db96d56Sopenharmony_ci def test_cancel_task_catching(self): 8557db96d56Sopenharmony_ci fut1 = self.new_future(self.loop) 8567db96d56Sopenharmony_ci fut2 = self.new_future(self.loop) 8577db96d56Sopenharmony_ci 8587db96d56Sopenharmony_ci async def task(): 8597db96d56Sopenharmony_ci await fut1 8607db96d56Sopenharmony_ci try: 8617db96d56Sopenharmony_ci await fut2 8627db96d56Sopenharmony_ci except asyncio.CancelledError: 8637db96d56Sopenharmony_ci return 42 8647db96d56Sopenharmony_ci 8657db96d56Sopenharmony_ci t = self.new_task(self.loop, task()) 8667db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8677db96d56Sopenharmony_ci self.assertIs(t._fut_waiter, fut1) # White-box test. 8687db96d56Sopenharmony_ci fut1.set_result(None) 8697db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8707db96d56Sopenharmony_ci self.assertIs(t._fut_waiter, fut2) # White-box test. 8717db96d56Sopenharmony_ci t.cancel() 8727db96d56Sopenharmony_ci self.assertTrue(fut2.cancelled()) 8737db96d56Sopenharmony_ci res = self.loop.run_until_complete(t) 8747db96d56Sopenharmony_ci self.assertEqual(res, 42) 8757db96d56Sopenharmony_ci self.assertFalse(t.cancelled()) 8767db96d56Sopenharmony_ci 8777db96d56Sopenharmony_ci def test_cancel_task_ignoring(self): 8787db96d56Sopenharmony_ci fut1 = self.new_future(self.loop) 8797db96d56Sopenharmony_ci fut2 = self.new_future(self.loop) 8807db96d56Sopenharmony_ci fut3 = self.new_future(self.loop) 8817db96d56Sopenharmony_ci 8827db96d56Sopenharmony_ci async def task(): 8837db96d56Sopenharmony_ci await fut1 8847db96d56Sopenharmony_ci try: 8857db96d56Sopenharmony_ci await fut2 8867db96d56Sopenharmony_ci except asyncio.CancelledError: 8877db96d56Sopenharmony_ci pass 8887db96d56Sopenharmony_ci res = await fut3 8897db96d56Sopenharmony_ci return res 8907db96d56Sopenharmony_ci 8917db96d56Sopenharmony_ci t = self.new_task(self.loop, task()) 8927db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8937db96d56Sopenharmony_ci self.assertIs(t._fut_waiter, fut1) # White-box test. 8947db96d56Sopenharmony_ci fut1.set_result(None) 8957db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8967db96d56Sopenharmony_ci self.assertIs(t._fut_waiter, fut2) # White-box test. 8977db96d56Sopenharmony_ci t.cancel() 8987db96d56Sopenharmony_ci self.assertTrue(fut2.cancelled()) 8997db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 9007db96d56Sopenharmony_ci self.assertIs(t._fut_waiter, fut3) # White-box test. 9017db96d56Sopenharmony_ci fut3.set_result(42) 9027db96d56Sopenharmony_ci res = self.loop.run_until_complete(t) 9037db96d56Sopenharmony_ci self.assertEqual(res, 42) 9047db96d56Sopenharmony_ci self.assertFalse(fut3.cancelled()) 9057db96d56Sopenharmony_ci self.assertFalse(t.cancelled()) 9067db96d56Sopenharmony_ci 9077db96d56Sopenharmony_ci def test_cancel_current_task(self): 9087db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 9097db96d56Sopenharmony_ci self.set_event_loop(loop) 9107db96d56Sopenharmony_ci 9117db96d56Sopenharmony_ci async def task(): 9127db96d56Sopenharmony_ci t.cancel() 9137db96d56Sopenharmony_ci self.assertTrue(t._must_cancel) # White-box test. 9147db96d56Sopenharmony_ci # The sleep should be cancelled immediately. 9157db96d56Sopenharmony_ci await asyncio.sleep(100) 9167db96d56Sopenharmony_ci return 12 9177db96d56Sopenharmony_ci 9187db96d56Sopenharmony_ci t = self.new_task(loop, task()) 9197db96d56Sopenharmony_ci self.assertFalse(t.cancelled()) 9207db96d56Sopenharmony_ci self.assertRaises( 9217db96d56Sopenharmony_ci asyncio.CancelledError, loop.run_until_complete, t) 9227db96d56Sopenharmony_ci self.assertTrue(t.done()) 9237db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) 9247db96d56Sopenharmony_ci self.assertFalse(t._must_cancel) # White-box test. 9257db96d56Sopenharmony_ci self.assertFalse(t.cancel()) 9267db96d56Sopenharmony_ci 9277db96d56Sopenharmony_ci def test_cancel_at_end(self): 9287db96d56Sopenharmony_ci """coroutine end right after task is cancelled""" 9297db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 9307db96d56Sopenharmony_ci self.set_event_loop(loop) 9317db96d56Sopenharmony_ci 9327db96d56Sopenharmony_ci async def task(): 9337db96d56Sopenharmony_ci t.cancel() 9347db96d56Sopenharmony_ci self.assertTrue(t._must_cancel) # White-box test. 9357db96d56Sopenharmony_ci return 12 9367db96d56Sopenharmony_ci 9377db96d56Sopenharmony_ci t = self.new_task(loop, task()) 9387db96d56Sopenharmony_ci self.assertFalse(t.cancelled()) 9397db96d56Sopenharmony_ci self.assertRaises( 9407db96d56Sopenharmony_ci asyncio.CancelledError, loop.run_until_complete, t) 9417db96d56Sopenharmony_ci self.assertTrue(t.done()) 9427db96d56Sopenharmony_ci self.assertTrue(t.cancelled()) 9437db96d56Sopenharmony_ci self.assertFalse(t._must_cancel) # White-box test. 9447db96d56Sopenharmony_ci self.assertFalse(t.cancel()) 9457db96d56Sopenharmony_ci 9467db96d56Sopenharmony_ci def test_cancel_awaited_task(self): 9477db96d56Sopenharmony_ci # This tests for a relatively rare condition when 9487db96d56Sopenharmony_ci # a task cancellation is requested for a task which is not 9497db96d56Sopenharmony_ci # currently blocked, such as a task cancelling itself. 9507db96d56Sopenharmony_ci # In this situation we must ensure that whatever next future 9517db96d56Sopenharmony_ci # or task the cancelled task blocks on is cancelled correctly 9527db96d56Sopenharmony_ci # as well. See also bpo-34872. 9537db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 9547db96d56Sopenharmony_ci self.addCleanup(lambda: loop.close()) 9557db96d56Sopenharmony_ci 9567db96d56Sopenharmony_ci task = nested_task = None 9577db96d56Sopenharmony_ci fut = self.new_future(loop) 9587db96d56Sopenharmony_ci 9597db96d56Sopenharmony_ci async def nested(): 9607db96d56Sopenharmony_ci await fut 9617db96d56Sopenharmony_ci 9627db96d56Sopenharmony_ci async def coro(): 9637db96d56Sopenharmony_ci nonlocal nested_task 9647db96d56Sopenharmony_ci # Create a sub-task and wait for it to run. 9657db96d56Sopenharmony_ci nested_task = self.new_task(loop, nested()) 9667db96d56Sopenharmony_ci await asyncio.sleep(0) 9677db96d56Sopenharmony_ci 9687db96d56Sopenharmony_ci # Request the current task to be cancelled. 9697db96d56Sopenharmony_ci task.cancel() 9707db96d56Sopenharmony_ci # Block on the nested task, which should be immediately 9717db96d56Sopenharmony_ci # cancelled. 9727db96d56Sopenharmony_ci await nested_task 9737db96d56Sopenharmony_ci 9747db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 9757db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 9767db96d56Sopenharmony_ci loop.run_until_complete(task) 9777db96d56Sopenharmony_ci 9787db96d56Sopenharmony_ci self.assertTrue(task.cancelled()) 9797db96d56Sopenharmony_ci self.assertTrue(nested_task.cancelled()) 9807db96d56Sopenharmony_ci self.assertTrue(fut.cancelled()) 9817db96d56Sopenharmony_ci 9827db96d56Sopenharmony_ci def assert_text_contains(self, text, substr): 9837db96d56Sopenharmony_ci if substr not in text: 9847db96d56Sopenharmony_ci raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<') 9857db96d56Sopenharmony_ci 9867db96d56Sopenharmony_ci def test_cancel_traceback_for_future_result(self): 9877db96d56Sopenharmony_ci # When calling Future.result() on a cancelled task, check that the 9887db96d56Sopenharmony_ci # line of code that was interrupted is included in the traceback. 9897db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 9907db96d56Sopenharmony_ci self.set_event_loop(loop) 9917db96d56Sopenharmony_ci 9927db96d56Sopenharmony_ci async def nested(): 9937db96d56Sopenharmony_ci # This will get cancelled immediately. 9947db96d56Sopenharmony_ci await asyncio.sleep(10) 9957db96d56Sopenharmony_ci 9967db96d56Sopenharmony_ci async def coro(): 9977db96d56Sopenharmony_ci task = self.new_task(loop, nested()) 9987db96d56Sopenharmony_ci await asyncio.sleep(0) 9997db96d56Sopenharmony_ci task.cancel() 10007db96d56Sopenharmony_ci await task # search target 10017db96d56Sopenharmony_ci 10027db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 10037db96d56Sopenharmony_ci try: 10047db96d56Sopenharmony_ci loop.run_until_complete(task) 10057db96d56Sopenharmony_ci except asyncio.CancelledError: 10067db96d56Sopenharmony_ci tb = traceback.format_exc() 10077db96d56Sopenharmony_ci self.assert_text_contains(tb, "await asyncio.sleep(10)") 10087db96d56Sopenharmony_ci # The intermediate await should also be included. 10097db96d56Sopenharmony_ci self.assert_text_contains(tb, "await task # search target") 10107db96d56Sopenharmony_ci else: 10117db96d56Sopenharmony_ci self.fail('CancelledError did not occur') 10127db96d56Sopenharmony_ci 10137db96d56Sopenharmony_ci def test_cancel_traceback_for_future_exception(self): 10147db96d56Sopenharmony_ci # When calling Future.exception() on a cancelled task, check that the 10157db96d56Sopenharmony_ci # line of code that was interrupted is included in the traceback. 10167db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 10177db96d56Sopenharmony_ci self.set_event_loop(loop) 10187db96d56Sopenharmony_ci 10197db96d56Sopenharmony_ci async def nested(): 10207db96d56Sopenharmony_ci # This will get cancelled immediately. 10217db96d56Sopenharmony_ci await asyncio.sleep(10) 10227db96d56Sopenharmony_ci 10237db96d56Sopenharmony_ci async def coro(): 10247db96d56Sopenharmony_ci task = self.new_task(loop, nested()) 10257db96d56Sopenharmony_ci await asyncio.sleep(0) 10267db96d56Sopenharmony_ci task.cancel() 10277db96d56Sopenharmony_ci done, pending = await asyncio.wait([task]) 10287db96d56Sopenharmony_ci task.exception() # search target 10297db96d56Sopenharmony_ci 10307db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 10317db96d56Sopenharmony_ci try: 10327db96d56Sopenharmony_ci loop.run_until_complete(task) 10337db96d56Sopenharmony_ci except asyncio.CancelledError: 10347db96d56Sopenharmony_ci tb = traceback.format_exc() 10357db96d56Sopenharmony_ci self.assert_text_contains(tb, "await asyncio.sleep(10)") 10367db96d56Sopenharmony_ci # The intermediate await should also be included. 10377db96d56Sopenharmony_ci self.assert_text_contains(tb, 10387db96d56Sopenharmony_ci "task.exception() # search target") 10397db96d56Sopenharmony_ci else: 10407db96d56Sopenharmony_ci self.fail('CancelledError did not occur') 10417db96d56Sopenharmony_ci 10427db96d56Sopenharmony_ci def test_stop_while_run_in_complete(self): 10437db96d56Sopenharmony_ci 10447db96d56Sopenharmony_ci def gen(): 10457db96d56Sopenharmony_ci when = yield 10467db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 10477db96d56Sopenharmony_ci when = yield 0.1 10487db96d56Sopenharmony_ci self.assertAlmostEqual(0.2, when) 10497db96d56Sopenharmony_ci when = yield 0.1 10507db96d56Sopenharmony_ci self.assertAlmostEqual(0.3, when) 10517db96d56Sopenharmony_ci yield 0.1 10527db96d56Sopenharmony_ci 10537db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 10547db96d56Sopenharmony_ci 10557db96d56Sopenharmony_ci x = 0 10567db96d56Sopenharmony_ci 10577db96d56Sopenharmony_ci async def task(): 10587db96d56Sopenharmony_ci nonlocal x 10597db96d56Sopenharmony_ci while x < 10: 10607db96d56Sopenharmony_ci await asyncio.sleep(0.1) 10617db96d56Sopenharmony_ci x += 1 10627db96d56Sopenharmony_ci if x == 2: 10637db96d56Sopenharmony_ci loop.stop() 10647db96d56Sopenharmony_ci 10657db96d56Sopenharmony_ci t = self.new_task(loop, task()) 10667db96d56Sopenharmony_ci with self.assertRaises(RuntimeError) as cm: 10677db96d56Sopenharmony_ci loop.run_until_complete(t) 10687db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), 10697db96d56Sopenharmony_ci 'Event loop stopped before Future completed.') 10707db96d56Sopenharmony_ci self.assertFalse(t.done()) 10717db96d56Sopenharmony_ci self.assertEqual(x, 2) 10727db96d56Sopenharmony_ci self.assertAlmostEqual(0.3, loop.time()) 10737db96d56Sopenharmony_ci 10747db96d56Sopenharmony_ci t.cancel() 10757db96d56Sopenharmony_ci self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 10767db96d56Sopenharmony_ci 10777db96d56Sopenharmony_ci def test_log_traceback(self): 10787db96d56Sopenharmony_ci async def coro(): 10797db96d56Sopenharmony_ci pass 10807db96d56Sopenharmony_ci 10817db96d56Sopenharmony_ci task = self.new_task(self.loop, coro()) 10827db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 'can only be set to False'): 10837db96d56Sopenharmony_ci task._log_traceback = True 10847db96d56Sopenharmony_ci self.loop.run_until_complete(task) 10857db96d56Sopenharmony_ci 10867db96d56Sopenharmony_ci def test_wait(self): 10877db96d56Sopenharmony_ci 10887db96d56Sopenharmony_ci def gen(): 10897db96d56Sopenharmony_ci when = yield 10907db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 10917db96d56Sopenharmony_ci when = yield 0 10927db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, when) 10937db96d56Sopenharmony_ci yield 0.15 10947db96d56Sopenharmony_ci 10957db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 10967db96d56Sopenharmony_ci 10977db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(0.1)) 10987db96d56Sopenharmony_ci b = self.new_task(loop, asyncio.sleep(0.15)) 10997db96d56Sopenharmony_ci 11007db96d56Sopenharmony_ci async def foo(): 11017db96d56Sopenharmony_ci done, pending = await asyncio.wait([b, a]) 11027db96d56Sopenharmony_ci self.assertEqual(done, set([a, b])) 11037db96d56Sopenharmony_ci self.assertEqual(pending, set()) 11047db96d56Sopenharmony_ci return 42 11057db96d56Sopenharmony_ci 11067db96d56Sopenharmony_ci res = loop.run_until_complete(self.new_task(loop, foo())) 11077db96d56Sopenharmony_ci self.assertEqual(res, 42) 11087db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, loop.time()) 11097db96d56Sopenharmony_ci 11107db96d56Sopenharmony_ci # Doing it again should take no time and exercise a different path. 11117db96d56Sopenharmony_ci res = loop.run_until_complete(self.new_task(loop, foo())) 11127db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, loop.time()) 11137db96d56Sopenharmony_ci self.assertEqual(res, 42) 11147db96d56Sopenharmony_ci 11157db96d56Sopenharmony_ci def test_wait_duplicate_coroutines(self): 11167db96d56Sopenharmony_ci 11177db96d56Sopenharmony_ci async def coro(s): 11187db96d56Sopenharmony_ci return s 11197db96d56Sopenharmony_ci c = self.loop.create_task(coro('test')) 11207db96d56Sopenharmony_ci task = self.new_task( 11217db96d56Sopenharmony_ci self.loop, 11227db96d56Sopenharmony_ci asyncio.wait([c, c, self.loop.create_task(coro('spam'))])) 11237db96d56Sopenharmony_ci 11247db96d56Sopenharmony_ci done, pending = self.loop.run_until_complete(task) 11257db96d56Sopenharmony_ci 11267db96d56Sopenharmony_ci self.assertFalse(pending) 11277db96d56Sopenharmony_ci self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 11287db96d56Sopenharmony_ci 11297db96d56Sopenharmony_ci def test_wait_errors(self): 11307db96d56Sopenharmony_ci self.assertRaises( 11317db96d56Sopenharmony_ci ValueError, self.loop.run_until_complete, 11327db96d56Sopenharmony_ci asyncio.wait(set())) 11337db96d56Sopenharmony_ci 11347db96d56Sopenharmony_ci # -1 is an invalid return_when value 11357db96d56Sopenharmony_ci sleep_coro = asyncio.sleep(10.0) 11367db96d56Sopenharmony_ci wait_coro = asyncio.wait([sleep_coro], return_when=-1) 11377db96d56Sopenharmony_ci self.assertRaises(ValueError, 11387db96d56Sopenharmony_ci self.loop.run_until_complete, wait_coro) 11397db96d56Sopenharmony_ci 11407db96d56Sopenharmony_ci sleep_coro.close() 11417db96d56Sopenharmony_ci 11427db96d56Sopenharmony_ci def test_wait_first_completed(self): 11437db96d56Sopenharmony_ci 11447db96d56Sopenharmony_ci def gen(): 11457db96d56Sopenharmony_ci when = yield 11467db96d56Sopenharmony_ci self.assertAlmostEqual(10.0, when) 11477db96d56Sopenharmony_ci when = yield 0 11487db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 11497db96d56Sopenharmony_ci yield 0.1 11507db96d56Sopenharmony_ci 11517db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 11527db96d56Sopenharmony_ci 11537db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(10.0)) 11547db96d56Sopenharmony_ci b = self.new_task(loop, asyncio.sleep(0.1)) 11557db96d56Sopenharmony_ci task = self.new_task( 11567db96d56Sopenharmony_ci loop, 11577db96d56Sopenharmony_ci asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 11587db96d56Sopenharmony_ci 11597db96d56Sopenharmony_ci done, pending = loop.run_until_complete(task) 11607db96d56Sopenharmony_ci self.assertEqual({b}, done) 11617db96d56Sopenharmony_ci self.assertEqual({a}, pending) 11627db96d56Sopenharmony_ci self.assertFalse(a.done()) 11637db96d56Sopenharmony_ci self.assertTrue(b.done()) 11647db96d56Sopenharmony_ci self.assertIsNone(b.result()) 11657db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, loop.time()) 11667db96d56Sopenharmony_ci 11677db96d56Sopenharmony_ci # move forward to close generator 11687db96d56Sopenharmony_ci loop.advance_time(10) 11697db96d56Sopenharmony_ci loop.run_until_complete(asyncio.wait([a, b])) 11707db96d56Sopenharmony_ci 11717db96d56Sopenharmony_ci def test_wait_really_done(self): 11727db96d56Sopenharmony_ci # there is possibility that some tasks in the pending list 11737db96d56Sopenharmony_ci # became done but their callbacks haven't all been called yet 11747db96d56Sopenharmony_ci 11757db96d56Sopenharmony_ci async def coro1(): 11767db96d56Sopenharmony_ci await asyncio.sleep(0) 11777db96d56Sopenharmony_ci 11787db96d56Sopenharmony_ci async def coro2(): 11797db96d56Sopenharmony_ci await asyncio.sleep(0) 11807db96d56Sopenharmony_ci await asyncio.sleep(0) 11817db96d56Sopenharmony_ci 11827db96d56Sopenharmony_ci a = self.new_task(self.loop, coro1()) 11837db96d56Sopenharmony_ci b = self.new_task(self.loop, coro2()) 11847db96d56Sopenharmony_ci task = self.new_task( 11857db96d56Sopenharmony_ci self.loop, 11867db96d56Sopenharmony_ci asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 11877db96d56Sopenharmony_ci 11887db96d56Sopenharmony_ci done, pending = self.loop.run_until_complete(task) 11897db96d56Sopenharmony_ci self.assertEqual({a, b}, done) 11907db96d56Sopenharmony_ci self.assertTrue(a.done()) 11917db96d56Sopenharmony_ci self.assertIsNone(a.result()) 11927db96d56Sopenharmony_ci self.assertTrue(b.done()) 11937db96d56Sopenharmony_ci self.assertIsNone(b.result()) 11947db96d56Sopenharmony_ci 11957db96d56Sopenharmony_ci def test_wait_first_exception(self): 11967db96d56Sopenharmony_ci 11977db96d56Sopenharmony_ci def gen(): 11987db96d56Sopenharmony_ci when = yield 11997db96d56Sopenharmony_ci self.assertAlmostEqual(10.0, when) 12007db96d56Sopenharmony_ci yield 0 12017db96d56Sopenharmony_ci 12027db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 12037db96d56Sopenharmony_ci 12047db96d56Sopenharmony_ci # first_exception, task already has exception 12057db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(10.0)) 12067db96d56Sopenharmony_ci 12077db96d56Sopenharmony_ci async def exc(): 12087db96d56Sopenharmony_ci raise ZeroDivisionError('err') 12097db96d56Sopenharmony_ci 12107db96d56Sopenharmony_ci b = self.new_task(loop, exc()) 12117db96d56Sopenharmony_ci task = self.new_task( 12127db96d56Sopenharmony_ci loop, 12137db96d56Sopenharmony_ci asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 12147db96d56Sopenharmony_ci 12157db96d56Sopenharmony_ci done, pending = loop.run_until_complete(task) 12167db96d56Sopenharmony_ci self.assertEqual({b}, done) 12177db96d56Sopenharmony_ci self.assertEqual({a}, pending) 12187db96d56Sopenharmony_ci self.assertAlmostEqual(0, loop.time()) 12197db96d56Sopenharmony_ci 12207db96d56Sopenharmony_ci # move forward to close generator 12217db96d56Sopenharmony_ci loop.advance_time(10) 12227db96d56Sopenharmony_ci loop.run_until_complete(asyncio.wait([a, b])) 12237db96d56Sopenharmony_ci 12247db96d56Sopenharmony_ci def test_wait_first_exception_in_wait(self): 12257db96d56Sopenharmony_ci 12267db96d56Sopenharmony_ci def gen(): 12277db96d56Sopenharmony_ci when = yield 12287db96d56Sopenharmony_ci self.assertAlmostEqual(10.0, when) 12297db96d56Sopenharmony_ci when = yield 0 12307db96d56Sopenharmony_ci self.assertAlmostEqual(0.01, when) 12317db96d56Sopenharmony_ci yield 0.01 12327db96d56Sopenharmony_ci 12337db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 12347db96d56Sopenharmony_ci 12357db96d56Sopenharmony_ci # first_exception, exception during waiting 12367db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(10.0)) 12377db96d56Sopenharmony_ci 12387db96d56Sopenharmony_ci async def exc(): 12397db96d56Sopenharmony_ci await asyncio.sleep(0.01) 12407db96d56Sopenharmony_ci raise ZeroDivisionError('err') 12417db96d56Sopenharmony_ci 12427db96d56Sopenharmony_ci b = self.new_task(loop, exc()) 12437db96d56Sopenharmony_ci task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 12447db96d56Sopenharmony_ci 12457db96d56Sopenharmony_ci done, pending = loop.run_until_complete(task) 12467db96d56Sopenharmony_ci self.assertEqual({b}, done) 12477db96d56Sopenharmony_ci self.assertEqual({a}, pending) 12487db96d56Sopenharmony_ci self.assertAlmostEqual(0.01, loop.time()) 12497db96d56Sopenharmony_ci 12507db96d56Sopenharmony_ci # move forward to close generator 12517db96d56Sopenharmony_ci loop.advance_time(10) 12527db96d56Sopenharmony_ci loop.run_until_complete(asyncio.wait([a, b])) 12537db96d56Sopenharmony_ci 12547db96d56Sopenharmony_ci def test_wait_with_exception(self): 12557db96d56Sopenharmony_ci 12567db96d56Sopenharmony_ci def gen(): 12577db96d56Sopenharmony_ci when = yield 12587db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 12597db96d56Sopenharmony_ci when = yield 0 12607db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, when) 12617db96d56Sopenharmony_ci yield 0.15 12627db96d56Sopenharmony_ci 12637db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 12647db96d56Sopenharmony_ci 12657db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(0.1)) 12667db96d56Sopenharmony_ci 12677db96d56Sopenharmony_ci async def sleeper(): 12687db96d56Sopenharmony_ci await asyncio.sleep(0.15) 12697db96d56Sopenharmony_ci raise ZeroDivisionError('really') 12707db96d56Sopenharmony_ci 12717db96d56Sopenharmony_ci b = self.new_task(loop, sleeper()) 12727db96d56Sopenharmony_ci 12737db96d56Sopenharmony_ci async def foo(): 12747db96d56Sopenharmony_ci done, pending = await asyncio.wait([b, a]) 12757db96d56Sopenharmony_ci self.assertEqual(len(done), 2) 12767db96d56Sopenharmony_ci self.assertEqual(pending, set()) 12777db96d56Sopenharmony_ci errors = set(f for f in done if f.exception() is not None) 12787db96d56Sopenharmony_ci self.assertEqual(len(errors), 1) 12797db96d56Sopenharmony_ci 12807db96d56Sopenharmony_ci loop.run_until_complete(self.new_task(loop, foo())) 12817db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, loop.time()) 12827db96d56Sopenharmony_ci 12837db96d56Sopenharmony_ci loop.run_until_complete(self.new_task(loop, foo())) 12847db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, loop.time()) 12857db96d56Sopenharmony_ci 12867db96d56Sopenharmony_ci def test_wait_with_timeout(self): 12877db96d56Sopenharmony_ci 12887db96d56Sopenharmony_ci def gen(): 12897db96d56Sopenharmony_ci when = yield 12907db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 12917db96d56Sopenharmony_ci when = yield 0 12927db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, when) 12937db96d56Sopenharmony_ci when = yield 0 12947db96d56Sopenharmony_ci self.assertAlmostEqual(0.11, when) 12957db96d56Sopenharmony_ci yield 0.11 12967db96d56Sopenharmony_ci 12977db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 12987db96d56Sopenharmony_ci 12997db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(0.1)) 13007db96d56Sopenharmony_ci b = self.new_task(loop, asyncio.sleep(0.15)) 13017db96d56Sopenharmony_ci 13027db96d56Sopenharmony_ci async def foo(): 13037db96d56Sopenharmony_ci done, pending = await asyncio.wait([b, a], timeout=0.11) 13047db96d56Sopenharmony_ci self.assertEqual(done, set([a])) 13057db96d56Sopenharmony_ci self.assertEqual(pending, set([b])) 13067db96d56Sopenharmony_ci 13077db96d56Sopenharmony_ci loop.run_until_complete(self.new_task(loop, foo())) 13087db96d56Sopenharmony_ci self.assertAlmostEqual(0.11, loop.time()) 13097db96d56Sopenharmony_ci 13107db96d56Sopenharmony_ci # move forward to close generator 13117db96d56Sopenharmony_ci loop.advance_time(10) 13127db96d56Sopenharmony_ci loop.run_until_complete(asyncio.wait([a, b])) 13137db96d56Sopenharmony_ci 13147db96d56Sopenharmony_ci def test_wait_concurrent_complete(self): 13157db96d56Sopenharmony_ci 13167db96d56Sopenharmony_ci def gen(): 13177db96d56Sopenharmony_ci when = yield 13187db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 13197db96d56Sopenharmony_ci when = yield 0 13207db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, when) 13217db96d56Sopenharmony_ci when = yield 0 13227db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 13237db96d56Sopenharmony_ci yield 0.1 13247db96d56Sopenharmony_ci 13257db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 13267db96d56Sopenharmony_ci 13277db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(0.1)) 13287db96d56Sopenharmony_ci b = self.new_task(loop, asyncio.sleep(0.15)) 13297db96d56Sopenharmony_ci 13307db96d56Sopenharmony_ci done, pending = loop.run_until_complete( 13317db96d56Sopenharmony_ci asyncio.wait([b, a], timeout=0.1)) 13327db96d56Sopenharmony_ci 13337db96d56Sopenharmony_ci self.assertEqual(done, set([a])) 13347db96d56Sopenharmony_ci self.assertEqual(pending, set([b])) 13357db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, loop.time()) 13367db96d56Sopenharmony_ci 13377db96d56Sopenharmony_ci # move forward to close generator 13387db96d56Sopenharmony_ci loop.advance_time(10) 13397db96d56Sopenharmony_ci loop.run_until_complete(asyncio.wait([a, b])) 13407db96d56Sopenharmony_ci 13417db96d56Sopenharmony_ci def test_wait_with_iterator_of_tasks(self): 13427db96d56Sopenharmony_ci 13437db96d56Sopenharmony_ci def gen(): 13447db96d56Sopenharmony_ci when = yield 13457db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 13467db96d56Sopenharmony_ci when = yield 0 13477db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, when) 13487db96d56Sopenharmony_ci yield 0.15 13497db96d56Sopenharmony_ci 13507db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 13517db96d56Sopenharmony_ci 13527db96d56Sopenharmony_ci a = self.new_task(loop, asyncio.sleep(0.1)) 13537db96d56Sopenharmony_ci b = self.new_task(loop, asyncio.sleep(0.15)) 13547db96d56Sopenharmony_ci 13557db96d56Sopenharmony_ci async def foo(): 13567db96d56Sopenharmony_ci done, pending = await asyncio.wait(iter([b, a])) 13577db96d56Sopenharmony_ci self.assertEqual(done, set([a, b])) 13587db96d56Sopenharmony_ci self.assertEqual(pending, set()) 13597db96d56Sopenharmony_ci return 42 13607db96d56Sopenharmony_ci 13617db96d56Sopenharmony_ci res = loop.run_until_complete(self.new_task(loop, foo())) 13627db96d56Sopenharmony_ci self.assertEqual(res, 42) 13637db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, loop.time()) 13647db96d56Sopenharmony_ci 13657db96d56Sopenharmony_ci def test_as_completed(self): 13667db96d56Sopenharmony_ci 13677db96d56Sopenharmony_ci def gen(): 13687db96d56Sopenharmony_ci yield 0 13697db96d56Sopenharmony_ci yield 0 13707db96d56Sopenharmony_ci yield 0.01 13717db96d56Sopenharmony_ci yield 0 13727db96d56Sopenharmony_ci 13737db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 13747db96d56Sopenharmony_ci # disable "slow callback" warning 13757db96d56Sopenharmony_ci loop.slow_callback_duration = 1.0 13767db96d56Sopenharmony_ci completed = set() 13777db96d56Sopenharmony_ci time_shifted = False 13787db96d56Sopenharmony_ci 13797db96d56Sopenharmony_ci async def sleeper(dt, x): 13807db96d56Sopenharmony_ci nonlocal time_shifted 13817db96d56Sopenharmony_ci await asyncio.sleep(dt) 13827db96d56Sopenharmony_ci completed.add(x) 13837db96d56Sopenharmony_ci if not time_shifted and 'a' in completed and 'b' in completed: 13847db96d56Sopenharmony_ci time_shifted = True 13857db96d56Sopenharmony_ci loop.advance_time(0.14) 13867db96d56Sopenharmony_ci return x 13877db96d56Sopenharmony_ci 13887db96d56Sopenharmony_ci a = sleeper(0.01, 'a') 13897db96d56Sopenharmony_ci b = sleeper(0.01, 'b') 13907db96d56Sopenharmony_ci c = sleeper(0.15, 'c') 13917db96d56Sopenharmony_ci 13927db96d56Sopenharmony_ci async def foo(): 13937db96d56Sopenharmony_ci values = [] 13947db96d56Sopenharmony_ci for f in asyncio.as_completed([b, c, a]): 13957db96d56Sopenharmony_ci values.append(await f) 13967db96d56Sopenharmony_ci return values 13977db96d56Sopenharmony_ci 13987db96d56Sopenharmony_ci res = loop.run_until_complete(self.new_task(loop, foo())) 13997db96d56Sopenharmony_ci self.assertAlmostEqual(0.15, loop.time()) 14007db96d56Sopenharmony_ci self.assertTrue('a' in res[:2]) 14017db96d56Sopenharmony_ci self.assertTrue('b' in res[:2]) 14027db96d56Sopenharmony_ci self.assertEqual(res[2], 'c') 14037db96d56Sopenharmony_ci 14047db96d56Sopenharmony_ci def test_as_completed_with_timeout(self): 14057db96d56Sopenharmony_ci 14067db96d56Sopenharmony_ci def gen(): 14077db96d56Sopenharmony_ci yield 14087db96d56Sopenharmony_ci yield 0 14097db96d56Sopenharmony_ci yield 0 14107db96d56Sopenharmony_ci yield 0.1 14117db96d56Sopenharmony_ci 14127db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 14137db96d56Sopenharmony_ci 14147db96d56Sopenharmony_ci a = loop.create_task(asyncio.sleep(0.1, 'a')) 14157db96d56Sopenharmony_ci b = loop.create_task(asyncio.sleep(0.15, 'b')) 14167db96d56Sopenharmony_ci 14177db96d56Sopenharmony_ci async def foo(): 14187db96d56Sopenharmony_ci values = [] 14197db96d56Sopenharmony_ci for f in asyncio.as_completed([a, b], timeout=0.12): 14207db96d56Sopenharmony_ci if values: 14217db96d56Sopenharmony_ci loop.advance_time(0.02) 14227db96d56Sopenharmony_ci try: 14237db96d56Sopenharmony_ci v = await f 14247db96d56Sopenharmony_ci values.append((1, v)) 14257db96d56Sopenharmony_ci except asyncio.TimeoutError as exc: 14267db96d56Sopenharmony_ci values.append((2, exc)) 14277db96d56Sopenharmony_ci return values 14287db96d56Sopenharmony_ci 14297db96d56Sopenharmony_ci res = loop.run_until_complete(self.new_task(loop, foo())) 14307db96d56Sopenharmony_ci self.assertEqual(len(res), 2, res) 14317db96d56Sopenharmony_ci self.assertEqual(res[0], (1, 'a')) 14327db96d56Sopenharmony_ci self.assertEqual(res[1][0], 2) 14337db96d56Sopenharmony_ci self.assertIsInstance(res[1][1], asyncio.TimeoutError) 14347db96d56Sopenharmony_ci self.assertAlmostEqual(0.12, loop.time()) 14357db96d56Sopenharmony_ci 14367db96d56Sopenharmony_ci # move forward to close generator 14377db96d56Sopenharmony_ci loop.advance_time(10) 14387db96d56Sopenharmony_ci loop.run_until_complete(asyncio.wait([a, b])) 14397db96d56Sopenharmony_ci 14407db96d56Sopenharmony_ci def test_as_completed_with_unused_timeout(self): 14417db96d56Sopenharmony_ci 14427db96d56Sopenharmony_ci def gen(): 14437db96d56Sopenharmony_ci yield 14447db96d56Sopenharmony_ci yield 0 14457db96d56Sopenharmony_ci yield 0.01 14467db96d56Sopenharmony_ci 14477db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 14487db96d56Sopenharmony_ci 14497db96d56Sopenharmony_ci a = asyncio.sleep(0.01, 'a') 14507db96d56Sopenharmony_ci 14517db96d56Sopenharmony_ci async def foo(): 14527db96d56Sopenharmony_ci for f in asyncio.as_completed([a], timeout=1): 14537db96d56Sopenharmony_ci v = await f 14547db96d56Sopenharmony_ci self.assertEqual(v, 'a') 14557db96d56Sopenharmony_ci 14567db96d56Sopenharmony_ci loop.run_until_complete(self.new_task(loop, foo())) 14577db96d56Sopenharmony_ci 14587db96d56Sopenharmony_ci def test_as_completed_reverse_wait(self): 14597db96d56Sopenharmony_ci 14607db96d56Sopenharmony_ci def gen(): 14617db96d56Sopenharmony_ci yield 0 14627db96d56Sopenharmony_ci yield 0.05 14637db96d56Sopenharmony_ci yield 0 14647db96d56Sopenharmony_ci 14657db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 14667db96d56Sopenharmony_ci 14677db96d56Sopenharmony_ci a = asyncio.sleep(0.05, 'a') 14687db96d56Sopenharmony_ci b = asyncio.sleep(0.10, 'b') 14697db96d56Sopenharmony_ci fs = {a, b} 14707db96d56Sopenharmony_ci 14717db96d56Sopenharmony_ci async def test(): 14727db96d56Sopenharmony_ci futs = list(asyncio.as_completed(fs)) 14737db96d56Sopenharmony_ci self.assertEqual(len(futs), 2) 14747db96d56Sopenharmony_ci 14757db96d56Sopenharmony_ci x = await futs[1] 14767db96d56Sopenharmony_ci self.assertEqual(x, 'a') 14777db96d56Sopenharmony_ci self.assertAlmostEqual(0.05, loop.time()) 14787db96d56Sopenharmony_ci loop.advance_time(0.05) 14797db96d56Sopenharmony_ci y = await futs[0] 14807db96d56Sopenharmony_ci self.assertEqual(y, 'b') 14817db96d56Sopenharmony_ci self.assertAlmostEqual(0.10, loop.time()) 14827db96d56Sopenharmony_ci 14837db96d56Sopenharmony_ci loop.run_until_complete(test()) 14847db96d56Sopenharmony_ci 14857db96d56Sopenharmony_ci def test_as_completed_concurrent(self): 14867db96d56Sopenharmony_ci 14877db96d56Sopenharmony_ci def gen(): 14887db96d56Sopenharmony_ci when = yield 14897db96d56Sopenharmony_ci self.assertAlmostEqual(0.05, when) 14907db96d56Sopenharmony_ci when = yield 0 14917db96d56Sopenharmony_ci self.assertAlmostEqual(0.05, when) 14927db96d56Sopenharmony_ci yield 0.05 14937db96d56Sopenharmony_ci 14947db96d56Sopenharmony_ci a = asyncio.sleep(0.05, 'a') 14957db96d56Sopenharmony_ci b = asyncio.sleep(0.05, 'b') 14967db96d56Sopenharmony_ci fs = {a, b} 14977db96d56Sopenharmony_ci 14987db96d56Sopenharmony_ci async def test(): 14997db96d56Sopenharmony_ci futs = list(asyncio.as_completed(fs)) 15007db96d56Sopenharmony_ci self.assertEqual(len(futs), 2) 15017db96d56Sopenharmony_ci done, pending = await asyncio.wait( 15027db96d56Sopenharmony_ci [asyncio.ensure_future(fut) for fut in futs] 15037db96d56Sopenharmony_ci ) 15047db96d56Sopenharmony_ci self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 15057db96d56Sopenharmony_ci 15067db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 15077db96d56Sopenharmony_ci loop.run_until_complete(test()) 15087db96d56Sopenharmony_ci 15097db96d56Sopenharmony_ci def test_as_completed_duplicate_coroutines(self): 15107db96d56Sopenharmony_ci 15117db96d56Sopenharmony_ci async def coro(s): 15127db96d56Sopenharmony_ci return s 15137db96d56Sopenharmony_ci 15147db96d56Sopenharmony_ci async def runner(): 15157db96d56Sopenharmony_ci result = [] 15167db96d56Sopenharmony_ci c = coro('ham') 15177db96d56Sopenharmony_ci for f in asyncio.as_completed([c, c, coro('spam')]): 15187db96d56Sopenharmony_ci result.append(await f) 15197db96d56Sopenharmony_ci return result 15207db96d56Sopenharmony_ci 15217db96d56Sopenharmony_ci fut = self.new_task(self.loop, runner()) 15227db96d56Sopenharmony_ci self.loop.run_until_complete(fut) 15237db96d56Sopenharmony_ci result = fut.result() 15247db96d56Sopenharmony_ci self.assertEqual(set(result), {'ham', 'spam'}) 15257db96d56Sopenharmony_ci self.assertEqual(len(result), 2) 15267db96d56Sopenharmony_ci 15277db96d56Sopenharmony_ci def test_as_completed_coroutine_without_loop(self): 15287db96d56Sopenharmony_ci async def coro(): 15297db96d56Sopenharmony_ci return 42 15307db96d56Sopenharmony_ci 15317db96d56Sopenharmony_ci a = coro() 15327db96d56Sopenharmony_ci self.addCleanup(a.close) 15337db96d56Sopenharmony_ci 15347db96d56Sopenharmony_ci futs = asyncio.as_completed([a]) 15357db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 15367db96d56Sopenharmony_ci list(futs) 15377db96d56Sopenharmony_ci 15387db96d56Sopenharmony_ci def test_as_completed_coroutine_use_running_loop(self): 15397db96d56Sopenharmony_ci loop = self.new_test_loop() 15407db96d56Sopenharmony_ci 15417db96d56Sopenharmony_ci async def coro(): 15427db96d56Sopenharmony_ci return 42 15437db96d56Sopenharmony_ci 15447db96d56Sopenharmony_ci async def test(): 15457db96d56Sopenharmony_ci futs = list(asyncio.as_completed([coro()])) 15467db96d56Sopenharmony_ci self.assertEqual(len(futs), 1) 15477db96d56Sopenharmony_ci self.assertEqual(await futs[0], 42) 15487db96d56Sopenharmony_ci 15497db96d56Sopenharmony_ci loop.run_until_complete(test()) 15507db96d56Sopenharmony_ci 15517db96d56Sopenharmony_ci def test_sleep(self): 15527db96d56Sopenharmony_ci 15537db96d56Sopenharmony_ci def gen(): 15547db96d56Sopenharmony_ci when = yield 15557db96d56Sopenharmony_ci self.assertAlmostEqual(0.05, when) 15567db96d56Sopenharmony_ci when = yield 0.05 15577db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 15587db96d56Sopenharmony_ci yield 0.05 15597db96d56Sopenharmony_ci 15607db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 15617db96d56Sopenharmony_ci 15627db96d56Sopenharmony_ci async def sleeper(dt, arg): 15637db96d56Sopenharmony_ci await asyncio.sleep(dt/2) 15647db96d56Sopenharmony_ci res = await asyncio.sleep(dt/2, arg) 15657db96d56Sopenharmony_ci return res 15667db96d56Sopenharmony_ci 15677db96d56Sopenharmony_ci t = self.new_task(loop, sleeper(0.1, 'yeah')) 15687db96d56Sopenharmony_ci loop.run_until_complete(t) 15697db96d56Sopenharmony_ci self.assertTrue(t.done()) 15707db96d56Sopenharmony_ci self.assertEqual(t.result(), 'yeah') 15717db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, loop.time()) 15727db96d56Sopenharmony_ci 15737db96d56Sopenharmony_ci def test_sleep_cancel(self): 15747db96d56Sopenharmony_ci 15757db96d56Sopenharmony_ci def gen(): 15767db96d56Sopenharmony_ci when = yield 15777db96d56Sopenharmony_ci self.assertAlmostEqual(10.0, when) 15787db96d56Sopenharmony_ci yield 0 15797db96d56Sopenharmony_ci 15807db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 15817db96d56Sopenharmony_ci 15827db96d56Sopenharmony_ci t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 15837db96d56Sopenharmony_ci 15847db96d56Sopenharmony_ci handle = None 15857db96d56Sopenharmony_ci orig_call_later = loop.call_later 15867db96d56Sopenharmony_ci 15877db96d56Sopenharmony_ci def call_later(delay, callback, *args): 15887db96d56Sopenharmony_ci nonlocal handle 15897db96d56Sopenharmony_ci handle = orig_call_later(delay, callback, *args) 15907db96d56Sopenharmony_ci return handle 15917db96d56Sopenharmony_ci 15927db96d56Sopenharmony_ci loop.call_later = call_later 15937db96d56Sopenharmony_ci test_utils.run_briefly(loop) 15947db96d56Sopenharmony_ci 15957db96d56Sopenharmony_ci self.assertFalse(handle._cancelled) 15967db96d56Sopenharmony_ci 15977db96d56Sopenharmony_ci t.cancel() 15987db96d56Sopenharmony_ci test_utils.run_briefly(loop) 15997db96d56Sopenharmony_ci self.assertTrue(handle._cancelled) 16007db96d56Sopenharmony_ci 16017db96d56Sopenharmony_ci def test_task_cancel_sleeping_task(self): 16027db96d56Sopenharmony_ci 16037db96d56Sopenharmony_ci def gen(): 16047db96d56Sopenharmony_ci when = yield 16057db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, when) 16067db96d56Sopenharmony_ci when = yield 0 16077db96d56Sopenharmony_ci self.assertAlmostEqual(5000, when) 16087db96d56Sopenharmony_ci yield 0.1 16097db96d56Sopenharmony_ci 16107db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 16117db96d56Sopenharmony_ci 16127db96d56Sopenharmony_ci async def sleep(dt): 16137db96d56Sopenharmony_ci await asyncio.sleep(dt) 16147db96d56Sopenharmony_ci 16157db96d56Sopenharmony_ci async def doit(): 16167db96d56Sopenharmony_ci sleeper = self.new_task(loop, sleep(5000)) 16177db96d56Sopenharmony_ci loop.call_later(0.1, sleeper.cancel) 16187db96d56Sopenharmony_ci try: 16197db96d56Sopenharmony_ci await sleeper 16207db96d56Sopenharmony_ci except asyncio.CancelledError: 16217db96d56Sopenharmony_ci return 'cancelled' 16227db96d56Sopenharmony_ci else: 16237db96d56Sopenharmony_ci return 'slept in' 16247db96d56Sopenharmony_ci 16257db96d56Sopenharmony_ci doer = doit() 16267db96d56Sopenharmony_ci self.assertEqual(loop.run_until_complete(doer), 'cancelled') 16277db96d56Sopenharmony_ci self.assertAlmostEqual(0.1, loop.time()) 16287db96d56Sopenharmony_ci 16297db96d56Sopenharmony_ci def test_task_cancel_waiter_future(self): 16307db96d56Sopenharmony_ci fut = self.new_future(self.loop) 16317db96d56Sopenharmony_ci 16327db96d56Sopenharmony_ci async def coro(): 16337db96d56Sopenharmony_ci await fut 16347db96d56Sopenharmony_ci 16357db96d56Sopenharmony_ci task = self.new_task(self.loop, coro()) 16367db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 16377db96d56Sopenharmony_ci self.assertIs(task._fut_waiter, fut) 16387db96d56Sopenharmony_ci 16397db96d56Sopenharmony_ci task.cancel() 16407db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 16417db96d56Sopenharmony_ci self.assertRaises( 16427db96d56Sopenharmony_ci asyncio.CancelledError, self.loop.run_until_complete, task) 16437db96d56Sopenharmony_ci self.assertIsNone(task._fut_waiter) 16447db96d56Sopenharmony_ci self.assertTrue(fut.cancelled()) 16457db96d56Sopenharmony_ci 16467db96d56Sopenharmony_ci def test_task_set_methods(self): 16477db96d56Sopenharmony_ci async def notmuch(): 16487db96d56Sopenharmony_ci return 'ko' 16497db96d56Sopenharmony_ci 16507db96d56Sopenharmony_ci gen = notmuch() 16517db96d56Sopenharmony_ci task = self.new_task(self.loop, gen) 16527db96d56Sopenharmony_ci 16537db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 16547db96d56Sopenharmony_ci task.set_result('ok') 16557db96d56Sopenharmony_ci 16567db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 16577db96d56Sopenharmony_ci task.set_exception(ValueError()) 16587db96d56Sopenharmony_ci 16597db96d56Sopenharmony_ci self.assertEqual( 16607db96d56Sopenharmony_ci self.loop.run_until_complete(task), 16617db96d56Sopenharmony_ci 'ko') 16627db96d56Sopenharmony_ci 16637db96d56Sopenharmony_ci def test_step_result_future(self): 16647db96d56Sopenharmony_ci # If coroutine returns future, task waits on this future. 16657db96d56Sopenharmony_ci 16667db96d56Sopenharmony_ci class Fut(asyncio.Future): 16677db96d56Sopenharmony_ci def __init__(self, *args, **kwds): 16687db96d56Sopenharmony_ci self.cb_added = False 16697db96d56Sopenharmony_ci super().__init__(*args, **kwds) 16707db96d56Sopenharmony_ci 16717db96d56Sopenharmony_ci def add_done_callback(self, *args, **kwargs): 16727db96d56Sopenharmony_ci self.cb_added = True 16737db96d56Sopenharmony_ci super().add_done_callback(*args, **kwargs) 16747db96d56Sopenharmony_ci 16757db96d56Sopenharmony_ci fut = Fut(loop=self.loop) 16767db96d56Sopenharmony_ci result = None 16777db96d56Sopenharmony_ci 16787db96d56Sopenharmony_ci async def wait_for_future(): 16797db96d56Sopenharmony_ci nonlocal result 16807db96d56Sopenharmony_ci result = await fut 16817db96d56Sopenharmony_ci 16827db96d56Sopenharmony_ci t = self.new_task(self.loop, wait_for_future()) 16837db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 16847db96d56Sopenharmony_ci self.assertTrue(fut.cb_added) 16857db96d56Sopenharmony_ci 16867db96d56Sopenharmony_ci res = object() 16877db96d56Sopenharmony_ci fut.set_result(res) 16887db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 16897db96d56Sopenharmony_ci self.assertIs(res, result) 16907db96d56Sopenharmony_ci self.assertTrue(t.done()) 16917db96d56Sopenharmony_ci self.assertIsNone(t.result()) 16927db96d56Sopenharmony_ci 16937db96d56Sopenharmony_ci def test_baseexception_during_cancel(self): 16947db96d56Sopenharmony_ci 16957db96d56Sopenharmony_ci def gen(): 16967db96d56Sopenharmony_ci when = yield 16977db96d56Sopenharmony_ci self.assertAlmostEqual(10.0, when) 16987db96d56Sopenharmony_ci yield 0 16997db96d56Sopenharmony_ci 17007db96d56Sopenharmony_ci loop = self.new_test_loop(gen) 17017db96d56Sopenharmony_ci 17027db96d56Sopenharmony_ci async def sleeper(): 17037db96d56Sopenharmony_ci await asyncio.sleep(10) 17047db96d56Sopenharmony_ci 17057db96d56Sopenharmony_ci base_exc = SystemExit() 17067db96d56Sopenharmony_ci 17077db96d56Sopenharmony_ci async def notmutch(): 17087db96d56Sopenharmony_ci try: 17097db96d56Sopenharmony_ci await sleeper() 17107db96d56Sopenharmony_ci except asyncio.CancelledError: 17117db96d56Sopenharmony_ci raise base_exc 17127db96d56Sopenharmony_ci 17137db96d56Sopenharmony_ci task = self.new_task(loop, notmutch()) 17147db96d56Sopenharmony_ci test_utils.run_briefly(loop) 17157db96d56Sopenharmony_ci 17167db96d56Sopenharmony_ci task.cancel() 17177db96d56Sopenharmony_ci self.assertFalse(task.done()) 17187db96d56Sopenharmony_ci 17197db96d56Sopenharmony_ci self.assertRaises(SystemExit, test_utils.run_briefly, loop) 17207db96d56Sopenharmony_ci 17217db96d56Sopenharmony_ci self.assertTrue(task.done()) 17227db96d56Sopenharmony_ci self.assertFalse(task.cancelled()) 17237db96d56Sopenharmony_ci self.assertIs(task.exception(), base_exc) 17247db96d56Sopenharmony_ci 17257db96d56Sopenharmony_ci def test_iscoroutinefunction(self): 17267db96d56Sopenharmony_ci def fn(): 17277db96d56Sopenharmony_ci pass 17287db96d56Sopenharmony_ci 17297db96d56Sopenharmony_ci self.assertFalse(asyncio.iscoroutinefunction(fn)) 17307db96d56Sopenharmony_ci 17317db96d56Sopenharmony_ci def fn1(): 17327db96d56Sopenharmony_ci yield 17337db96d56Sopenharmony_ci self.assertFalse(asyncio.iscoroutinefunction(fn1)) 17347db96d56Sopenharmony_ci 17357db96d56Sopenharmony_ci async def fn2(): 17367db96d56Sopenharmony_ci pass 17377db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutinefunction(fn2)) 17387db96d56Sopenharmony_ci 17397db96d56Sopenharmony_ci self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 17407db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutinefunction(mock.AsyncMock())) 17417db96d56Sopenharmony_ci 17427db96d56Sopenharmony_ci def test_coroutine_non_gen_function(self): 17437db96d56Sopenharmony_ci async def func(): 17447db96d56Sopenharmony_ci return 'test' 17457db96d56Sopenharmony_ci 17467db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutinefunction(func)) 17477db96d56Sopenharmony_ci 17487db96d56Sopenharmony_ci coro = func() 17497db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutine(coro)) 17507db96d56Sopenharmony_ci 17517db96d56Sopenharmony_ci res = self.loop.run_until_complete(coro) 17527db96d56Sopenharmony_ci self.assertEqual(res, 'test') 17537db96d56Sopenharmony_ci 17547db96d56Sopenharmony_ci def test_coroutine_non_gen_function_return_future(self): 17557db96d56Sopenharmony_ci fut = self.new_future(self.loop) 17567db96d56Sopenharmony_ci 17577db96d56Sopenharmony_ci async def func(): 17587db96d56Sopenharmony_ci return fut 17597db96d56Sopenharmony_ci 17607db96d56Sopenharmony_ci async def coro(): 17617db96d56Sopenharmony_ci fut.set_result('test') 17627db96d56Sopenharmony_ci 17637db96d56Sopenharmony_ci t1 = self.new_task(self.loop, func()) 17647db96d56Sopenharmony_ci t2 = self.new_task(self.loop, coro()) 17657db96d56Sopenharmony_ci res = self.loop.run_until_complete(t1) 17667db96d56Sopenharmony_ci self.assertEqual(res, fut) 17677db96d56Sopenharmony_ci self.assertIsNone(t2.result()) 17687db96d56Sopenharmony_ci 17697db96d56Sopenharmony_ci def test_current_task(self): 17707db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop=self.loop)) 17717db96d56Sopenharmony_ci 17727db96d56Sopenharmony_ci async def coro(loop): 17737db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(), task) 17747db96d56Sopenharmony_ci 17757db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(None), task) 17767db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(), task) 17777db96d56Sopenharmony_ci 17787db96d56Sopenharmony_ci task = self.new_task(self.loop, coro(self.loop)) 17797db96d56Sopenharmony_ci self.loop.run_until_complete(task) 17807db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop=self.loop)) 17817db96d56Sopenharmony_ci 17827db96d56Sopenharmony_ci def test_current_task_with_interleaving_tasks(self): 17837db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop=self.loop)) 17847db96d56Sopenharmony_ci 17857db96d56Sopenharmony_ci fut1 = self.new_future(self.loop) 17867db96d56Sopenharmony_ci fut2 = self.new_future(self.loop) 17877db96d56Sopenharmony_ci 17887db96d56Sopenharmony_ci async def coro1(loop): 17897db96d56Sopenharmony_ci self.assertTrue(asyncio.current_task() is task1) 17907db96d56Sopenharmony_ci await fut1 17917db96d56Sopenharmony_ci self.assertTrue(asyncio.current_task() is task1) 17927db96d56Sopenharmony_ci fut2.set_result(True) 17937db96d56Sopenharmony_ci 17947db96d56Sopenharmony_ci async def coro2(loop): 17957db96d56Sopenharmony_ci self.assertTrue(asyncio.current_task() is task2) 17967db96d56Sopenharmony_ci fut1.set_result(True) 17977db96d56Sopenharmony_ci await fut2 17987db96d56Sopenharmony_ci self.assertTrue(asyncio.current_task() is task2) 17997db96d56Sopenharmony_ci 18007db96d56Sopenharmony_ci task1 = self.new_task(self.loop, coro1(self.loop)) 18017db96d56Sopenharmony_ci task2 = self.new_task(self.loop, coro2(self.loop)) 18027db96d56Sopenharmony_ci 18037db96d56Sopenharmony_ci self.loop.run_until_complete(asyncio.wait((task1, task2))) 18047db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop=self.loop)) 18057db96d56Sopenharmony_ci 18067db96d56Sopenharmony_ci # Some thorough tests for cancellation propagation through 18077db96d56Sopenharmony_ci # coroutines, tasks and wait(). 18087db96d56Sopenharmony_ci 18097db96d56Sopenharmony_ci def test_yield_future_passes_cancel(self): 18107db96d56Sopenharmony_ci # Cancelling outer() cancels inner() cancels waiter. 18117db96d56Sopenharmony_ci proof = 0 18127db96d56Sopenharmony_ci waiter = self.new_future(self.loop) 18137db96d56Sopenharmony_ci 18147db96d56Sopenharmony_ci async def inner(): 18157db96d56Sopenharmony_ci nonlocal proof 18167db96d56Sopenharmony_ci try: 18177db96d56Sopenharmony_ci await waiter 18187db96d56Sopenharmony_ci except asyncio.CancelledError: 18197db96d56Sopenharmony_ci proof += 1 18207db96d56Sopenharmony_ci raise 18217db96d56Sopenharmony_ci else: 18227db96d56Sopenharmony_ci self.fail('got past sleep() in inner()') 18237db96d56Sopenharmony_ci 18247db96d56Sopenharmony_ci async def outer(): 18257db96d56Sopenharmony_ci nonlocal proof 18267db96d56Sopenharmony_ci try: 18277db96d56Sopenharmony_ci await inner() 18287db96d56Sopenharmony_ci except asyncio.CancelledError: 18297db96d56Sopenharmony_ci proof += 100 # Expect this path. 18307db96d56Sopenharmony_ci else: 18317db96d56Sopenharmony_ci proof += 10 18327db96d56Sopenharmony_ci 18337db96d56Sopenharmony_ci f = asyncio.ensure_future(outer(), loop=self.loop) 18347db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18357db96d56Sopenharmony_ci f.cancel() 18367db96d56Sopenharmony_ci self.loop.run_until_complete(f) 18377db96d56Sopenharmony_ci self.assertEqual(proof, 101) 18387db96d56Sopenharmony_ci self.assertTrue(waiter.cancelled()) 18397db96d56Sopenharmony_ci 18407db96d56Sopenharmony_ci def test_yield_wait_does_not_shield_cancel(self): 18417db96d56Sopenharmony_ci # Cancelling outer() makes wait() return early, leaves inner() 18427db96d56Sopenharmony_ci # running. 18437db96d56Sopenharmony_ci proof = 0 18447db96d56Sopenharmony_ci waiter = self.new_future(self.loop) 18457db96d56Sopenharmony_ci 18467db96d56Sopenharmony_ci async def inner(): 18477db96d56Sopenharmony_ci nonlocal proof 18487db96d56Sopenharmony_ci await waiter 18497db96d56Sopenharmony_ci proof += 1 18507db96d56Sopenharmony_ci 18517db96d56Sopenharmony_ci async def outer(): 18527db96d56Sopenharmony_ci nonlocal proof 18537db96d56Sopenharmony_ci with self.assertWarns(DeprecationWarning): 18547db96d56Sopenharmony_ci d, p = await asyncio.wait([asyncio.create_task(inner())]) 18557db96d56Sopenharmony_ci proof += 100 18567db96d56Sopenharmony_ci 18577db96d56Sopenharmony_ci f = asyncio.ensure_future(outer(), loop=self.loop) 18587db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18597db96d56Sopenharmony_ci f.cancel() 18607db96d56Sopenharmony_ci self.assertRaises( 18617db96d56Sopenharmony_ci asyncio.CancelledError, self.loop.run_until_complete, f) 18627db96d56Sopenharmony_ci waiter.set_result(None) 18637db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18647db96d56Sopenharmony_ci self.assertEqual(proof, 1) 18657db96d56Sopenharmony_ci 18667db96d56Sopenharmony_ci def test_shield_result(self): 18677db96d56Sopenharmony_ci inner = self.new_future(self.loop) 18687db96d56Sopenharmony_ci outer = asyncio.shield(inner) 18697db96d56Sopenharmony_ci inner.set_result(42) 18707db96d56Sopenharmony_ci res = self.loop.run_until_complete(outer) 18717db96d56Sopenharmony_ci self.assertEqual(res, 42) 18727db96d56Sopenharmony_ci 18737db96d56Sopenharmony_ci def test_shield_exception(self): 18747db96d56Sopenharmony_ci inner = self.new_future(self.loop) 18757db96d56Sopenharmony_ci outer = asyncio.shield(inner) 18767db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18777db96d56Sopenharmony_ci exc = RuntimeError('expected') 18787db96d56Sopenharmony_ci inner.set_exception(exc) 18797db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18807db96d56Sopenharmony_ci self.assertIs(outer.exception(), exc) 18817db96d56Sopenharmony_ci 18827db96d56Sopenharmony_ci def test_shield_cancel_inner(self): 18837db96d56Sopenharmony_ci inner = self.new_future(self.loop) 18847db96d56Sopenharmony_ci outer = asyncio.shield(inner) 18857db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18867db96d56Sopenharmony_ci inner.cancel() 18877db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18887db96d56Sopenharmony_ci self.assertTrue(outer.cancelled()) 18897db96d56Sopenharmony_ci 18907db96d56Sopenharmony_ci def test_shield_cancel_outer(self): 18917db96d56Sopenharmony_ci inner = self.new_future(self.loop) 18927db96d56Sopenharmony_ci outer = asyncio.shield(inner) 18937db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18947db96d56Sopenharmony_ci outer.cancel() 18957db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 18967db96d56Sopenharmony_ci self.assertTrue(outer.cancelled()) 18977db96d56Sopenharmony_ci self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 18987db96d56Sopenharmony_ci 18997db96d56Sopenharmony_ci def test_shield_shortcut(self): 19007db96d56Sopenharmony_ci fut = self.new_future(self.loop) 19017db96d56Sopenharmony_ci fut.set_result(42) 19027db96d56Sopenharmony_ci res = self.loop.run_until_complete(asyncio.shield(fut)) 19037db96d56Sopenharmony_ci self.assertEqual(res, 42) 19047db96d56Sopenharmony_ci 19057db96d56Sopenharmony_ci def test_shield_effect(self): 19067db96d56Sopenharmony_ci # Cancelling outer() does not affect inner(). 19077db96d56Sopenharmony_ci proof = 0 19087db96d56Sopenharmony_ci waiter = self.new_future(self.loop) 19097db96d56Sopenharmony_ci 19107db96d56Sopenharmony_ci async def inner(): 19117db96d56Sopenharmony_ci nonlocal proof 19127db96d56Sopenharmony_ci await waiter 19137db96d56Sopenharmony_ci proof += 1 19147db96d56Sopenharmony_ci 19157db96d56Sopenharmony_ci async def outer(): 19167db96d56Sopenharmony_ci nonlocal proof 19177db96d56Sopenharmony_ci await asyncio.shield(inner()) 19187db96d56Sopenharmony_ci proof += 100 19197db96d56Sopenharmony_ci 19207db96d56Sopenharmony_ci f = asyncio.ensure_future(outer(), loop=self.loop) 19217db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19227db96d56Sopenharmony_ci f.cancel() 19237db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 19247db96d56Sopenharmony_ci self.loop.run_until_complete(f) 19257db96d56Sopenharmony_ci waiter.set_result(None) 19267db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19277db96d56Sopenharmony_ci self.assertEqual(proof, 1) 19287db96d56Sopenharmony_ci 19297db96d56Sopenharmony_ci def test_shield_gather(self): 19307db96d56Sopenharmony_ci child1 = self.new_future(self.loop) 19317db96d56Sopenharmony_ci child2 = self.new_future(self.loop) 19327db96d56Sopenharmony_ci parent = asyncio.gather(child1, child2) 19337db96d56Sopenharmony_ci outer = asyncio.shield(parent) 19347db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19357db96d56Sopenharmony_ci outer.cancel() 19367db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19377db96d56Sopenharmony_ci self.assertTrue(outer.cancelled()) 19387db96d56Sopenharmony_ci child1.set_result(1) 19397db96d56Sopenharmony_ci child2.set_result(2) 19407db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19417db96d56Sopenharmony_ci self.assertEqual(parent.result(), [1, 2]) 19427db96d56Sopenharmony_ci 19437db96d56Sopenharmony_ci def test_gather_shield(self): 19447db96d56Sopenharmony_ci child1 = self.new_future(self.loop) 19457db96d56Sopenharmony_ci child2 = self.new_future(self.loop) 19467db96d56Sopenharmony_ci inner1 = asyncio.shield(child1) 19477db96d56Sopenharmony_ci inner2 = asyncio.shield(child2) 19487db96d56Sopenharmony_ci parent = asyncio.gather(inner1, inner2) 19497db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19507db96d56Sopenharmony_ci parent.cancel() 19517db96d56Sopenharmony_ci # This should cancel inner1 and inner2 but bot child1 and child2. 19527db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19537db96d56Sopenharmony_ci self.assertIsInstance(parent.exception(), asyncio.CancelledError) 19547db96d56Sopenharmony_ci self.assertTrue(inner1.cancelled()) 19557db96d56Sopenharmony_ci self.assertTrue(inner2.cancelled()) 19567db96d56Sopenharmony_ci child1.set_result(1) 19577db96d56Sopenharmony_ci child2.set_result(2) 19587db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 19597db96d56Sopenharmony_ci 19607db96d56Sopenharmony_ci def test_shield_coroutine_without_loop(self): 19617db96d56Sopenharmony_ci async def coro(): 19627db96d56Sopenharmony_ci return 42 19637db96d56Sopenharmony_ci 19647db96d56Sopenharmony_ci inner = coro() 19657db96d56Sopenharmony_ci self.addCleanup(inner.close) 19667db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 19677db96d56Sopenharmony_ci asyncio.shield(inner) 19687db96d56Sopenharmony_ci 19697db96d56Sopenharmony_ci def test_shield_coroutine_use_running_loop(self): 19707db96d56Sopenharmony_ci async def coro(): 19717db96d56Sopenharmony_ci return 42 19727db96d56Sopenharmony_ci 19737db96d56Sopenharmony_ci async def test(): 19747db96d56Sopenharmony_ci return asyncio.shield(coro()) 19757db96d56Sopenharmony_ci outer = self.loop.run_until_complete(test()) 19767db96d56Sopenharmony_ci self.assertEqual(outer._loop, self.loop) 19777db96d56Sopenharmony_ci res = self.loop.run_until_complete(outer) 19787db96d56Sopenharmony_ci self.assertEqual(res, 42) 19797db96d56Sopenharmony_ci 19807db96d56Sopenharmony_ci def test_shield_coroutine_use_global_loop(self): 19817db96d56Sopenharmony_ci # Deprecated in 3.10, undeprecated in 3.11.1 19827db96d56Sopenharmony_ci async def coro(): 19837db96d56Sopenharmony_ci return 42 19847db96d56Sopenharmony_ci 19857db96d56Sopenharmony_ci asyncio.set_event_loop(self.loop) 19867db96d56Sopenharmony_ci self.addCleanup(asyncio.set_event_loop, None) 19877db96d56Sopenharmony_ci outer = asyncio.shield(coro()) 19887db96d56Sopenharmony_ci self.assertEqual(outer._loop, self.loop) 19897db96d56Sopenharmony_ci res = self.loop.run_until_complete(outer) 19907db96d56Sopenharmony_ci self.assertEqual(res, 42) 19917db96d56Sopenharmony_ci 19927db96d56Sopenharmony_ci def test_as_completed_invalid_args(self): 19937db96d56Sopenharmony_ci fut = self.new_future(self.loop) 19947db96d56Sopenharmony_ci 19957db96d56Sopenharmony_ci # as_completed() expects a list of futures, not a future instance 19967db96d56Sopenharmony_ci self.assertRaises(TypeError, self.loop.run_until_complete, 19977db96d56Sopenharmony_ci asyncio.as_completed(fut)) 19987db96d56Sopenharmony_ci coro = coroutine_function() 19997db96d56Sopenharmony_ci self.assertRaises(TypeError, self.loop.run_until_complete, 20007db96d56Sopenharmony_ci asyncio.as_completed(coro)) 20017db96d56Sopenharmony_ci coro.close() 20027db96d56Sopenharmony_ci 20037db96d56Sopenharmony_ci def test_wait_invalid_args(self): 20047db96d56Sopenharmony_ci fut = self.new_future(self.loop) 20057db96d56Sopenharmony_ci 20067db96d56Sopenharmony_ci # wait() expects a list of futures, not a future instance 20077db96d56Sopenharmony_ci self.assertRaises(TypeError, self.loop.run_until_complete, 20087db96d56Sopenharmony_ci asyncio.wait(fut)) 20097db96d56Sopenharmony_ci coro = coroutine_function() 20107db96d56Sopenharmony_ci self.assertRaises(TypeError, self.loop.run_until_complete, 20117db96d56Sopenharmony_ci asyncio.wait(coro)) 20127db96d56Sopenharmony_ci coro.close() 20137db96d56Sopenharmony_ci 20147db96d56Sopenharmony_ci # wait() expects at least a future 20157db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, 20167db96d56Sopenharmony_ci asyncio.wait([])) 20177db96d56Sopenharmony_ci 20187db96d56Sopenharmony_ci def test_log_destroyed_pending_task(self): 20197db96d56Sopenharmony_ci Task = self.__class__.Task 20207db96d56Sopenharmony_ci 20217db96d56Sopenharmony_ci async def kill_me(loop): 20227db96d56Sopenharmony_ci future = self.new_future(loop) 20237db96d56Sopenharmony_ci await future 20247db96d56Sopenharmony_ci # at this point, the only reference to kill_me() task is 20257db96d56Sopenharmony_ci # the Task._wakeup() method in future._callbacks 20267db96d56Sopenharmony_ci raise Exception("code never reached") 20277db96d56Sopenharmony_ci 20287db96d56Sopenharmony_ci mock_handler = mock.Mock() 20297db96d56Sopenharmony_ci self.loop.set_debug(True) 20307db96d56Sopenharmony_ci self.loop.set_exception_handler(mock_handler) 20317db96d56Sopenharmony_ci 20327db96d56Sopenharmony_ci # schedule the task 20337db96d56Sopenharmony_ci coro = kill_me(self.loop) 20347db96d56Sopenharmony_ci task = asyncio.ensure_future(coro, loop=self.loop) 20357db96d56Sopenharmony_ci 20367db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 20377db96d56Sopenharmony_ci 20387db96d56Sopenharmony_ci asyncio.set_event_loop(None) 20397db96d56Sopenharmony_ci 20407db96d56Sopenharmony_ci # execute the task so it waits for future 20417db96d56Sopenharmony_ci self.loop._run_once() 20427db96d56Sopenharmony_ci self.assertEqual(len(self.loop._ready), 0) 20437db96d56Sopenharmony_ci 20447db96d56Sopenharmony_ci coro = None 20457db96d56Sopenharmony_ci source_traceback = task._source_traceback 20467db96d56Sopenharmony_ci task = None 20477db96d56Sopenharmony_ci 20487db96d56Sopenharmony_ci # no more reference to kill_me() task: the task is destroyed by the GC 20497db96d56Sopenharmony_ci support.gc_collect() 20507db96d56Sopenharmony_ci 20517db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 20527db96d56Sopenharmony_ci 20537db96d56Sopenharmony_ci mock_handler.assert_called_with(self.loop, { 20547db96d56Sopenharmony_ci 'message': 'Task was destroyed but it is pending!', 20557db96d56Sopenharmony_ci 'task': mock.ANY, 20567db96d56Sopenharmony_ci 'source_traceback': source_traceback, 20577db96d56Sopenharmony_ci }) 20587db96d56Sopenharmony_ci mock_handler.reset_mock() 20597db96d56Sopenharmony_ci 20607db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.logger') 20617db96d56Sopenharmony_ci def test_tb_logger_not_called_after_cancel(self, m_log): 20627db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 20637db96d56Sopenharmony_ci self.set_event_loop(loop) 20647db96d56Sopenharmony_ci 20657db96d56Sopenharmony_ci async def coro(): 20667db96d56Sopenharmony_ci raise TypeError 20677db96d56Sopenharmony_ci 20687db96d56Sopenharmony_ci async def runner(): 20697db96d56Sopenharmony_ci task = self.new_task(loop, coro()) 20707db96d56Sopenharmony_ci await asyncio.sleep(0.05) 20717db96d56Sopenharmony_ci task.cancel() 20727db96d56Sopenharmony_ci task = None 20737db96d56Sopenharmony_ci 20747db96d56Sopenharmony_ci loop.run_until_complete(runner()) 20757db96d56Sopenharmony_ci self.assertFalse(m_log.error.called) 20767db96d56Sopenharmony_ci 20777db96d56Sopenharmony_ci def test_task_source_traceback(self): 20787db96d56Sopenharmony_ci self.loop.set_debug(True) 20797db96d56Sopenharmony_ci 20807db96d56Sopenharmony_ci task = self.new_task(self.loop, coroutine_function()) 20817db96d56Sopenharmony_ci lineno = sys._getframe().f_lineno - 1 20827db96d56Sopenharmony_ci self.assertIsInstance(task._source_traceback, list) 20837db96d56Sopenharmony_ci self.assertEqual(task._source_traceback[-2][:3], 20847db96d56Sopenharmony_ci (__file__, 20857db96d56Sopenharmony_ci lineno, 20867db96d56Sopenharmony_ci 'test_task_source_traceback')) 20877db96d56Sopenharmony_ci self.loop.run_until_complete(task) 20887db96d56Sopenharmony_ci 20897db96d56Sopenharmony_ci def test_cancel_gather_1(self): 20907db96d56Sopenharmony_ci """Ensure that a gathering future refuses to be cancelled once all 20917db96d56Sopenharmony_ci children are done""" 20927db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 20937db96d56Sopenharmony_ci self.addCleanup(loop.close) 20947db96d56Sopenharmony_ci 20957db96d56Sopenharmony_ci fut = self.new_future(loop) 20967db96d56Sopenharmony_ci async def create(): 20977db96d56Sopenharmony_ci # The indirection fut->child_coro is needed since otherwise the 20987db96d56Sopenharmony_ci # gathering task is done at the same time as the child future 20997db96d56Sopenharmony_ci def child_coro(): 21007db96d56Sopenharmony_ci return (yield from fut) 21017db96d56Sopenharmony_ci gather_future = asyncio.gather(child_coro()) 21027db96d56Sopenharmony_ci return asyncio.ensure_future(gather_future) 21037db96d56Sopenharmony_ci gather_task = loop.run_until_complete(create()) 21047db96d56Sopenharmony_ci 21057db96d56Sopenharmony_ci cancel_result = None 21067db96d56Sopenharmony_ci def cancelling_callback(_): 21077db96d56Sopenharmony_ci nonlocal cancel_result 21087db96d56Sopenharmony_ci cancel_result = gather_task.cancel() 21097db96d56Sopenharmony_ci fut.add_done_callback(cancelling_callback) 21107db96d56Sopenharmony_ci 21117db96d56Sopenharmony_ci fut.set_result(42) # calls the cancelling_callback after fut is done() 21127db96d56Sopenharmony_ci 21137db96d56Sopenharmony_ci # At this point the task should complete. 21147db96d56Sopenharmony_ci loop.run_until_complete(gather_task) 21157db96d56Sopenharmony_ci 21167db96d56Sopenharmony_ci # Python issue #26923: asyncio.gather drops cancellation 21177db96d56Sopenharmony_ci self.assertEqual(cancel_result, False) 21187db96d56Sopenharmony_ci self.assertFalse(gather_task.cancelled()) 21197db96d56Sopenharmony_ci self.assertEqual(gather_task.result(), [42]) 21207db96d56Sopenharmony_ci 21217db96d56Sopenharmony_ci def test_cancel_gather_2(self): 21227db96d56Sopenharmony_ci cases = [ 21237db96d56Sopenharmony_ci ((), ()), 21247db96d56Sopenharmony_ci ((None,), ()), 21257db96d56Sopenharmony_ci (('my message',), ('my message',)), 21267db96d56Sopenharmony_ci # Non-string values should roundtrip. 21277db96d56Sopenharmony_ci ((5,), (5,)), 21287db96d56Sopenharmony_ci ] 21297db96d56Sopenharmony_ci for cancel_args, expected_args in cases: 21307db96d56Sopenharmony_ci with self.subTest(cancel_args=cancel_args): 21317db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 21327db96d56Sopenharmony_ci self.addCleanup(loop.close) 21337db96d56Sopenharmony_ci 21347db96d56Sopenharmony_ci async def test(): 21357db96d56Sopenharmony_ci time = 0 21367db96d56Sopenharmony_ci while True: 21377db96d56Sopenharmony_ci time += 0.05 21387db96d56Sopenharmony_ci await asyncio.gather(asyncio.sleep(0.05), 21397db96d56Sopenharmony_ci return_exceptions=True) 21407db96d56Sopenharmony_ci if time > 1: 21417db96d56Sopenharmony_ci return 21427db96d56Sopenharmony_ci 21437db96d56Sopenharmony_ci async def main(): 21447db96d56Sopenharmony_ci qwe = self.new_task(loop, test()) 21457db96d56Sopenharmony_ci await asyncio.sleep(0.2) 21467db96d56Sopenharmony_ci qwe.cancel(*cancel_args) 21477db96d56Sopenharmony_ci await qwe 21487db96d56Sopenharmony_ci 21497db96d56Sopenharmony_ci try: 21507db96d56Sopenharmony_ci loop.run_until_complete(main()) 21517db96d56Sopenharmony_ci except asyncio.CancelledError as exc: 21527db96d56Sopenharmony_ci self.assertEqual(exc.args, expected_args) 21537db96d56Sopenharmony_ci actual = get_innermost_context(exc) 21547db96d56Sopenharmony_ci self.assertEqual( 21557db96d56Sopenharmony_ci actual, 21567db96d56Sopenharmony_ci (asyncio.CancelledError, expected_args, 0), 21577db96d56Sopenharmony_ci ) 21587db96d56Sopenharmony_ci else: 21597db96d56Sopenharmony_ci self.fail( 21607db96d56Sopenharmony_ci 'gather() does not propagate CancelledError ' 21617db96d56Sopenharmony_ci 'raised by inner task to the gather() caller.' 21627db96d56Sopenharmony_ci ) 21637db96d56Sopenharmony_ci 21647db96d56Sopenharmony_ci def test_exception_traceback(self): 21657db96d56Sopenharmony_ci # See http://bugs.python.org/issue28843 21667db96d56Sopenharmony_ci 21677db96d56Sopenharmony_ci async def foo(): 21687db96d56Sopenharmony_ci 1 / 0 21697db96d56Sopenharmony_ci 21707db96d56Sopenharmony_ci async def main(): 21717db96d56Sopenharmony_ci task = self.new_task(self.loop, foo()) 21727db96d56Sopenharmony_ci await asyncio.sleep(0) # skip one loop iteration 21737db96d56Sopenharmony_ci self.assertIsNotNone(task.exception().__traceback__) 21747db96d56Sopenharmony_ci 21757db96d56Sopenharmony_ci self.loop.run_until_complete(main()) 21767db96d56Sopenharmony_ci 21777db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.logger') 21787db96d56Sopenharmony_ci def test_error_in_call_soon(self, m_log): 21797db96d56Sopenharmony_ci def call_soon(callback, *args, **kwargs): 21807db96d56Sopenharmony_ci raise ValueError 21817db96d56Sopenharmony_ci self.loop.call_soon = call_soon 21827db96d56Sopenharmony_ci 21837db96d56Sopenharmony_ci async def coro(): 21847db96d56Sopenharmony_ci pass 21857db96d56Sopenharmony_ci 21867db96d56Sopenharmony_ci self.assertFalse(m_log.error.called) 21877db96d56Sopenharmony_ci 21887db96d56Sopenharmony_ci with self.assertRaises(ValueError): 21897db96d56Sopenharmony_ci gen = coro() 21907db96d56Sopenharmony_ci try: 21917db96d56Sopenharmony_ci self.new_task(self.loop, gen) 21927db96d56Sopenharmony_ci finally: 21937db96d56Sopenharmony_ci gen.close() 21947db96d56Sopenharmony_ci gc.collect() # For PyPy or other GCs. 21957db96d56Sopenharmony_ci 21967db96d56Sopenharmony_ci self.assertTrue(m_log.error.called) 21977db96d56Sopenharmony_ci message = m_log.error.call_args[0][0] 21987db96d56Sopenharmony_ci self.assertIn('Task was destroyed but it is pending', message) 21997db96d56Sopenharmony_ci 22007db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(self.loop), set()) 22017db96d56Sopenharmony_ci 22027db96d56Sopenharmony_ci def test_create_task_with_noncoroutine(self): 22037db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, 22047db96d56Sopenharmony_ci "a coroutine was expected, got 123"): 22057db96d56Sopenharmony_ci self.new_task(self.loop, 123) 22067db96d56Sopenharmony_ci 22077db96d56Sopenharmony_ci # test it for the second time to ensure that caching 22087db96d56Sopenharmony_ci # in asyncio.iscoroutine() doesn't break things. 22097db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, 22107db96d56Sopenharmony_ci "a coroutine was expected, got 123"): 22117db96d56Sopenharmony_ci self.new_task(self.loop, 123) 22127db96d56Sopenharmony_ci 22137db96d56Sopenharmony_ci def test_create_task_with_async_function(self): 22147db96d56Sopenharmony_ci 22157db96d56Sopenharmony_ci async def coro(): 22167db96d56Sopenharmony_ci pass 22177db96d56Sopenharmony_ci 22187db96d56Sopenharmony_ci task = self.new_task(self.loop, coro()) 22197db96d56Sopenharmony_ci self.assertIsInstance(task, self.Task) 22207db96d56Sopenharmony_ci self.loop.run_until_complete(task) 22217db96d56Sopenharmony_ci 22227db96d56Sopenharmony_ci # test it for the second time to ensure that caching 22237db96d56Sopenharmony_ci # in asyncio.iscoroutine() doesn't break things. 22247db96d56Sopenharmony_ci task = self.new_task(self.loop, coro()) 22257db96d56Sopenharmony_ci self.assertIsInstance(task, self.Task) 22267db96d56Sopenharmony_ci self.loop.run_until_complete(task) 22277db96d56Sopenharmony_ci 22287db96d56Sopenharmony_ci def test_create_task_with_asynclike_function(self): 22297db96d56Sopenharmony_ci task = self.new_task(self.loop, CoroLikeObject()) 22307db96d56Sopenharmony_ci self.assertIsInstance(task, self.Task) 22317db96d56Sopenharmony_ci self.assertEqual(self.loop.run_until_complete(task), 42) 22327db96d56Sopenharmony_ci 22337db96d56Sopenharmony_ci # test it for the second time to ensure that caching 22347db96d56Sopenharmony_ci # in asyncio.iscoroutine() doesn't break things. 22357db96d56Sopenharmony_ci task = self.new_task(self.loop, CoroLikeObject()) 22367db96d56Sopenharmony_ci self.assertIsInstance(task, self.Task) 22377db96d56Sopenharmony_ci self.assertEqual(self.loop.run_until_complete(task), 42) 22387db96d56Sopenharmony_ci 22397db96d56Sopenharmony_ci def test_bare_create_task(self): 22407db96d56Sopenharmony_ci 22417db96d56Sopenharmony_ci async def inner(): 22427db96d56Sopenharmony_ci return 1 22437db96d56Sopenharmony_ci 22447db96d56Sopenharmony_ci async def coro(): 22457db96d56Sopenharmony_ci task = asyncio.create_task(inner()) 22467db96d56Sopenharmony_ci self.assertIsInstance(task, self.Task) 22477db96d56Sopenharmony_ci ret = await task 22487db96d56Sopenharmony_ci self.assertEqual(1, ret) 22497db96d56Sopenharmony_ci 22507db96d56Sopenharmony_ci self.loop.run_until_complete(coro()) 22517db96d56Sopenharmony_ci 22527db96d56Sopenharmony_ci def test_bare_create_named_task(self): 22537db96d56Sopenharmony_ci 22547db96d56Sopenharmony_ci async def coro_noop(): 22557db96d56Sopenharmony_ci pass 22567db96d56Sopenharmony_ci 22577db96d56Sopenharmony_ci async def coro(): 22587db96d56Sopenharmony_ci task = asyncio.create_task(coro_noop(), name='No-op') 22597db96d56Sopenharmony_ci self.assertEqual(task.get_name(), 'No-op') 22607db96d56Sopenharmony_ci await task 22617db96d56Sopenharmony_ci 22627db96d56Sopenharmony_ci self.loop.run_until_complete(coro()) 22637db96d56Sopenharmony_ci 22647db96d56Sopenharmony_ci def test_context_1(self): 22657db96d56Sopenharmony_ci cvar = contextvars.ContextVar('cvar', default='nope') 22667db96d56Sopenharmony_ci 22677db96d56Sopenharmony_ci async def sub(): 22687db96d56Sopenharmony_ci await asyncio.sleep(0.01) 22697db96d56Sopenharmony_ci self.assertEqual(cvar.get(), 'nope') 22707db96d56Sopenharmony_ci cvar.set('something else') 22717db96d56Sopenharmony_ci 22727db96d56Sopenharmony_ci async def main(): 22737db96d56Sopenharmony_ci self.assertEqual(cvar.get(), 'nope') 22747db96d56Sopenharmony_ci subtask = self.new_task(loop, sub()) 22757db96d56Sopenharmony_ci cvar.set('yes') 22767db96d56Sopenharmony_ci self.assertEqual(cvar.get(), 'yes') 22777db96d56Sopenharmony_ci await subtask 22787db96d56Sopenharmony_ci self.assertEqual(cvar.get(), 'yes') 22797db96d56Sopenharmony_ci 22807db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 22817db96d56Sopenharmony_ci try: 22827db96d56Sopenharmony_ci task = self.new_task(loop, main()) 22837db96d56Sopenharmony_ci loop.run_until_complete(task) 22847db96d56Sopenharmony_ci finally: 22857db96d56Sopenharmony_ci loop.close() 22867db96d56Sopenharmony_ci 22877db96d56Sopenharmony_ci def test_context_2(self): 22887db96d56Sopenharmony_ci cvar = contextvars.ContextVar('cvar', default='nope') 22897db96d56Sopenharmony_ci 22907db96d56Sopenharmony_ci async def main(): 22917db96d56Sopenharmony_ci def fut_on_done(fut): 22927db96d56Sopenharmony_ci # This change must not pollute the context 22937db96d56Sopenharmony_ci # of the "main()" task. 22947db96d56Sopenharmony_ci cvar.set('something else') 22957db96d56Sopenharmony_ci 22967db96d56Sopenharmony_ci self.assertEqual(cvar.get(), 'nope') 22977db96d56Sopenharmony_ci 22987db96d56Sopenharmony_ci for j in range(2): 22997db96d56Sopenharmony_ci fut = self.new_future(loop) 23007db96d56Sopenharmony_ci fut.add_done_callback(fut_on_done) 23017db96d56Sopenharmony_ci cvar.set(f'yes{j}') 23027db96d56Sopenharmony_ci loop.call_soon(fut.set_result, None) 23037db96d56Sopenharmony_ci await fut 23047db96d56Sopenharmony_ci self.assertEqual(cvar.get(), f'yes{j}') 23057db96d56Sopenharmony_ci 23067db96d56Sopenharmony_ci for i in range(3): 23077db96d56Sopenharmony_ci # Test that task passed its context to add_done_callback: 23087db96d56Sopenharmony_ci cvar.set(f'yes{i}-{j}') 23097db96d56Sopenharmony_ci await asyncio.sleep(0.001) 23107db96d56Sopenharmony_ci self.assertEqual(cvar.get(), f'yes{i}-{j}') 23117db96d56Sopenharmony_ci 23127db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 23137db96d56Sopenharmony_ci try: 23147db96d56Sopenharmony_ci task = self.new_task(loop, main()) 23157db96d56Sopenharmony_ci loop.run_until_complete(task) 23167db96d56Sopenharmony_ci finally: 23177db96d56Sopenharmony_ci loop.close() 23187db96d56Sopenharmony_ci 23197db96d56Sopenharmony_ci self.assertEqual(cvar.get(), 'nope') 23207db96d56Sopenharmony_ci 23217db96d56Sopenharmony_ci def test_context_3(self): 23227db96d56Sopenharmony_ci # Run 100 Tasks in parallel, each modifying cvar. 23237db96d56Sopenharmony_ci 23247db96d56Sopenharmony_ci cvar = contextvars.ContextVar('cvar', default=-1) 23257db96d56Sopenharmony_ci 23267db96d56Sopenharmony_ci async def sub(num): 23277db96d56Sopenharmony_ci for i in range(10): 23287db96d56Sopenharmony_ci cvar.set(num + i) 23297db96d56Sopenharmony_ci await asyncio.sleep(random.uniform(0.001, 0.05)) 23307db96d56Sopenharmony_ci self.assertEqual(cvar.get(), num + i) 23317db96d56Sopenharmony_ci 23327db96d56Sopenharmony_ci async def main(): 23337db96d56Sopenharmony_ci tasks = [] 23347db96d56Sopenharmony_ci for i in range(100): 23357db96d56Sopenharmony_ci task = loop.create_task(sub(random.randint(0, 10))) 23367db96d56Sopenharmony_ci tasks.append(task) 23377db96d56Sopenharmony_ci 23387db96d56Sopenharmony_ci await asyncio.gather(*tasks) 23397db96d56Sopenharmony_ci 23407db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 23417db96d56Sopenharmony_ci try: 23427db96d56Sopenharmony_ci loop.run_until_complete(main()) 23437db96d56Sopenharmony_ci finally: 23447db96d56Sopenharmony_ci loop.close() 23457db96d56Sopenharmony_ci 23467db96d56Sopenharmony_ci self.assertEqual(cvar.get(), -1) 23477db96d56Sopenharmony_ci 23487db96d56Sopenharmony_ci def test_context_4(self): 23497db96d56Sopenharmony_ci cvar = contextvars.ContextVar('cvar') 23507db96d56Sopenharmony_ci 23517db96d56Sopenharmony_ci async def coro(val): 23527db96d56Sopenharmony_ci await asyncio.sleep(0) 23537db96d56Sopenharmony_ci cvar.set(val) 23547db96d56Sopenharmony_ci 23557db96d56Sopenharmony_ci async def main(): 23567db96d56Sopenharmony_ci ret = [] 23577db96d56Sopenharmony_ci ctx = contextvars.copy_context() 23587db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 23597db96d56Sopenharmony_ci t1 = self.new_task(loop, coro(1), context=ctx) 23607db96d56Sopenharmony_ci await t1 23617db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 23627db96d56Sopenharmony_ci t2 = self.new_task(loop, coro(2), context=ctx) 23637db96d56Sopenharmony_ci await t2 23647db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 23657db96d56Sopenharmony_ci return ret 23667db96d56Sopenharmony_ci 23677db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 23687db96d56Sopenharmony_ci try: 23697db96d56Sopenharmony_ci task = self.new_task(loop, main()) 23707db96d56Sopenharmony_ci ret = loop.run_until_complete(task) 23717db96d56Sopenharmony_ci finally: 23727db96d56Sopenharmony_ci loop.close() 23737db96d56Sopenharmony_ci 23747db96d56Sopenharmony_ci self.assertEqual([None, 1, 2], ret) 23757db96d56Sopenharmony_ci 23767db96d56Sopenharmony_ci def test_context_5(self): 23777db96d56Sopenharmony_ci cvar = contextvars.ContextVar('cvar') 23787db96d56Sopenharmony_ci 23797db96d56Sopenharmony_ci async def coro(val): 23807db96d56Sopenharmony_ci await asyncio.sleep(0) 23817db96d56Sopenharmony_ci cvar.set(val) 23827db96d56Sopenharmony_ci 23837db96d56Sopenharmony_ci async def main(): 23847db96d56Sopenharmony_ci ret = [] 23857db96d56Sopenharmony_ci ctx = contextvars.copy_context() 23867db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 23877db96d56Sopenharmony_ci t1 = asyncio.create_task(coro(1), context=ctx) 23887db96d56Sopenharmony_ci await t1 23897db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 23907db96d56Sopenharmony_ci t2 = asyncio.create_task(coro(2), context=ctx) 23917db96d56Sopenharmony_ci await t2 23927db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 23937db96d56Sopenharmony_ci return ret 23947db96d56Sopenharmony_ci 23957db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 23967db96d56Sopenharmony_ci try: 23977db96d56Sopenharmony_ci task = self.new_task(loop, main()) 23987db96d56Sopenharmony_ci ret = loop.run_until_complete(task) 23997db96d56Sopenharmony_ci finally: 24007db96d56Sopenharmony_ci loop.close() 24017db96d56Sopenharmony_ci 24027db96d56Sopenharmony_ci self.assertEqual([None, 1, 2], ret) 24037db96d56Sopenharmony_ci 24047db96d56Sopenharmony_ci def test_context_6(self): 24057db96d56Sopenharmony_ci cvar = contextvars.ContextVar('cvar') 24067db96d56Sopenharmony_ci 24077db96d56Sopenharmony_ci async def coro(val): 24087db96d56Sopenharmony_ci await asyncio.sleep(0) 24097db96d56Sopenharmony_ci cvar.set(val) 24107db96d56Sopenharmony_ci 24117db96d56Sopenharmony_ci async def main(): 24127db96d56Sopenharmony_ci ret = [] 24137db96d56Sopenharmony_ci ctx = contextvars.copy_context() 24147db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 24157db96d56Sopenharmony_ci t1 = loop.create_task(coro(1), context=ctx) 24167db96d56Sopenharmony_ci await t1 24177db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 24187db96d56Sopenharmony_ci t2 = loop.create_task(coro(2), context=ctx) 24197db96d56Sopenharmony_ci await t2 24207db96d56Sopenharmony_ci ret.append(ctx.get(cvar)) 24217db96d56Sopenharmony_ci return ret 24227db96d56Sopenharmony_ci 24237db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 24247db96d56Sopenharmony_ci try: 24257db96d56Sopenharmony_ci task = loop.create_task(main()) 24267db96d56Sopenharmony_ci ret = loop.run_until_complete(task) 24277db96d56Sopenharmony_ci finally: 24287db96d56Sopenharmony_ci loop.close() 24297db96d56Sopenharmony_ci 24307db96d56Sopenharmony_ci self.assertEqual([None, 1, 2], ret) 24317db96d56Sopenharmony_ci 24327db96d56Sopenharmony_ci def test_get_coro(self): 24337db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 24347db96d56Sopenharmony_ci coro = coroutine_function() 24357db96d56Sopenharmony_ci try: 24367db96d56Sopenharmony_ci task = self.new_task(loop, coro) 24377db96d56Sopenharmony_ci loop.run_until_complete(task) 24387db96d56Sopenharmony_ci self.assertIs(task.get_coro(), coro) 24397db96d56Sopenharmony_ci finally: 24407db96d56Sopenharmony_ci loop.close() 24417db96d56Sopenharmony_ci 24427db96d56Sopenharmony_ci 24437db96d56Sopenharmony_cidef add_subclass_tests(cls): 24447db96d56Sopenharmony_ci BaseTask = cls.Task 24457db96d56Sopenharmony_ci BaseFuture = cls.Future 24467db96d56Sopenharmony_ci 24477db96d56Sopenharmony_ci if BaseTask is None or BaseFuture is None: 24487db96d56Sopenharmony_ci return cls 24497db96d56Sopenharmony_ci 24507db96d56Sopenharmony_ci class CommonFuture: 24517db96d56Sopenharmony_ci def __init__(self, *args, **kwargs): 24527db96d56Sopenharmony_ci self.calls = collections.defaultdict(lambda: 0) 24537db96d56Sopenharmony_ci super().__init__(*args, **kwargs) 24547db96d56Sopenharmony_ci 24557db96d56Sopenharmony_ci def add_done_callback(self, *args, **kwargs): 24567db96d56Sopenharmony_ci self.calls['add_done_callback'] += 1 24577db96d56Sopenharmony_ci return super().add_done_callback(*args, **kwargs) 24587db96d56Sopenharmony_ci 24597db96d56Sopenharmony_ci class Task(CommonFuture, BaseTask): 24607db96d56Sopenharmony_ci pass 24617db96d56Sopenharmony_ci 24627db96d56Sopenharmony_ci class Future(CommonFuture, BaseFuture): 24637db96d56Sopenharmony_ci pass 24647db96d56Sopenharmony_ci 24657db96d56Sopenharmony_ci def test_subclasses_ctask_cfuture(self): 24667db96d56Sopenharmony_ci fut = self.Future(loop=self.loop) 24677db96d56Sopenharmony_ci 24687db96d56Sopenharmony_ci async def func(): 24697db96d56Sopenharmony_ci self.loop.call_soon(lambda: fut.set_result('spam')) 24707db96d56Sopenharmony_ci return await fut 24717db96d56Sopenharmony_ci 24727db96d56Sopenharmony_ci task = self.Task(func(), loop=self.loop) 24737db96d56Sopenharmony_ci 24747db96d56Sopenharmony_ci result = self.loop.run_until_complete(task) 24757db96d56Sopenharmony_ci 24767db96d56Sopenharmony_ci self.assertEqual(result, 'spam') 24777db96d56Sopenharmony_ci 24787db96d56Sopenharmony_ci self.assertEqual( 24797db96d56Sopenharmony_ci dict(task.calls), 24807db96d56Sopenharmony_ci {'add_done_callback': 1}) 24817db96d56Sopenharmony_ci 24827db96d56Sopenharmony_ci self.assertEqual( 24837db96d56Sopenharmony_ci dict(fut.calls), 24847db96d56Sopenharmony_ci {'add_done_callback': 1}) 24857db96d56Sopenharmony_ci 24867db96d56Sopenharmony_ci # Add patched Task & Future back to the test case 24877db96d56Sopenharmony_ci cls.Task = Task 24887db96d56Sopenharmony_ci cls.Future = Future 24897db96d56Sopenharmony_ci 24907db96d56Sopenharmony_ci # Add an extra unit-test 24917db96d56Sopenharmony_ci cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 24927db96d56Sopenharmony_ci 24937db96d56Sopenharmony_ci # Disable the "test_task_source_traceback" test 24947db96d56Sopenharmony_ci # (the test is hardcoded for a particular call stack, which 24957db96d56Sopenharmony_ci # is slightly different for Task subclasses) 24967db96d56Sopenharmony_ci cls.test_task_source_traceback = None 24977db96d56Sopenharmony_ci 24987db96d56Sopenharmony_ci return cls 24997db96d56Sopenharmony_ci 25007db96d56Sopenharmony_ci 25017db96d56Sopenharmony_ciclass SetMethodsTest: 25027db96d56Sopenharmony_ci 25037db96d56Sopenharmony_ci def test_set_result_causes_invalid_state(self): 25047db96d56Sopenharmony_ci Future = type(self).Future 25057db96d56Sopenharmony_ci self.loop.call_exception_handler = exc_handler = mock.Mock() 25067db96d56Sopenharmony_ci 25077db96d56Sopenharmony_ci async def foo(): 25087db96d56Sopenharmony_ci await asyncio.sleep(0.1) 25097db96d56Sopenharmony_ci return 10 25107db96d56Sopenharmony_ci 25117db96d56Sopenharmony_ci coro = foo() 25127db96d56Sopenharmony_ci task = self.new_task(self.loop, coro) 25137db96d56Sopenharmony_ci Future.set_result(task, 'spam') 25147db96d56Sopenharmony_ci 25157db96d56Sopenharmony_ci self.assertEqual( 25167db96d56Sopenharmony_ci self.loop.run_until_complete(task), 25177db96d56Sopenharmony_ci 'spam') 25187db96d56Sopenharmony_ci 25197db96d56Sopenharmony_ci exc_handler.assert_called_once() 25207db96d56Sopenharmony_ci exc = exc_handler.call_args[0][0]['exception'] 25217db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.InvalidStateError, 25227db96d56Sopenharmony_ci r'step\(\): already done'): 25237db96d56Sopenharmony_ci raise exc 25247db96d56Sopenharmony_ci 25257db96d56Sopenharmony_ci coro.close() 25267db96d56Sopenharmony_ci 25277db96d56Sopenharmony_ci def test_set_exception_causes_invalid_state(self): 25287db96d56Sopenharmony_ci class MyExc(Exception): 25297db96d56Sopenharmony_ci pass 25307db96d56Sopenharmony_ci 25317db96d56Sopenharmony_ci Future = type(self).Future 25327db96d56Sopenharmony_ci self.loop.call_exception_handler = exc_handler = mock.Mock() 25337db96d56Sopenharmony_ci 25347db96d56Sopenharmony_ci async def foo(): 25357db96d56Sopenharmony_ci await asyncio.sleep(0.1) 25367db96d56Sopenharmony_ci return 10 25377db96d56Sopenharmony_ci 25387db96d56Sopenharmony_ci coro = foo() 25397db96d56Sopenharmony_ci task = self.new_task(self.loop, coro) 25407db96d56Sopenharmony_ci Future.set_exception(task, MyExc()) 25417db96d56Sopenharmony_ci 25427db96d56Sopenharmony_ci with self.assertRaises(MyExc): 25437db96d56Sopenharmony_ci self.loop.run_until_complete(task) 25447db96d56Sopenharmony_ci 25457db96d56Sopenharmony_ci exc_handler.assert_called_once() 25467db96d56Sopenharmony_ci exc = exc_handler.call_args[0][0]['exception'] 25477db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.InvalidStateError, 25487db96d56Sopenharmony_ci r'step\(\): already done'): 25497db96d56Sopenharmony_ci raise exc 25507db96d56Sopenharmony_ci 25517db96d56Sopenharmony_ci coro.close() 25527db96d56Sopenharmony_ci 25537db96d56Sopenharmony_ci 25547db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(futures, '_CFuture') and 25557db96d56Sopenharmony_ci hasattr(tasks, '_CTask'), 25567db96d56Sopenharmony_ci 'requires the C _asyncio module') 25577db96d56Sopenharmony_ciclass CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 25587db96d56Sopenharmony_ci test_utils.TestCase): 25597db96d56Sopenharmony_ci 25607db96d56Sopenharmony_ci Task = getattr(tasks, '_CTask', None) 25617db96d56Sopenharmony_ci Future = getattr(futures, '_CFuture', None) 25627db96d56Sopenharmony_ci 25637db96d56Sopenharmony_ci @support.refcount_test 25647db96d56Sopenharmony_ci def test_refleaks_in_task___init__(self): 25657db96d56Sopenharmony_ci gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 25667db96d56Sopenharmony_ci async def coro(): 25677db96d56Sopenharmony_ci pass 25687db96d56Sopenharmony_ci task = self.new_task(self.loop, coro()) 25697db96d56Sopenharmony_ci self.loop.run_until_complete(task) 25707db96d56Sopenharmony_ci refs_before = gettotalrefcount() 25717db96d56Sopenharmony_ci for i in range(100): 25727db96d56Sopenharmony_ci task.__init__(coro(), loop=self.loop) 25737db96d56Sopenharmony_ci self.loop.run_until_complete(task) 25747db96d56Sopenharmony_ci self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 25757db96d56Sopenharmony_ci 25767db96d56Sopenharmony_ci def test_del__log_destroy_pending_segfault(self): 25777db96d56Sopenharmony_ci async def coro(): 25787db96d56Sopenharmony_ci pass 25797db96d56Sopenharmony_ci task = self.new_task(self.loop, coro()) 25807db96d56Sopenharmony_ci self.loop.run_until_complete(task) 25817db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 25827db96d56Sopenharmony_ci del task._log_destroy_pending 25837db96d56Sopenharmony_ci 25847db96d56Sopenharmony_ci 25857db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(futures, '_CFuture') and 25867db96d56Sopenharmony_ci hasattr(tasks, '_CTask'), 25877db96d56Sopenharmony_ci 'requires the C _asyncio module') 25887db96d56Sopenharmony_ci@add_subclass_tests 25897db96d56Sopenharmony_ciclass CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 25907db96d56Sopenharmony_ci 25917db96d56Sopenharmony_ci Task = getattr(tasks, '_CTask', None) 25927db96d56Sopenharmony_ci Future = getattr(futures, '_CFuture', None) 25937db96d56Sopenharmony_ci 25947db96d56Sopenharmony_ci 25957db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(tasks, '_CTask'), 25967db96d56Sopenharmony_ci 'requires the C _asyncio module') 25977db96d56Sopenharmony_ci@add_subclass_tests 25987db96d56Sopenharmony_ciclass CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 25997db96d56Sopenharmony_ci 26007db96d56Sopenharmony_ci Task = getattr(tasks, '_CTask', None) 26017db96d56Sopenharmony_ci Future = futures._PyFuture 26027db96d56Sopenharmony_ci 26037db96d56Sopenharmony_ci 26047db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(futures, '_CFuture'), 26057db96d56Sopenharmony_ci 'requires the C _asyncio module') 26067db96d56Sopenharmony_ci@add_subclass_tests 26077db96d56Sopenharmony_ciclass PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 26087db96d56Sopenharmony_ci 26097db96d56Sopenharmony_ci Future = getattr(futures, '_CFuture', None) 26107db96d56Sopenharmony_ci Task = tasks._PyTask 26117db96d56Sopenharmony_ci 26127db96d56Sopenharmony_ci 26137db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(tasks, '_CTask'), 26147db96d56Sopenharmony_ci 'requires the C _asyncio module') 26157db96d56Sopenharmony_ciclass CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 26167db96d56Sopenharmony_ci 26177db96d56Sopenharmony_ci Task = getattr(tasks, '_CTask', None) 26187db96d56Sopenharmony_ci Future = futures._PyFuture 26197db96d56Sopenharmony_ci 26207db96d56Sopenharmony_ci 26217db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(futures, '_CFuture'), 26227db96d56Sopenharmony_ci 'requires the C _asyncio module') 26237db96d56Sopenharmony_ciclass PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 26247db96d56Sopenharmony_ci 26257db96d56Sopenharmony_ci Task = tasks._PyTask 26267db96d56Sopenharmony_ci Future = getattr(futures, '_CFuture', None) 26277db96d56Sopenharmony_ci 26287db96d56Sopenharmony_ci 26297db96d56Sopenharmony_ciclass PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 26307db96d56Sopenharmony_ci test_utils.TestCase): 26317db96d56Sopenharmony_ci 26327db96d56Sopenharmony_ci Task = tasks._PyTask 26337db96d56Sopenharmony_ci Future = futures._PyFuture 26347db96d56Sopenharmony_ci 26357db96d56Sopenharmony_ci 26367db96d56Sopenharmony_ci@add_subclass_tests 26377db96d56Sopenharmony_ciclass PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 26387db96d56Sopenharmony_ci Task = tasks._PyTask 26397db96d56Sopenharmony_ci Future = futures._PyFuture 26407db96d56Sopenharmony_ci 26417db96d56Sopenharmony_ci 26427db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(tasks, '_CTask'), 26437db96d56Sopenharmony_ci 'requires the C _asyncio module') 26447db96d56Sopenharmony_ciclass CTask_Future_Tests(test_utils.TestCase): 26457db96d56Sopenharmony_ci 26467db96d56Sopenharmony_ci def test_foobar(self): 26477db96d56Sopenharmony_ci class Fut(asyncio.Future): 26487db96d56Sopenharmony_ci @property 26497db96d56Sopenharmony_ci def get_loop(self): 26507db96d56Sopenharmony_ci raise AttributeError 26517db96d56Sopenharmony_ci 26527db96d56Sopenharmony_ci async def coro(): 26537db96d56Sopenharmony_ci await fut 26547db96d56Sopenharmony_ci return 'spam' 26557db96d56Sopenharmony_ci 26567db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 26577db96d56Sopenharmony_ci try: 26587db96d56Sopenharmony_ci fut = Fut(loop=self.loop) 26597db96d56Sopenharmony_ci self.loop.call_later(0.1, fut.set_result, 1) 26607db96d56Sopenharmony_ci task = self.loop.create_task(coro()) 26617db96d56Sopenharmony_ci res = self.loop.run_until_complete(task) 26627db96d56Sopenharmony_ci finally: 26637db96d56Sopenharmony_ci self.loop.close() 26647db96d56Sopenharmony_ci 26657db96d56Sopenharmony_ci self.assertEqual(res, 'spam') 26667db96d56Sopenharmony_ci 26677db96d56Sopenharmony_ci 26687db96d56Sopenharmony_ciclass BaseTaskIntrospectionTests: 26697db96d56Sopenharmony_ci _register_task = None 26707db96d56Sopenharmony_ci _unregister_task = None 26717db96d56Sopenharmony_ci _enter_task = None 26727db96d56Sopenharmony_ci _leave_task = None 26737db96d56Sopenharmony_ci 26747db96d56Sopenharmony_ci def test__register_task_1(self): 26757db96d56Sopenharmony_ci class TaskLike: 26767db96d56Sopenharmony_ci @property 26777db96d56Sopenharmony_ci def _loop(self): 26787db96d56Sopenharmony_ci return loop 26797db96d56Sopenharmony_ci 26807db96d56Sopenharmony_ci def done(self): 26817db96d56Sopenharmony_ci return False 26827db96d56Sopenharmony_ci 26837db96d56Sopenharmony_ci task = TaskLike() 26847db96d56Sopenharmony_ci loop = mock.Mock() 26857db96d56Sopenharmony_ci 26867db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), set()) 26877db96d56Sopenharmony_ci self._register_task(task) 26887db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), {task}) 26897db96d56Sopenharmony_ci self._unregister_task(task) 26907db96d56Sopenharmony_ci 26917db96d56Sopenharmony_ci def test__register_task_2(self): 26927db96d56Sopenharmony_ci class TaskLike: 26937db96d56Sopenharmony_ci def get_loop(self): 26947db96d56Sopenharmony_ci return loop 26957db96d56Sopenharmony_ci 26967db96d56Sopenharmony_ci def done(self): 26977db96d56Sopenharmony_ci return False 26987db96d56Sopenharmony_ci 26997db96d56Sopenharmony_ci task = TaskLike() 27007db96d56Sopenharmony_ci loop = mock.Mock() 27017db96d56Sopenharmony_ci 27027db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), set()) 27037db96d56Sopenharmony_ci self._register_task(task) 27047db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), {task}) 27057db96d56Sopenharmony_ci self._unregister_task(task) 27067db96d56Sopenharmony_ci 27077db96d56Sopenharmony_ci def test__register_task_3(self): 27087db96d56Sopenharmony_ci class TaskLike: 27097db96d56Sopenharmony_ci def get_loop(self): 27107db96d56Sopenharmony_ci return loop 27117db96d56Sopenharmony_ci 27127db96d56Sopenharmony_ci def done(self): 27137db96d56Sopenharmony_ci return True 27147db96d56Sopenharmony_ci 27157db96d56Sopenharmony_ci task = TaskLike() 27167db96d56Sopenharmony_ci loop = mock.Mock() 27177db96d56Sopenharmony_ci 27187db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), set()) 27197db96d56Sopenharmony_ci self._register_task(task) 27207db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), set()) 27217db96d56Sopenharmony_ci self._unregister_task(task) 27227db96d56Sopenharmony_ci 27237db96d56Sopenharmony_ci def test__enter_task(self): 27247db96d56Sopenharmony_ci task = mock.Mock() 27257db96d56Sopenharmony_ci loop = mock.Mock() 27267db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop)) 27277db96d56Sopenharmony_ci self._enter_task(loop, task) 27287db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(loop), task) 27297db96d56Sopenharmony_ci self._leave_task(loop, task) 27307db96d56Sopenharmony_ci 27317db96d56Sopenharmony_ci def test__enter_task_failure(self): 27327db96d56Sopenharmony_ci task1 = mock.Mock() 27337db96d56Sopenharmony_ci task2 = mock.Mock() 27347db96d56Sopenharmony_ci loop = mock.Mock() 27357db96d56Sopenharmony_ci self._enter_task(loop, task1) 27367db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 27377db96d56Sopenharmony_ci self._enter_task(loop, task2) 27387db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(loop), task1) 27397db96d56Sopenharmony_ci self._leave_task(loop, task1) 27407db96d56Sopenharmony_ci 27417db96d56Sopenharmony_ci def test__leave_task(self): 27427db96d56Sopenharmony_ci task = mock.Mock() 27437db96d56Sopenharmony_ci loop = mock.Mock() 27447db96d56Sopenharmony_ci self._enter_task(loop, task) 27457db96d56Sopenharmony_ci self._leave_task(loop, task) 27467db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop)) 27477db96d56Sopenharmony_ci 27487db96d56Sopenharmony_ci def test__leave_task_failure1(self): 27497db96d56Sopenharmony_ci task1 = mock.Mock() 27507db96d56Sopenharmony_ci task2 = mock.Mock() 27517db96d56Sopenharmony_ci loop = mock.Mock() 27527db96d56Sopenharmony_ci self._enter_task(loop, task1) 27537db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 27547db96d56Sopenharmony_ci self._leave_task(loop, task2) 27557db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(loop), task1) 27567db96d56Sopenharmony_ci self._leave_task(loop, task1) 27577db96d56Sopenharmony_ci 27587db96d56Sopenharmony_ci def test__leave_task_failure2(self): 27597db96d56Sopenharmony_ci task = mock.Mock() 27607db96d56Sopenharmony_ci loop = mock.Mock() 27617db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 27627db96d56Sopenharmony_ci self._leave_task(loop, task) 27637db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop)) 27647db96d56Sopenharmony_ci 27657db96d56Sopenharmony_ci def test__unregister_task(self): 27667db96d56Sopenharmony_ci task = mock.Mock() 27677db96d56Sopenharmony_ci loop = mock.Mock() 27687db96d56Sopenharmony_ci task.get_loop = lambda: loop 27697db96d56Sopenharmony_ci self._register_task(task) 27707db96d56Sopenharmony_ci self._unregister_task(task) 27717db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), set()) 27727db96d56Sopenharmony_ci 27737db96d56Sopenharmony_ci def test__unregister_task_not_registered(self): 27747db96d56Sopenharmony_ci task = mock.Mock() 27757db96d56Sopenharmony_ci loop = mock.Mock() 27767db96d56Sopenharmony_ci self._unregister_task(task) 27777db96d56Sopenharmony_ci self.assertEqual(asyncio.all_tasks(loop), set()) 27787db96d56Sopenharmony_ci 27797db96d56Sopenharmony_ci 27807db96d56Sopenharmony_ciclass PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 27817db96d56Sopenharmony_ci _register_task = staticmethod(tasks._py_register_task) 27827db96d56Sopenharmony_ci _unregister_task = staticmethod(tasks._py_unregister_task) 27837db96d56Sopenharmony_ci _enter_task = staticmethod(tasks._py_enter_task) 27847db96d56Sopenharmony_ci _leave_task = staticmethod(tasks._py_leave_task) 27857db96d56Sopenharmony_ci 27867db96d56Sopenharmony_ci 27877db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 27887db96d56Sopenharmony_ci 'requires the C _asyncio module') 27897db96d56Sopenharmony_ciclass CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 27907db96d56Sopenharmony_ci if hasattr(tasks, '_c_register_task'): 27917db96d56Sopenharmony_ci _register_task = staticmethod(tasks._c_register_task) 27927db96d56Sopenharmony_ci _unregister_task = staticmethod(tasks._c_unregister_task) 27937db96d56Sopenharmony_ci _enter_task = staticmethod(tasks._c_enter_task) 27947db96d56Sopenharmony_ci _leave_task = staticmethod(tasks._c_leave_task) 27957db96d56Sopenharmony_ci else: 27967db96d56Sopenharmony_ci _register_task = _unregister_task = _enter_task = _leave_task = None 27977db96d56Sopenharmony_ci 27987db96d56Sopenharmony_ci 27997db96d56Sopenharmony_ciclass BaseCurrentLoopTests: 28007db96d56Sopenharmony_ci 28017db96d56Sopenharmony_ci def setUp(self): 28027db96d56Sopenharmony_ci super().setUp() 28037db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 28047db96d56Sopenharmony_ci self.set_event_loop(self.loop) 28057db96d56Sopenharmony_ci 28067db96d56Sopenharmony_ci def new_task(self, coro): 28077db96d56Sopenharmony_ci raise NotImplementedError 28087db96d56Sopenharmony_ci 28097db96d56Sopenharmony_ci def test_current_task_no_running_loop(self): 28107db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop=self.loop)) 28117db96d56Sopenharmony_ci 28127db96d56Sopenharmony_ci def test_current_task_no_running_loop_implicit(self): 28137db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no running event loop'): 28147db96d56Sopenharmony_ci asyncio.current_task() 28157db96d56Sopenharmony_ci 28167db96d56Sopenharmony_ci def test_current_task_with_implicit_loop(self): 28177db96d56Sopenharmony_ci async def coro(): 28187db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(loop=self.loop), task) 28197db96d56Sopenharmony_ci 28207db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(None), task) 28217db96d56Sopenharmony_ci self.assertIs(asyncio.current_task(), task) 28227db96d56Sopenharmony_ci 28237db96d56Sopenharmony_ci task = self.new_task(coro()) 28247db96d56Sopenharmony_ci self.loop.run_until_complete(task) 28257db96d56Sopenharmony_ci self.assertIsNone(asyncio.current_task(loop=self.loop)) 28267db96d56Sopenharmony_ci 28277db96d56Sopenharmony_ci 28287db96d56Sopenharmony_ciclass PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 28297db96d56Sopenharmony_ci 28307db96d56Sopenharmony_ci def new_task(self, coro): 28317db96d56Sopenharmony_ci return tasks._PyTask(coro, loop=self.loop) 28327db96d56Sopenharmony_ci 28337db96d56Sopenharmony_ci 28347db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(tasks, '_CTask'), 28357db96d56Sopenharmony_ci 'requires the C _asyncio module') 28367db96d56Sopenharmony_ciclass CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 28377db96d56Sopenharmony_ci 28387db96d56Sopenharmony_ci def new_task(self, coro): 28397db96d56Sopenharmony_ci return getattr(tasks, '_CTask')(coro, loop=self.loop) 28407db96d56Sopenharmony_ci 28417db96d56Sopenharmony_ci 28427db96d56Sopenharmony_ciclass GenericTaskTests(test_utils.TestCase): 28437db96d56Sopenharmony_ci 28447db96d56Sopenharmony_ci def test_future_subclass(self): 28457db96d56Sopenharmony_ci self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 28467db96d56Sopenharmony_ci 28477db96d56Sopenharmony_ci @support.cpython_only 28487db96d56Sopenharmony_ci def test_asyncio_module_compiled(self): 28497db96d56Sopenharmony_ci # Because of circular imports it's easy to make _asyncio 28507db96d56Sopenharmony_ci # module non-importable. This is a simple test that will 28517db96d56Sopenharmony_ci # fail on systems where C modules were successfully compiled 28527db96d56Sopenharmony_ci # (hence the test for _functools etc), but _asyncio somehow didn't. 28537db96d56Sopenharmony_ci try: 28547db96d56Sopenharmony_ci import _functools 28557db96d56Sopenharmony_ci import _json 28567db96d56Sopenharmony_ci import _pickle 28577db96d56Sopenharmony_ci except ImportError: 28587db96d56Sopenharmony_ci self.skipTest('C modules are not available') 28597db96d56Sopenharmony_ci else: 28607db96d56Sopenharmony_ci try: 28617db96d56Sopenharmony_ci import _asyncio 28627db96d56Sopenharmony_ci except ImportError: 28637db96d56Sopenharmony_ci self.fail('_asyncio module is missing') 28647db96d56Sopenharmony_ci 28657db96d56Sopenharmony_ci 28667db96d56Sopenharmony_ciclass GatherTestsBase: 28677db96d56Sopenharmony_ci 28687db96d56Sopenharmony_ci def setUp(self): 28697db96d56Sopenharmony_ci super().setUp() 28707db96d56Sopenharmony_ci self.one_loop = self.new_test_loop() 28717db96d56Sopenharmony_ci self.other_loop = self.new_test_loop() 28727db96d56Sopenharmony_ci self.set_event_loop(self.one_loop, cleanup=False) 28737db96d56Sopenharmony_ci 28747db96d56Sopenharmony_ci def _run_loop(self, loop): 28757db96d56Sopenharmony_ci while loop._ready: 28767db96d56Sopenharmony_ci test_utils.run_briefly(loop) 28777db96d56Sopenharmony_ci 28787db96d56Sopenharmony_ci def _check_success(self, **kwargs): 28797db96d56Sopenharmony_ci a, b, c = [self.one_loop.create_future() for i in range(3)] 28807db96d56Sopenharmony_ci fut = self._gather(*self.wrap_futures(a, b, c), **kwargs) 28817db96d56Sopenharmony_ci cb = test_utils.MockCallback() 28827db96d56Sopenharmony_ci fut.add_done_callback(cb) 28837db96d56Sopenharmony_ci b.set_result(1) 28847db96d56Sopenharmony_ci a.set_result(2) 28857db96d56Sopenharmony_ci self._run_loop(self.one_loop) 28867db96d56Sopenharmony_ci self.assertEqual(cb.called, False) 28877db96d56Sopenharmony_ci self.assertFalse(fut.done()) 28887db96d56Sopenharmony_ci c.set_result(3) 28897db96d56Sopenharmony_ci self._run_loop(self.one_loop) 28907db96d56Sopenharmony_ci cb.assert_called_once_with(fut) 28917db96d56Sopenharmony_ci self.assertEqual(fut.result(), [2, 1, 3]) 28927db96d56Sopenharmony_ci 28937db96d56Sopenharmony_ci def test_success(self): 28947db96d56Sopenharmony_ci self._check_success() 28957db96d56Sopenharmony_ci self._check_success(return_exceptions=False) 28967db96d56Sopenharmony_ci 28977db96d56Sopenharmony_ci def test_result_exception_success(self): 28987db96d56Sopenharmony_ci self._check_success(return_exceptions=True) 28997db96d56Sopenharmony_ci 29007db96d56Sopenharmony_ci def test_one_exception(self): 29017db96d56Sopenharmony_ci a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 29027db96d56Sopenharmony_ci fut = self._gather(*self.wrap_futures(a, b, c, d, e)) 29037db96d56Sopenharmony_ci cb = test_utils.MockCallback() 29047db96d56Sopenharmony_ci fut.add_done_callback(cb) 29057db96d56Sopenharmony_ci exc = ZeroDivisionError() 29067db96d56Sopenharmony_ci a.set_result(1) 29077db96d56Sopenharmony_ci b.set_exception(exc) 29087db96d56Sopenharmony_ci self._run_loop(self.one_loop) 29097db96d56Sopenharmony_ci self.assertTrue(fut.done()) 29107db96d56Sopenharmony_ci cb.assert_called_once_with(fut) 29117db96d56Sopenharmony_ci self.assertIs(fut.exception(), exc) 29127db96d56Sopenharmony_ci # Does nothing 29137db96d56Sopenharmony_ci c.set_result(3) 29147db96d56Sopenharmony_ci d.cancel() 29157db96d56Sopenharmony_ci e.set_exception(RuntimeError()) 29167db96d56Sopenharmony_ci e.exception() 29177db96d56Sopenharmony_ci 29187db96d56Sopenharmony_ci def test_return_exceptions(self): 29197db96d56Sopenharmony_ci a, b, c, d = [self.one_loop.create_future() for i in range(4)] 29207db96d56Sopenharmony_ci fut = self._gather(*self.wrap_futures(a, b, c, d), 29217db96d56Sopenharmony_ci return_exceptions=True) 29227db96d56Sopenharmony_ci cb = test_utils.MockCallback() 29237db96d56Sopenharmony_ci fut.add_done_callback(cb) 29247db96d56Sopenharmony_ci exc = ZeroDivisionError() 29257db96d56Sopenharmony_ci exc2 = RuntimeError() 29267db96d56Sopenharmony_ci b.set_result(1) 29277db96d56Sopenharmony_ci c.set_exception(exc) 29287db96d56Sopenharmony_ci a.set_result(3) 29297db96d56Sopenharmony_ci self._run_loop(self.one_loop) 29307db96d56Sopenharmony_ci self.assertFalse(fut.done()) 29317db96d56Sopenharmony_ci d.set_exception(exc2) 29327db96d56Sopenharmony_ci self._run_loop(self.one_loop) 29337db96d56Sopenharmony_ci self.assertTrue(fut.done()) 29347db96d56Sopenharmony_ci cb.assert_called_once_with(fut) 29357db96d56Sopenharmony_ci self.assertEqual(fut.result(), [3, 1, exc, exc2]) 29367db96d56Sopenharmony_ci 29377db96d56Sopenharmony_ci def test_env_var_debug(self): 29387db96d56Sopenharmony_ci code = '\n'.join(( 29397db96d56Sopenharmony_ci 'import asyncio.coroutines', 29407db96d56Sopenharmony_ci 'print(asyncio.coroutines._is_debug_mode())')) 29417db96d56Sopenharmony_ci 29427db96d56Sopenharmony_ci # Test with -E to not fail if the unit test was run with 29437db96d56Sopenharmony_ci # PYTHONASYNCIODEBUG set to a non-empty string 29447db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-E', '-c', code) 29457db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'False') 29467db96d56Sopenharmony_ci 29477db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-c', code, 29487db96d56Sopenharmony_ci PYTHONASYNCIODEBUG='', 29497db96d56Sopenharmony_ci PYTHONDEVMODE='') 29507db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'False') 29517db96d56Sopenharmony_ci 29527db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-c', code, 29537db96d56Sopenharmony_ci PYTHONASYNCIODEBUG='1', 29547db96d56Sopenharmony_ci PYTHONDEVMODE='') 29557db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'True') 29567db96d56Sopenharmony_ci 29577db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-E', '-c', code, 29587db96d56Sopenharmony_ci PYTHONASYNCIODEBUG='1', 29597db96d56Sopenharmony_ci PYTHONDEVMODE='') 29607db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'False') 29617db96d56Sopenharmony_ci 29627db96d56Sopenharmony_ci # -X dev 29637db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 29647db96d56Sopenharmony_ci '-c', code) 29657db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'True') 29667db96d56Sopenharmony_ci 29677db96d56Sopenharmony_ci 29687db96d56Sopenharmony_ciclass FutureGatherTests(GatherTestsBase, test_utils.TestCase): 29697db96d56Sopenharmony_ci 29707db96d56Sopenharmony_ci def wrap_futures(self, *futures): 29717db96d56Sopenharmony_ci return futures 29727db96d56Sopenharmony_ci 29737db96d56Sopenharmony_ci def _gather(self, *args, **kwargs): 29747db96d56Sopenharmony_ci return asyncio.gather(*args, **kwargs) 29757db96d56Sopenharmony_ci 29767db96d56Sopenharmony_ci def test_constructor_empty_sequence_without_loop(self): 29777db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 29787db96d56Sopenharmony_ci asyncio.gather() 29797db96d56Sopenharmony_ci 29807db96d56Sopenharmony_ci def test_constructor_empty_sequence_use_running_loop(self): 29817db96d56Sopenharmony_ci async def gather(): 29827db96d56Sopenharmony_ci return asyncio.gather() 29837db96d56Sopenharmony_ci fut = self.one_loop.run_until_complete(gather()) 29847db96d56Sopenharmony_ci self.assertIsInstance(fut, asyncio.Future) 29857db96d56Sopenharmony_ci self.assertIs(fut._loop, self.one_loop) 29867db96d56Sopenharmony_ci self._run_loop(self.one_loop) 29877db96d56Sopenharmony_ci self.assertTrue(fut.done()) 29887db96d56Sopenharmony_ci self.assertEqual(fut.result(), []) 29897db96d56Sopenharmony_ci 29907db96d56Sopenharmony_ci def test_constructor_empty_sequence_use_global_loop(self): 29917db96d56Sopenharmony_ci # Deprecated in 3.10, undeprecated in 3.11.1 29927db96d56Sopenharmony_ci asyncio.set_event_loop(self.one_loop) 29937db96d56Sopenharmony_ci self.addCleanup(asyncio.set_event_loop, None) 29947db96d56Sopenharmony_ci fut = asyncio.gather() 29957db96d56Sopenharmony_ci self.assertIsInstance(fut, asyncio.Future) 29967db96d56Sopenharmony_ci self.assertIs(fut._loop, self.one_loop) 29977db96d56Sopenharmony_ci self._run_loop(self.one_loop) 29987db96d56Sopenharmony_ci self.assertTrue(fut.done()) 29997db96d56Sopenharmony_ci self.assertEqual(fut.result(), []) 30007db96d56Sopenharmony_ci 30017db96d56Sopenharmony_ci def test_constructor_heterogenous_futures(self): 30027db96d56Sopenharmony_ci fut1 = self.one_loop.create_future() 30037db96d56Sopenharmony_ci fut2 = self.other_loop.create_future() 30047db96d56Sopenharmony_ci with self.assertRaises(ValueError): 30057db96d56Sopenharmony_ci asyncio.gather(fut1, fut2) 30067db96d56Sopenharmony_ci 30077db96d56Sopenharmony_ci def test_constructor_homogenous_futures(self): 30087db96d56Sopenharmony_ci children = [self.other_loop.create_future() for i in range(3)] 30097db96d56Sopenharmony_ci fut = asyncio.gather(*children) 30107db96d56Sopenharmony_ci self.assertIs(fut._loop, self.other_loop) 30117db96d56Sopenharmony_ci self._run_loop(self.other_loop) 30127db96d56Sopenharmony_ci self.assertFalse(fut.done()) 30137db96d56Sopenharmony_ci fut = asyncio.gather(*children) 30147db96d56Sopenharmony_ci self.assertIs(fut._loop, self.other_loop) 30157db96d56Sopenharmony_ci self._run_loop(self.other_loop) 30167db96d56Sopenharmony_ci self.assertFalse(fut.done()) 30177db96d56Sopenharmony_ci 30187db96d56Sopenharmony_ci def test_one_cancellation(self): 30197db96d56Sopenharmony_ci a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 30207db96d56Sopenharmony_ci fut = asyncio.gather(a, b, c, d, e) 30217db96d56Sopenharmony_ci cb = test_utils.MockCallback() 30227db96d56Sopenharmony_ci fut.add_done_callback(cb) 30237db96d56Sopenharmony_ci a.set_result(1) 30247db96d56Sopenharmony_ci b.cancel() 30257db96d56Sopenharmony_ci self._run_loop(self.one_loop) 30267db96d56Sopenharmony_ci self.assertTrue(fut.done()) 30277db96d56Sopenharmony_ci cb.assert_called_once_with(fut) 30287db96d56Sopenharmony_ci self.assertFalse(fut.cancelled()) 30297db96d56Sopenharmony_ci self.assertIsInstance(fut.exception(), asyncio.CancelledError) 30307db96d56Sopenharmony_ci # Does nothing 30317db96d56Sopenharmony_ci c.set_result(3) 30327db96d56Sopenharmony_ci d.cancel() 30337db96d56Sopenharmony_ci e.set_exception(RuntimeError()) 30347db96d56Sopenharmony_ci e.exception() 30357db96d56Sopenharmony_ci 30367db96d56Sopenharmony_ci def test_result_exception_one_cancellation(self): 30377db96d56Sopenharmony_ci a, b, c, d, e, f = [self.one_loop.create_future() 30387db96d56Sopenharmony_ci for i in range(6)] 30397db96d56Sopenharmony_ci fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 30407db96d56Sopenharmony_ci cb = test_utils.MockCallback() 30417db96d56Sopenharmony_ci fut.add_done_callback(cb) 30427db96d56Sopenharmony_ci a.set_result(1) 30437db96d56Sopenharmony_ci zde = ZeroDivisionError() 30447db96d56Sopenharmony_ci b.set_exception(zde) 30457db96d56Sopenharmony_ci c.cancel() 30467db96d56Sopenharmony_ci self._run_loop(self.one_loop) 30477db96d56Sopenharmony_ci self.assertFalse(fut.done()) 30487db96d56Sopenharmony_ci d.set_result(3) 30497db96d56Sopenharmony_ci e.cancel() 30507db96d56Sopenharmony_ci rte = RuntimeError() 30517db96d56Sopenharmony_ci f.set_exception(rte) 30527db96d56Sopenharmony_ci res = self.one_loop.run_until_complete(fut) 30537db96d56Sopenharmony_ci self.assertIsInstance(res[2], asyncio.CancelledError) 30547db96d56Sopenharmony_ci self.assertIsInstance(res[4], asyncio.CancelledError) 30557db96d56Sopenharmony_ci res[2] = res[4] = None 30567db96d56Sopenharmony_ci self.assertEqual(res, [1, zde, None, 3, None, rte]) 30577db96d56Sopenharmony_ci cb.assert_called_once_with(fut) 30587db96d56Sopenharmony_ci 30597db96d56Sopenharmony_ci 30607db96d56Sopenharmony_ciclass CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 30617db96d56Sopenharmony_ci 30627db96d56Sopenharmony_ci def wrap_futures(self, *futures): 30637db96d56Sopenharmony_ci coros = [] 30647db96d56Sopenharmony_ci for fut in futures: 30657db96d56Sopenharmony_ci async def coro(fut=fut): 30667db96d56Sopenharmony_ci return await fut 30677db96d56Sopenharmony_ci coros.append(coro()) 30687db96d56Sopenharmony_ci return coros 30697db96d56Sopenharmony_ci 30707db96d56Sopenharmony_ci def _gather(self, *args, **kwargs): 30717db96d56Sopenharmony_ci async def coro(): 30727db96d56Sopenharmony_ci return asyncio.gather(*args, **kwargs) 30737db96d56Sopenharmony_ci return self.one_loop.run_until_complete(coro()) 30747db96d56Sopenharmony_ci 30757db96d56Sopenharmony_ci def test_constructor_without_loop(self): 30767db96d56Sopenharmony_ci async def coro(): 30777db96d56Sopenharmony_ci return 'abc' 30787db96d56Sopenharmony_ci gen1 = coro() 30797db96d56Sopenharmony_ci self.addCleanup(gen1.close) 30807db96d56Sopenharmony_ci gen2 = coro() 30817db96d56Sopenharmony_ci self.addCleanup(gen2.close) 30827db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no current event loop'): 30837db96d56Sopenharmony_ci asyncio.gather(gen1, gen2) 30847db96d56Sopenharmony_ci 30857db96d56Sopenharmony_ci def test_constructor_use_running_loop(self): 30867db96d56Sopenharmony_ci async def coro(): 30877db96d56Sopenharmony_ci return 'abc' 30887db96d56Sopenharmony_ci gen1 = coro() 30897db96d56Sopenharmony_ci gen2 = coro() 30907db96d56Sopenharmony_ci async def gather(): 30917db96d56Sopenharmony_ci return asyncio.gather(gen1, gen2) 30927db96d56Sopenharmony_ci fut = self.one_loop.run_until_complete(gather()) 30937db96d56Sopenharmony_ci self.assertIs(fut._loop, self.one_loop) 30947db96d56Sopenharmony_ci self.one_loop.run_until_complete(fut) 30957db96d56Sopenharmony_ci 30967db96d56Sopenharmony_ci def test_constructor_use_global_loop(self): 30977db96d56Sopenharmony_ci # Deprecated in 3.10, undeprecated in 3.11.1 30987db96d56Sopenharmony_ci async def coro(): 30997db96d56Sopenharmony_ci return 'abc' 31007db96d56Sopenharmony_ci asyncio.set_event_loop(self.other_loop) 31017db96d56Sopenharmony_ci self.addCleanup(asyncio.set_event_loop, None) 31027db96d56Sopenharmony_ci gen1 = coro() 31037db96d56Sopenharmony_ci gen2 = coro() 31047db96d56Sopenharmony_ci fut = asyncio.gather(gen1, gen2) 31057db96d56Sopenharmony_ci self.assertIs(fut._loop, self.other_loop) 31067db96d56Sopenharmony_ci self.other_loop.run_until_complete(fut) 31077db96d56Sopenharmony_ci 31087db96d56Sopenharmony_ci def test_duplicate_coroutines(self): 31097db96d56Sopenharmony_ci async def coro(s): 31107db96d56Sopenharmony_ci return s 31117db96d56Sopenharmony_ci c = coro('abc') 31127db96d56Sopenharmony_ci fut = self._gather(c, c, coro('def'), c) 31137db96d56Sopenharmony_ci self._run_loop(self.one_loop) 31147db96d56Sopenharmony_ci self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 31157db96d56Sopenharmony_ci 31167db96d56Sopenharmony_ci def test_cancellation_broadcast(self): 31177db96d56Sopenharmony_ci # Cancelling outer() cancels all children. 31187db96d56Sopenharmony_ci proof = 0 31197db96d56Sopenharmony_ci waiter = self.one_loop.create_future() 31207db96d56Sopenharmony_ci 31217db96d56Sopenharmony_ci async def inner(): 31227db96d56Sopenharmony_ci nonlocal proof 31237db96d56Sopenharmony_ci await waiter 31247db96d56Sopenharmony_ci proof += 1 31257db96d56Sopenharmony_ci 31267db96d56Sopenharmony_ci child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 31277db96d56Sopenharmony_ci child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 31287db96d56Sopenharmony_ci gatherer = None 31297db96d56Sopenharmony_ci 31307db96d56Sopenharmony_ci async def outer(): 31317db96d56Sopenharmony_ci nonlocal proof, gatherer 31327db96d56Sopenharmony_ci gatherer = asyncio.gather(child1, child2) 31337db96d56Sopenharmony_ci await gatherer 31347db96d56Sopenharmony_ci proof += 100 31357db96d56Sopenharmony_ci 31367db96d56Sopenharmony_ci f = asyncio.ensure_future(outer(), loop=self.one_loop) 31377db96d56Sopenharmony_ci test_utils.run_briefly(self.one_loop) 31387db96d56Sopenharmony_ci self.assertTrue(f.cancel()) 31397db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 31407db96d56Sopenharmony_ci self.one_loop.run_until_complete(f) 31417db96d56Sopenharmony_ci self.assertFalse(gatherer.cancel()) 31427db96d56Sopenharmony_ci self.assertTrue(waiter.cancelled()) 31437db96d56Sopenharmony_ci self.assertTrue(child1.cancelled()) 31447db96d56Sopenharmony_ci self.assertTrue(child2.cancelled()) 31457db96d56Sopenharmony_ci test_utils.run_briefly(self.one_loop) 31467db96d56Sopenharmony_ci self.assertEqual(proof, 0) 31477db96d56Sopenharmony_ci 31487db96d56Sopenharmony_ci def test_exception_marking(self): 31497db96d56Sopenharmony_ci # Test for the first line marked "Mark exception retrieved." 31507db96d56Sopenharmony_ci 31517db96d56Sopenharmony_ci async def inner(f): 31527db96d56Sopenharmony_ci await f 31537db96d56Sopenharmony_ci raise RuntimeError('should not be ignored') 31547db96d56Sopenharmony_ci 31557db96d56Sopenharmony_ci a = self.one_loop.create_future() 31567db96d56Sopenharmony_ci b = self.one_loop.create_future() 31577db96d56Sopenharmony_ci 31587db96d56Sopenharmony_ci async def outer(): 31597db96d56Sopenharmony_ci await asyncio.gather(inner(a), inner(b)) 31607db96d56Sopenharmony_ci 31617db96d56Sopenharmony_ci f = asyncio.ensure_future(outer(), loop=self.one_loop) 31627db96d56Sopenharmony_ci test_utils.run_briefly(self.one_loop) 31637db96d56Sopenharmony_ci a.set_result(None) 31647db96d56Sopenharmony_ci test_utils.run_briefly(self.one_loop) 31657db96d56Sopenharmony_ci b.set_result(None) 31667db96d56Sopenharmony_ci test_utils.run_briefly(self.one_loop) 31677db96d56Sopenharmony_ci self.assertIsInstance(f.exception(), RuntimeError) 31687db96d56Sopenharmony_ci 31697db96d56Sopenharmony_ci def test_issue46672(self): 31707db96d56Sopenharmony_ci with mock.patch( 31717db96d56Sopenharmony_ci 'asyncio.base_events.BaseEventLoop.call_exception_handler', 31727db96d56Sopenharmony_ci ): 31737db96d56Sopenharmony_ci async def coro(s): 31747db96d56Sopenharmony_ci return s 31757db96d56Sopenharmony_ci c = coro('abc') 31767db96d56Sopenharmony_ci 31777db96d56Sopenharmony_ci with self.assertRaises(TypeError): 31787db96d56Sopenharmony_ci self._gather(c, {}) 31797db96d56Sopenharmony_ci self._run_loop(self.one_loop) 31807db96d56Sopenharmony_ci # NameError should not happen: 31817db96d56Sopenharmony_ci self.one_loop.call_exception_handler.assert_not_called() 31827db96d56Sopenharmony_ci 31837db96d56Sopenharmony_ci 31847db96d56Sopenharmony_ciclass RunCoroutineThreadsafeTests(test_utils.TestCase): 31857db96d56Sopenharmony_ci """Test case for asyncio.run_coroutine_threadsafe.""" 31867db96d56Sopenharmony_ci 31877db96d56Sopenharmony_ci def setUp(self): 31887db96d56Sopenharmony_ci super().setUp() 31897db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 31907db96d56Sopenharmony_ci self.set_event_loop(self.loop) # Will cleanup properly 31917db96d56Sopenharmony_ci 31927db96d56Sopenharmony_ci async def add(self, a, b, fail=False, cancel=False): 31937db96d56Sopenharmony_ci """Wait 0.05 second and return a + b.""" 31947db96d56Sopenharmony_ci await asyncio.sleep(0.05) 31957db96d56Sopenharmony_ci if fail: 31967db96d56Sopenharmony_ci raise RuntimeError("Fail!") 31977db96d56Sopenharmony_ci if cancel: 31987db96d56Sopenharmony_ci asyncio.current_task(self.loop).cancel() 31997db96d56Sopenharmony_ci await asyncio.sleep(0) 32007db96d56Sopenharmony_ci return a + b 32017db96d56Sopenharmony_ci 32027db96d56Sopenharmony_ci def target(self, fail=False, cancel=False, timeout=None, 32037db96d56Sopenharmony_ci advance_coro=False): 32047db96d56Sopenharmony_ci """Run add coroutine in the event loop.""" 32057db96d56Sopenharmony_ci coro = self.add(1, 2, fail=fail, cancel=cancel) 32067db96d56Sopenharmony_ci future = asyncio.run_coroutine_threadsafe(coro, self.loop) 32077db96d56Sopenharmony_ci if advance_coro: 32087db96d56Sopenharmony_ci # this is for test_run_coroutine_threadsafe_task_factory_exception; 32097db96d56Sopenharmony_ci # otherwise it spills errors and breaks **other** unittests, since 32107db96d56Sopenharmony_ci # 'target' is interacting with threads. 32117db96d56Sopenharmony_ci 32127db96d56Sopenharmony_ci # With this call, `coro` will be advanced. 32137db96d56Sopenharmony_ci self.loop.call_soon_threadsafe(coro.send, None) 32147db96d56Sopenharmony_ci try: 32157db96d56Sopenharmony_ci return future.result(timeout) 32167db96d56Sopenharmony_ci finally: 32177db96d56Sopenharmony_ci future.done() or future.cancel() 32187db96d56Sopenharmony_ci 32197db96d56Sopenharmony_ci def test_run_coroutine_threadsafe(self): 32207db96d56Sopenharmony_ci """Test coroutine submission from a thread to an event loop.""" 32217db96d56Sopenharmony_ci future = self.loop.run_in_executor(None, self.target) 32227db96d56Sopenharmony_ci result = self.loop.run_until_complete(future) 32237db96d56Sopenharmony_ci self.assertEqual(result, 3) 32247db96d56Sopenharmony_ci 32257db96d56Sopenharmony_ci def test_run_coroutine_threadsafe_with_exception(self): 32267db96d56Sopenharmony_ci """Test coroutine submission from a thread to an event loop 32277db96d56Sopenharmony_ci when an exception is raised.""" 32287db96d56Sopenharmony_ci future = self.loop.run_in_executor(None, self.target, True) 32297db96d56Sopenharmony_ci with self.assertRaises(RuntimeError) as exc_context: 32307db96d56Sopenharmony_ci self.loop.run_until_complete(future) 32317db96d56Sopenharmony_ci self.assertIn("Fail!", exc_context.exception.args) 32327db96d56Sopenharmony_ci 32337db96d56Sopenharmony_ci def test_run_coroutine_threadsafe_with_timeout(self): 32347db96d56Sopenharmony_ci """Test coroutine submission from a thread to an event loop 32357db96d56Sopenharmony_ci when a timeout is raised.""" 32367db96d56Sopenharmony_ci callback = lambda: self.target(timeout=0) 32377db96d56Sopenharmony_ci future = self.loop.run_in_executor(None, callback) 32387db96d56Sopenharmony_ci with self.assertRaises(asyncio.TimeoutError): 32397db96d56Sopenharmony_ci self.loop.run_until_complete(future) 32407db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 32417db96d56Sopenharmony_ci # Check that there's no pending task (add has been cancelled) 32427db96d56Sopenharmony_ci for task in asyncio.all_tasks(self.loop): 32437db96d56Sopenharmony_ci self.assertTrue(task.done()) 32447db96d56Sopenharmony_ci 32457db96d56Sopenharmony_ci def test_run_coroutine_threadsafe_task_cancelled(self): 32467db96d56Sopenharmony_ci """Test coroutine submission from a thread to an event loop 32477db96d56Sopenharmony_ci when the task is cancelled.""" 32487db96d56Sopenharmony_ci callback = lambda: self.target(cancel=True) 32497db96d56Sopenharmony_ci future = self.loop.run_in_executor(None, callback) 32507db96d56Sopenharmony_ci with self.assertRaises(asyncio.CancelledError): 32517db96d56Sopenharmony_ci self.loop.run_until_complete(future) 32527db96d56Sopenharmony_ci 32537db96d56Sopenharmony_ci def test_run_coroutine_threadsafe_task_factory_exception(self): 32547db96d56Sopenharmony_ci """Test coroutine submission from a thread to an event loop 32557db96d56Sopenharmony_ci when the task factory raise an exception.""" 32567db96d56Sopenharmony_ci 32577db96d56Sopenharmony_ci def task_factory(loop, coro): 32587db96d56Sopenharmony_ci raise NameError 32597db96d56Sopenharmony_ci 32607db96d56Sopenharmony_ci run = self.loop.run_in_executor( 32617db96d56Sopenharmony_ci None, lambda: self.target(advance_coro=True)) 32627db96d56Sopenharmony_ci 32637db96d56Sopenharmony_ci # Set exception handler 32647db96d56Sopenharmony_ci callback = test_utils.MockCallback() 32657db96d56Sopenharmony_ci self.loop.set_exception_handler(callback) 32667db96d56Sopenharmony_ci 32677db96d56Sopenharmony_ci # Set corrupted task factory 32687db96d56Sopenharmony_ci self.addCleanup(self.loop.set_task_factory, 32697db96d56Sopenharmony_ci self.loop.get_task_factory()) 32707db96d56Sopenharmony_ci self.loop.set_task_factory(task_factory) 32717db96d56Sopenharmony_ci 32727db96d56Sopenharmony_ci # Run event loop 32737db96d56Sopenharmony_ci with self.assertRaises(NameError) as exc_context: 32747db96d56Sopenharmony_ci self.loop.run_until_complete(run) 32757db96d56Sopenharmony_ci 32767db96d56Sopenharmony_ci # Check exceptions 32777db96d56Sopenharmony_ci self.assertEqual(len(callback.call_args_list), 1) 32787db96d56Sopenharmony_ci (loop, context), kwargs = callback.call_args 32797db96d56Sopenharmony_ci self.assertEqual(context['exception'], exc_context.exception) 32807db96d56Sopenharmony_ci 32817db96d56Sopenharmony_ci 32827db96d56Sopenharmony_ciclass SleepTests(test_utils.TestCase): 32837db96d56Sopenharmony_ci def setUp(self): 32847db96d56Sopenharmony_ci super().setUp() 32857db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 32867db96d56Sopenharmony_ci self.set_event_loop(self.loop) 32877db96d56Sopenharmony_ci 32887db96d56Sopenharmony_ci def tearDown(self): 32897db96d56Sopenharmony_ci self.loop.close() 32907db96d56Sopenharmony_ci self.loop = None 32917db96d56Sopenharmony_ci super().tearDown() 32927db96d56Sopenharmony_ci 32937db96d56Sopenharmony_ci def test_sleep_zero(self): 32947db96d56Sopenharmony_ci result = 0 32957db96d56Sopenharmony_ci 32967db96d56Sopenharmony_ci def inc_result(num): 32977db96d56Sopenharmony_ci nonlocal result 32987db96d56Sopenharmony_ci result += num 32997db96d56Sopenharmony_ci 33007db96d56Sopenharmony_ci async def coro(): 33017db96d56Sopenharmony_ci self.loop.call_soon(inc_result, 1) 33027db96d56Sopenharmony_ci self.assertEqual(result, 0) 33037db96d56Sopenharmony_ci num = await asyncio.sleep(0, result=10) 33047db96d56Sopenharmony_ci self.assertEqual(result, 1) # inc'ed by call_soon 33057db96d56Sopenharmony_ci inc_result(num) # num should be 11 33067db96d56Sopenharmony_ci 33077db96d56Sopenharmony_ci self.loop.run_until_complete(coro()) 33087db96d56Sopenharmony_ci self.assertEqual(result, 11) 33097db96d56Sopenharmony_ci 33107db96d56Sopenharmony_ci 33117db96d56Sopenharmony_ciclass CompatibilityTests(test_utils.TestCase): 33127db96d56Sopenharmony_ci # Tests for checking a bridge between old-styled coroutines 33137db96d56Sopenharmony_ci # and async/await syntax 33147db96d56Sopenharmony_ci 33157db96d56Sopenharmony_ci def setUp(self): 33167db96d56Sopenharmony_ci super().setUp() 33177db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 33187db96d56Sopenharmony_ci self.set_event_loop(self.loop) 33197db96d56Sopenharmony_ci 33207db96d56Sopenharmony_ci def tearDown(self): 33217db96d56Sopenharmony_ci self.loop.close() 33227db96d56Sopenharmony_ci self.loop = None 33237db96d56Sopenharmony_ci super().tearDown() 33247db96d56Sopenharmony_ci 33257db96d56Sopenharmony_ci 33267db96d56Sopenharmony_ciif __name__ == '__main__': 33277db96d56Sopenharmony_ci unittest.main() 3328