17db96d56Sopenharmony_ci"""Tests support for new syntax introduced by PEP 492."""
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport sys
47db96d56Sopenharmony_ciimport types
57db96d56Sopenharmony_ciimport unittest
67db96d56Sopenharmony_ci
77db96d56Sopenharmony_cifrom unittest import mock
87db96d56Sopenharmony_ci
97db96d56Sopenharmony_ciimport asyncio
107db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_cidef tearDownModule():
147db96d56Sopenharmony_ci    asyncio.set_event_loop_policy(None)
157db96d56Sopenharmony_ci
167db96d56Sopenharmony_ci
177db96d56Sopenharmony_ci# Test that asyncio.iscoroutine() uses collections.abc.Coroutine
187db96d56Sopenharmony_ciclass FakeCoro:
197db96d56Sopenharmony_ci    def send(self, value):
207db96d56Sopenharmony_ci        pass
217db96d56Sopenharmony_ci
227db96d56Sopenharmony_ci    def throw(self, typ, val=None, tb=None):
237db96d56Sopenharmony_ci        pass
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ci    def close(self):
267db96d56Sopenharmony_ci        pass
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ci    def __await__(self):
297db96d56Sopenharmony_ci        yield
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_ciclass BaseTest(test_utils.TestCase):
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_ci    def setUp(self):
357db96d56Sopenharmony_ci        super().setUp()
367db96d56Sopenharmony_ci        self.loop = asyncio.BaseEventLoop()
377db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
387db96d56Sopenharmony_ci        self.loop._selector = mock.Mock()
397db96d56Sopenharmony_ci        self.loop._selector.select.return_value = ()
407db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ciclass LockTests(BaseTest):
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci    def test_context_manager_async_with(self):
467db96d56Sopenharmony_ci        primitives = [
477db96d56Sopenharmony_ci            asyncio.Lock(),
487db96d56Sopenharmony_ci            asyncio.Condition(),
497db96d56Sopenharmony_ci            asyncio.Semaphore(),
507db96d56Sopenharmony_ci            asyncio.BoundedSemaphore(),
517db96d56Sopenharmony_ci        ]
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci        async def test(lock):
547db96d56Sopenharmony_ci            await asyncio.sleep(0.01)
557db96d56Sopenharmony_ci            self.assertFalse(lock.locked())
567db96d56Sopenharmony_ci            async with lock as _lock:
577db96d56Sopenharmony_ci                self.assertIs(_lock, None)
587db96d56Sopenharmony_ci                self.assertTrue(lock.locked())
597db96d56Sopenharmony_ci                await asyncio.sleep(0.01)
607db96d56Sopenharmony_ci                self.assertTrue(lock.locked())
617db96d56Sopenharmony_ci            self.assertFalse(lock.locked())
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ci        for primitive in primitives:
647db96d56Sopenharmony_ci            self.loop.run_until_complete(test(primitive))
657db96d56Sopenharmony_ci            self.assertFalse(primitive.locked())
667db96d56Sopenharmony_ci
677db96d56Sopenharmony_ci    def test_context_manager_with_await(self):
687db96d56Sopenharmony_ci        primitives = [
697db96d56Sopenharmony_ci            asyncio.Lock(),
707db96d56Sopenharmony_ci            asyncio.Condition(),
717db96d56Sopenharmony_ci            asyncio.Semaphore(),
727db96d56Sopenharmony_ci            asyncio.BoundedSemaphore(),
737db96d56Sopenharmony_ci        ]
747db96d56Sopenharmony_ci
757db96d56Sopenharmony_ci        async def test(lock):
767db96d56Sopenharmony_ci            await asyncio.sleep(0.01)
777db96d56Sopenharmony_ci            self.assertFalse(lock.locked())
787db96d56Sopenharmony_ci            with self.assertRaisesRegex(
797db96d56Sopenharmony_ci                TypeError,
807db96d56Sopenharmony_ci                "can't be used in 'await' expression"
817db96d56Sopenharmony_ci            ):
827db96d56Sopenharmony_ci                with await lock:
837db96d56Sopenharmony_ci                    pass
847db96d56Sopenharmony_ci
857db96d56Sopenharmony_ci        for primitive in primitives:
867db96d56Sopenharmony_ci            self.loop.run_until_complete(test(primitive))
877db96d56Sopenharmony_ci            self.assertFalse(primitive.locked())
887db96d56Sopenharmony_ci
897db96d56Sopenharmony_ci
907db96d56Sopenharmony_ciclass StreamReaderTests(BaseTest):
917db96d56Sopenharmony_ci
927db96d56Sopenharmony_ci    def test_readline(self):
937db96d56Sopenharmony_ci        DATA = b'line1\nline2\nline3'
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ci        stream = asyncio.StreamReader(loop=self.loop)
967db96d56Sopenharmony_ci        stream.feed_data(DATA)
977db96d56Sopenharmony_ci        stream.feed_eof()
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci        async def reader():
1007db96d56Sopenharmony_ci            data = []
1017db96d56Sopenharmony_ci            async for line in stream:
1027db96d56Sopenharmony_ci                data.append(line)
1037db96d56Sopenharmony_ci            return data
1047db96d56Sopenharmony_ci
1057db96d56Sopenharmony_ci        data = self.loop.run_until_complete(reader())
1067db96d56Sopenharmony_ci        self.assertEqual(data, [b'line1\n', b'line2\n', b'line3'])
1077db96d56Sopenharmony_ci
1087db96d56Sopenharmony_ci
1097db96d56Sopenharmony_ciclass CoroutineTests(BaseTest):
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci    def test_iscoroutine(self):
1127db96d56Sopenharmony_ci        async def foo(): pass
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ci        f = foo()
1157db96d56Sopenharmony_ci        try:
1167db96d56Sopenharmony_ci            self.assertTrue(asyncio.iscoroutine(f))
1177db96d56Sopenharmony_ci        finally:
1187db96d56Sopenharmony_ci            f.close() # silence warning
1197db96d56Sopenharmony_ci
1207db96d56Sopenharmony_ci        self.assertTrue(asyncio.iscoroutine(FakeCoro()))
1217db96d56Sopenharmony_ci
1227db96d56Sopenharmony_ci    def test_iscoroutinefunction(self):
1237db96d56Sopenharmony_ci        async def foo(): pass
1247db96d56Sopenharmony_ci        self.assertTrue(asyncio.iscoroutinefunction(foo))
1257db96d56Sopenharmony_ci
1267db96d56Sopenharmony_ci    def test_async_def_coroutines(self):
1277db96d56Sopenharmony_ci        async def bar():
1287db96d56Sopenharmony_ci            return 'spam'
1297db96d56Sopenharmony_ci        async def foo():
1307db96d56Sopenharmony_ci            return await bar()
1317db96d56Sopenharmony_ci
1327db96d56Sopenharmony_ci        # production mode
1337db96d56Sopenharmony_ci        data = self.loop.run_until_complete(foo())
1347db96d56Sopenharmony_ci        self.assertEqual(data, 'spam')
1357db96d56Sopenharmony_ci
1367db96d56Sopenharmony_ci        # debug mode
1377db96d56Sopenharmony_ci        self.loop.set_debug(True)
1387db96d56Sopenharmony_ci        data = self.loop.run_until_complete(foo())
1397db96d56Sopenharmony_ci        self.assertEqual(data, 'spam')
1407db96d56Sopenharmony_ci
1417db96d56Sopenharmony_ci    def test_debug_mode_manages_coroutine_origin_tracking(self):
1427db96d56Sopenharmony_ci        async def start():
1437db96d56Sopenharmony_ci            self.assertTrue(sys.get_coroutine_origin_tracking_depth() > 0)
1447db96d56Sopenharmony_ci
1457db96d56Sopenharmony_ci        self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
1467db96d56Sopenharmony_ci        self.loop.set_debug(True)
1477db96d56Sopenharmony_ci        self.loop.run_until_complete(start())
1487db96d56Sopenharmony_ci        self.assertEqual(sys.get_coroutine_origin_tracking_depth(), 0)
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci    def test_types_coroutine(self):
1517db96d56Sopenharmony_ci        def gen():
1527db96d56Sopenharmony_ci            yield from ()
1537db96d56Sopenharmony_ci            return 'spam'
1547db96d56Sopenharmony_ci
1557db96d56Sopenharmony_ci        @types.coroutine
1567db96d56Sopenharmony_ci        def func():
1577db96d56Sopenharmony_ci            return gen()
1587db96d56Sopenharmony_ci
1597db96d56Sopenharmony_ci        async def coro():
1607db96d56Sopenharmony_ci            wrapper = func()
1617db96d56Sopenharmony_ci            self.assertIsInstance(wrapper, types._GeneratorWrapper)
1627db96d56Sopenharmony_ci            return await wrapper
1637db96d56Sopenharmony_ci
1647db96d56Sopenharmony_ci        data = self.loop.run_until_complete(coro())
1657db96d56Sopenharmony_ci        self.assertEqual(data, 'spam')
1667db96d56Sopenharmony_ci
1677db96d56Sopenharmony_ci    def test_task_print_stack(self):
1687db96d56Sopenharmony_ci        T = None
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ci        async def foo():
1717db96d56Sopenharmony_ci            f = T.get_stack(limit=1)
1727db96d56Sopenharmony_ci            try:
1737db96d56Sopenharmony_ci                self.assertEqual(f[0].f_code.co_name, 'foo')
1747db96d56Sopenharmony_ci            finally:
1757db96d56Sopenharmony_ci                f = None
1767db96d56Sopenharmony_ci
1777db96d56Sopenharmony_ci        async def runner():
1787db96d56Sopenharmony_ci            nonlocal T
1797db96d56Sopenharmony_ci            T = asyncio.ensure_future(foo(), loop=self.loop)
1807db96d56Sopenharmony_ci            await T
1817db96d56Sopenharmony_ci
1827db96d56Sopenharmony_ci        self.loop.run_until_complete(runner())
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci    def test_double_await(self):
1857db96d56Sopenharmony_ci        async def afunc():
1867db96d56Sopenharmony_ci            await asyncio.sleep(0.1)
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ci        async def runner():
1897db96d56Sopenharmony_ci            coro = afunc()
1907db96d56Sopenharmony_ci            t = self.loop.create_task(coro)
1917db96d56Sopenharmony_ci            try:
1927db96d56Sopenharmony_ci                await asyncio.sleep(0)
1937db96d56Sopenharmony_ci                await coro
1947db96d56Sopenharmony_ci            finally:
1957db96d56Sopenharmony_ci                t.cancel()
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci        self.loop.set_debug(True)
1987db96d56Sopenharmony_ci        with self.assertRaises(
1997db96d56Sopenharmony_ci                RuntimeError,
2007db96d56Sopenharmony_ci                msg='coroutine is being awaited already'):
2017db96d56Sopenharmony_ci
2027db96d56Sopenharmony_ci            self.loop.run_until_complete(runner())
2037db96d56Sopenharmony_ci
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ciif __name__ == '__main__':
2067db96d56Sopenharmony_ci    unittest.main()
207