17db96d56Sopenharmony_ci"""Tests support for new syntax introduced by PEP 492.""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport sys 47db96d56Sopenharmony_ciimport types 57db96d56Sopenharmony_ciimport unittest 67db96d56Sopenharmony_ci 77db96d56Sopenharmony_cifrom unittest import mock 87db96d56Sopenharmony_ci 97db96d56Sopenharmony_ciimport asyncio 107db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_cidef tearDownModule(): 147db96d56Sopenharmony_ci asyncio.set_event_loop_policy(None) 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_ci# Test that asyncio.iscoroutine() uses collections.abc.Coroutine 187db96d56Sopenharmony_ciclass FakeCoro: 197db96d56Sopenharmony_ci def send(self, value): 207db96d56Sopenharmony_ci pass 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ci def throw(self, typ, val=None, tb=None): 237db96d56Sopenharmony_ci pass 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci def close(self): 267db96d56Sopenharmony_ci pass 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci def __await__(self): 297db96d56Sopenharmony_ci yield 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_ciclass BaseTest(test_utils.TestCase): 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci def setUp(self): 357db96d56Sopenharmony_ci super().setUp() 367db96d56Sopenharmony_ci self.loop = asyncio.BaseEventLoop() 377db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 387db96d56Sopenharmony_ci self.loop._selector = mock.Mock() 397db96d56Sopenharmony_ci self.loop._selector.select.return_value = () 407db96d56Sopenharmony_ci self.set_event_loop(self.loop) 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ciclass LockTests(BaseTest): 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci def test_context_manager_async_with(self): 467db96d56Sopenharmony_ci primitives = [ 477db96d56Sopenharmony_ci asyncio.Lock(), 487db96d56Sopenharmony_ci asyncio.Condition(), 497db96d56Sopenharmony_ci asyncio.Semaphore(), 507db96d56Sopenharmony_ci asyncio.BoundedSemaphore(), 517db96d56Sopenharmony_ci ] 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci async def test(lock): 547db96d56Sopenharmony_ci await asyncio.sleep(0.01) 557db96d56Sopenharmony_ci self.assertFalse(lock.locked()) 567db96d56Sopenharmony_ci async with lock as _lock: 577db96d56Sopenharmony_ci self.assertIs(_lock, None) 587db96d56Sopenharmony_ci self.assertTrue(lock.locked()) 597db96d56Sopenharmony_ci await asyncio.sleep(0.01) 607db96d56Sopenharmony_ci self.assertTrue(lock.locked()) 617db96d56Sopenharmony_ci self.assertFalse(lock.locked()) 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci for primitive in primitives: 647db96d56Sopenharmony_ci self.loop.run_until_complete(test(primitive)) 657db96d56Sopenharmony_ci self.assertFalse(primitive.locked()) 667db96d56Sopenharmony_ci 677db96d56Sopenharmony_ci def test_context_manager_with_await(self): 687db96d56Sopenharmony_ci primitives = [ 697db96d56Sopenharmony_ci asyncio.Lock(), 707db96d56Sopenharmony_ci asyncio.Condition(), 717db96d56Sopenharmony_ci asyncio.Semaphore(), 727db96d56Sopenharmony_ci asyncio.BoundedSemaphore(), 737db96d56Sopenharmony_ci ] 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci async def test(lock): 767db96d56Sopenharmony_ci await asyncio.sleep(0.01) 777db96d56Sopenharmony_ci self.assertFalse(lock.locked()) 787db96d56Sopenharmony_ci with self.assertRaisesRegex( 797db96d56Sopenharmony_ci TypeError, 807db96d56Sopenharmony_ci "can't be used in 'await' expression" 817db96d56Sopenharmony_ci ): 827db96d56Sopenharmony_ci with await lock: 837db96d56Sopenharmony_ci pass 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci for primitive in primitives: 867db96d56Sopenharmony_ci self.loop.run_until_complete(test(primitive)) 877db96d56Sopenharmony_ci self.assertFalse(primitive.locked()) 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ciclass StreamReaderTests(BaseTest): 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci def test_readline(self): 937db96d56Sopenharmony_ci DATA = b'line1\nline2\nline3' 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci stream = asyncio.StreamReader(loop=self.loop) 967db96d56Sopenharmony_ci stream.feed_data(DATA) 977db96d56Sopenharmony_ci stream.feed_eof() 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci async def reader(): 1007db96d56Sopenharmony_ci data = [] 1017db96d56Sopenharmony_ci async for line in stream: 1027db96d56Sopenharmony_ci data.append(line) 1037db96d56Sopenharmony_ci return data 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci data = self.loop.run_until_complete(reader()) 1067db96d56Sopenharmony_ci self.assertEqual(data, [b'line1\n', b'line2\n', b'line3']) 1077db96d56Sopenharmony_ci 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ciclass CoroutineTests(BaseTest): 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ci def test_iscoroutine(self): 1127db96d56Sopenharmony_ci async def foo(): pass 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci f = foo() 1157db96d56Sopenharmony_ci try: 1167db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutine(f)) 1177db96d56Sopenharmony_ci finally: 1187db96d56Sopenharmony_ci f.close() # silence warning 1197db96d56Sopenharmony_ci 1207db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutine(FakeCoro())) 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci def test_iscoroutinefunction(self): 1237db96d56Sopenharmony_ci async def foo(): pass 1247db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutinefunction(foo)) 1257db96d56Sopenharmony_ci 1267db96d56Sopenharmony_ci def test_async_def_coroutines(self): 1277db96d56Sopenharmony_ci async def bar(): 1287db96d56Sopenharmony_ci return 'spam' 1297db96d56Sopenharmony_ci async def foo(): 1307db96d56Sopenharmony_ci return await bar() 1317db96d56Sopenharmony_ci 1327db96d56Sopenharmony_ci # production mode 1337db96d56Sopenharmony_ci data = self.loop.run_until_complete(foo()) 1347db96d56Sopenharmony_ci self.assertEqual(data, 'spam') 1357db96d56Sopenharmony_ci 1367db96d56Sopenharmony_ci # debug mode 1377db96d56Sopenharmony_ci self.loop.set_debug(True) 1387db96d56Sopenharmony_ci data = self.loop.run_until_complete(foo()) 1397db96d56Sopenharmony_ci self.assertEqual(data, 'spam') 1407db96d56Sopenharmony_ci 1417db96d56Sopenharmony_ci def test_debug_mode_manages_coroutine_origin_tracking(self): 1427db96d56Sopenharmony_ci async def start(): 1437db96d56Sopenharmony_ci self.assertTrue(sys.get_coroutine_origin_tracking_depth() > 0) 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0) 1467db96d56Sopenharmony_ci self.loop.set_debug(True) 1477db96d56Sopenharmony_ci self.loop.run_until_complete(start()) 1487db96d56Sopenharmony_ci self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0) 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci def test_types_coroutine(self): 1517db96d56Sopenharmony_ci def gen(): 1527db96d56Sopenharmony_ci yield from () 1537db96d56Sopenharmony_ci return 'spam' 1547db96d56Sopenharmony_ci 1557db96d56Sopenharmony_ci @types.coroutine 1567db96d56Sopenharmony_ci def func(): 1577db96d56Sopenharmony_ci return gen() 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci async def coro(): 1607db96d56Sopenharmony_ci wrapper = func() 1617db96d56Sopenharmony_ci self.assertIsInstance(wrapper, types._GeneratorWrapper) 1627db96d56Sopenharmony_ci return await wrapper 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci data = self.loop.run_until_complete(coro()) 1657db96d56Sopenharmony_ci self.assertEqual(data, 'spam') 1667db96d56Sopenharmony_ci 1677db96d56Sopenharmony_ci def test_task_print_stack(self): 1687db96d56Sopenharmony_ci T = None 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci async def foo(): 1717db96d56Sopenharmony_ci f = T.get_stack(limit=1) 1727db96d56Sopenharmony_ci try: 1737db96d56Sopenharmony_ci self.assertEqual(f[0].f_code.co_name, 'foo') 1747db96d56Sopenharmony_ci finally: 1757db96d56Sopenharmony_ci f = None 1767db96d56Sopenharmony_ci 1777db96d56Sopenharmony_ci async def runner(): 1787db96d56Sopenharmony_ci nonlocal T 1797db96d56Sopenharmony_ci T = asyncio.ensure_future(foo(), loop=self.loop) 1807db96d56Sopenharmony_ci await T 1817db96d56Sopenharmony_ci 1827db96d56Sopenharmony_ci self.loop.run_until_complete(runner()) 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ci def test_double_await(self): 1857db96d56Sopenharmony_ci async def afunc(): 1867db96d56Sopenharmony_ci await asyncio.sleep(0.1) 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci async def runner(): 1897db96d56Sopenharmony_ci coro = afunc() 1907db96d56Sopenharmony_ci t = self.loop.create_task(coro) 1917db96d56Sopenharmony_ci try: 1927db96d56Sopenharmony_ci await asyncio.sleep(0) 1937db96d56Sopenharmony_ci await coro 1947db96d56Sopenharmony_ci finally: 1957db96d56Sopenharmony_ci t.cancel() 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci self.loop.set_debug(True) 1987db96d56Sopenharmony_ci with self.assertRaises( 1997db96d56Sopenharmony_ci RuntimeError, 2007db96d56Sopenharmony_ci msg='coroutine is being awaited already'): 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci self.loop.run_until_complete(runner()) 2037db96d56Sopenharmony_ci 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ciif __name__ == '__main__': 2067db96d56Sopenharmony_ci unittest.main() 207