17db96d56Sopenharmony_ci"""Tests for events.py.""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport collections.abc 47db96d56Sopenharmony_ciimport concurrent.futures 57db96d56Sopenharmony_ciimport functools 67db96d56Sopenharmony_ciimport io 77db96d56Sopenharmony_ciimport os 87db96d56Sopenharmony_ciimport platform 97db96d56Sopenharmony_ciimport re 107db96d56Sopenharmony_ciimport signal 117db96d56Sopenharmony_ciimport socket 127db96d56Sopenharmony_citry: 137db96d56Sopenharmony_ci import ssl 147db96d56Sopenharmony_ciexcept ImportError: 157db96d56Sopenharmony_ci ssl = None 167db96d56Sopenharmony_ciimport subprocess 177db96d56Sopenharmony_ciimport sys 187db96d56Sopenharmony_ciimport threading 197db96d56Sopenharmony_ciimport time 207db96d56Sopenharmony_ciimport types 217db96d56Sopenharmony_ciimport errno 227db96d56Sopenharmony_ciimport unittest 237db96d56Sopenharmony_cifrom unittest import mock 247db96d56Sopenharmony_ciimport weakref 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ciif sys.platform not in ('win32', 'vxworks'): 277db96d56Sopenharmony_ci import tty 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_ciimport asyncio 307db96d56Sopenharmony_cifrom asyncio import coroutines 317db96d56Sopenharmony_cifrom asyncio import events 327db96d56Sopenharmony_cifrom asyncio import proactor_events 337db96d56Sopenharmony_cifrom asyncio import selector_events 347db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils 357db96d56Sopenharmony_cifrom test import support 367db96d56Sopenharmony_cifrom test.support import socket_helper 377db96d56Sopenharmony_cifrom test.support import threading_helper 387db96d56Sopenharmony_cifrom test.support import ALWAYS_EQ, LARGEST, SMALLEST 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_cidef tearDownModule(): 427db96d56Sopenharmony_ci asyncio.set_event_loop_policy(None) 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_cidef broken_unix_getsockname(): 467db96d56Sopenharmony_ci """Return True if the platform is Mac OS 10.4 or older.""" 477db96d56Sopenharmony_ci if sys.platform.startswith("aix"): 487db96d56Sopenharmony_ci return True 497db96d56Sopenharmony_ci elif sys.platform != 'darwin': 507db96d56Sopenharmony_ci return False 517db96d56Sopenharmony_ci version = platform.mac_ver()[0] 527db96d56Sopenharmony_ci version = tuple(map(int, version.split('.'))) 537db96d56Sopenharmony_ci return version < (10, 5) 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_cidef _test_get_event_loop_new_process__sub_proc(): 577db96d56Sopenharmony_ci async def doit(): 587db96d56Sopenharmony_ci return 'hello' 597db96d56Sopenharmony_ci 607db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 617db96d56Sopenharmony_ci asyncio.set_event_loop(loop) 627db96d56Sopenharmony_ci return loop.run_until_complete(doit()) 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ciclass CoroLike: 667db96d56Sopenharmony_ci def send(self, v): 677db96d56Sopenharmony_ci pass 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci def throw(self, *exc): 707db96d56Sopenharmony_ci pass 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci def close(self): 737db96d56Sopenharmony_ci pass 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci def __await__(self): 767db96d56Sopenharmony_ci pass 777db96d56Sopenharmony_ci 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ciclass MyBaseProto(asyncio.Protocol): 807db96d56Sopenharmony_ci connected = None 817db96d56Sopenharmony_ci done = None 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_ci def __init__(self, loop=None): 847db96d56Sopenharmony_ci self.transport = None 857db96d56Sopenharmony_ci self.state = 'INITIAL' 867db96d56Sopenharmony_ci self.nbytes = 0 877db96d56Sopenharmony_ci if loop is not None: 887db96d56Sopenharmony_ci self.connected = loop.create_future() 897db96d56Sopenharmony_ci self.done = loop.create_future() 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def _assert_state(self, *expected): 927db96d56Sopenharmony_ci if self.state not in expected: 937db96d56Sopenharmony_ci raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def connection_made(self, transport): 967db96d56Sopenharmony_ci self.transport = transport 977db96d56Sopenharmony_ci self._assert_state('INITIAL') 987db96d56Sopenharmony_ci self.state = 'CONNECTED' 997db96d56Sopenharmony_ci if self.connected: 1007db96d56Sopenharmony_ci self.connected.set_result(None) 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci def data_received(self, data): 1037db96d56Sopenharmony_ci self._assert_state('CONNECTED') 1047db96d56Sopenharmony_ci self.nbytes += len(data) 1057db96d56Sopenharmony_ci 1067db96d56Sopenharmony_ci def eof_received(self): 1077db96d56Sopenharmony_ci self._assert_state('CONNECTED') 1087db96d56Sopenharmony_ci self.state = 'EOF' 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci def connection_lost(self, exc): 1117db96d56Sopenharmony_ci self._assert_state('CONNECTED', 'EOF') 1127db96d56Sopenharmony_ci self.state = 'CLOSED' 1137db96d56Sopenharmony_ci if self.done: 1147db96d56Sopenharmony_ci self.done.set_result(None) 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ciclass MyProto(MyBaseProto): 1187db96d56Sopenharmony_ci def connection_made(self, transport): 1197db96d56Sopenharmony_ci super().connection_made(transport) 1207db96d56Sopenharmony_ci transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 1217db96d56Sopenharmony_ci 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ciclass MyDatagramProto(asyncio.DatagramProtocol): 1247db96d56Sopenharmony_ci done = None 1257db96d56Sopenharmony_ci 1267db96d56Sopenharmony_ci def __init__(self, loop=None): 1277db96d56Sopenharmony_ci self.state = 'INITIAL' 1287db96d56Sopenharmony_ci self.nbytes = 0 1297db96d56Sopenharmony_ci if loop is not None: 1307db96d56Sopenharmony_ci self.done = loop.create_future() 1317db96d56Sopenharmony_ci 1327db96d56Sopenharmony_ci def _assert_state(self, expected): 1337db96d56Sopenharmony_ci if self.state != expected: 1347db96d56Sopenharmony_ci raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 1357db96d56Sopenharmony_ci 1367db96d56Sopenharmony_ci def connection_made(self, transport): 1377db96d56Sopenharmony_ci self.transport = transport 1387db96d56Sopenharmony_ci self._assert_state('INITIAL') 1397db96d56Sopenharmony_ci self.state = 'INITIALIZED' 1407db96d56Sopenharmony_ci 1417db96d56Sopenharmony_ci def datagram_received(self, data, addr): 1427db96d56Sopenharmony_ci self._assert_state('INITIALIZED') 1437db96d56Sopenharmony_ci self.nbytes += len(data) 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci def error_received(self, exc): 1467db96d56Sopenharmony_ci self._assert_state('INITIALIZED') 1477db96d56Sopenharmony_ci 1487db96d56Sopenharmony_ci def connection_lost(self, exc): 1497db96d56Sopenharmony_ci self._assert_state('INITIALIZED') 1507db96d56Sopenharmony_ci self.state = 'CLOSED' 1517db96d56Sopenharmony_ci if self.done: 1527db96d56Sopenharmony_ci self.done.set_result(None) 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci 1557db96d56Sopenharmony_ciclass MyReadPipeProto(asyncio.Protocol): 1567db96d56Sopenharmony_ci done = None 1577db96d56Sopenharmony_ci 1587db96d56Sopenharmony_ci def __init__(self, loop=None): 1597db96d56Sopenharmony_ci self.state = ['INITIAL'] 1607db96d56Sopenharmony_ci self.nbytes = 0 1617db96d56Sopenharmony_ci self.transport = None 1627db96d56Sopenharmony_ci if loop is not None: 1637db96d56Sopenharmony_ci self.done = loop.create_future() 1647db96d56Sopenharmony_ci 1657db96d56Sopenharmony_ci def _assert_state(self, expected): 1667db96d56Sopenharmony_ci if self.state != expected: 1677db96d56Sopenharmony_ci raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci def connection_made(self, transport): 1707db96d56Sopenharmony_ci self.transport = transport 1717db96d56Sopenharmony_ci self._assert_state(['INITIAL']) 1727db96d56Sopenharmony_ci self.state.append('CONNECTED') 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci def data_received(self, data): 1757db96d56Sopenharmony_ci self._assert_state(['INITIAL', 'CONNECTED']) 1767db96d56Sopenharmony_ci self.nbytes += len(data) 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci def eof_received(self): 1797db96d56Sopenharmony_ci self._assert_state(['INITIAL', 'CONNECTED']) 1807db96d56Sopenharmony_ci self.state.append('EOF') 1817db96d56Sopenharmony_ci 1827db96d56Sopenharmony_ci def connection_lost(self, exc): 1837db96d56Sopenharmony_ci if 'EOF' not in self.state: 1847db96d56Sopenharmony_ci self.state.append('EOF') # It is okay if EOF is missed. 1857db96d56Sopenharmony_ci self._assert_state(['INITIAL', 'CONNECTED', 'EOF']) 1867db96d56Sopenharmony_ci self.state.append('CLOSED') 1877db96d56Sopenharmony_ci if self.done: 1887db96d56Sopenharmony_ci self.done.set_result(None) 1897db96d56Sopenharmony_ci 1907db96d56Sopenharmony_ci 1917db96d56Sopenharmony_ciclass MyWritePipeProto(asyncio.BaseProtocol): 1927db96d56Sopenharmony_ci done = None 1937db96d56Sopenharmony_ci 1947db96d56Sopenharmony_ci def __init__(self, loop=None): 1957db96d56Sopenharmony_ci self.state = 'INITIAL' 1967db96d56Sopenharmony_ci self.transport = None 1977db96d56Sopenharmony_ci if loop is not None: 1987db96d56Sopenharmony_ci self.done = loop.create_future() 1997db96d56Sopenharmony_ci 2007db96d56Sopenharmony_ci def _assert_state(self, expected): 2017db96d56Sopenharmony_ci if self.state != expected: 2027db96d56Sopenharmony_ci raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 2037db96d56Sopenharmony_ci 2047db96d56Sopenharmony_ci def connection_made(self, transport): 2057db96d56Sopenharmony_ci self.transport = transport 2067db96d56Sopenharmony_ci self._assert_state('INITIAL') 2077db96d56Sopenharmony_ci self.state = 'CONNECTED' 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci def connection_lost(self, exc): 2107db96d56Sopenharmony_ci self._assert_state('CONNECTED') 2117db96d56Sopenharmony_ci self.state = 'CLOSED' 2127db96d56Sopenharmony_ci if self.done: 2137db96d56Sopenharmony_ci self.done.set_result(None) 2147db96d56Sopenharmony_ci 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ciclass MySubprocessProtocol(asyncio.SubprocessProtocol): 2177db96d56Sopenharmony_ci 2187db96d56Sopenharmony_ci def __init__(self, loop): 2197db96d56Sopenharmony_ci self.state = 'INITIAL' 2207db96d56Sopenharmony_ci self.transport = None 2217db96d56Sopenharmony_ci self.connected = loop.create_future() 2227db96d56Sopenharmony_ci self.completed = loop.create_future() 2237db96d56Sopenharmony_ci self.disconnects = {fd: loop.create_future() for fd in range(3)} 2247db96d56Sopenharmony_ci self.data = {1: b'', 2: b''} 2257db96d56Sopenharmony_ci self.returncode = None 2267db96d56Sopenharmony_ci self.got_data = {1: asyncio.Event(), 2277db96d56Sopenharmony_ci 2: asyncio.Event()} 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci def _assert_state(self, expected): 2307db96d56Sopenharmony_ci if self.state != expected: 2317db96d56Sopenharmony_ci raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 2327db96d56Sopenharmony_ci 2337db96d56Sopenharmony_ci def connection_made(self, transport): 2347db96d56Sopenharmony_ci self.transport = transport 2357db96d56Sopenharmony_ci self._assert_state('INITIAL') 2367db96d56Sopenharmony_ci self.state = 'CONNECTED' 2377db96d56Sopenharmony_ci self.connected.set_result(None) 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci def connection_lost(self, exc): 2407db96d56Sopenharmony_ci self._assert_state('CONNECTED') 2417db96d56Sopenharmony_ci self.state = 'CLOSED' 2427db96d56Sopenharmony_ci self.completed.set_result(None) 2437db96d56Sopenharmony_ci 2447db96d56Sopenharmony_ci def pipe_data_received(self, fd, data): 2457db96d56Sopenharmony_ci self._assert_state('CONNECTED') 2467db96d56Sopenharmony_ci self.data[fd] += data 2477db96d56Sopenharmony_ci self.got_data[fd].set() 2487db96d56Sopenharmony_ci 2497db96d56Sopenharmony_ci def pipe_connection_lost(self, fd, exc): 2507db96d56Sopenharmony_ci self._assert_state('CONNECTED') 2517db96d56Sopenharmony_ci if exc: 2527db96d56Sopenharmony_ci self.disconnects[fd].set_exception(exc) 2537db96d56Sopenharmony_ci else: 2547db96d56Sopenharmony_ci self.disconnects[fd].set_result(exc) 2557db96d56Sopenharmony_ci 2567db96d56Sopenharmony_ci def process_exited(self): 2577db96d56Sopenharmony_ci self._assert_state('CONNECTED') 2587db96d56Sopenharmony_ci self.returncode = self.transport.get_returncode() 2597db96d56Sopenharmony_ci 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ciclass EventLoopTestsMixin: 2627db96d56Sopenharmony_ci 2637db96d56Sopenharmony_ci def setUp(self): 2647db96d56Sopenharmony_ci super().setUp() 2657db96d56Sopenharmony_ci self.loop = self.create_event_loop() 2667db96d56Sopenharmony_ci self.set_event_loop(self.loop) 2677db96d56Sopenharmony_ci 2687db96d56Sopenharmony_ci def tearDown(self): 2697db96d56Sopenharmony_ci # just in case if we have transport close callbacks 2707db96d56Sopenharmony_ci if not self.loop.is_closed(): 2717db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci self.doCleanups() 2747db96d56Sopenharmony_ci support.gc_collect() 2757db96d56Sopenharmony_ci super().tearDown() 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci def test_run_until_complete_nesting(self): 2787db96d56Sopenharmony_ci async def coro1(): 2797db96d56Sopenharmony_ci await asyncio.sleep(0) 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci async def coro2(): 2827db96d56Sopenharmony_ci self.assertTrue(self.loop.is_running()) 2837db96d56Sopenharmony_ci self.loop.run_until_complete(coro1()) 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci with self.assertWarnsRegex( 2867db96d56Sopenharmony_ci RuntimeWarning, 2877db96d56Sopenharmony_ci r"coroutine \S+ was never awaited" 2887db96d56Sopenharmony_ci ): 2897db96d56Sopenharmony_ci self.assertRaises( 2907db96d56Sopenharmony_ci RuntimeError, self.loop.run_until_complete, coro2()) 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci # Note: because of the default Windows timing granularity of 2937db96d56Sopenharmony_ci # 15.6 msec, we use fairly long sleep times here (~100 msec). 2947db96d56Sopenharmony_ci 2957db96d56Sopenharmony_ci def test_run_until_complete(self): 2967db96d56Sopenharmony_ci t0 = self.loop.time() 2977db96d56Sopenharmony_ci self.loop.run_until_complete(asyncio.sleep(0.1)) 2987db96d56Sopenharmony_ci t1 = self.loop.time() 2997db96d56Sopenharmony_ci self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0) 3007db96d56Sopenharmony_ci 3017db96d56Sopenharmony_ci def test_run_until_complete_stopped(self): 3027db96d56Sopenharmony_ci 3037db96d56Sopenharmony_ci async def cb(): 3047db96d56Sopenharmony_ci self.loop.stop() 3057db96d56Sopenharmony_ci await asyncio.sleep(0.1) 3067db96d56Sopenharmony_ci task = cb() 3077db96d56Sopenharmony_ci self.assertRaises(RuntimeError, 3087db96d56Sopenharmony_ci self.loop.run_until_complete, task) 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci def test_call_later(self): 3117db96d56Sopenharmony_ci results = [] 3127db96d56Sopenharmony_ci 3137db96d56Sopenharmony_ci def callback(arg): 3147db96d56Sopenharmony_ci results.append(arg) 3157db96d56Sopenharmony_ci self.loop.stop() 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci self.loop.call_later(0.1, callback, 'hello world') 3187db96d56Sopenharmony_ci self.loop.run_forever() 3197db96d56Sopenharmony_ci self.assertEqual(results, ['hello world']) 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci def test_call_soon(self): 3227db96d56Sopenharmony_ci results = [] 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci def callback(arg1, arg2): 3257db96d56Sopenharmony_ci results.append((arg1, arg2)) 3267db96d56Sopenharmony_ci self.loop.stop() 3277db96d56Sopenharmony_ci 3287db96d56Sopenharmony_ci self.loop.call_soon(callback, 'hello', 'world') 3297db96d56Sopenharmony_ci self.loop.run_forever() 3307db96d56Sopenharmony_ci self.assertEqual(results, [('hello', 'world')]) 3317db96d56Sopenharmony_ci 3327db96d56Sopenharmony_ci def test_call_soon_threadsafe(self): 3337db96d56Sopenharmony_ci results = [] 3347db96d56Sopenharmony_ci lock = threading.Lock() 3357db96d56Sopenharmony_ci 3367db96d56Sopenharmony_ci def callback(arg): 3377db96d56Sopenharmony_ci results.append(arg) 3387db96d56Sopenharmony_ci if len(results) >= 2: 3397db96d56Sopenharmony_ci self.loop.stop() 3407db96d56Sopenharmony_ci 3417db96d56Sopenharmony_ci def run_in_thread(): 3427db96d56Sopenharmony_ci self.loop.call_soon_threadsafe(callback, 'hello') 3437db96d56Sopenharmony_ci lock.release() 3447db96d56Sopenharmony_ci 3457db96d56Sopenharmony_ci lock.acquire() 3467db96d56Sopenharmony_ci t = threading.Thread(target=run_in_thread) 3477db96d56Sopenharmony_ci t.start() 3487db96d56Sopenharmony_ci 3497db96d56Sopenharmony_ci with lock: 3507db96d56Sopenharmony_ci self.loop.call_soon(callback, 'world') 3517db96d56Sopenharmony_ci self.loop.run_forever() 3527db96d56Sopenharmony_ci t.join() 3537db96d56Sopenharmony_ci self.assertEqual(results, ['hello', 'world']) 3547db96d56Sopenharmony_ci 3557db96d56Sopenharmony_ci def test_call_soon_threadsafe_same_thread(self): 3567db96d56Sopenharmony_ci results = [] 3577db96d56Sopenharmony_ci 3587db96d56Sopenharmony_ci def callback(arg): 3597db96d56Sopenharmony_ci results.append(arg) 3607db96d56Sopenharmony_ci if len(results) >= 2: 3617db96d56Sopenharmony_ci self.loop.stop() 3627db96d56Sopenharmony_ci 3637db96d56Sopenharmony_ci self.loop.call_soon_threadsafe(callback, 'hello') 3647db96d56Sopenharmony_ci self.loop.call_soon(callback, 'world') 3657db96d56Sopenharmony_ci self.loop.run_forever() 3667db96d56Sopenharmony_ci self.assertEqual(results, ['hello', 'world']) 3677db96d56Sopenharmony_ci 3687db96d56Sopenharmony_ci def test_run_in_executor(self): 3697db96d56Sopenharmony_ci def run(arg): 3707db96d56Sopenharmony_ci return (arg, threading.get_ident()) 3717db96d56Sopenharmony_ci f2 = self.loop.run_in_executor(None, run, 'yo') 3727db96d56Sopenharmony_ci res, thread_id = self.loop.run_until_complete(f2) 3737db96d56Sopenharmony_ci self.assertEqual(res, 'yo') 3747db96d56Sopenharmony_ci self.assertNotEqual(thread_id, threading.get_ident()) 3757db96d56Sopenharmony_ci 3767db96d56Sopenharmony_ci def test_run_in_executor_cancel(self): 3777db96d56Sopenharmony_ci called = False 3787db96d56Sopenharmony_ci 3797db96d56Sopenharmony_ci def patched_call_soon(*args): 3807db96d56Sopenharmony_ci nonlocal called 3817db96d56Sopenharmony_ci called = True 3827db96d56Sopenharmony_ci 3837db96d56Sopenharmony_ci def run(): 3847db96d56Sopenharmony_ci time.sleep(0.05) 3857db96d56Sopenharmony_ci 3867db96d56Sopenharmony_ci f2 = self.loop.run_in_executor(None, run) 3877db96d56Sopenharmony_ci f2.cancel() 3887db96d56Sopenharmony_ci self.loop.run_until_complete( 3897db96d56Sopenharmony_ci self.loop.shutdown_default_executor()) 3907db96d56Sopenharmony_ci self.loop.close() 3917db96d56Sopenharmony_ci self.loop.call_soon = patched_call_soon 3927db96d56Sopenharmony_ci self.loop.call_soon_threadsafe = patched_call_soon 3937db96d56Sopenharmony_ci time.sleep(0.4) 3947db96d56Sopenharmony_ci self.assertFalse(called) 3957db96d56Sopenharmony_ci 3967db96d56Sopenharmony_ci def test_reader_callback(self): 3977db96d56Sopenharmony_ci r, w = socket.socketpair() 3987db96d56Sopenharmony_ci r.setblocking(False) 3997db96d56Sopenharmony_ci bytes_read = bytearray() 4007db96d56Sopenharmony_ci 4017db96d56Sopenharmony_ci def reader(): 4027db96d56Sopenharmony_ci try: 4037db96d56Sopenharmony_ci data = r.recv(1024) 4047db96d56Sopenharmony_ci except BlockingIOError: 4057db96d56Sopenharmony_ci # Spurious readiness notifications are possible 4067db96d56Sopenharmony_ci # at least on Linux -- see man select. 4077db96d56Sopenharmony_ci return 4087db96d56Sopenharmony_ci if data: 4097db96d56Sopenharmony_ci bytes_read.extend(data) 4107db96d56Sopenharmony_ci else: 4117db96d56Sopenharmony_ci self.assertTrue(self.loop.remove_reader(r.fileno())) 4127db96d56Sopenharmony_ci r.close() 4137db96d56Sopenharmony_ci 4147db96d56Sopenharmony_ci self.loop.add_reader(r.fileno(), reader) 4157db96d56Sopenharmony_ci self.loop.call_soon(w.send, b'abc') 4167db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3) 4177db96d56Sopenharmony_ci self.loop.call_soon(w.send, b'def') 4187db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6) 4197db96d56Sopenharmony_ci self.loop.call_soon(w.close) 4207db96d56Sopenharmony_ci self.loop.call_soon(self.loop.stop) 4217db96d56Sopenharmony_ci self.loop.run_forever() 4227db96d56Sopenharmony_ci self.assertEqual(bytes_read, b'abcdef') 4237db96d56Sopenharmony_ci 4247db96d56Sopenharmony_ci def test_writer_callback(self): 4257db96d56Sopenharmony_ci r, w = socket.socketpair() 4267db96d56Sopenharmony_ci w.setblocking(False) 4277db96d56Sopenharmony_ci 4287db96d56Sopenharmony_ci def writer(data): 4297db96d56Sopenharmony_ci w.send(data) 4307db96d56Sopenharmony_ci self.loop.stop() 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ci data = b'x' * 1024 4337db96d56Sopenharmony_ci self.loop.add_writer(w.fileno(), writer, data) 4347db96d56Sopenharmony_ci self.loop.run_forever() 4357db96d56Sopenharmony_ci 4367db96d56Sopenharmony_ci self.assertTrue(self.loop.remove_writer(w.fileno())) 4377db96d56Sopenharmony_ci self.assertFalse(self.loop.remove_writer(w.fileno())) 4387db96d56Sopenharmony_ci 4397db96d56Sopenharmony_ci w.close() 4407db96d56Sopenharmony_ci read = r.recv(len(data) * 2) 4417db96d56Sopenharmony_ci r.close() 4427db96d56Sopenharmony_ci self.assertEqual(read, data) 4437db96d56Sopenharmony_ci 4447db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL') 4457db96d56Sopenharmony_ci def test_add_signal_handler(self): 4467db96d56Sopenharmony_ci caught = 0 4477db96d56Sopenharmony_ci 4487db96d56Sopenharmony_ci def my_handler(): 4497db96d56Sopenharmony_ci nonlocal caught 4507db96d56Sopenharmony_ci caught += 1 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci # Check error behavior first. 4537db96d56Sopenharmony_ci self.assertRaises( 4547db96d56Sopenharmony_ci TypeError, self.loop.add_signal_handler, 'boom', my_handler) 4557db96d56Sopenharmony_ci self.assertRaises( 4567db96d56Sopenharmony_ci TypeError, self.loop.remove_signal_handler, 'boom') 4577db96d56Sopenharmony_ci self.assertRaises( 4587db96d56Sopenharmony_ci ValueError, self.loop.add_signal_handler, signal.NSIG+1, 4597db96d56Sopenharmony_ci my_handler) 4607db96d56Sopenharmony_ci self.assertRaises( 4617db96d56Sopenharmony_ci ValueError, self.loop.remove_signal_handler, signal.NSIG+1) 4627db96d56Sopenharmony_ci self.assertRaises( 4637db96d56Sopenharmony_ci ValueError, self.loop.add_signal_handler, 0, my_handler) 4647db96d56Sopenharmony_ci self.assertRaises( 4657db96d56Sopenharmony_ci ValueError, self.loop.remove_signal_handler, 0) 4667db96d56Sopenharmony_ci self.assertRaises( 4677db96d56Sopenharmony_ci ValueError, self.loop.add_signal_handler, -1, my_handler) 4687db96d56Sopenharmony_ci self.assertRaises( 4697db96d56Sopenharmony_ci ValueError, self.loop.remove_signal_handler, -1) 4707db96d56Sopenharmony_ci self.assertRaises( 4717db96d56Sopenharmony_ci RuntimeError, self.loop.add_signal_handler, signal.SIGKILL, 4727db96d56Sopenharmony_ci my_handler) 4737db96d56Sopenharmony_ci # Removing SIGKILL doesn't raise, since we don't call signal(). 4747db96d56Sopenharmony_ci self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL)) 4757db96d56Sopenharmony_ci # Now set a handler and handle it. 4767db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGINT, my_handler) 4777db96d56Sopenharmony_ci 4787db96d56Sopenharmony_ci os.kill(os.getpid(), signal.SIGINT) 4797db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: caught) 4807db96d56Sopenharmony_ci 4817db96d56Sopenharmony_ci # Removing it should restore the default handler. 4827db96d56Sopenharmony_ci self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT)) 4837db96d56Sopenharmony_ci self.assertEqual(signal.getsignal(signal.SIGINT), 4847db96d56Sopenharmony_ci signal.default_int_handler) 4857db96d56Sopenharmony_ci # Removing again returns False. 4867db96d56Sopenharmony_ci self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT)) 4877db96d56Sopenharmony_ci 4887db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 4897db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'setitimer'), 4907db96d56Sopenharmony_ci 'need signal.setitimer()') 4917db96d56Sopenharmony_ci def test_signal_handling_while_selecting(self): 4927db96d56Sopenharmony_ci # Test with a signal actually arriving during a select() call. 4937db96d56Sopenharmony_ci caught = 0 4947db96d56Sopenharmony_ci 4957db96d56Sopenharmony_ci def my_handler(): 4967db96d56Sopenharmony_ci nonlocal caught 4977db96d56Sopenharmony_ci caught += 1 4987db96d56Sopenharmony_ci self.loop.stop() 4997db96d56Sopenharmony_ci 5007db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGALRM, my_handler) 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci signal.setitimer(signal.ITIMER_REAL, 0.01, 0) # Send SIGALRM once. 5037db96d56Sopenharmony_ci self.loop.call_later(60, self.loop.stop) 5047db96d56Sopenharmony_ci self.loop.run_forever() 5057db96d56Sopenharmony_ci self.assertEqual(caught, 1) 5067db96d56Sopenharmony_ci 5077db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM') 5087db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'setitimer'), 5097db96d56Sopenharmony_ci 'need signal.setitimer()') 5107db96d56Sopenharmony_ci def test_signal_handling_args(self): 5117db96d56Sopenharmony_ci some_args = (42,) 5127db96d56Sopenharmony_ci caught = 0 5137db96d56Sopenharmony_ci 5147db96d56Sopenharmony_ci def my_handler(*args): 5157db96d56Sopenharmony_ci nonlocal caught 5167db96d56Sopenharmony_ci caught += 1 5177db96d56Sopenharmony_ci self.assertEqual(args, some_args) 5187db96d56Sopenharmony_ci self.loop.stop() 5197db96d56Sopenharmony_ci 5207db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args) 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci signal.setitimer(signal.ITIMER_REAL, 0.1, 0) # Send SIGALRM once. 5237db96d56Sopenharmony_ci self.loop.call_later(60, self.loop.stop) 5247db96d56Sopenharmony_ci self.loop.run_forever() 5257db96d56Sopenharmony_ci self.assertEqual(caught, 1) 5267db96d56Sopenharmony_ci 5277db96d56Sopenharmony_ci def _basetest_create_connection(self, connection_fut, check_sockname=True): 5287db96d56Sopenharmony_ci tr, pr = self.loop.run_until_complete(connection_fut) 5297db96d56Sopenharmony_ci self.assertIsInstance(tr, asyncio.Transport) 5307db96d56Sopenharmony_ci self.assertIsInstance(pr, asyncio.Protocol) 5317db96d56Sopenharmony_ci self.assertIs(pr.transport, tr) 5327db96d56Sopenharmony_ci if check_sockname: 5337db96d56Sopenharmony_ci self.assertIsNotNone(tr.get_extra_info('sockname')) 5347db96d56Sopenharmony_ci self.loop.run_until_complete(pr.done) 5357db96d56Sopenharmony_ci self.assertGreater(pr.nbytes, 0) 5367db96d56Sopenharmony_ci tr.close() 5377db96d56Sopenharmony_ci 5387db96d56Sopenharmony_ci def test_create_connection(self): 5397db96d56Sopenharmony_ci with test_utils.run_test_server() as httpd: 5407db96d56Sopenharmony_ci conn_fut = self.loop.create_connection( 5417db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), *httpd.address) 5427db96d56Sopenharmony_ci self._basetest_create_connection(conn_fut) 5437db96d56Sopenharmony_ci 5447db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 5457db96d56Sopenharmony_ci def test_create_unix_connection(self): 5467db96d56Sopenharmony_ci # Issue #20682: On Mac OS X Tiger, getsockname() returns a 5477db96d56Sopenharmony_ci # zero-length address for UNIX socket. 5487db96d56Sopenharmony_ci check_sockname = not broken_unix_getsockname() 5497db96d56Sopenharmony_ci 5507db96d56Sopenharmony_ci with test_utils.run_test_unix_server() as httpd: 5517db96d56Sopenharmony_ci conn_fut = self.loop.create_unix_connection( 5527db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), httpd.address) 5537db96d56Sopenharmony_ci self._basetest_create_connection(conn_fut, check_sockname) 5547db96d56Sopenharmony_ci 5557db96d56Sopenharmony_ci def check_ssl_extra_info(self, client, check_sockname=True, 5567db96d56Sopenharmony_ci peername=None, peercert={}): 5577db96d56Sopenharmony_ci if check_sockname: 5587db96d56Sopenharmony_ci self.assertIsNotNone(client.get_extra_info('sockname')) 5597db96d56Sopenharmony_ci if peername: 5607db96d56Sopenharmony_ci self.assertEqual(peername, 5617db96d56Sopenharmony_ci client.get_extra_info('peername')) 5627db96d56Sopenharmony_ci else: 5637db96d56Sopenharmony_ci self.assertIsNotNone(client.get_extra_info('peername')) 5647db96d56Sopenharmony_ci self.assertEqual(peercert, 5657db96d56Sopenharmony_ci client.get_extra_info('peercert')) 5667db96d56Sopenharmony_ci 5677db96d56Sopenharmony_ci # test SSL cipher 5687db96d56Sopenharmony_ci cipher = client.get_extra_info('cipher') 5697db96d56Sopenharmony_ci self.assertIsInstance(cipher, tuple) 5707db96d56Sopenharmony_ci self.assertEqual(len(cipher), 3, cipher) 5717db96d56Sopenharmony_ci self.assertIsInstance(cipher[0], str) 5727db96d56Sopenharmony_ci self.assertIsInstance(cipher[1], str) 5737db96d56Sopenharmony_ci self.assertIsInstance(cipher[2], int) 5747db96d56Sopenharmony_ci 5757db96d56Sopenharmony_ci # test SSL object 5767db96d56Sopenharmony_ci sslobj = client.get_extra_info('ssl_object') 5777db96d56Sopenharmony_ci self.assertIsNotNone(sslobj) 5787db96d56Sopenharmony_ci self.assertEqual(sslobj.compression(), 5797db96d56Sopenharmony_ci client.get_extra_info('compression')) 5807db96d56Sopenharmony_ci self.assertEqual(sslobj.cipher(), 5817db96d56Sopenharmony_ci client.get_extra_info('cipher')) 5827db96d56Sopenharmony_ci self.assertEqual(sslobj.getpeercert(), 5837db96d56Sopenharmony_ci client.get_extra_info('peercert')) 5847db96d56Sopenharmony_ci self.assertEqual(sslobj.compression(), 5857db96d56Sopenharmony_ci client.get_extra_info('compression')) 5867db96d56Sopenharmony_ci 5877db96d56Sopenharmony_ci def _basetest_create_ssl_connection(self, connection_fut, 5887db96d56Sopenharmony_ci check_sockname=True, 5897db96d56Sopenharmony_ci peername=None): 5907db96d56Sopenharmony_ci tr, pr = self.loop.run_until_complete(connection_fut) 5917db96d56Sopenharmony_ci self.assertIsInstance(tr, asyncio.Transport) 5927db96d56Sopenharmony_ci self.assertIsInstance(pr, asyncio.Protocol) 5937db96d56Sopenharmony_ci self.assertTrue('ssl' in tr.__class__.__name__.lower()) 5947db96d56Sopenharmony_ci self.check_ssl_extra_info(tr, check_sockname, peername) 5957db96d56Sopenharmony_ci self.loop.run_until_complete(pr.done) 5967db96d56Sopenharmony_ci self.assertGreater(pr.nbytes, 0) 5977db96d56Sopenharmony_ci tr.close() 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci def _test_create_ssl_connection(self, httpd, create_connection, 6007db96d56Sopenharmony_ci check_sockname=True, peername=None): 6017db96d56Sopenharmony_ci conn_fut = create_connection(ssl=test_utils.dummy_ssl_context()) 6027db96d56Sopenharmony_ci self._basetest_create_ssl_connection(conn_fut, check_sockname, 6037db96d56Sopenharmony_ci peername) 6047db96d56Sopenharmony_ci 6057db96d56Sopenharmony_ci # ssl.Purpose was introduced in Python 3.4 6067db96d56Sopenharmony_ci if hasattr(ssl, 'Purpose'): 6077db96d56Sopenharmony_ci def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *, 6087db96d56Sopenharmony_ci cafile=None, capath=None, 6097db96d56Sopenharmony_ci cadata=None): 6107db96d56Sopenharmony_ci """ 6117db96d56Sopenharmony_ci A ssl.create_default_context() replacement that doesn't enable 6127db96d56Sopenharmony_ci cert validation. 6137db96d56Sopenharmony_ci """ 6147db96d56Sopenharmony_ci self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH) 6157db96d56Sopenharmony_ci return test_utils.dummy_ssl_context() 6167db96d56Sopenharmony_ci 6177db96d56Sopenharmony_ci # With ssl=True, ssl.create_default_context() should be called 6187db96d56Sopenharmony_ci with mock.patch('ssl.create_default_context', 6197db96d56Sopenharmony_ci side_effect=_dummy_ssl_create_context) as m: 6207db96d56Sopenharmony_ci conn_fut = create_connection(ssl=True) 6217db96d56Sopenharmony_ci self._basetest_create_ssl_connection(conn_fut, check_sockname, 6227db96d56Sopenharmony_ci peername) 6237db96d56Sopenharmony_ci self.assertEqual(m.call_count, 1) 6247db96d56Sopenharmony_ci 6257db96d56Sopenharmony_ci # With the real ssl.create_default_context(), certificate 6267db96d56Sopenharmony_ci # validation will fail 6277db96d56Sopenharmony_ci with self.assertRaises(ssl.SSLError) as cm: 6287db96d56Sopenharmony_ci conn_fut = create_connection(ssl=True) 6297db96d56Sopenharmony_ci # Ignore the "SSL handshake failed" log in debug mode 6307db96d56Sopenharmony_ci with test_utils.disable_logger(): 6317db96d56Sopenharmony_ci self._basetest_create_ssl_connection(conn_fut, check_sockname, 6327db96d56Sopenharmony_ci peername) 6337db96d56Sopenharmony_ci 6347db96d56Sopenharmony_ci self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED') 6357db96d56Sopenharmony_ci 6367db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 6377db96d56Sopenharmony_ci def test_create_ssl_connection(self): 6387db96d56Sopenharmony_ci with test_utils.run_test_server(use_ssl=True) as httpd: 6397db96d56Sopenharmony_ci create_connection = functools.partial( 6407db96d56Sopenharmony_ci self.loop.create_connection, 6417db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), 6427db96d56Sopenharmony_ci *httpd.address) 6437db96d56Sopenharmony_ci self._test_create_ssl_connection(httpd, create_connection, 6447db96d56Sopenharmony_ci peername=httpd.address) 6457db96d56Sopenharmony_ci 6467db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 6477db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 6487db96d56Sopenharmony_ci def test_create_ssl_unix_connection(self): 6497db96d56Sopenharmony_ci # Issue #20682: On Mac OS X Tiger, getsockname() returns a 6507db96d56Sopenharmony_ci # zero-length address for UNIX socket. 6517db96d56Sopenharmony_ci check_sockname = not broken_unix_getsockname() 6527db96d56Sopenharmony_ci 6537db96d56Sopenharmony_ci with test_utils.run_test_unix_server(use_ssl=True) as httpd: 6547db96d56Sopenharmony_ci create_connection = functools.partial( 6557db96d56Sopenharmony_ci self.loop.create_unix_connection, 6567db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), httpd.address, 6577db96d56Sopenharmony_ci server_hostname='127.0.0.1') 6587db96d56Sopenharmony_ci 6597db96d56Sopenharmony_ci self._test_create_ssl_connection(httpd, create_connection, 6607db96d56Sopenharmony_ci check_sockname, 6617db96d56Sopenharmony_ci peername=httpd.address) 6627db96d56Sopenharmony_ci 6637db96d56Sopenharmony_ci def test_create_connection_local_addr(self): 6647db96d56Sopenharmony_ci with test_utils.run_test_server() as httpd: 6657db96d56Sopenharmony_ci port = socket_helper.find_unused_port() 6667db96d56Sopenharmony_ci f = self.loop.create_connection( 6677db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), 6687db96d56Sopenharmony_ci *httpd.address, local_addr=(httpd.address[0], port)) 6697db96d56Sopenharmony_ci tr, pr = self.loop.run_until_complete(f) 6707db96d56Sopenharmony_ci expected = pr.transport.get_extra_info('sockname')[1] 6717db96d56Sopenharmony_ci self.assertEqual(port, expected) 6727db96d56Sopenharmony_ci tr.close() 6737db96d56Sopenharmony_ci 6747db96d56Sopenharmony_ci def test_create_connection_local_addr_skip_different_family(self): 6757db96d56Sopenharmony_ci # See https://github.com/python/cpython/issues/86508 6767db96d56Sopenharmony_ci port1 = socket_helper.find_unused_port() 6777db96d56Sopenharmony_ci port2 = socket_helper.find_unused_port() 6787db96d56Sopenharmony_ci getaddrinfo_orig = self.loop.getaddrinfo 6797db96d56Sopenharmony_ci 6807db96d56Sopenharmony_ci async def getaddrinfo(host, port, *args, **kwargs): 6817db96d56Sopenharmony_ci if port == port2: 6827db96d56Sopenharmony_ci return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0)), 6837db96d56Sopenharmony_ci (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 0))] 6847db96d56Sopenharmony_ci return await getaddrinfo_orig(host, port, *args, **kwargs) 6857db96d56Sopenharmony_ci 6867db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo 6877db96d56Sopenharmony_ci 6887db96d56Sopenharmony_ci f = self.loop.create_connection( 6897db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), 6907db96d56Sopenharmony_ci 'localhost', port1, local_addr=('localhost', port2)) 6917db96d56Sopenharmony_ci 6927db96d56Sopenharmony_ci with self.assertRaises(OSError): 6937db96d56Sopenharmony_ci self.loop.run_until_complete(f) 6947db96d56Sopenharmony_ci 6957db96d56Sopenharmony_ci def test_create_connection_local_addr_nomatch_family(self): 6967db96d56Sopenharmony_ci # See https://github.com/python/cpython/issues/86508 6977db96d56Sopenharmony_ci port1 = socket_helper.find_unused_port() 6987db96d56Sopenharmony_ci port2 = socket_helper.find_unused_port() 6997db96d56Sopenharmony_ci getaddrinfo_orig = self.loop.getaddrinfo 7007db96d56Sopenharmony_ci 7017db96d56Sopenharmony_ci async def getaddrinfo(host, port, *args, **kwargs): 7027db96d56Sopenharmony_ci if port == port2: 7037db96d56Sopenharmony_ci return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0))] 7047db96d56Sopenharmony_ci return await getaddrinfo_orig(host, port, *args, **kwargs) 7057db96d56Sopenharmony_ci 7067db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo 7077db96d56Sopenharmony_ci 7087db96d56Sopenharmony_ci f = self.loop.create_connection( 7097db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), 7107db96d56Sopenharmony_ci 'localhost', port1, local_addr=('localhost', port2)) 7117db96d56Sopenharmony_ci 7127db96d56Sopenharmony_ci with self.assertRaises(OSError): 7137db96d56Sopenharmony_ci self.loop.run_until_complete(f) 7147db96d56Sopenharmony_ci 7157db96d56Sopenharmony_ci def test_create_connection_local_addr_in_use(self): 7167db96d56Sopenharmony_ci with test_utils.run_test_server() as httpd: 7177db96d56Sopenharmony_ci f = self.loop.create_connection( 7187db96d56Sopenharmony_ci lambda: MyProto(loop=self.loop), 7197db96d56Sopenharmony_ci *httpd.address, local_addr=httpd.address) 7207db96d56Sopenharmony_ci with self.assertRaises(OSError) as cm: 7217db96d56Sopenharmony_ci self.loop.run_until_complete(f) 7227db96d56Sopenharmony_ci self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 7237db96d56Sopenharmony_ci self.assertIn(str(httpd.address), cm.exception.strerror) 7247db96d56Sopenharmony_ci 7257db96d56Sopenharmony_ci def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None): 7267db96d56Sopenharmony_ci loop = self.loop 7277db96d56Sopenharmony_ci 7287db96d56Sopenharmony_ci class MyProto(MyBaseProto): 7297db96d56Sopenharmony_ci 7307db96d56Sopenharmony_ci def connection_lost(self, exc): 7317db96d56Sopenharmony_ci super().connection_lost(exc) 7327db96d56Sopenharmony_ci loop.call_soon(loop.stop) 7337db96d56Sopenharmony_ci 7347db96d56Sopenharmony_ci def data_received(self, data): 7357db96d56Sopenharmony_ci super().data_received(data) 7367db96d56Sopenharmony_ci self.transport.write(expected_response) 7377db96d56Sopenharmony_ci 7387db96d56Sopenharmony_ci lsock = socket.create_server(('127.0.0.1', 0), backlog=1) 7397db96d56Sopenharmony_ci addr = lsock.getsockname() 7407db96d56Sopenharmony_ci 7417db96d56Sopenharmony_ci message = b'test data' 7427db96d56Sopenharmony_ci response = None 7437db96d56Sopenharmony_ci expected_response = b'roger' 7447db96d56Sopenharmony_ci 7457db96d56Sopenharmony_ci def client(): 7467db96d56Sopenharmony_ci nonlocal response 7477db96d56Sopenharmony_ci try: 7487db96d56Sopenharmony_ci csock = socket.socket() 7497db96d56Sopenharmony_ci if client_ssl is not None: 7507db96d56Sopenharmony_ci csock = client_ssl.wrap_socket(csock) 7517db96d56Sopenharmony_ci csock.connect(addr) 7527db96d56Sopenharmony_ci csock.sendall(message) 7537db96d56Sopenharmony_ci response = csock.recv(99) 7547db96d56Sopenharmony_ci csock.close() 7557db96d56Sopenharmony_ci except Exception as exc: 7567db96d56Sopenharmony_ci print( 7577db96d56Sopenharmony_ci "Failure in client thread in test_connect_accepted_socket", 7587db96d56Sopenharmony_ci exc) 7597db96d56Sopenharmony_ci 7607db96d56Sopenharmony_ci thread = threading.Thread(target=client, daemon=True) 7617db96d56Sopenharmony_ci thread.start() 7627db96d56Sopenharmony_ci 7637db96d56Sopenharmony_ci conn, _ = lsock.accept() 7647db96d56Sopenharmony_ci proto = MyProto(loop=loop) 7657db96d56Sopenharmony_ci proto.loop = loop 7667db96d56Sopenharmony_ci loop.run_until_complete( 7677db96d56Sopenharmony_ci loop.connect_accepted_socket( 7687db96d56Sopenharmony_ci (lambda: proto), conn, ssl=server_ssl)) 7697db96d56Sopenharmony_ci loop.run_forever() 7707db96d56Sopenharmony_ci proto.transport.close() 7717db96d56Sopenharmony_ci lsock.close() 7727db96d56Sopenharmony_ci 7737db96d56Sopenharmony_ci threading_helper.join_thread(thread) 7747db96d56Sopenharmony_ci self.assertFalse(thread.is_alive()) 7757db96d56Sopenharmony_ci self.assertEqual(proto.state, 'CLOSED') 7767db96d56Sopenharmony_ci self.assertEqual(proto.nbytes, len(message)) 7777db96d56Sopenharmony_ci self.assertEqual(response, expected_response) 7787db96d56Sopenharmony_ci 7797db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 7807db96d56Sopenharmony_ci def test_ssl_connect_accepted_socket(self): 7817db96d56Sopenharmony_ci server_context = test_utils.simple_server_sslcontext() 7827db96d56Sopenharmony_ci client_context = test_utils.simple_client_sslcontext() 7837db96d56Sopenharmony_ci 7847db96d56Sopenharmony_ci self.test_connect_accepted_socket(server_context, client_context) 7857db96d56Sopenharmony_ci 7867db96d56Sopenharmony_ci def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self): 7877db96d56Sopenharmony_ci sock = socket.socket() 7887db96d56Sopenharmony_ci self.addCleanup(sock.close) 7897db96d56Sopenharmony_ci coro = self.loop.connect_accepted_socket( 7907db96d56Sopenharmony_ci MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT) 7917db96d56Sopenharmony_ci with self.assertRaisesRegex( 7927db96d56Sopenharmony_ci ValueError, 7937db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl'): 7947db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 7957db96d56Sopenharmony_ci 7967db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.socket') 7977db96d56Sopenharmony_ci def create_server_multiple_hosts(self, family, hosts, mock_sock): 7987db96d56Sopenharmony_ci async def getaddrinfo(host, port, *args, **kw): 7997db96d56Sopenharmony_ci if family == socket.AF_INET: 8007db96d56Sopenharmony_ci return [(family, socket.SOCK_STREAM, 6, '', (host, port))] 8017db96d56Sopenharmony_ci else: 8027db96d56Sopenharmony_ci return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))] 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 8057db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 8067db96d56Sopenharmony_ci 8077db96d56Sopenharmony_ci unique_hosts = set(hosts) 8087db96d56Sopenharmony_ci 8097db96d56Sopenharmony_ci if family == socket.AF_INET: 8107db96d56Sopenharmony_ci mock_sock.socket().getsockbyname.side_effect = [ 8117db96d56Sopenharmony_ci (host, 80) for host in unique_hosts] 8127db96d56Sopenharmony_ci else: 8137db96d56Sopenharmony_ci mock_sock.socket().getsockbyname.side_effect = [ 8147db96d56Sopenharmony_ci (host, 80, 0, 0) for host in unique_hosts] 8157db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 8167db96d56Sopenharmony_ci self.loop._start_serving = mock.Mock() 8177db96d56Sopenharmony_ci self.loop._stop_serving = mock.Mock() 8187db96d56Sopenharmony_ci f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80) 8197db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 8207db96d56Sopenharmony_ci self.addCleanup(server.close) 8217db96d56Sopenharmony_ci server_hosts = {sock.getsockbyname()[0] for sock in server.sockets} 8227db96d56Sopenharmony_ci self.assertEqual(server_hosts, unique_hosts) 8237db96d56Sopenharmony_ci 8247db96d56Sopenharmony_ci def test_create_server_multiple_hosts_ipv4(self): 8257db96d56Sopenharmony_ci self.create_server_multiple_hosts(socket.AF_INET, 8267db96d56Sopenharmony_ci ['1.2.3.4', '5.6.7.8', '1.2.3.4']) 8277db96d56Sopenharmony_ci 8287db96d56Sopenharmony_ci def test_create_server_multiple_hosts_ipv6(self): 8297db96d56Sopenharmony_ci self.create_server_multiple_hosts(socket.AF_INET6, 8307db96d56Sopenharmony_ci ['::1', '::2', '::1']) 8317db96d56Sopenharmony_ci 8327db96d56Sopenharmony_ci def test_create_server(self): 8337db96d56Sopenharmony_ci proto = MyProto(self.loop) 8347db96d56Sopenharmony_ci f = self.loop.create_server(lambda: proto, '0.0.0.0', 0) 8357db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 8367db96d56Sopenharmony_ci self.assertEqual(len(server.sockets), 1) 8377db96d56Sopenharmony_ci sock = server.sockets[0] 8387db96d56Sopenharmony_ci host, port = sock.getsockname() 8397db96d56Sopenharmony_ci self.assertEqual(host, '0.0.0.0') 8407db96d56Sopenharmony_ci client = socket.socket() 8417db96d56Sopenharmony_ci client.connect(('127.0.0.1', port)) 8427db96d56Sopenharmony_ci client.sendall(b'xxx') 8437db96d56Sopenharmony_ci 8447db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 8457db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 8467db96d56Sopenharmony_ci 8477db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 8487db96d56Sopenharmony_ci self.assertEqual(3, proto.nbytes) 8497db96d56Sopenharmony_ci 8507db96d56Sopenharmony_ci # extra info is available 8517db96d56Sopenharmony_ci self.assertIsNotNone(proto.transport.get_extra_info('sockname')) 8527db96d56Sopenharmony_ci self.assertEqual('127.0.0.1', 8537db96d56Sopenharmony_ci proto.transport.get_extra_info('peername')[0]) 8547db96d56Sopenharmony_ci 8557db96d56Sopenharmony_ci # close connection 8567db96d56Sopenharmony_ci proto.transport.close() 8577db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 8587db96d56Sopenharmony_ci 8597db96d56Sopenharmony_ci self.assertEqual('CLOSED', proto.state) 8607db96d56Sopenharmony_ci 8617db96d56Sopenharmony_ci # the client socket must be closed after to avoid ECONNRESET upon 8627db96d56Sopenharmony_ci # recv()/send() on the serving socket 8637db96d56Sopenharmony_ci client.close() 8647db96d56Sopenharmony_ci 8657db96d56Sopenharmony_ci # close server 8667db96d56Sopenharmony_ci server.close() 8677db96d56Sopenharmony_ci 8687db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT') 8697db96d56Sopenharmony_ci def test_create_server_reuse_port(self): 8707db96d56Sopenharmony_ci proto = MyProto(self.loop) 8717db96d56Sopenharmony_ci f = self.loop.create_server( 8727db96d56Sopenharmony_ci lambda: proto, '0.0.0.0', 0) 8737db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 8747db96d56Sopenharmony_ci self.assertEqual(len(server.sockets), 1) 8757db96d56Sopenharmony_ci sock = server.sockets[0] 8767db96d56Sopenharmony_ci self.assertFalse( 8777db96d56Sopenharmony_ci sock.getsockopt( 8787db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_REUSEPORT)) 8797db96d56Sopenharmony_ci server.close() 8807db96d56Sopenharmony_ci 8817db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8827db96d56Sopenharmony_ci 8837db96d56Sopenharmony_ci proto = MyProto(self.loop) 8847db96d56Sopenharmony_ci f = self.loop.create_server( 8857db96d56Sopenharmony_ci lambda: proto, '0.0.0.0', 0, reuse_port=True) 8867db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 8877db96d56Sopenharmony_ci self.assertEqual(len(server.sockets), 1) 8887db96d56Sopenharmony_ci sock = server.sockets[0] 8897db96d56Sopenharmony_ci self.assertTrue( 8907db96d56Sopenharmony_ci sock.getsockopt( 8917db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_REUSEPORT)) 8927db96d56Sopenharmony_ci server.close() 8937db96d56Sopenharmony_ci 8947db96d56Sopenharmony_ci def _make_unix_server(self, factory, **kwargs): 8957db96d56Sopenharmony_ci path = test_utils.gen_unix_socket_path() 8967db96d56Sopenharmony_ci self.addCleanup(lambda: os.path.exists(path) and os.unlink(path)) 8977db96d56Sopenharmony_ci 8987db96d56Sopenharmony_ci f = self.loop.create_unix_server(factory, path, **kwargs) 8997db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 9007db96d56Sopenharmony_ci 9017db96d56Sopenharmony_ci return server, path 9027db96d56Sopenharmony_ci 9037db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 9047db96d56Sopenharmony_ci def test_create_unix_server(self): 9057db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 9067db96d56Sopenharmony_ci server, path = self._make_unix_server(lambda: proto) 9077db96d56Sopenharmony_ci self.assertEqual(len(server.sockets), 1) 9087db96d56Sopenharmony_ci 9097db96d56Sopenharmony_ci client = socket.socket(socket.AF_UNIX) 9107db96d56Sopenharmony_ci client.connect(path) 9117db96d56Sopenharmony_ci client.sendall(b'xxx') 9127db96d56Sopenharmony_ci 9137db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 9147db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 9157db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 9167db96d56Sopenharmony_ci self.assertEqual(3, proto.nbytes) 9177db96d56Sopenharmony_ci 9187db96d56Sopenharmony_ci # close connection 9197db96d56Sopenharmony_ci proto.transport.close() 9207db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 9217db96d56Sopenharmony_ci 9227db96d56Sopenharmony_ci self.assertEqual('CLOSED', proto.state) 9237db96d56Sopenharmony_ci 9247db96d56Sopenharmony_ci # the client socket must be closed after to avoid ECONNRESET upon 9257db96d56Sopenharmony_ci # recv()/send() on the serving socket 9267db96d56Sopenharmony_ci client.close() 9277db96d56Sopenharmony_ci 9287db96d56Sopenharmony_ci # close server 9297db96d56Sopenharmony_ci server.close() 9307db96d56Sopenharmony_ci 9317db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 9327db96d56Sopenharmony_ci def test_create_unix_server_path_socket_error(self): 9337db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 9347db96d56Sopenharmony_ci sock = socket.socket() 9357db96d56Sopenharmony_ci with sock: 9367db96d56Sopenharmony_ci f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock) 9377db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 9387db96d56Sopenharmony_ci 'path and sock can not be specified ' 9397db96d56Sopenharmony_ci 'at the same time'): 9407db96d56Sopenharmony_ci self.loop.run_until_complete(f) 9417db96d56Sopenharmony_ci 9427db96d56Sopenharmony_ci def _create_ssl_context(self, certfile, keyfile=None): 9437db96d56Sopenharmony_ci sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) 9447db96d56Sopenharmony_ci sslcontext.options |= ssl.OP_NO_SSLv2 9457db96d56Sopenharmony_ci sslcontext.load_cert_chain(certfile, keyfile) 9467db96d56Sopenharmony_ci return sslcontext 9477db96d56Sopenharmony_ci 9487db96d56Sopenharmony_ci def _make_ssl_server(self, factory, certfile, keyfile=None): 9497db96d56Sopenharmony_ci sslcontext = self._create_ssl_context(certfile, keyfile) 9507db96d56Sopenharmony_ci 9517db96d56Sopenharmony_ci f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext) 9527db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 9537db96d56Sopenharmony_ci 9547db96d56Sopenharmony_ci sock = server.sockets[0] 9557db96d56Sopenharmony_ci host, port = sock.getsockname() 9567db96d56Sopenharmony_ci self.assertEqual(host, '127.0.0.1') 9577db96d56Sopenharmony_ci return server, host, port 9587db96d56Sopenharmony_ci 9597db96d56Sopenharmony_ci def _make_ssl_unix_server(self, factory, certfile, keyfile=None): 9607db96d56Sopenharmony_ci sslcontext = self._create_ssl_context(certfile, keyfile) 9617db96d56Sopenharmony_ci return self._make_unix_server(factory, ssl=sslcontext) 9627db96d56Sopenharmony_ci 9637db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 9647db96d56Sopenharmony_ci def test_create_server_ssl(self): 9657db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 9667db96d56Sopenharmony_ci server, host, port = self._make_ssl_server( 9677db96d56Sopenharmony_ci lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 9687db96d56Sopenharmony_ci 9697db96d56Sopenharmony_ci f_c = self.loop.create_connection(MyBaseProto, host, port, 9707db96d56Sopenharmony_ci ssl=test_utils.dummy_ssl_context()) 9717db96d56Sopenharmony_ci client, pr = self.loop.run_until_complete(f_c) 9727db96d56Sopenharmony_ci 9737db96d56Sopenharmony_ci client.write(b'xxx') 9747db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 9757db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 9767db96d56Sopenharmony_ci 9777db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 9787db96d56Sopenharmony_ci self.assertEqual(3, proto.nbytes) 9797db96d56Sopenharmony_ci 9807db96d56Sopenharmony_ci # extra info is available 9817db96d56Sopenharmony_ci self.check_ssl_extra_info(client, peername=(host, port)) 9827db96d56Sopenharmony_ci 9837db96d56Sopenharmony_ci # close connection 9847db96d56Sopenharmony_ci proto.transport.close() 9857db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 9867db96d56Sopenharmony_ci self.assertEqual('CLOSED', proto.state) 9877db96d56Sopenharmony_ci 9887db96d56Sopenharmony_ci # the client socket must be closed after to avoid ECONNRESET upon 9897db96d56Sopenharmony_ci # recv()/send() on the serving socket 9907db96d56Sopenharmony_ci client.close() 9917db96d56Sopenharmony_ci 9927db96d56Sopenharmony_ci # stop serving 9937db96d56Sopenharmony_ci server.close() 9947db96d56Sopenharmony_ci 9957db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 9967db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 9977db96d56Sopenharmony_ci def test_create_unix_server_ssl(self): 9987db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 9997db96d56Sopenharmony_ci server, path = self._make_ssl_unix_server( 10007db96d56Sopenharmony_ci lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY) 10017db96d56Sopenharmony_ci 10027db96d56Sopenharmony_ci f_c = self.loop.create_unix_connection( 10037db96d56Sopenharmony_ci MyBaseProto, path, ssl=test_utils.dummy_ssl_context(), 10047db96d56Sopenharmony_ci server_hostname='') 10057db96d56Sopenharmony_ci 10067db96d56Sopenharmony_ci client, pr = self.loop.run_until_complete(f_c) 10077db96d56Sopenharmony_ci 10087db96d56Sopenharmony_ci client.write(b'xxx') 10097db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 10107db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 10117db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes > 0) 10127db96d56Sopenharmony_ci self.assertEqual(3, proto.nbytes) 10137db96d56Sopenharmony_ci 10147db96d56Sopenharmony_ci # close connection 10157db96d56Sopenharmony_ci proto.transport.close() 10167db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 10177db96d56Sopenharmony_ci self.assertEqual('CLOSED', proto.state) 10187db96d56Sopenharmony_ci 10197db96d56Sopenharmony_ci # the client socket must be closed after to avoid ECONNRESET upon 10207db96d56Sopenharmony_ci # recv()/send() on the serving socket 10217db96d56Sopenharmony_ci client.close() 10227db96d56Sopenharmony_ci 10237db96d56Sopenharmony_ci # stop serving 10247db96d56Sopenharmony_ci server.close() 10257db96d56Sopenharmony_ci 10267db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 10277db96d56Sopenharmony_ci def test_create_server_ssl_verify_failed(self): 10287db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 10297db96d56Sopenharmony_ci server, host, port = self._make_ssl_server( 10307db96d56Sopenharmony_ci lambda: proto, test_utils.SIGNED_CERTFILE) 10317db96d56Sopenharmony_ci 10327db96d56Sopenharmony_ci sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 10337db96d56Sopenharmony_ci sslcontext_client.options |= ssl.OP_NO_SSLv2 10347db96d56Sopenharmony_ci sslcontext_client.verify_mode = ssl.CERT_REQUIRED 10357db96d56Sopenharmony_ci if hasattr(sslcontext_client, 'check_hostname'): 10367db96d56Sopenharmony_ci sslcontext_client.check_hostname = True 10377db96d56Sopenharmony_ci 10387db96d56Sopenharmony_ci 10397db96d56Sopenharmony_ci # no CA loaded 10407db96d56Sopenharmony_ci f_c = self.loop.create_connection(MyProto, host, port, 10417db96d56Sopenharmony_ci ssl=sslcontext_client) 10427db96d56Sopenharmony_ci with mock.patch.object(self.loop, 'call_exception_handler'): 10437db96d56Sopenharmony_ci with test_utils.disable_logger(): 10447db96d56Sopenharmony_ci with self.assertRaisesRegex(ssl.SSLError, 10457db96d56Sopenharmony_ci '(?i)certificate.verify.failed'): 10467db96d56Sopenharmony_ci self.loop.run_until_complete(f_c) 10477db96d56Sopenharmony_ci 10487db96d56Sopenharmony_ci # execute the loop to log the connection error 10497db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 10507db96d56Sopenharmony_ci 10517db96d56Sopenharmony_ci # close connection 10527db96d56Sopenharmony_ci self.assertIsNone(proto.transport) 10537db96d56Sopenharmony_ci server.close() 10547db96d56Sopenharmony_ci 10557db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 10567db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 10577db96d56Sopenharmony_ci def test_create_unix_server_ssl_verify_failed(self): 10587db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 10597db96d56Sopenharmony_ci server, path = self._make_ssl_unix_server( 10607db96d56Sopenharmony_ci lambda: proto, test_utils.SIGNED_CERTFILE) 10617db96d56Sopenharmony_ci 10627db96d56Sopenharmony_ci sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 10637db96d56Sopenharmony_ci sslcontext_client.options |= ssl.OP_NO_SSLv2 10647db96d56Sopenharmony_ci sslcontext_client.verify_mode = ssl.CERT_REQUIRED 10657db96d56Sopenharmony_ci if hasattr(sslcontext_client, 'check_hostname'): 10667db96d56Sopenharmony_ci sslcontext_client.check_hostname = True 10677db96d56Sopenharmony_ci 10687db96d56Sopenharmony_ci # no CA loaded 10697db96d56Sopenharmony_ci f_c = self.loop.create_unix_connection(MyProto, path, 10707db96d56Sopenharmony_ci ssl=sslcontext_client, 10717db96d56Sopenharmony_ci server_hostname='invalid') 10727db96d56Sopenharmony_ci with mock.patch.object(self.loop, 'call_exception_handler'): 10737db96d56Sopenharmony_ci with test_utils.disable_logger(): 10747db96d56Sopenharmony_ci with self.assertRaisesRegex(ssl.SSLError, 10757db96d56Sopenharmony_ci '(?i)certificate.verify.failed'): 10767db96d56Sopenharmony_ci self.loop.run_until_complete(f_c) 10777db96d56Sopenharmony_ci 10787db96d56Sopenharmony_ci # execute the loop to log the connection error 10797db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 10807db96d56Sopenharmony_ci 10817db96d56Sopenharmony_ci # close connection 10827db96d56Sopenharmony_ci self.assertIsNone(proto.transport) 10837db96d56Sopenharmony_ci server.close() 10847db96d56Sopenharmony_ci 10857db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 10867db96d56Sopenharmony_ci def test_create_server_ssl_match_failed(self): 10877db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 10887db96d56Sopenharmony_ci server, host, port = self._make_ssl_server( 10897db96d56Sopenharmony_ci lambda: proto, test_utils.SIGNED_CERTFILE) 10907db96d56Sopenharmony_ci 10917db96d56Sopenharmony_ci sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 10927db96d56Sopenharmony_ci sslcontext_client.options |= ssl.OP_NO_SSLv2 10937db96d56Sopenharmony_ci sslcontext_client.verify_mode = ssl.CERT_REQUIRED 10947db96d56Sopenharmony_ci sslcontext_client.load_verify_locations( 10957db96d56Sopenharmony_ci cafile=test_utils.SIGNING_CA) 10967db96d56Sopenharmony_ci if hasattr(sslcontext_client, 'check_hostname'): 10977db96d56Sopenharmony_ci sslcontext_client.check_hostname = True 10987db96d56Sopenharmony_ci 10997db96d56Sopenharmony_ci # incorrect server_hostname 11007db96d56Sopenharmony_ci f_c = self.loop.create_connection(MyProto, host, port, 11017db96d56Sopenharmony_ci ssl=sslcontext_client) 11027db96d56Sopenharmony_ci with mock.patch.object(self.loop, 'call_exception_handler'): 11037db96d56Sopenharmony_ci with test_utils.disable_logger(): 11047db96d56Sopenharmony_ci with self.assertRaisesRegex( 11057db96d56Sopenharmony_ci ssl.CertificateError, 11067db96d56Sopenharmony_ci "IP address mismatch, certificate is not valid for " 11077db96d56Sopenharmony_ci "'127.0.0.1'"): 11087db96d56Sopenharmony_ci self.loop.run_until_complete(f_c) 11097db96d56Sopenharmony_ci 11107db96d56Sopenharmony_ci # close connection 11117db96d56Sopenharmony_ci # transport is None because TLS ALERT aborted the handshake 11127db96d56Sopenharmony_ci self.assertIsNone(proto.transport) 11137db96d56Sopenharmony_ci server.close() 11147db96d56Sopenharmony_ci 11157db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 11167db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 11177db96d56Sopenharmony_ci def test_create_unix_server_ssl_verified(self): 11187db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 11197db96d56Sopenharmony_ci server, path = self._make_ssl_unix_server( 11207db96d56Sopenharmony_ci lambda: proto, test_utils.SIGNED_CERTFILE) 11217db96d56Sopenharmony_ci 11227db96d56Sopenharmony_ci sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 11237db96d56Sopenharmony_ci sslcontext_client.options |= ssl.OP_NO_SSLv2 11247db96d56Sopenharmony_ci sslcontext_client.verify_mode = ssl.CERT_REQUIRED 11257db96d56Sopenharmony_ci sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 11267db96d56Sopenharmony_ci if hasattr(sslcontext_client, 'check_hostname'): 11277db96d56Sopenharmony_ci sslcontext_client.check_hostname = True 11287db96d56Sopenharmony_ci 11297db96d56Sopenharmony_ci # Connection succeeds with correct CA and server hostname. 11307db96d56Sopenharmony_ci f_c = self.loop.create_unix_connection(MyProto, path, 11317db96d56Sopenharmony_ci ssl=sslcontext_client, 11327db96d56Sopenharmony_ci server_hostname='localhost') 11337db96d56Sopenharmony_ci client, pr = self.loop.run_until_complete(f_c) 11347db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 11357db96d56Sopenharmony_ci 11367db96d56Sopenharmony_ci # close connection 11377db96d56Sopenharmony_ci proto.transport.close() 11387db96d56Sopenharmony_ci client.close() 11397db96d56Sopenharmony_ci server.close() 11407db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 11417db96d56Sopenharmony_ci 11427db96d56Sopenharmony_ci @unittest.skipIf(ssl is None, 'No ssl module') 11437db96d56Sopenharmony_ci def test_create_server_ssl_verified(self): 11447db96d56Sopenharmony_ci proto = MyProto(loop=self.loop) 11457db96d56Sopenharmony_ci server, host, port = self._make_ssl_server( 11467db96d56Sopenharmony_ci lambda: proto, test_utils.SIGNED_CERTFILE) 11477db96d56Sopenharmony_ci 11487db96d56Sopenharmony_ci sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 11497db96d56Sopenharmony_ci sslcontext_client.options |= ssl.OP_NO_SSLv2 11507db96d56Sopenharmony_ci sslcontext_client.verify_mode = ssl.CERT_REQUIRED 11517db96d56Sopenharmony_ci sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA) 11527db96d56Sopenharmony_ci if hasattr(sslcontext_client, 'check_hostname'): 11537db96d56Sopenharmony_ci sslcontext_client.check_hostname = True 11547db96d56Sopenharmony_ci 11557db96d56Sopenharmony_ci # Connection succeeds with correct CA and server hostname. 11567db96d56Sopenharmony_ci f_c = self.loop.create_connection(MyProto, host, port, 11577db96d56Sopenharmony_ci ssl=sslcontext_client, 11587db96d56Sopenharmony_ci server_hostname='localhost') 11597db96d56Sopenharmony_ci client, pr = self.loop.run_until_complete(f_c) 11607db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 11617db96d56Sopenharmony_ci 11627db96d56Sopenharmony_ci # extra info is available 11637db96d56Sopenharmony_ci self.check_ssl_extra_info(client, peername=(host, port), 11647db96d56Sopenharmony_ci peercert=test_utils.PEERCERT) 11657db96d56Sopenharmony_ci 11667db96d56Sopenharmony_ci # close connection 11677db96d56Sopenharmony_ci proto.transport.close() 11687db96d56Sopenharmony_ci client.close() 11697db96d56Sopenharmony_ci server.close() 11707db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 11717db96d56Sopenharmony_ci 11727db96d56Sopenharmony_ci def test_create_server_sock(self): 11737db96d56Sopenharmony_ci proto = self.loop.create_future() 11747db96d56Sopenharmony_ci 11757db96d56Sopenharmony_ci class TestMyProto(MyProto): 11767db96d56Sopenharmony_ci def connection_made(self, transport): 11777db96d56Sopenharmony_ci super().connection_made(transport) 11787db96d56Sopenharmony_ci proto.set_result(self) 11797db96d56Sopenharmony_ci 11807db96d56Sopenharmony_ci sock_ob = socket.create_server(('0.0.0.0', 0)) 11817db96d56Sopenharmony_ci 11827db96d56Sopenharmony_ci f = self.loop.create_server(TestMyProto, sock=sock_ob) 11837db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 11847db96d56Sopenharmony_ci sock = server.sockets[0] 11857db96d56Sopenharmony_ci self.assertEqual(sock.fileno(), sock_ob.fileno()) 11867db96d56Sopenharmony_ci 11877db96d56Sopenharmony_ci host, port = sock.getsockname() 11887db96d56Sopenharmony_ci self.assertEqual(host, '0.0.0.0') 11897db96d56Sopenharmony_ci client = socket.socket() 11907db96d56Sopenharmony_ci client.connect(('127.0.0.1', port)) 11917db96d56Sopenharmony_ci client.send(b'xxx') 11927db96d56Sopenharmony_ci client.close() 11937db96d56Sopenharmony_ci server.close() 11947db96d56Sopenharmony_ci 11957db96d56Sopenharmony_ci def test_create_server_addr_in_use(self): 11967db96d56Sopenharmony_ci sock_ob = socket.create_server(('0.0.0.0', 0)) 11977db96d56Sopenharmony_ci 11987db96d56Sopenharmony_ci f = self.loop.create_server(MyProto, sock=sock_ob) 11997db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 12007db96d56Sopenharmony_ci sock = server.sockets[0] 12017db96d56Sopenharmony_ci host, port = sock.getsockname() 12027db96d56Sopenharmony_ci 12037db96d56Sopenharmony_ci f = self.loop.create_server(MyProto, host=host, port=port) 12047db96d56Sopenharmony_ci with self.assertRaises(OSError) as cm: 12057db96d56Sopenharmony_ci self.loop.run_until_complete(f) 12067db96d56Sopenharmony_ci self.assertEqual(cm.exception.errno, errno.EADDRINUSE) 12077db96d56Sopenharmony_ci 12087db96d56Sopenharmony_ci server.close() 12097db96d56Sopenharmony_ci 12107db96d56Sopenharmony_ci @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 12117db96d56Sopenharmony_ci def test_create_server_dual_stack(self): 12127db96d56Sopenharmony_ci f_proto = self.loop.create_future() 12137db96d56Sopenharmony_ci 12147db96d56Sopenharmony_ci class TestMyProto(MyProto): 12157db96d56Sopenharmony_ci def connection_made(self, transport): 12167db96d56Sopenharmony_ci super().connection_made(transport) 12177db96d56Sopenharmony_ci f_proto.set_result(self) 12187db96d56Sopenharmony_ci 12197db96d56Sopenharmony_ci try_count = 0 12207db96d56Sopenharmony_ci while True: 12217db96d56Sopenharmony_ci try: 12227db96d56Sopenharmony_ci port = socket_helper.find_unused_port() 12237db96d56Sopenharmony_ci f = self.loop.create_server(TestMyProto, host=None, port=port) 12247db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 12257db96d56Sopenharmony_ci except OSError as ex: 12267db96d56Sopenharmony_ci if ex.errno == errno.EADDRINUSE: 12277db96d56Sopenharmony_ci try_count += 1 12287db96d56Sopenharmony_ci self.assertGreaterEqual(5, try_count) 12297db96d56Sopenharmony_ci continue 12307db96d56Sopenharmony_ci else: 12317db96d56Sopenharmony_ci raise 12327db96d56Sopenharmony_ci else: 12337db96d56Sopenharmony_ci break 12347db96d56Sopenharmony_ci client = socket.socket() 12357db96d56Sopenharmony_ci client.connect(('127.0.0.1', port)) 12367db96d56Sopenharmony_ci client.send(b'xxx') 12377db96d56Sopenharmony_ci proto = self.loop.run_until_complete(f_proto) 12387db96d56Sopenharmony_ci proto.transport.close() 12397db96d56Sopenharmony_ci client.close() 12407db96d56Sopenharmony_ci 12417db96d56Sopenharmony_ci f_proto = self.loop.create_future() 12427db96d56Sopenharmony_ci client = socket.socket(socket.AF_INET6) 12437db96d56Sopenharmony_ci client.connect(('::1', port)) 12447db96d56Sopenharmony_ci client.send(b'xxx') 12457db96d56Sopenharmony_ci proto = self.loop.run_until_complete(f_proto) 12467db96d56Sopenharmony_ci proto.transport.close() 12477db96d56Sopenharmony_ci client.close() 12487db96d56Sopenharmony_ci 12497db96d56Sopenharmony_ci server.close() 12507db96d56Sopenharmony_ci 12517db96d56Sopenharmony_ci def test_server_close(self): 12527db96d56Sopenharmony_ci f = self.loop.create_server(MyProto, '0.0.0.0', 0) 12537db96d56Sopenharmony_ci server = self.loop.run_until_complete(f) 12547db96d56Sopenharmony_ci sock = server.sockets[0] 12557db96d56Sopenharmony_ci host, port = sock.getsockname() 12567db96d56Sopenharmony_ci 12577db96d56Sopenharmony_ci client = socket.socket() 12587db96d56Sopenharmony_ci client.connect(('127.0.0.1', port)) 12597db96d56Sopenharmony_ci client.send(b'xxx') 12607db96d56Sopenharmony_ci client.close() 12617db96d56Sopenharmony_ci 12627db96d56Sopenharmony_ci server.close() 12637db96d56Sopenharmony_ci 12647db96d56Sopenharmony_ci client = socket.socket() 12657db96d56Sopenharmony_ci self.assertRaises( 12667db96d56Sopenharmony_ci ConnectionRefusedError, client.connect, ('127.0.0.1', port)) 12677db96d56Sopenharmony_ci client.close() 12687db96d56Sopenharmony_ci 12697db96d56Sopenharmony_ci def _test_create_datagram_endpoint(self, local_addr, family): 12707db96d56Sopenharmony_ci class TestMyDatagramProto(MyDatagramProto): 12717db96d56Sopenharmony_ci def __init__(inner_self): 12727db96d56Sopenharmony_ci super().__init__(loop=self.loop) 12737db96d56Sopenharmony_ci 12747db96d56Sopenharmony_ci def datagram_received(self, data, addr): 12757db96d56Sopenharmony_ci super().datagram_received(data, addr) 12767db96d56Sopenharmony_ci self.transport.sendto(b'resp:'+data, addr) 12777db96d56Sopenharmony_ci 12787db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 12797db96d56Sopenharmony_ci TestMyDatagramProto, local_addr=local_addr, family=family) 12807db96d56Sopenharmony_ci s_transport, server = self.loop.run_until_complete(coro) 12817db96d56Sopenharmony_ci sockname = s_transport.get_extra_info('sockname') 12827db96d56Sopenharmony_ci host, port = socket.getnameinfo( 12837db96d56Sopenharmony_ci sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV) 12847db96d56Sopenharmony_ci 12857db96d56Sopenharmony_ci self.assertIsInstance(s_transport, asyncio.Transport) 12867db96d56Sopenharmony_ci self.assertIsInstance(server, TestMyDatagramProto) 12877db96d56Sopenharmony_ci self.assertEqual('INITIALIZED', server.state) 12887db96d56Sopenharmony_ci self.assertIs(server.transport, s_transport) 12897db96d56Sopenharmony_ci 12907db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 12917db96d56Sopenharmony_ci lambda: MyDatagramProto(loop=self.loop), 12927db96d56Sopenharmony_ci remote_addr=(host, port)) 12937db96d56Sopenharmony_ci transport, client = self.loop.run_until_complete(coro) 12947db96d56Sopenharmony_ci 12957db96d56Sopenharmony_ci self.assertIsInstance(transport, asyncio.Transport) 12967db96d56Sopenharmony_ci self.assertIsInstance(client, MyDatagramProto) 12977db96d56Sopenharmony_ci self.assertEqual('INITIALIZED', client.state) 12987db96d56Sopenharmony_ci self.assertIs(client.transport, transport) 12997db96d56Sopenharmony_ci 13007db96d56Sopenharmony_ci transport.sendto(b'xxx') 13017db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: server.nbytes) 13027db96d56Sopenharmony_ci self.assertEqual(3, server.nbytes) 13037db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: client.nbytes) 13047db96d56Sopenharmony_ci 13057db96d56Sopenharmony_ci # received 13067db96d56Sopenharmony_ci self.assertEqual(8, client.nbytes) 13077db96d56Sopenharmony_ci 13087db96d56Sopenharmony_ci # extra info is available 13097db96d56Sopenharmony_ci self.assertIsNotNone(transport.get_extra_info('sockname')) 13107db96d56Sopenharmony_ci 13117db96d56Sopenharmony_ci # close connection 13127db96d56Sopenharmony_ci transport.close() 13137db96d56Sopenharmony_ci self.loop.run_until_complete(client.done) 13147db96d56Sopenharmony_ci self.assertEqual('CLOSED', client.state) 13157db96d56Sopenharmony_ci server.transport.close() 13167db96d56Sopenharmony_ci 13177db96d56Sopenharmony_ci def test_create_datagram_endpoint(self): 13187db96d56Sopenharmony_ci self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET) 13197db96d56Sopenharmony_ci 13207db96d56Sopenharmony_ci @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 13217db96d56Sopenharmony_ci def test_create_datagram_endpoint_ipv6(self): 13227db96d56Sopenharmony_ci self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6) 13237db96d56Sopenharmony_ci 13247db96d56Sopenharmony_ci def test_create_datagram_endpoint_sock(self): 13257db96d56Sopenharmony_ci sock = None 13267db96d56Sopenharmony_ci local_address = ('127.0.0.1', 0) 13277db96d56Sopenharmony_ci infos = self.loop.run_until_complete( 13287db96d56Sopenharmony_ci self.loop.getaddrinfo( 13297db96d56Sopenharmony_ci *local_address, type=socket.SOCK_DGRAM)) 13307db96d56Sopenharmony_ci for family, type, proto, cname, address in infos: 13317db96d56Sopenharmony_ci try: 13327db96d56Sopenharmony_ci sock = socket.socket(family=family, type=type, proto=proto) 13337db96d56Sopenharmony_ci sock.setblocking(False) 13347db96d56Sopenharmony_ci sock.bind(address) 13357db96d56Sopenharmony_ci except: 13367db96d56Sopenharmony_ci pass 13377db96d56Sopenharmony_ci else: 13387db96d56Sopenharmony_ci break 13397db96d56Sopenharmony_ci else: 13407db96d56Sopenharmony_ci self.fail('Can not create socket.') 13417db96d56Sopenharmony_ci 13427db96d56Sopenharmony_ci f = self.loop.create_datagram_endpoint( 13437db96d56Sopenharmony_ci lambda: MyDatagramProto(loop=self.loop), sock=sock) 13447db96d56Sopenharmony_ci tr, pr = self.loop.run_until_complete(f) 13457db96d56Sopenharmony_ci self.assertIsInstance(tr, asyncio.Transport) 13467db96d56Sopenharmony_ci self.assertIsInstance(pr, MyDatagramProto) 13477db96d56Sopenharmony_ci tr.close() 13487db96d56Sopenharmony_ci self.loop.run_until_complete(pr.done) 13497db96d56Sopenharmony_ci 13507db96d56Sopenharmony_ci def test_internal_fds(self): 13517db96d56Sopenharmony_ci loop = self.create_event_loop() 13527db96d56Sopenharmony_ci if not isinstance(loop, selector_events.BaseSelectorEventLoop): 13537db96d56Sopenharmony_ci loop.close() 13547db96d56Sopenharmony_ci self.skipTest('loop is not a BaseSelectorEventLoop') 13557db96d56Sopenharmony_ci 13567db96d56Sopenharmony_ci self.assertEqual(1, loop._internal_fds) 13577db96d56Sopenharmony_ci loop.close() 13587db96d56Sopenharmony_ci self.assertEqual(0, loop._internal_fds) 13597db96d56Sopenharmony_ci self.assertIsNone(loop._csock) 13607db96d56Sopenharmony_ci self.assertIsNone(loop._ssock) 13617db96d56Sopenharmony_ci 13627db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform != 'win32', 13637db96d56Sopenharmony_ci "Don't support pipes for Windows") 13647db96d56Sopenharmony_ci def test_read_pipe(self): 13657db96d56Sopenharmony_ci proto = MyReadPipeProto(loop=self.loop) 13667db96d56Sopenharmony_ci 13677db96d56Sopenharmony_ci rpipe, wpipe = os.pipe() 13687db96d56Sopenharmony_ci pipeobj = io.open(rpipe, 'rb', 1024) 13697db96d56Sopenharmony_ci 13707db96d56Sopenharmony_ci async def connect(): 13717db96d56Sopenharmony_ci t, p = await self.loop.connect_read_pipe( 13727db96d56Sopenharmony_ci lambda: proto, pipeobj) 13737db96d56Sopenharmony_ci self.assertIs(p, proto) 13747db96d56Sopenharmony_ci self.assertIs(t, proto.transport) 13757db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 13767db96d56Sopenharmony_ci self.assertEqual(0, proto.nbytes) 13777db96d56Sopenharmony_ci 13787db96d56Sopenharmony_ci self.loop.run_until_complete(connect()) 13797db96d56Sopenharmony_ci 13807db96d56Sopenharmony_ci os.write(wpipe, b'1') 13817db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes >= 1) 13827db96d56Sopenharmony_ci self.assertEqual(1, proto.nbytes) 13837db96d56Sopenharmony_ci 13847db96d56Sopenharmony_ci os.write(wpipe, b'2345') 13857db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 13867db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 13877db96d56Sopenharmony_ci self.assertEqual(5, proto.nbytes) 13887db96d56Sopenharmony_ci 13897db96d56Sopenharmony_ci os.close(wpipe) 13907db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 13917db96d56Sopenharmony_ci self.assertEqual( 13927db96d56Sopenharmony_ci ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 13937db96d56Sopenharmony_ci # extra info is available 13947db96d56Sopenharmony_ci self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 13957db96d56Sopenharmony_ci 13967db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform != 'win32', 13977db96d56Sopenharmony_ci "Don't support pipes for Windows") 13987db96d56Sopenharmony_ci def test_unclosed_pipe_transport(self): 13997db96d56Sopenharmony_ci # This test reproduces the issue #314 on GitHub 14007db96d56Sopenharmony_ci loop = self.create_event_loop() 14017db96d56Sopenharmony_ci read_proto = MyReadPipeProto(loop=loop) 14027db96d56Sopenharmony_ci write_proto = MyWritePipeProto(loop=loop) 14037db96d56Sopenharmony_ci 14047db96d56Sopenharmony_ci rpipe, wpipe = os.pipe() 14057db96d56Sopenharmony_ci rpipeobj = io.open(rpipe, 'rb', 1024) 14067db96d56Sopenharmony_ci wpipeobj = io.open(wpipe, 'w', 1024, encoding="utf-8") 14077db96d56Sopenharmony_ci 14087db96d56Sopenharmony_ci async def connect(): 14097db96d56Sopenharmony_ci read_transport, _ = await loop.connect_read_pipe( 14107db96d56Sopenharmony_ci lambda: read_proto, rpipeobj) 14117db96d56Sopenharmony_ci write_transport, _ = await loop.connect_write_pipe( 14127db96d56Sopenharmony_ci lambda: write_proto, wpipeobj) 14137db96d56Sopenharmony_ci return read_transport, write_transport 14147db96d56Sopenharmony_ci 14157db96d56Sopenharmony_ci # Run and close the loop without closing the transports 14167db96d56Sopenharmony_ci read_transport, write_transport = loop.run_until_complete(connect()) 14177db96d56Sopenharmony_ci loop.close() 14187db96d56Sopenharmony_ci 14197db96d56Sopenharmony_ci # These 'repr' calls used to raise an AttributeError 14207db96d56Sopenharmony_ci # See Issue #314 on GitHub 14217db96d56Sopenharmony_ci self.assertIn('open', repr(read_transport)) 14227db96d56Sopenharmony_ci self.assertIn('open', repr(write_transport)) 14237db96d56Sopenharmony_ci 14247db96d56Sopenharmony_ci # Clean up (avoid ResourceWarning) 14257db96d56Sopenharmony_ci rpipeobj.close() 14267db96d56Sopenharmony_ci wpipeobj.close() 14277db96d56Sopenharmony_ci read_transport._pipe = None 14287db96d56Sopenharmony_ci write_transport._pipe = None 14297db96d56Sopenharmony_ci 14307db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform != 'win32', 14317db96d56Sopenharmony_ci "Don't support pipes for Windows") 14327db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()') 14337db96d56Sopenharmony_ci def test_read_pty_output(self): 14347db96d56Sopenharmony_ci proto = MyReadPipeProto(loop=self.loop) 14357db96d56Sopenharmony_ci 14367db96d56Sopenharmony_ci master, slave = os.openpty() 14377db96d56Sopenharmony_ci master_read_obj = io.open(master, 'rb', 0) 14387db96d56Sopenharmony_ci 14397db96d56Sopenharmony_ci async def connect(): 14407db96d56Sopenharmony_ci t, p = await self.loop.connect_read_pipe(lambda: proto, 14417db96d56Sopenharmony_ci master_read_obj) 14427db96d56Sopenharmony_ci self.assertIs(p, proto) 14437db96d56Sopenharmony_ci self.assertIs(t, proto.transport) 14447db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 14457db96d56Sopenharmony_ci self.assertEqual(0, proto.nbytes) 14467db96d56Sopenharmony_ci 14477db96d56Sopenharmony_ci self.loop.run_until_complete(connect()) 14487db96d56Sopenharmony_ci 14497db96d56Sopenharmony_ci os.write(slave, b'1') 14507db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes) 14517db96d56Sopenharmony_ci self.assertEqual(1, proto.nbytes) 14527db96d56Sopenharmony_ci 14537db96d56Sopenharmony_ci os.write(slave, b'2345') 14547db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: proto.nbytes >= 5) 14557db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], proto.state) 14567db96d56Sopenharmony_ci self.assertEqual(5, proto.nbytes) 14577db96d56Sopenharmony_ci 14587db96d56Sopenharmony_ci os.close(slave) 14597db96d56Sopenharmony_ci proto.transport.close() 14607db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 14617db96d56Sopenharmony_ci self.assertEqual( 14627db96d56Sopenharmony_ci ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state) 14637db96d56Sopenharmony_ci # extra info is available 14647db96d56Sopenharmony_ci self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 14657db96d56Sopenharmony_ci 14667db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform != 'win32', 14677db96d56Sopenharmony_ci "Don't support pipes for Windows") 14687db96d56Sopenharmony_ci def test_write_pipe(self): 14697db96d56Sopenharmony_ci rpipe, wpipe = os.pipe() 14707db96d56Sopenharmony_ci pipeobj = io.open(wpipe, 'wb', 1024) 14717db96d56Sopenharmony_ci 14727db96d56Sopenharmony_ci proto = MyWritePipeProto(loop=self.loop) 14737db96d56Sopenharmony_ci connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 14747db96d56Sopenharmony_ci transport, p = self.loop.run_until_complete(connect) 14757db96d56Sopenharmony_ci self.assertIs(p, proto) 14767db96d56Sopenharmony_ci self.assertIs(transport, proto.transport) 14777db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 14787db96d56Sopenharmony_ci 14797db96d56Sopenharmony_ci transport.write(b'1') 14807db96d56Sopenharmony_ci 14817db96d56Sopenharmony_ci data = bytearray() 14827db96d56Sopenharmony_ci def reader(data): 14837db96d56Sopenharmony_ci chunk = os.read(rpipe, 1024) 14847db96d56Sopenharmony_ci data += chunk 14857db96d56Sopenharmony_ci return len(data) 14867db96d56Sopenharmony_ci 14877db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: reader(data) >= 1) 14887db96d56Sopenharmony_ci self.assertEqual(b'1', data) 14897db96d56Sopenharmony_ci 14907db96d56Sopenharmony_ci transport.write(b'2345') 14917db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: reader(data) >= 5) 14927db96d56Sopenharmony_ci self.assertEqual(b'12345', data) 14937db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 14947db96d56Sopenharmony_ci 14957db96d56Sopenharmony_ci os.close(rpipe) 14967db96d56Sopenharmony_ci 14977db96d56Sopenharmony_ci # extra info is available 14987db96d56Sopenharmony_ci self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 14997db96d56Sopenharmony_ci 15007db96d56Sopenharmony_ci # close connection 15017db96d56Sopenharmony_ci proto.transport.close() 15027db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 15037db96d56Sopenharmony_ci self.assertEqual('CLOSED', proto.state) 15047db96d56Sopenharmony_ci 15057db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform != 'win32', 15067db96d56Sopenharmony_ci "Don't support pipes for Windows") 15077db96d56Sopenharmony_ci def test_write_pipe_disconnect_on_close(self): 15087db96d56Sopenharmony_ci rsock, wsock = socket.socketpair() 15097db96d56Sopenharmony_ci rsock.setblocking(False) 15107db96d56Sopenharmony_ci pipeobj = io.open(wsock.detach(), 'wb', 1024) 15117db96d56Sopenharmony_ci 15127db96d56Sopenharmony_ci proto = MyWritePipeProto(loop=self.loop) 15137db96d56Sopenharmony_ci connect = self.loop.connect_write_pipe(lambda: proto, pipeobj) 15147db96d56Sopenharmony_ci transport, p = self.loop.run_until_complete(connect) 15157db96d56Sopenharmony_ci self.assertIs(p, proto) 15167db96d56Sopenharmony_ci self.assertIs(transport, proto.transport) 15177db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 15187db96d56Sopenharmony_ci 15197db96d56Sopenharmony_ci transport.write(b'1') 15207db96d56Sopenharmony_ci data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024)) 15217db96d56Sopenharmony_ci self.assertEqual(b'1', data) 15227db96d56Sopenharmony_ci 15237db96d56Sopenharmony_ci rsock.close() 15247db96d56Sopenharmony_ci 15257db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 15267db96d56Sopenharmony_ci self.assertEqual('CLOSED', proto.state) 15277db96d56Sopenharmony_ci 15287db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform != 'win32', 15297db96d56Sopenharmony_ci "Don't support pipes for Windows") 15307db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()') 15317db96d56Sopenharmony_ci # select, poll and kqueue don't support character devices (PTY) on Mac OS X 15327db96d56Sopenharmony_ci # older than 10.6 (Snow Leopard) 15337db96d56Sopenharmony_ci @support.requires_mac_ver(10, 6) 15347db96d56Sopenharmony_ci def test_write_pty(self): 15357db96d56Sopenharmony_ci master, slave = os.openpty() 15367db96d56Sopenharmony_ci slave_write_obj = io.open(slave, 'wb', 0) 15377db96d56Sopenharmony_ci 15387db96d56Sopenharmony_ci proto = MyWritePipeProto(loop=self.loop) 15397db96d56Sopenharmony_ci connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj) 15407db96d56Sopenharmony_ci transport, p = self.loop.run_until_complete(connect) 15417db96d56Sopenharmony_ci self.assertIs(p, proto) 15427db96d56Sopenharmony_ci self.assertIs(transport, proto.transport) 15437db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 15447db96d56Sopenharmony_ci 15457db96d56Sopenharmony_ci transport.write(b'1') 15467db96d56Sopenharmony_ci 15477db96d56Sopenharmony_ci data = bytearray() 15487db96d56Sopenharmony_ci def reader(data): 15497db96d56Sopenharmony_ci chunk = os.read(master, 1024) 15507db96d56Sopenharmony_ci data += chunk 15517db96d56Sopenharmony_ci return len(data) 15527db96d56Sopenharmony_ci 15537db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: reader(data) >= 1, 15547db96d56Sopenharmony_ci timeout=support.SHORT_TIMEOUT) 15557db96d56Sopenharmony_ci self.assertEqual(b'1', data) 15567db96d56Sopenharmony_ci 15577db96d56Sopenharmony_ci transport.write(b'2345') 15587db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: reader(data) >= 5, 15597db96d56Sopenharmony_ci timeout=support.SHORT_TIMEOUT) 15607db96d56Sopenharmony_ci self.assertEqual(b'12345', data) 15617db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 15627db96d56Sopenharmony_ci 15637db96d56Sopenharmony_ci os.close(master) 15647db96d56Sopenharmony_ci 15657db96d56Sopenharmony_ci # extra info is available 15667db96d56Sopenharmony_ci self.assertIsNotNone(proto.transport.get_extra_info('pipe')) 15677db96d56Sopenharmony_ci 15687db96d56Sopenharmony_ci # close connection 15697db96d56Sopenharmony_ci proto.transport.close() 15707db96d56Sopenharmony_ci self.loop.run_until_complete(proto.done) 15717db96d56Sopenharmony_ci self.assertEqual('CLOSED', proto.state) 15727db96d56Sopenharmony_ci 15737db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform != 'win32', 15747db96d56Sopenharmony_ci "Don't support pipes for Windows") 15757db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()') 15767db96d56Sopenharmony_ci # select, poll and kqueue don't support character devices (PTY) on Mac OS X 15777db96d56Sopenharmony_ci # older than 10.6 (Snow Leopard) 15787db96d56Sopenharmony_ci @support.requires_mac_ver(10, 6) 15797db96d56Sopenharmony_ci def test_bidirectional_pty(self): 15807db96d56Sopenharmony_ci master, read_slave = os.openpty() 15817db96d56Sopenharmony_ci write_slave = os.dup(read_slave) 15827db96d56Sopenharmony_ci tty.setraw(read_slave) 15837db96d56Sopenharmony_ci 15847db96d56Sopenharmony_ci slave_read_obj = io.open(read_slave, 'rb', 0) 15857db96d56Sopenharmony_ci read_proto = MyReadPipeProto(loop=self.loop) 15867db96d56Sopenharmony_ci read_connect = self.loop.connect_read_pipe(lambda: read_proto, 15877db96d56Sopenharmony_ci slave_read_obj) 15887db96d56Sopenharmony_ci read_transport, p = self.loop.run_until_complete(read_connect) 15897db96d56Sopenharmony_ci self.assertIs(p, read_proto) 15907db96d56Sopenharmony_ci self.assertIs(read_transport, read_proto.transport) 15917db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 15927db96d56Sopenharmony_ci self.assertEqual(0, read_proto.nbytes) 15937db96d56Sopenharmony_ci 15947db96d56Sopenharmony_ci 15957db96d56Sopenharmony_ci slave_write_obj = io.open(write_slave, 'wb', 0) 15967db96d56Sopenharmony_ci write_proto = MyWritePipeProto(loop=self.loop) 15977db96d56Sopenharmony_ci write_connect = self.loop.connect_write_pipe(lambda: write_proto, 15987db96d56Sopenharmony_ci slave_write_obj) 15997db96d56Sopenharmony_ci write_transport, p = self.loop.run_until_complete(write_connect) 16007db96d56Sopenharmony_ci self.assertIs(p, write_proto) 16017db96d56Sopenharmony_ci self.assertIs(write_transport, write_proto.transport) 16027db96d56Sopenharmony_ci self.assertEqual('CONNECTED', write_proto.state) 16037db96d56Sopenharmony_ci 16047db96d56Sopenharmony_ci data = bytearray() 16057db96d56Sopenharmony_ci def reader(data): 16067db96d56Sopenharmony_ci chunk = os.read(master, 1024) 16077db96d56Sopenharmony_ci data += chunk 16087db96d56Sopenharmony_ci return len(data) 16097db96d56Sopenharmony_ci 16107db96d56Sopenharmony_ci write_transport.write(b'1') 16117db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: reader(data) >= 1, 16127db96d56Sopenharmony_ci timeout=support.SHORT_TIMEOUT) 16137db96d56Sopenharmony_ci self.assertEqual(b'1', data) 16147db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 16157db96d56Sopenharmony_ci self.assertEqual('CONNECTED', write_proto.state) 16167db96d56Sopenharmony_ci 16177db96d56Sopenharmony_ci os.write(master, b'a') 16187db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1, 16197db96d56Sopenharmony_ci timeout=support.SHORT_TIMEOUT) 16207db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 16217db96d56Sopenharmony_ci self.assertEqual(1, read_proto.nbytes) 16227db96d56Sopenharmony_ci self.assertEqual('CONNECTED', write_proto.state) 16237db96d56Sopenharmony_ci 16247db96d56Sopenharmony_ci write_transport.write(b'2345') 16257db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: reader(data) >= 5, 16267db96d56Sopenharmony_ci timeout=support.SHORT_TIMEOUT) 16277db96d56Sopenharmony_ci self.assertEqual(b'12345', data) 16287db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 16297db96d56Sopenharmony_ci self.assertEqual('CONNECTED', write_proto.state) 16307db96d56Sopenharmony_ci 16317db96d56Sopenharmony_ci os.write(master, b'bcde') 16327db96d56Sopenharmony_ci test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5, 16337db96d56Sopenharmony_ci timeout=support.SHORT_TIMEOUT) 16347db96d56Sopenharmony_ci self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state) 16357db96d56Sopenharmony_ci self.assertEqual(5, read_proto.nbytes) 16367db96d56Sopenharmony_ci self.assertEqual('CONNECTED', write_proto.state) 16377db96d56Sopenharmony_ci 16387db96d56Sopenharmony_ci os.close(master) 16397db96d56Sopenharmony_ci 16407db96d56Sopenharmony_ci read_transport.close() 16417db96d56Sopenharmony_ci self.loop.run_until_complete(read_proto.done) 16427db96d56Sopenharmony_ci self.assertEqual( 16437db96d56Sopenharmony_ci ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state) 16447db96d56Sopenharmony_ci 16457db96d56Sopenharmony_ci write_transport.close() 16467db96d56Sopenharmony_ci self.loop.run_until_complete(write_proto.done) 16477db96d56Sopenharmony_ci self.assertEqual('CLOSED', write_proto.state) 16487db96d56Sopenharmony_ci 16497db96d56Sopenharmony_ci def test_prompt_cancellation(self): 16507db96d56Sopenharmony_ci r, w = socket.socketpair() 16517db96d56Sopenharmony_ci r.setblocking(False) 16527db96d56Sopenharmony_ci f = self.loop.create_task(self.loop.sock_recv(r, 1)) 16537db96d56Sopenharmony_ci ov = getattr(f, 'ov', None) 16547db96d56Sopenharmony_ci if ov is not None: 16557db96d56Sopenharmony_ci self.assertTrue(ov.pending) 16567db96d56Sopenharmony_ci 16577db96d56Sopenharmony_ci async def main(): 16587db96d56Sopenharmony_ci try: 16597db96d56Sopenharmony_ci self.loop.call_soon(f.cancel) 16607db96d56Sopenharmony_ci await f 16617db96d56Sopenharmony_ci except asyncio.CancelledError: 16627db96d56Sopenharmony_ci res = 'cancelled' 16637db96d56Sopenharmony_ci else: 16647db96d56Sopenharmony_ci res = None 16657db96d56Sopenharmony_ci finally: 16667db96d56Sopenharmony_ci self.loop.stop() 16677db96d56Sopenharmony_ci return res 16687db96d56Sopenharmony_ci 16697db96d56Sopenharmony_ci start = time.monotonic() 16707db96d56Sopenharmony_ci t = self.loop.create_task(main()) 16717db96d56Sopenharmony_ci self.loop.run_forever() 16727db96d56Sopenharmony_ci elapsed = time.monotonic() - start 16737db96d56Sopenharmony_ci 16747db96d56Sopenharmony_ci self.assertLess(elapsed, 0.1) 16757db96d56Sopenharmony_ci self.assertEqual(t.result(), 'cancelled') 16767db96d56Sopenharmony_ci self.assertRaises(asyncio.CancelledError, f.result) 16777db96d56Sopenharmony_ci if ov is not None: 16787db96d56Sopenharmony_ci self.assertFalse(ov.pending) 16797db96d56Sopenharmony_ci self.loop._stop_serving(r) 16807db96d56Sopenharmony_ci 16817db96d56Sopenharmony_ci r.close() 16827db96d56Sopenharmony_ci w.close() 16837db96d56Sopenharmony_ci 16847db96d56Sopenharmony_ci def test_timeout_rounding(self): 16857db96d56Sopenharmony_ci def _run_once(): 16867db96d56Sopenharmony_ci self.loop._run_once_counter += 1 16877db96d56Sopenharmony_ci orig_run_once() 16887db96d56Sopenharmony_ci 16897db96d56Sopenharmony_ci orig_run_once = self.loop._run_once 16907db96d56Sopenharmony_ci self.loop._run_once_counter = 0 16917db96d56Sopenharmony_ci self.loop._run_once = _run_once 16927db96d56Sopenharmony_ci 16937db96d56Sopenharmony_ci async def wait(): 16947db96d56Sopenharmony_ci loop = self.loop 16957db96d56Sopenharmony_ci await asyncio.sleep(1e-2) 16967db96d56Sopenharmony_ci await asyncio.sleep(1e-4) 16977db96d56Sopenharmony_ci await asyncio.sleep(1e-6) 16987db96d56Sopenharmony_ci await asyncio.sleep(1e-8) 16997db96d56Sopenharmony_ci await asyncio.sleep(1e-10) 17007db96d56Sopenharmony_ci 17017db96d56Sopenharmony_ci self.loop.run_until_complete(wait()) 17027db96d56Sopenharmony_ci # The ideal number of call is 12, but on some platforms, the selector 17037db96d56Sopenharmony_ci # may sleep at little bit less than timeout depending on the resolution 17047db96d56Sopenharmony_ci # of the clock used by the kernel. Tolerate a few useless calls on 17057db96d56Sopenharmony_ci # these platforms. 17067db96d56Sopenharmony_ci self.assertLessEqual(self.loop._run_once_counter, 20, 17077db96d56Sopenharmony_ci {'clock_resolution': self.loop._clock_resolution, 17087db96d56Sopenharmony_ci 'selector': self.loop._selector.__class__.__name__}) 17097db96d56Sopenharmony_ci 17107db96d56Sopenharmony_ci def test_remove_fds_after_closing(self): 17117db96d56Sopenharmony_ci loop = self.create_event_loop() 17127db96d56Sopenharmony_ci callback = lambda: None 17137db96d56Sopenharmony_ci r, w = socket.socketpair() 17147db96d56Sopenharmony_ci self.addCleanup(r.close) 17157db96d56Sopenharmony_ci self.addCleanup(w.close) 17167db96d56Sopenharmony_ci loop.add_reader(r, callback) 17177db96d56Sopenharmony_ci loop.add_writer(w, callback) 17187db96d56Sopenharmony_ci loop.close() 17197db96d56Sopenharmony_ci self.assertFalse(loop.remove_reader(r)) 17207db96d56Sopenharmony_ci self.assertFalse(loop.remove_writer(w)) 17217db96d56Sopenharmony_ci 17227db96d56Sopenharmony_ci def test_add_fds_after_closing(self): 17237db96d56Sopenharmony_ci loop = self.create_event_loop() 17247db96d56Sopenharmony_ci callback = lambda: None 17257db96d56Sopenharmony_ci r, w = socket.socketpair() 17267db96d56Sopenharmony_ci self.addCleanup(r.close) 17277db96d56Sopenharmony_ci self.addCleanup(w.close) 17287db96d56Sopenharmony_ci loop.close() 17297db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17307db96d56Sopenharmony_ci loop.add_reader(r, callback) 17317db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17327db96d56Sopenharmony_ci loop.add_writer(w, callback) 17337db96d56Sopenharmony_ci 17347db96d56Sopenharmony_ci def test_close_running_event_loop(self): 17357db96d56Sopenharmony_ci async def close_loop(loop): 17367db96d56Sopenharmony_ci self.loop.close() 17377db96d56Sopenharmony_ci 17387db96d56Sopenharmony_ci coro = close_loop(self.loop) 17397db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17407db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 17417db96d56Sopenharmony_ci 17427db96d56Sopenharmony_ci def test_close(self): 17437db96d56Sopenharmony_ci self.loop.close() 17447db96d56Sopenharmony_ci 17457db96d56Sopenharmony_ci async def test(): 17467db96d56Sopenharmony_ci pass 17477db96d56Sopenharmony_ci 17487db96d56Sopenharmony_ci func = lambda: False 17497db96d56Sopenharmony_ci coro = test() 17507db96d56Sopenharmony_ci self.addCleanup(coro.close) 17517db96d56Sopenharmony_ci 17527db96d56Sopenharmony_ci # operation blocked when the loop is closed 17537db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17547db96d56Sopenharmony_ci self.loop.run_forever() 17557db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17567db96d56Sopenharmony_ci fut = self.loop.create_future() 17577db96d56Sopenharmony_ci self.loop.run_until_complete(fut) 17587db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17597db96d56Sopenharmony_ci self.loop.call_soon(func) 17607db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17617db96d56Sopenharmony_ci self.loop.call_soon_threadsafe(func) 17627db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17637db96d56Sopenharmony_ci self.loop.call_later(1.0, func) 17647db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17657db96d56Sopenharmony_ci self.loop.call_at(self.loop.time() + .0, func) 17667db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17677db96d56Sopenharmony_ci self.loop.create_task(coro) 17687db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17697db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGTERM, func) 17707db96d56Sopenharmony_ci 17717db96d56Sopenharmony_ci # run_in_executor test is tricky: the method is a coroutine, 17727db96d56Sopenharmony_ci # but run_until_complete cannot be called on closed loop. 17737db96d56Sopenharmony_ci # Thus iterate once explicitly. 17747db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 17757db96d56Sopenharmony_ci it = self.loop.run_in_executor(None, func).__await__() 17767db96d56Sopenharmony_ci next(it) 17777db96d56Sopenharmony_ci 17787db96d56Sopenharmony_ci 17797db96d56Sopenharmony_ciclass SubprocessTestsMixin: 17807db96d56Sopenharmony_ci 17817db96d56Sopenharmony_ci def check_terminated(self, returncode): 17827db96d56Sopenharmony_ci if sys.platform == 'win32': 17837db96d56Sopenharmony_ci self.assertIsInstance(returncode, int) 17847db96d56Sopenharmony_ci # expect 1 but sometimes get 0 17857db96d56Sopenharmony_ci else: 17867db96d56Sopenharmony_ci self.assertEqual(-signal.SIGTERM, returncode) 17877db96d56Sopenharmony_ci 17887db96d56Sopenharmony_ci def check_killed(self, returncode): 17897db96d56Sopenharmony_ci if sys.platform == 'win32': 17907db96d56Sopenharmony_ci self.assertIsInstance(returncode, int) 17917db96d56Sopenharmony_ci # expect 1 but sometimes get 0 17927db96d56Sopenharmony_ci else: 17937db96d56Sopenharmony_ci self.assertEqual(-signal.SIGKILL, returncode) 17947db96d56Sopenharmony_ci 17957db96d56Sopenharmony_ci def test_subprocess_exec(self): 17967db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo.py') 17977db96d56Sopenharmony_ci 17987db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 17997db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 18007db96d56Sopenharmony_ci sys.executable, prog) 18017db96d56Sopenharmony_ci 18027db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 18037db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 18047db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 18057db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 18067db96d56Sopenharmony_ci 18077db96d56Sopenharmony_ci stdin = transp.get_pipe_transport(0) 18087db96d56Sopenharmony_ci stdin.write(b'Python The Winner') 18097db96d56Sopenharmony_ci self.loop.run_until_complete(proto.got_data[1].wait()) 18107db96d56Sopenharmony_ci with test_utils.disable_logger(): 18117db96d56Sopenharmony_ci transp.close() 18127db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 18137db96d56Sopenharmony_ci self.check_killed(proto.returncode) 18147db96d56Sopenharmony_ci self.assertEqual(b'Python The Winner', proto.data[1]) 18157db96d56Sopenharmony_ci 18167db96d56Sopenharmony_ci def test_subprocess_interactive(self): 18177db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo.py') 18187db96d56Sopenharmony_ci 18197db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 18207db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 18217db96d56Sopenharmony_ci sys.executable, prog) 18227db96d56Sopenharmony_ci 18237db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 18247db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 18257db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 18267db96d56Sopenharmony_ci self.assertEqual('CONNECTED', proto.state) 18277db96d56Sopenharmony_ci 18287db96d56Sopenharmony_ci stdin = transp.get_pipe_transport(0) 18297db96d56Sopenharmony_ci stdin.write(b'Python ') 18307db96d56Sopenharmony_ci self.loop.run_until_complete(proto.got_data[1].wait()) 18317db96d56Sopenharmony_ci proto.got_data[1].clear() 18327db96d56Sopenharmony_ci self.assertEqual(b'Python ', proto.data[1]) 18337db96d56Sopenharmony_ci 18347db96d56Sopenharmony_ci stdin.write(b'The Winner') 18357db96d56Sopenharmony_ci self.loop.run_until_complete(proto.got_data[1].wait()) 18367db96d56Sopenharmony_ci self.assertEqual(b'Python The Winner', proto.data[1]) 18377db96d56Sopenharmony_ci 18387db96d56Sopenharmony_ci with test_utils.disable_logger(): 18397db96d56Sopenharmony_ci transp.close() 18407db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 18417db96d56Sopenharmony_ci self.check_killed(proto.returncode) 18427db96d56Sopenharmony_ci 18437db96d56Sopenharmony_ci def test_subprocess_shell(self): 18447db96d56Sopenharmony_ci connect = self.loop.subprocess_shell( 18457db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 18467db96d56Sopenharmony_ci 'echo Python') 18477db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 18487db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 18497db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 18507db96d56Sopenharmony_ci 18517db96d56Sopenharmony_ci transp.get_pipe_transport(0).close() 18527db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 18537db96d56Sopenharmony_ci self.assertEqual(0, proto.returncode) 18547db96d56Sopenharmony_ci self.assertTrue(all(f.done() for f in proto.disconnects.values())) 18557db96d56Sopenharmony_ci self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python') 18567db96d56Sopenharmony_ci self.assertEqual(proto.data[2], b'') 18577db96d56Sopenharmony_ci transp.close() 18587db96d56Sopenharmony_ci 18597db96d56Sopenharmony_ci def test_subprocess_exitcode(self): 18607db96d56Sopenharmony_ci connect = self.loop.subprocess_shell( 18617db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 18627db96d56Sopenharmony_ci 'exit 7', stdin=None, stdout=None, stderr=None) 18637db96d56Sopenharmony_ci 18647db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 18657db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 18667db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 18677db96d56Sopenharmony_ci self.assertEqual(7, proto.returncode) 18687db96d56Sopenharmony_ci transp.close() 18697db96d56Sopenharmony_ci 18707db96d56Sopenharmony_ci def test_subprocess_close_after_finish(self): 18717db96d56Sopenharmony_ci connect = self.loop.subprocess_shell( 18727db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 18737db96d56Sopenharmony_ci 'exit 7', stdin=None, stdout=None, stderr=None) 18747db96d56Sopenharmony_ci 18757db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 18767db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 18777db96d56Sopenharmony_ci self.assertIsNone(transp.get_pipe_transport(0)) 18787db96d56Sopenharmony_ci self.assertIsNone(transp.get_pipe_transport(1)) 18797db96d56Sopenharmony_ci self.assertIsNone(transp.get_pipe_transport(2)) 18807db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 18817db96d56Sopenharmony_ci self.assertEqual(7, proto.returncode) 18827db96d56Sopenharmony_ci self.assertIsNone(transp.close()) 18837db96d56Sopenharmony_ci 18847db96d56Sopenharmony_ci def test_subprocess_kill(self): 18857db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo.py') 18867db96d56Sopenharmony_ci 18877db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 18887db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 18897db96d56Sopenharmony_ci sys.executable, prog) 18907db96d56Sopenharmony_ci 18917db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 18927db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 18937db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 18947db96d56Sopenharmony_ci 18957db96d56Sopenharmony_ci transp.kill() 18967db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 18977db96d56Sopenharmony_ci self.check_killed(proto.returncode) 18987db96d56Sopenharmony_ci transp.close() 18997db96d56Sopenharmony_ci 19007db96d56Sopenharmony_ci def test_subprocess_terminate(self): 19017db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo.py') 19027db96d56Sopenharmony_ci 19037db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 19047db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 19057db96d56Sopenharmony_ci sys.executable, prog) 19067db96d56Sopenharmony_ci 19077db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 19087db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 19097db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 19107db96d56Sopenharmony_ci 19117db96d56Sopenharmony_ci transp.terminate() 19127db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 19137db96d56Sopenharmony_ci self.check_terminated(proto.returncode) 19147db96d56Sopenharmony_ci transp.close() 19157db96d56Sopenharmony_ci 19167db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP") 19177db96d56Sopenharmony_ci def test_subprocess_send_signal(self): 19187db96d56Sopenharmony_ci # bpo-31034: Make sure that we get the default signal handler (killing 19197db96d56Sopenharmony_ci # the process). The parent process may have decided to ignore SIGHUP, 19207db96d56Sopenharmony_ci # and signal handlers are inherited. 19217db96d56Sopenharmony_ci old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL) 19227db96d56Sopenharmony_ci try: 19237db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo.py') 19247db96d56Sopenharmony_ci 19257db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 19267db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 19277db96d56Sopenharmony_ci sys.executable, prog) 19287db96d56Sopenharmony_ci 19297db96d56Sopenharmony_ci 19307db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 19317db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 19327db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 19337db96d56Sopenharmony_ci 19347db96d56Sopenharmony_ci transp.send_signal(signal.SIGHUP) 19357db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 19367db96d56Sopenharmony_ci self.assertEqual(-signal.SIGHUP, proto.returncode) 19377db96d56Sopenharmony_ci transp.close() 19387db96d56Sopenharmony_ci finally: 19397db96d56Sopenharmony_ci signal.signal(signal.SIGHUP, old_handler) 19407db96d56Sopenharmony_ci 19417db96d56Sopenharmony_ci def test_subprocess_stderr(self): 19427db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 19437db96d56Sopenharmony_ci 19447db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 19457db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 19467db96d56Sopenharmony_ci sys.executable, prog) 19477db96d56Sopenharmony_ci 19487db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 19497db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 19507db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 19517db96d56Sopenharmony_ci 19527db96d56Sopenharmony_ci stdin = transp.get_pipe_transport(0) 19537db96d56Sopenharmony_ci stdin.write(b'test') 19547db96d56Sopenharmony_ci 19557db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 19567db96d56Sopenharmony_ci 19577db96d56Sopenharmony_ci transp.close() 19587db96d56Sopenharmony_ci self.assertEqual(b'OUT:test', proto.data[1]) 19597db96d56Sopenharmony_ci self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2]) 19607db96d56Sopenharmony_ci self.assertEqual(0, proto.returncode) 19617db96d56Sopenharmony_ci 19627db96d56Sopenharmony_ci def test_subprocess_stderr_redirect_to_stdout(self): 19637db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo2.py') 19647db96d56Sopenharmony_ci 19657db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 19667db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 19677db96d56Sopenharmony_ci sys.executable, prog, stderr=subprocess.STDOUT) 19687db96d56Sopenharmony_ci 19697db96d56Sopenharmony_ci 19707db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 19717db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 19727db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 19737db96d56Sopenharmony_ci 19747db96d56Sopenharmony_ci stdin = transp.get_pipe_transport(0) 19757db96d56Sopenharmony_ci self.assertIsNotNone(transp.get_pipe_transport(1)) 19767db96d56Sopenharmony_ci self.assertIsNone(transp.get_pipe_transport(2)) 19777db96d56Sopenharmony_ci 19787db96d56Sopenharmony_ci stdin.write(b'test') 19797db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 19807db96d56Sopenharmony_ci self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'), 19817db96d56Sopenharmony_ci proto.data[1]) 19827db96d56Sopenharmony_ci self.assertEqual(b'', proto.data[2]) 19837db96d56Sopenharmony_ci 19847db96d56Sopenharmony_ci transp.close() 19857db96d56Sopenharmony_ci self.assertEqual(0, proto.returncode) 19867db96d56Sopenharmony_ci 19877db96d56Sopenharmony_ci def test_subprocess_close_client_stream(self): 19887db96d56Sopenharmony_ci prog = os.path.join(os.path.dirname(__file__), 'echo3.py') 19897db96d56Sopenharmony_ci 19907db96d56Sopenharmony_ci connect = self.loop.subprocess_exec( 19917db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 19927db96d56Sopenharmony_ci sys.executable, prog) 19937db96d56Sopenharmony_ci 19947db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 19957db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 19967db96d56Sopenharmony_ci self.loop.run_until_complete(proto.connected) 19977db96d56Sopenharmony_ci 19987db96d56Sopenharmony_ci stdin = transp.get_pipe_transport(0) 19997db96d56Sopenharmony_ci stdout = transp.get_pipe_transport(1) 20007db96d56Sopenharmony_ci stdin.write(b'test') 20017db96d56Sopenharmony_ci self.loop.run_until_complete(proto.got_data[1].wait()) 20027db96d56Sopenharmony_ci self.assertEqual(b'OUT:test', proto.data[1]) 20037db96d56Sopenharmony_ci 20047db96d56Sopenharmony_ci stdout.close() 20057db96d56Sopenharmony_ci self.loop.run_until_complete(proto.disconnects[1]) 20067db96d56Sopenharmony_ci stdin.write(b'xxx') 20077db96d56Sopenharmony_ci self.loop.run_until_complete(proto.got_data[2].wait()) 20087db96d56Sopenharmony_ci if sys.platform != 'win32': 20097db96d56Sopenharmony_ci self.assertEqual(b'ERR:BrokenPipeError', proto.data[2]) 20107db96d56Sopenharmony_ci else: 20117db96d56Sopenharmony_ci # After closing the read-end of a pipe, writing to the 20127db96d56Sopenharmony_ci # write-end using os.write() fails with errno==EINVAL and 20137db96d56Sopenharmony_ci # GetLastError()==ERROR_INVALID_NAME on Windows!?! (Using 20147db96d56Sopenharmony_ci # WriteFile() we get ERROR_BROKEN_PIPE as expected.) 20157db96d56Sopenharmony_ci self.assertEqual(b'ERR:OSError', proto.data[2]) 20167db96d56Sopenharmony_ci with test_utils.disable_logger(): 20177db96d56Sopenharmony_ci transp.close() 20187db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 20197db96d56Sopenharmony_ci self.check_killed(proto.returncode) 20207db96d56Sopenharmony_ci 20217db96d56Sopenharmony_ci def test_subprocess_wait_no_same_group(self): 20227db96d56Sopenharmony_ci # start the new process in a new session 20237db96d56Sopenharmony_ci connect = self.loop.subprocess_shell( 20247db96d56Sopenharmony_ci functools.partial(MySubprocessProtocol, self.loop), 20257db96d56Sopenharmony_ci 'exit 7', stdin=None, stdout=None, stderr=None, 20267db96d56Sopenharmony_ci start_new_session=True) 20277db96d56Sopenharmony_ci transp, proto = self.loop.run_until_complete(connect) 20287db96d56Sopenharmony_ci self.assertIsInstance(proto, MySubprocessProtocol) 20297db96d56Sopenharmony_ci self.loop.run_until_complete(proto.completed) 20307db96d56Sopenharmony_ci self.assertEqual(7, proto.returncode) 20317db96d56Sopenharmony_ci transp.close() 20327db96d56Sopenharmony_ci 20337db96d56Sopenharmony_ci def test_subprocess_exec_invalid_args(self): 20347db96d56Sopenharmony_ci async def connect(**kwds): 20357db96d56Sopenharmony_ci await self.loop.subprocess_exec( 20367db96d56Sopenharmony_ci asyncio.SubprocessProtocol, 20377db96d56Sopenharmony_ci 'pwd', **kwds) 20387db96d56Sopenharmony_ci 20397db96d56Sopenharmony_ci with self.assertRaises(ValueError): 20407db96d56Sopenharmony_ci self.loop.run_until_complete(connect(universal_newlines=True)) 20417db96d56Sopenharmony_ci with self.assertRaises(ValueError): 20427db96d56Sopenharmony_ci self.loop.run_until_complete(connect(bufsize=4096)) 20437db96d56Sopenharmony_ci with self.assertRaises(ValueError): 20447db96d56Sopenharmony_ci self.loop.run_until_complete(connect(shell=True)) 20457db96d56Sopenharmony_ci 20467db96d56Sopenharmony_ci def test_subprocess_shell_invalid_args(self): 20477db96d56Sopenharmony_ci 20487db96d56Sopenharmony_ci async def connect(cmd=None, **kwds): 20497db96d56Sopenharmony_ci if not cmd: 20507db96d56Sopenharmony_ci cmd = 'pwd' 20517db96d56Sopenharmony_ci await self.loop.subprocess_shell( 20527db96d56Sopenharmony_ci asyncio.SubprocessProtocol, 20537db96d56Sopenharmony_ci cmd, **kwds) 20547db96d56Sopenharmony_ci 20557db96d56Sopenharmony_ci with self.assertRaises(ValueError): 20567db96d56Sopenharmony_ci self.loop.run_until_complete(connect(['ls', '-l'])) 20577db96d56Sopenharmony_ci with self.assertRaises(ValueError): 20587db96d56Sopenharmony_ci self.loop.run_until_complete(connect(universal_newlines=True)) 20597db96d56Sopenharmony_ci with self.assertRaises(ValueError): 20607db96d56Sopenharmony_ci self.loop.run_until_complete(connect(bufsize=4096)) 20617db96d56Sopenharmony_ci with self.assertRaises(ValueError): 20627db96d56Sopenharmony_ci self.loop.run_until_complete(connect(shell=False)) 20637db96d56Sopenharmony_ci 20647db96d56Sopenharmony_ci 20657db96d56Sopenharmony_ciif sys.platform == 'win32': 20667db96d56Sopenharmony_ci 20677db96d56Sopenharmony_ci class SelectEventLoopTests(EventLoopTestsMixin, 20687db96d56Sopenharmony_ci test_utils.TestCase): 20697db96d56Sopenharmony_ci 20707db96d56Sopenharmony_ci def create_event_loop(self): 20717db96d56Sopenharmony_ci return asyncio.SelectorEventLoop() 20727db96d56Sopenharmony_ci 20737db96d56Sopenharmony_ci class ProactorEventLoopTests(EventLoopTestsMixin, 20747db96d56Sopenharmony_ci SubprocessTestsMixin, 20757db96d56Sopenharmony_ci test_utils.TestCase): 20767db96d56Sopenharmony_ci 20777db96d56Sopenharmony_ci def create_event_loop(self): 20787db96d56Sopenharmony_ci return asyncio.ProactorEventLoop() 20797db96d56Sopenharmony_ci 20807db96d56Sopenharmony_ci def test_reader_callback(self): 20817db96d56Sopenharmony_ci raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 20827db96d56Sopenharmony_ci 20837db96d56Sopenharmony_ci def test_reader_callback_cancel(self): 20847db96d56Sopenharmony_ci raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 20857db96d56Sopenharmony_ci 20867db96d56Sopenharmony_ci def test_writer_callback(self): 20877db96d56Sopenharmony_ci raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 20887db96d56Sopenharmony_ci 20897db96d56Sopenharmony_ci def test_writer_callback_cancel(self): 20907db96d56Sopenharmony_ci raise unittest.SkipTest("IocpEventLoop does not have add_writer()") 20917db96d56Sopenharmony_ci 20927db96d56Sopenharmony_ci def test_remove_fds_after_closing(self): 20937db96d56Sopenharmony_ci raise unittest.SkipTest("IocpEventLoop does not have add_reader()") 20947db96d56Sopenharmony_cielse: 20957db96d56Sopenharmony_ci import selectors 20967db96d56Sopenharmony_ci 20977db96d56Sopenharmony_ci class UnixEventLoopTestsMixin(EventLoopTestsMixin): 20987db96d56Sopenharmony_ci def setUp(self): 20997db96d56Sopenharmony_ci super().setUp() 21007db96d56Sopenharmony_ci watcher = asyncio.SafeChildWatcher() 21017db96d56Sopenharmony_ci watcher.attach_loop(self.loop) 21027db96d56Sopenharmony_ci asyncio.set_child_watcher(watcher) 21037db96d56Sopenharmony_ci 21047db96d56Sopenharmony_ci def tearDown(self): 21057db96d56Sopenharmony_ci asyncio.set_child_watcher(None) 21067db96d56Sopenharmony_ci super().tearDown() 21077db96d56Sopenharmony_ci 21087db96d56Sopenharmony_ci 21097db96d56Sopenharmony_ci if hasattr(selectors, 'KqueueSelector'): 21107db96d56Sopenharmony_ci class KqueueEventLoopTests(UnixEventLoopTestsMixin, 21117db96d56Sopenharmony_ci SubprocessTestsMixin, 21127db96d56Sopenharmony_ci test_utils.TestCase): 21137db96d56Sopenharmony_ci 21147db96d56Sopenharmony_ci def create_event_loop(self): 21157db96d56Sopenharmony_ci return asyncio.SelectorEventLoop( 21167db96d56Sopenharmony_ci selectors.KqueueSelector()) 21177db96d56Sopenharmony_ci 21187db96d56Sopenharmony_ci # kqueue doesn't support character devices (PTY) on Mac OS X older 21197db96d56Sopenharmony_ci # than 10.9 (Maverick) 21207db96d56Sopenharmony_ci @support.requires_mac_ver(10, 9) 21217db96d56Sopenharmony_ci # Issue #20667: KqueueEventLoopTests.test_read_pty_output() 21227db96d56Sopenharmony_ci # hangs on OpenBSD 5.5 21237db96d56Sopenharmony_ci @unittest.skipIf(sys.platform.startswith('openbsd'), 21247db96d56Sopenharmony_ci 'test hangs on OpenBSD') 21257db96d56Sopenharmony_ci def test_read_pty_output(self): 21267db96d56Sopenharmony_ci super().test_read_pty_output() 21277db96d56Sopenharmony_ci 21287db96d56Sopenharmony_ci # kqueue doesn't support character devices (PTY) on Mac OS X older 21297db96d56Sopenharmony_ci # than 10.9 (Maverick) 21307db96d56Sopenharmony_ci @support.requires_mac_ver(10, 9) 21317db96d56Sopenharmony_ci def test_write_pty(self): 21327db96d56Sopenharmony_ci super().test_write_pty() 21337db96d56Sopenharmony_ci 21347db96d56Sopenharmony_ci if hasattr(selectors, 'EpollSelector'): 21357db96d56Sopenharmony_ci class EPollEventLoopTests(UnixEventLoopTestsMixin, 21367db96d56Sopenharmony_ci SubprocessTestsMixin, 21377db96d56Sopenharmony_ci test_utils.TestCase): 21387db96d56Sopenharmony_ci 21397db96d56Sopenharmony_ci def create_event_loop(self): 21407db96d56Sopenharmony_ci return asyncio.SelectorEventLoop(selectors.EpollSelector()) 21417db96d56Sopenharmony_ci 21427db96d56Sopenharmony_ci if hasattr(selectors, 'PollSelector'): 21437db96d56Sopenharmony_ci class PollEventLoopTests(UnixEventLoopTestsMixin, 21447db96d56Sopenharmony_ci SubprocessTestsMixin, 21457db96d56Sopenharmony_ci test_utils.TestCase): 21467db96d56Sopenharmony_ci 21477db96d56Sopenharmony_ci def create_event_loop(self): 21487db96d56Sopenharmony_ci return asyncio.SelectorEventLoop(selectors.PollSelector()) 21497db96d56Sopenharmony_ci 21507db96d56Sopenharmony_ci # Should always exist. 21517db96d56Sopenharmony_ci class SelectEventLoopTests(UnixEventLoopTestsMixin, 21527db96d56Sopenharmony_ci SubprocessTestsMixin, 21537db96d56Sopenharmony_ci test_utils.TestCase): 21547db96d56Sopenharmony_ci 21557db96d56Sopenharmony_ci def create_event_loop(self): 21567db96d56Sopenharmony_ci return asyncio.SelectorEventLoop(selectors.SelectSelector()) 21577db96d56Sopenharmony_ci 21587db96d56Sopenharmony_ci 21597db96d56Sopenharmony_cidef noop(*args, **kwargs): 21607db96d56Sopenharmony_ci pass 21617db96d56Sopenharmony_ci 21627db96d56Sopenharmony_ci 21637db96d56Sopenharmony_ciclass HandleTests(test_utils.TestCase): 21647db96d56Sopenharmony_ci 21657db96d56Sopenharmony_ci def setUp(self): 21667db96d56Sopenharmony_ci super().setUp() 21677db96d56Sopenharmony_ci self.loop = mock.Mock() 21687db96d56Sopenharmony_ci self.loop.get_debug.return_value = True 21697db96d56Sopenharmony_ci 21707db96d56Sopenharmony_ci def test_handle(self): 21717db96d56Sopenharmony_ci def callback(*args): 21727db96d56Sopenharmony_ci return args 21737db96d56Sopenharmony_ci 21747db96d56Sopenharmony_ci args = () 21757db96d56Sopenharmony_ci h = asyncio.Handle(callback, args, self.loop) 21767db96d56Sopenharmony_ci self.assertIs(h._callback, callback) 21777db96d56Sopenharmony_ci self.assertIs(h._args, args) 21787db96d56Sopenharmony_ci self.assertFalse(h.cancelled()) 21797db96d56Sopenharmony_ci 21807db96d56Sopenharmony_ci h.cancel() 21817db96d56Sopenharmony_ci self.assertTrue(h.cancelled()) 21827db96d56Sopenharmony_ci 21837db96d56Sopenharmony_ci def test_callback_with_exception(self): 21847db96d56Sopenharmony_ci def callback(): 21857db96d56Sopenharmony_ci raise ValueError() 21867db96d56Sopenharmony_ci 21877db96d56Sopenharmony_ci self.loop = mock.Mock() 21887db96d56Sopenharmony_ci self.loop.call_exception_handler = mock.Mock() 21897db96d56Sopenharmony_ci 21907db96d56Sopenharmony_ci h = asyncio.Handle(callback, (), self.loop) 21917db96d56Sopenharmony_ci h._run() 21927db96d56Sopenharmony_ci 21937db96d56Sopenharmony_ci self.loop.call_exception_handler.assert_called_with({ 21947db96d56Sopenharmony_ci 'message': test_utils.MockPattern('Exception in callback.*'), 21957db96d56Sopenharmony_ci 'exception': mock.ANY, 21967db96d56Sopenharmony_ci 'handle': h, 21977db96d56Sopenharmony_ci 'source_traceback': h._source_traceback, 21987db96d56Sopenharmony_ci }) 21997db96d56Sopenharmony_ci 22007db96d56Sopenharmony_ci def test_handle_weakref(self): 22017db96d56Sopenharmony_ci wd = weakref.WeakValueDictionary() 22027db96d56Sopenharmony_ci h = asyncio.Handle(lambda: None, (), self.loop) 22037db96d56Sopenharmony_ci wd['h'] = h # Would fail without __weakref__ slot. 22047db96d56Sopenharmony_ci 22057db96d56Sopenharmony_ci def test_handle_repr(self): 22067db96d56Sopenharmony_ci self.loop.get_debug.return_value = False 22077db96d56Sopenharmony_ci 22087db96d56Sopenharmony_ci # simple function 22097db96d56Sopenharmony_ci h = asyncio.Handle(noop, (1, 2), self.loop) 22107db96d56Sopenharmony_ci filename, lineno = test_utils.get_function_source(noop) 22117db96d56Sopenharmony_ci self.assertEqual(repr(h), 22127db96d56Sopenharmony_ci '<Handle noop(1, 2) at %s:%s>' 22137db96d56Sopenharmony_ci % (filename, lineno)) 22147db96d56Sopenharmony_ci 22157db96d56Sopenharmony_ci # cancelled handle 22167db96d56Sopenharmony_ci h.cancel() 22177db96d56Sopenharmony_ci self.assertEqual(repr(h), 22187db96d56Sopenharmony_ci '<Handle cancelled>') 22197db96d56Sopenharmony_ci 22207db96d56Sopenharmony_ci # decorated function 22217db96d56Sopenharmony_ci cb = types.coroutine(noop) 22227db96d56Sopenharmony_ci h = asyncio.Handle(cb, (), self.loop) 22237db96d56Sopenharmony_ci self.assertEqual(repr(h), 22247db96d56Sopenharmony_ci '<Handle noop() at %s:%s>' 22257db96d56Sopenharmony_ci % (filename, lineno)) 22267db96d56Sopenharmony_ci 22277db96d56Sopenharmony_ci # partial function 22287db96d56Sopenharmony_ci cb = functools.partial(noop, 1, 2) 22297db96d56Sopenharmony_ci h = asyncio.Handle(cb, (3,), self.loop) 22307db96d56Sopenharmony_ci regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$' 22317db96d56Sopenharmony_ci % (re.escape(filename), lineno)) 22327db96d56Sopenharmony_ci self.assertRegex(repr(h), regex) 22337db96d56Sopenharmony_ci 22347db96d56Sopenharmony_ci # partial function with keyword args 22357db96d56Sopenharmony_ci cb = functools.partial(noop, x=1) 22367db96d56Sopenharmony_ci h = asyncio.Handle(cb, (2, 3), self.loop) 22377db96d56Sopenharmony_ci regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$' 22387db96d56Sopenharmony_ci % (re.escape(filename), lineno)) 22397db96d56Sopenharmony_ci self.assertRegex(repr(h), regex) 22407db96d56Sopenharmony_ci 22417db96d56Sopenharmony_ci # partial method 22427db96d56Sopenharmony_ci method = HandleTests.test_handle_repr 22437db96d56Sopenharmony_ci cb = functools.partialmethod(method) 22447db96d56Sopenharmony_ci filename, lineno = test_utils.get_function_source(method) 22457db96d56Sopenharmony_ci h = asyncio.Handle(cb, (), self.loop) 22467db96d56Sopenharmony_ci 22477db96d56Sopenharmony_ci cb_regex = r'<function HandleTests.test_handle_repr .*>' 22487db96d56Sopenharmony_ci cb_regex = fr'functools.partialmethod\({cb_regex}, , \)\(\)' 22497db96d56Sopenharmony_ci regex = fr'^<Handle {cb_regex} at {re.escape(filename)}:{lineno}>$' 22507db96d56Sopenharmony_ci self.assertRegex(repr(h), regex) 22517db96d56Sopenharmony_ci 22527db96d56Sopenharmony_ci def test_handle_repr_debug(self): 22537db96d56Sopenharmony_ci self.loop.get_debug.return_value = True 22547db96d56Sopenharmony_ci 22557db96d56Sopenharmony_ci # simple function 22567db96d56Sopenharmony_ci create_filename = __file__ 22577db96d56Sopenharmony_ci create_lineno = sys._getframe().f_lineno + 1 22587db96d56Sopenharmony_ci h = asyncio.Handle(noop, (1, 2), self.loop) 22597db96d56Sopenharmony_ci filename, lineno = test_utils.get_function_source(noop) 22607db96d56Sopenharmony_ci self.assertEqual(repr(h), 22617db96d56Sopenharmony_ci '<Handle noop(1, 2) at %s:%s created at %s:%s>' 22627db96d56Sopenharmony_ci % (filename, lineno, create_filename, create_lineno)) 22637db96d56Sopenharmony_ci 22647db96d56Sopenharmony_ci # cancelled handle 22657db96d56Sopenharmony_ci h.cancel() 22667db96d56Sopenharmony_ci self.assertEqual( 22677db96d56Sopenharmony_ci repr(h), 22687db96d56Sopenharmony_ci '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 22697db96d56Sopenharmony_ci % (filename, lineno, create_filename, create_lineno)) 22707db96d56Sopenharmony_ci 22717db96d56Sopenharmony_ci # double cancellation won't overwrite _repr 22727db96d56Sopenharmony_ci h.cancel() 22737db96d56Sopenharmony_ci self.assertEqual( 22747db96d56Sopenharmony_ci repr(h), 22757db96d56Sopenharmony_ci '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>' 22767db96d56Sopenharmony_ci % (filename, lineno, create_filename, create_lineno)) 22777db96d56Sopenharmony_ci 22787db96d56Sopenharmony_ci def test_handle_source_traceback(self): 22797db96d56Sopenharmony_ci loop = asyncio.get_event_loop_policy().new_event_loop() 22807db96d56Sopenharmony_ci loop.set_debug(True) 22817db96d56Sopenharmony_ci self.set_event_loop(loop) 22827db96d56Sopenharmony_ci 22837db96d56Sopenharmony_ci def check_source_traceback(h): 22847db96d56Sopenharmony_ci lineno = sys._getframe(1).f_lineno - 1 22857db96d56Sopenharmony_ci self.assertIsInstance(h._source_traceback, list) 22867db96d56Sopenharmony_ci self.assertEqual(h._source_traceback[-1][:3], 22877db96d56Sopenharmony_ci (__file__, 22887db96d56Sopenharmony_ci lineno, 22897db96d56Sopenharmony_ci 'test_handle_source_traceback')) 22907db96d56Sopenharmony_ci 22917db96d56Sopenharmony_ci # call_soon 22927db96d56Sopenharmony_ci h = loop.call_soon(noop) 22937db96d56Sopenharmony_ci check_source_traceback(h) 22947db96d56Sopenharmony_ci 22957db96d56Sopenharmony_ci # call_soon_threadsafe 22967db96d56Sopenharmony_ci h = loop.call_soon_threadsafe(noop) 22977db96d56Sopenharmony_ci check_source_traceback(h) 22987db96d56Sopenharmony_ci 22997db96d56Sopenharmony_ci # call_later 23007db96d56Sopenharmony_ci h = loop.call_later(0, noop) 23017db96d56Sopenharmony_ci check_source_traceback(h) 23027db96d56Sopenharmony_ci 23037db96d56Sopenharmony_ci # call_at 23047db96d56Sopenharmony_ci h = loop.call_later(0, noop) 23057db96d56Sopenharmony_ci check_source_traceback(h) 23067db96d56Sopenharmony_ci 23077db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'), 23087db96d56Sopenharmony_ci 'No collections.abc.Coroutine') 23097db96d56Sopenharmony_ci def test_coroutine_like_object_debug_formatting(self): 23107db96d56Sopenharmony_ci # Test that asyncio can format coroutines that are instances of 23117db96d56Sopenharmony_ci # collections.abc.Coroutine, but lack cr_core or gi_code attributes 23127db96d56Sopenharmony_ci # (such as ones compiled with Cython). 23137db96d56Sopenharmony_ci 23147db96d56Sopenharmony_ci coro = CoroLike() 23157db96d56Sopenharmony_ci coro.__name__ = 'AAA' 23167db96d56Sopenharmony_ci self.assertTrue(asyncio.iscoroutine(coro)) 23177db96d56Sopenharmony_ci self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 23187db96d56Sopenharmony_ci 23197db96d56Sopenharmony_ci coro.__qualname__ = 'BBB' 23207db96d56Sopenharmony_ci self.assertEqual(coroutines._format_coroutine(coro), 'BBB()') 23217db96d56Sopenharmony_ci 23227db96d56Sopenharmony_ci coro.cr_running = True 23237db96d56Sopenharmony_ci self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running') 23247db96d56Sopenharmony_ci 23257db96d56Sopenharmony_ci coro.__name__ = coro.__qualname__ = None 23267db96d56Sopenharmony_ci self.assertEqual(coroutines._format_coroutine(coro), 23277db96d56Sopenharmony_ci '<CoroLike without __name__>() running') 23287db96d56Sopenharmony_ci 23297db96d56Sopenharmony_ci coro = CoroLike() 23307db96d56Sopenharmony_ci coro.__qualname__ = 'CoroLike' 23317db96d56Sopenharmony_ci # Some coroutines might not have '__name__', such as 23327db96d56Sopenharmony_ci # built-in async_gen.asend(). 23337db96d56Sopenharmony_ci self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()') 23347db96d56Sopenharmony_ci 23357db96d56Sopenharmony_ci coro = CoroLike() 23367db96d56Sopenharmony_ci coro.__qualname__ = 'AAA' 23377db96d56Sopenharmony_ci coro.cr_code = None 23387db96d56Sopenharmony_ci self.assertEqual(coroutines._format_coroutine(coro), 'AAA()') 23397db96d56Sopenharmony_ci 23407db96d56Sopenharmony_ci 23417db96d56Sopenharmony_ciclass TimerTests(unittest.TestCase): 23427db96d56Sopenharmony_ci 23437db96d56Sopenharmony_ci def setUp(self): 23447db96d56Sopenharmony_ci super().setUp() 23457db96d56Sopenharmony_ci self.loop = mock.Mock() 23467db96d56Sopenharmony_ci 23477db96d56Sopenharmony_ci def test_hash(self): 23487db96d56Sopenharmony_ci when = time.monotonic() 23497db96d56Sopenharmony_ci h = asyncio.TimerHandle(when, lambda: False, (), 23507db96d56Sopenharmony_ci mock.Mock()) 23517db96d56Sopenharmony_ci self.assertEqual(hash(h), hash(when)) 23527db96d56Sopenharmony_ci 23537db96d56Sopenharmony_ci def test_when(self): 23547db96d56Sopenharmony_ci when = time.monotonic() 23557db96d56Sopenharmony_ci h = asyncio.TimerHandle(when, lambda: False, (), 23567db96d56Sopenharmony_ci mock.Mock()) 23577db96d56Sopenharmony_ci self.assertEqual(when, h.when()) 23587db96d56Sopenharmony_ci 23597db96d56Sopenharmony_ci def test_timer(self): 23607db96d56Sopenharmony_ci def callback(*args): 23617db96d56Sopenharmony_ci return args 23627db96d56Sopenharmony_ci 23637db96d56Sopenharmony_ci args = (1, 2, 3) 23647db96d56Sopenharmony_ci when = time.monotonic() 23657db96d56Sopenharmony_ci h = asyncio.TimerHandle(when, callback, args, mock.Mock()) 23667db96d56Sopenharmony_ci self.assertIs(h._callback, callback) 23677db96d56Sopenharmony_ci self.assertIs(h._args, args) 23687db96d56Sopenharmony_ci self.assertFalse(h.cancelled()) 23697db96d56Sopenharmony_ci 23707db96d56Sopenharmony_ci # cancel 23717db96d56Sopenharmony_ci h.cancel() 23727db96d56Sopenharmony_ci self.assertTrue(h.cancelled()) 23737db96d56Sopenharmony_ci self.assertIsNone(h._callback) 23747db96d56Sopenharmony_ci self.assertIsNone(h._args) 23757db96d56Sopenharmony_ci 23767db96d56Sopenharmony_ci 23777db96d56Sopenharmony_ci def test_timer_repr(self): 23787db96d56Sopenharmony_ci self.loop.get_debug.return_value = False 23797db96d56Sopenharmony_ci 23807db96d56Sopenharmony_ci # simple function 23817db96d56Sopenharmony_ci h = asyncio.TimerHandle(123, noop, (), self.loop) 23827db96d56Sopenharmony_ci src = test_utils.get_function_source(noop) 23837db96d56Sopenharmony_ci self.assertEqual(repr(h), 23847db96d56Sopenharmony_ci '<TimerHandle when=123 noop() at %s:%s>' % src) 23857db96d56Sopenharmony_ci 23867db96d56Sopenharmony_ci # cancelled handle 23877db96d56Sopenharmony_ci h.cancel() 23887db96d56Sopenharmony_ci self.assertEqual(repr(h), 23897db96d56Sopenharmony_ci '<TimerHandle cancelled when=123>') 23907db96d56Sopenharmony_ci 23917db96d56Sopenharmony_ci def test_timer_repr_debug(self): 23927db96d56Sopenharmony_ci self.loop.get_debug.return_value = True 23937db96d56Sopenharmony_ci 23947db96d56Sopenharmony_ci # simple function 23957db96d56Sopenharmony_ci create_filename = __file__ 23967db96d56Sopenharmony_ci create_lineno = sys._getframe().f_lineno + 1 23977db96d56Sopenharmony_ci h = asyncio.TimerHandle(123, noop, (), self.loop) 23987db96d56Sopenharmony_ci filename, lineno = test_utils.get_function_source(noop) 23997db96d56Sopenharmony_ci self.assertEqual(repr(h), 24007db96d56Sopenharmony_ci '<TimerHandle when=123 noop() ' 24017db96d56Sopenharmony_ci 'at %s:%s created at %s:%s>' 24027db96d56Sopenharmony_ci % (filename, lineno, create_filename, create_lineno)) 24037db96d56Sopenharmony_ci 24047db96d56Sopenharmony_ci # cancelled handle 24057db96d56Sopenharmony_ci h.cancel() 24067db96d56Sopenharmony_ci self.assertEqual(repr(h), 24077db96d56Sopenharmony_ci '<TimerHandle cancelled when=123 noop() ' 24087db96d56Sopenharmony_ci 'at %s:%s created at %s:%s>' 24097db96d56Sopenharmony_ci % (filename, lineno, create_filename, create_lineno)) 24107db96d56Sopenharmony_ci 24117db96d56Sopenharmony_ci 24127db96d56Sopenharmony_ci def test_timer_comparison(self): 24137db96d56Sopenharmony_ci def callback(*args): 24147db96d56Sopenharmony_ci return args 24157db96d56Sopenharmony_ci 24167db96d56Sopenharmony_ci when = time.monotonic() 24177db96d56Sopenharmony_ci 24187db96d56Sopenharmony_ci h1 = asyncio.TimerHandle(when, callback, (), self.loop) 24197db96d56Sopenharmony_ci h2 = asyncio.TimerHandle(when, callback, (), self.loop) 24207db96d56Sopenharmony_ci # TODO: Use assertLess etc. 24217db96d56Sopenharmony_ci self.assertFalse(h1 < h2) 24227db96d56Sopenharmony_ci self.assertFalse(h2 < h1) 24237db96d56Sopenharmony_ci self.assertTrue(h1 <= h2) 24247db96d56Sopenharmony_ci self.assertTrue(h2 <= h1) 24257db96d56Sopenharmony_ci self.assertFalse(h1 > h2) 24267db96d56Sopenharmony_ci self.assertFalse(h2 > h1) 24277db96d56Sopenharmony_ci self.assertTrue(h1 >= h2) 24287db96d56Sopenharmony_ci self.assertTrue(h2 >= h1) 24297db96d56Sopenharmony_ci self.assertTrue(h1 == h2) 24307db96d56Sopenharmony_ci self.assertFalse(h1 != h2) 24317db96d56Sopenharmony_ci 24327db96d56Sopenharmony_ci h2.cancel() 24337db96d56Sopenharmony_ci self.assertFalse(h1 == h2) 24347db96d56Sopenharmony_ci 24357db96d56Sopenharmony_ci h1 = asyncio.TimerHandle(when, callback, (), self.loop) 24367db96d56Sopenharmony_ci h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop) 24377db96d56Sopenharmony_ci self.assertTrue(h1 < h2) 24387db96d56Sopenharmony_ci self.assertFalse(h2 < h1) 24397db96d56Sopenharmony_ci self.assertTrue(h1 <= h2) 24407db96d56Sopenharmony_ci self.assertFalse(h2 <= h1) 24417db96d56Sopenharmony_ci self.assertFalse(h1 > h2) 24427db96d56Sopenharmony_ci self.assertTrue(h2 > h1) 24437db96d56Sopenharmony_ci self.assertFalse(h1 >= h2) 24447db96d56Sopenharmony_ci self.assertTrue(h2 >= h1) 24457db96d56Sopenharmony_ci self.assertFalse(h1 == h2) 24467db96d56Sopenharmony_ci self.assertTrue(h1 != h2) 24477db96d56Sopenharmony_ci 24487db96d56Sopenharmony_ci h3 = asyncio.Handle(callback, (), self.loop) 24497db96d56Sopenharmony_ci self.assertIs(NotImplemented, h1.__eq__(h3)) 24507db96d56Sopenharmony_ci self.assertIs(NotImplemented, h1.__ne__(h3)) 24517db96d56Sopenharmony_ci 24527db96d56Sopenharmony_ci with self.assertRaises(TypeError): 24537db96d56Sopenharmony_ci h1 < () 24547db96d56Sopenharmony_ci with self.assertRaises(TypeError): 24557db96d56Sopenharmony_ci h1 > () 24567db96d56Sopenharmony_ci with self.assertRaises(TypeError): 24577db96d56Sopenharmony_ci h1 <= () 24587db96d56Sopenharmony_ci with self.assertRaises(TypeError): 24597db96d56Sopenharmony_ci h1 >= () 24607db96d56Sopenharmony_ci self.assertFalse(h1 == ()) 24617db96d56Sopenharmony_ci self.assertTrue(h1 != ()) 24627db96d56Sopenharmony_ci 24637db96d56Sopenharmony_ci self.assertTrue(h1 == ALWAYS_EQ) 24647db96d56Sopenharmony_ci self.assertFalse(h1 != ALWAYS_EQ) 24657db96d56Sopenharmony_ci self.assertTrue(h1 < LARGEST) 24667db96d56Sopenharmony_ci self.assertFalse(h1 > LARGEST) 24677db96d56Sopenharmony_ci self.assertTrue(h1 <= LARGEST) 24687db96d56Sopenharmony_ci self.assertFalse(h1 >= LARGEST) 24697db96d56Sopenharmony_ci self.assertFalse(h1 < SMALLEST) 24707db96d56Sopenharmony_ci self.assertTrue(h1 > SMALLEST) 24717db96d56Sopenharmony_ci self.assertFalse(h1 <= SMALLEST) 24727db96d56Sopenharmony_ci self.assertTrue(h1 >= SMALLEST) 24737db96d56Sopenharmony_ci 24747db96d56Sopenharmony_ci 24757db96d56Sopenharmony_ciclass AbstractEventLoopTests(unittest.TestCase): 24767db96d56Sopenharmony_ci 24777db96d56Sopenharmony_ci def test_not_implemented(self): 24787db96d56Sopenharmony_ci f = mock.Mock() 24797db96d56Sopenharmony_ci loop = asyncio.AbstractEventLoop() 24807db96d56Sopenharmony_ci self.assertRaises( 24817db96d56Sopenharmony_ci NotImplementedError, loop.run_forever) 24827db96d56Sopenharmony_ci self.assertRaises( 24837db96d56Sopenharmony_ci NotImplementedError, loop.run_until_complete, None) 24847db96d56Sopenharmony_ci self.assertRaises( 24857db96d56Sopenharmony_ci NotImplementedError, loop.stop) 24867db96d56Sopenharmony_ci self.assertRaises( 24877db96d56Sopenharmony_ci NotImplementedError, loop.is_running) 24887db96d56Sopenharmony_ci self.assertRaises( 24897db96d56Sopenharmony_ci NotImplementedError, loop.is_closed) 24907db96d56Sopenharmony_ci self.assertRaises( 24917db96d56Sopenharmony_ci NotImplementedError, loop.close) 24927db96d56Sopenharmony_ci self.assertRaises( 24937db96d56Sopenharmony_ci NotImplementedError, loop.create_task, None) 24947db96d56Sopenharmony_ci self.assertRaises( 24957db96d56Sopenharmony_ci NotImplementedError, loop.call_later, None, None) 24967db96d56Sopenharmony_ci self.assertRaises( 24977db96d56Sopenharmony_ci NotImplementedError, loop.call_at, f, f) 24987db96d56Sopenharmony_ci self.assertRaises( 24997db96d56Sopenharmony_ci NotImplementedError, loop.call_soon, None) 25007db96d56Sopenharmony_ci self.assertRaises( 25017db96d56Sopenharmony_ci NotImplementedError, loop.time) 25027db96d56Sopenharmony_ci self.assertRaises( 25037db96d56Sopenharmony_ci NotImplementedError, loop.call_soon_threadsafe, None) 25047db96d56Sopenharmony_ci self.assertRaises( 25057db96d56Sopenharmony_ci NotImplementedError, loop.set_default_executor, f) 25067db96d56Sopenharmony_ci self.assertRaises( 25077db96d56Sopenharmony_ci NotImplementedError, loop.add_reader, 1, f) 25087db96d56Sopenharmony_ci self.assertRaises( 25097db96d56Sopenharmony_ci NotImplementedError, loop.remove_reader, 1) 25107db96d56Sopenharmony_ci self.assertRaises( 25117db96d56Sopenharmony_ci NotImplementedError, loop.add_writer, 1, f) 25127db96d56Sopenharmony_ci self.assertRaises( 25137db96d56Sopenharmony_ci NotImplementedError, loop.remove_writer, 1) 25147db96d56Sopenharmony_ci self.assertRaises( 25157db96d56Sopenharmony_ci NotImplementedError, loop.add_signal_handler, 1, f) 25167db96d56Sopenharmony_ci self.assertRaises( 25177db96d56Sopenharmony_ci NotImplementedError, loop.remove_signal_handler, 1) 25187db96d56Sopenharmony_ci self.assertRaises( 25197db96d56Sopenharmony_ci NotImplementedError, loop.remove_signal_handler, 1) 25207db96d56Sopenharmony_ci self.assertRaises( 25217db96d56Sopenharmony_ci NotImplementedError, loop.set_exception_handler, f) 25227db96d56Sopenharmony_ci self.assertRaises( 25237db96d56Sopenharmony_ci NotImplementedError, loop.default_exception_handler, f) 25247db96d56Sopenharmony_ci self.assertRaises( 25257db96d56Sopenharmony_ci NotImplementedError, loop.call_exception_handler, f) 25267db96d56Sopenharmony_ci self.assertRaises( 25277db96d56Sopenharmony_ci NotImplementedError, loop.get_debug) 25287db96d56Sopenharmony_ci self.assertRaises( 25297db96d56Sopenharmony_ci NotImplementedError, loop.set_debug, f) 25307db96d56Sopenharmony_ci 25317db96d56Sopenharmony_ci def test_not_implemented_async(self): 25327db96d56Sopenharmony_ci 25337db96d56Sopenharmony_ci async def inner(): 25347db96d56Sopenharmony_ci f = mock.Mock() 25357db96d56Sopenharmony_ci loop = asyncio.AbstractEventLoop() 25367db96d56Sopenharmony_ci 25377db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25387db96d56Sopenharmony_ci await loop.run_in_executor(f, f) 25397db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25407db96d56Sopenharmony_ci await loop.getaddrinfo('localhost', 8080) 25417db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25427db96d56Sopenharmony_ci await loop.getnameinfo(('localhost', 8080)) 25437db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25447db96d56Sopenharmony_ci await loop.create_connection(f) 25457db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25467db96d56Sopenharmony_ci await loop.create_server(f) 25477db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25487db96d56Sopenharmony_ci await loop.create_datagram_endpoint(f) 25497db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25507db96d56Sopenharmony_ci await loop.sock_recv(f, 10) 25517db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25527db96d56Sopenharmony_ci await loop.sock_recv_into(f, 10) 25537db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25547db96d56Sopenharmony_ci await loop.sock_sendall(f, 10) 25557db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25567db96d56Sopenharmony_ci await loop.sock_connect(f, f) 25577db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25587db96d56Sopenharmony_ci await loop.sock_accept(f) 25597db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25607db96d56Sopenharmony_ci await loop.sock_sendfile(f, f) 25617db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25627db96d56Sopenharmony_ci await loop.sendfile(f, f) 25637db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25647db96d56Sopenharmony_ci await loop.connect_read_pipe(f, mock.sentinel.pipe) 25657db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25667db96d56Sopenharmony_ci await loop.connect_write_pipe(f, mock.sentinel.pipe) 25677db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25687db96d56Sopenharmony_ci await loop.subprocess_shell(f, mock.sentinel) 25697db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 25707db96d56Sopenharmony_ci await loop.subprocess_exec(f) 25717db96d56Sopenharmony_ci 25727db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 25737db96d56Sopenharmony_ci loop.run_until_complete(inner()) 25747db96d56Sopenharmony_ci loop.close() 25757db96d56Sopenharmony_ci 25767db96d56Sopenharmony_ci 25777db96d56Sopenharmony_ciclass PolicyTests(unittest.TestCase): 25787db96d56Sopenharmony_ci 25797db96d56Sopenharmony_ci def test_event_loop_policy(self): 25807db96d56Sopenharmony_ci policy = asyncio.AbstractEventLoopPolicy() 25817db96d56Sopenharmony_ci self.assertRaises(NotImplementedError, policy.get_event_loop) 25827db96d56Sopenharmony_ci self.assertRaises(NotImplementedError, policy.set_event_loop, object()) 25837db96d56Sopenharmony_ci self.assertRaises(NotImplementedError, policy.new_event_loop) 25847db96d56Sopenharmony_ci self.assertRaises(NotImplementedError, policy.get_child_watcher) 25857db96d56Sopenharmony_ci self.assertRaises(NotImplementedError, policy.set_child_watcher, 25867db96d56Sopenharmony_ci object()) 25877db96d56Sopenharmony_ci 25887db96d56Sopenharmony_ci def test_get_event_loop(self): 25897db96d56Sopenharmony_ci policy = asyncio.DefaultEventLoopPolicy() 25907db96d56Sopenharmony_ci self.assertIsNone(policy._local._loop) 25917db96d56Sopenharmony_ci loop = policy.get_event_loop() 25927db96d56Sopenharmony_ci self.assertIsInstance(loop, asyncio.AbstractEventLoop) 25937db96d56Sopenharmony_ci 25947db96d56Sopenharmony_ci self.assertIs(policy._local._loop, loop) 25957db96d56Sopenharmony_ci self.assertIs(loop, policy.get_event_loop()) 25967db96d56Sopenharmony_ci loop.close() 25977db96d56Sopenharmony_ci 25987db96d56Sopenharmony_ci def test_get_event_loop_calls_set_event_loop(self): 25997db96d56Sopenharmony_ci policy = asyncio.DefaultEventLoopPolicy() 26007db96d56Sopenharmony_ci 26017db96d56Sopenharmony_ci with mock.patch.object( 26027db96d56Sopenharmony_ci policy, "set_event_loop", 26037db96d56Sopenharmony_ci wraps=policy.set_event_loop) as m_set_event_loop: 26047db96d56Sopenharmony_ci 26057db96d56Sopenharmony_ci loop = policy.get_event_loop() 26067db96d56Sopenharmony_ci self.addCleanup(loop.close) 26077db96d56Sopenharmony_ci 26087db96d56Sopenharmony_ci # policy._local._loop must be set through .set_event_loop() 26097db96d56Sopenharmony_ci # (the unix DefaultEventLoopPolicy needs this call to attach 26107db96d56Sopenharmony_ci # the child watcher correctly) 26117db96d56Sopenharmony_ci m_set_event_loop.assert_called_with(loop) 26127db96d56Sopenharmony_ci 26137db96d56Sopenharmony_ci loop.close() 26147db96d56Sopenharmony_ci 26157db96d56Sopenharmony_ci def test_get_event_loop_after_set_none(self): 26167db96d56Sopenharmony_ci policy = asyncio.DefaultEventLoopPolicy() 26177db96d56Sopenharmony_ci policy.set_event_loop(None) 26187db96d56Sopenharmony_ci self.assertRaises(RuntimeError, policy.get_event_loop) 26197db96d56Sopenharmony_ci 26207db96d56Sopenharmony_ci @mock.patch('asyncio.events.threading.current_thread') 26217db96d56Sopenharmony_ci def test_get_event_loop_thread(self, m_current_thread): 26227db96d56Sopenharmony_ci 26237db96d56Sopenharmony_ci def f(): 26247db96d56Sopenharmony_ci policy = asyncio.DefaultEventLoopPolicy() 26257db96d56Sopenharmony_ci self.assertRaises(RuntimeError, policy.get_event_loop) 26267db96d56Sopenharmony_ci 26277db96d56Sopenharmony_ci th = threading.Thread(target=f) 26287db96d56Sopenharmony_ci th.start() 26297db96d56Sopenharmony_ci th.join() 26307db96d56Sopenharmony_ci 26317db96d56Sopenharmony_ci def test_new_event_loop(self): 26327db96d56Sopenharmony_ci policy = asyncio.DefaultEventLoopPolicy() 26337db96d56Sopenharmony_ci 26347db96d56Sopenharmony_ci loop = policy.new_event_loop() 26357db96d56Sopenharmony_ci self.assertIsInstance(loop, asyncio.AbstractEventLoop) 26367db96d56Sopenharmony_ci loop.close() 26377db96d56Sopenharmony_ci 26387db96d56Sopenharmony_ci def test_set_event_loop(self): 26397db96d56Sopenharmony_ci policy = asyncio.DefaultEventLoopPolicy() 26407db96d56Sopenharmony_ci old_loop = policy.new_event_loop() 26417db96d56Sopenharmony_ci policy.set_event_loop(old_loop) 26427db96d56Sopenharmony_ci 26437db96d56Sopenharmony_ci self.assertRaises(TypeError, policy.set_event_loop, object()) 26447db96d56Sopenharmony_ci 26457db96d56Sopenharmony_ci loop = policy.new_event_loop() 26467db96d56Sopenharmony_ci policy.set_event_loop(loop) 26477db96d56Sopenharmony_ci self.assertIs(loop, policy.get_event_loop()) 26487db96d56Sopenharmony_ci self.assertIsNot(old_loop, policy.get_event_loop()) 26497db96d56Sopenharmony_ci loop.close() 26507db96d56Sopenharmony_ci old_loop.close() 26517db96d56Sopenharmony_ci 26527db96d56Sopenharmony_ci def test_get_event_loop_policy(self): 26537db96d56Sopenharmony_ci policy = asyncio.get_event_loop_policy() 26547db96d56Sopenharmony_ci self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy) 26557db96d56Sopenharmony_ci self.assertIs(policy, asyncio.get_event_loop_policy()) 26567db96d56Sopenharmony_ci 26577db96d56Sopenharmony_ci def test_set_event_loop_policy(self): 26587db96d56Sopenharmony_ci self.assertRaises( 26597db96d56Sopenharmony_ci TypeError, asyncio.set_event_loop_policy, object()) 26607db96d56Sopenharmony_ci 26617db96d56Sopenharmony_ci old_policy = asyncio.get_event_loop_policy() 26627db96d56Sopenharmony_ci 26637db96d56Sopenharmony_ci policy = asyncio.DefaultEventLoopPolicy() 26647db96d56Sopenharmony_ci asyncio.set_event_loop_policy(policy) 26657db96d56Sopenharmony_ci self.assertIs(policy, asyncio.get_event_loop_policy()) 26667db96d56Sopenharmony_ci self.assertIsNot(policy, old_policy) 26677db96d56Sopenharmony_ci 26687db96d56Sopenharmony_ci 26697db96d56Sopenharmony_ciclass GetEventLoopTestsMixin: 26707db96d56Sopenharmony_ci 26717db96d56Sopenharmony_ci _get_running_loop_impl = None 26727db96d56Sopenharmony_ci _set_running_loop_impl = None 26737db96d56Sopenharmony_ci get_running_loop_impl = None 26747db96d56Sopenharmony_ci get_event_loop_impl = None 26757db96d56Sopenharmony_ci 26767db96d56Sopenharmony_ci def setUp(self): 26777db96d56Sopenharmony_ci self._get_running_loop_saved = events._get_running_loop 26787db96d56Sopenharmony_ci self._set_running_loop_saved = events._set_running_loop 26797db96d56Sopenharmony_ci self.get_running_loop_saved = events.get_running_loop 26807db96d56Sopenharmony_ci self.get_event_loop_saved = events.get_event_loop 26817db96d56Sopenharmony_ci 26827db96d56Sopenharmony_ci events._get_running_loop = type(self)._get_running_loop_impl 26837db96d56Sopenharmony_ci events._set_running_loop = type(self)._set_running_loop_impl 26847db96d56Sopenharmony_ci events.get_running_loop = type(self).get_running_loop_impl 26857db96d56Sopenharmony_ci events.get_event_loop = type(self).get_event_loop_impl 26867db96d56Sopenharmony_ci 26877db96d56Sopenharmony_ci asyncio._get_running_loop = type(self)._get_running_loop_impl 26887db96d56Sopenharmony_ci asyncio._set_running_loop = type(self)._set_running_loop_impl 26897db96d56Sopenharmony_ci asyncio.get_running_loop = type(self).get_running_loop_impl 26907db96d56Sopenharmony_ci asyncio.get_event_loop = type(self).get_event_loop_impl 26917db96d56Sopenharmony_ci 26927db96d56Sopenharmony_ci super().setUp() 26937db96d56Sopenharmony_ci 26947db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 26957db96d56Sopenharmony_ci asyncio.set_event_loop(self.loop) 26967db96d56Sopenharmony_ci 26977db96d56Sopenharmony_ci if sys.platform != 'win32': 26987db96d56Sopenharmony_ci watcher = asyncio.SafeChildWatcher() 26997db96d56Sopenharmony_ci watcher.attach_loop(self.loop) 27007db96d56Sopenharmony_ci asyncio.set_child_watcher(watcher) 27017db96d56Sopenharmony_ci 27027db96d56Sopenharmony_ci def tearDown(self): 27037db96d56Sopenharmony_ci try: 27047db96d56Sopenharmony_ci if sys.platform != 'win32': 27057db96d56Sopenharmony_ci asyncio.set_child_watcher(None) 27067db96d56Sopenharmony_ci 27077db96d56Sopenharmony_ci super().tearDown() 27087db96d56Sopenharmony_ci finally: 27097db96d56Sopenharmony_ci self.loop.close() 27107db96d56Sopenharmony_ci asyncio.set_event_loop(None) 27117db96d56Sopenharmony_ci 27127db96d56Sopenharmony_ci events._get_running_loop = self._get_running_loop_saved 27137db96d56Sopenharmony_ci events._set_running_loop = self._set_running_loop_saved 27147db96d56Sopenharmony_ci events.get_running_loop = self.get_running_loop_saved 27157db96d56Sopenharmony_ci events.get_event_loop = self.get_event_loop_saved 27167db96d56Sopenharmony_ci 27177db96d56Sopenharmony_ci asyncio._get_running_loop = self._get_running_loop_saved 27187db96d56Sopenharmony_ci asyncio._set_running_loop = self._set_running_loop_saved 27197db96d56Sopenharmony_ci asyncio.get_running_loop = self.get_running_loop_saved 27207db96d56Sopenharmony_ci asyncio.get_event_loop = self.get_event_loop_saved 27217db96d56Sopenharmony_ci 27227db96d56Sopenharmony_ci if sys.platform != 'win32': 27237db96d56Sopenharmony_ci 27247db96d56Sopenharmony_ci def test_get_event_loop_new_process(self): 27257db96d56Sopenharmony_ci # bpo-32126: The multiprocessing module used by 27267db96d56Sopenharmony_ci # ProcessPoolExecutor is not functional when the 27277db96d56Sopenharmony_ci # multiprocessing.synchronize module cannot be imported. 27287db96d56Sopenharmony_ci support.skip_if_broken_multiprocessing_synchronize() 27297db96d56Sopenharmony_ci 27307db96d56Sopenharmony_ci async def main(): 27317db96d56Sopenharmony_ci pool = concurrent.futures.ProcessPoolExecutor() 27327db96d56Sopenharmony_ci result = await self.loop.run_in_executor( 27337db96d56Sopenharmony_ci pool, _test_get_event_loop_new_process__sub_proc) 27347db96d56Sopenharmony_ci pool.shutdown() 27357db96d56Sopenharmony_ci return result 27367db96d56Sopenharmony_ci 27377db96d56Sopenharmony_ci self.assertEqual( 27387db96d56Sopenharmony_ci self.loop.run_until_complete(main()), 27397db96d56Sopenharmony_ci 'hello') 27407db96d56Sopenharmony_ci 27417db96d56Sopenharmony_ci def test_get_event_loop_returns_running_loop(self): 27427db96d56Sopenharmony_ci class TestError(Exception): 27437db96d56Sopenharmony_ci pass 27447db96d56Sopenharmony_ci 27457db96d56Sopenharmony_ci class Policy(asyncio.DefaultEventLoopPolicy): 27467db96d56Sopenharmony_ci def get_event_loop(self): 27477db96d56Sopenharmony_ci raise TestError 27487db96d56Sopenharmony_ci 27497db96d56Sopenharmony_ci old_policy = asyncio.get_event_loop_policy() 27507db96d56Sopenharmony_ci try: 27517db96d56Sopenharmony_ci asyncio.set_event_loop_policy(Policy()) 27527db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 27537db96d56Sopenharmony_ci 27547db96d56Sopenharmony_ci with self.assertRaises(TestError): 27557db96d56Sopenharmony_ci asyncio.get_event_loop() 27567db96d56Sopenharmony_ci asyncio.set_event_loop(None) 27577db96d56Sopenharmony_ci with self.assertRaises(TestError): 27587db96d56Sopenharmony_ci asyncio.get_event_loop() 27597db96d56Sopenharmony_ci 27607db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no running'): 27617db96d56Sopenharmony_ci asyncio.get_running_loop() 27627db96d56Sopenharmony_ci self.assertIs(asyncio._get_running_loop(), None) 27637db96d56Sopenharmony_ci 27647db96d56Sopenharmony_ci async def func(): 27657db96d56Sopenharmony_ci self.assertIs(asyncio.get_event_loop(), loop) 27667db96d56Sopenharmony_ci self.assertIs(asyncio.get_running_loop(), loop) 27677db96d56Sopenharmony_ci self.assertIs(asyncio._get_running_loop(), loop) 27687db96d56Sopenharmony_ci 27697db96d56Sopenharmony_ci loop.run_until_complete(func()) 27707db96d56Sopenharmony_ci 27717db96d56Sopenharmony_ci asyncio.set_event_loop(loop) 27727db96d56Sopenharmony_ci with self.assertRaises(TestError): 27737db96d56Sopenharmony_ci asyncio.get_event_loop() 27747db96d56Sopenharmony_ci asyncio.set_event_loop(None) 27757db96d56Sopenharmony_ci with self.assertRaises(TestError): 27767db96d56Sopenharmony_ci asyncio.get_event_loop() 27777db96d56Sopenharmony_ci 27787db96d56Sopenharmony_ci finally: 27797db96d56Sopenharmony_ci asyncio.set_event_loop_policy(old_policy) 27807db96d56Sopenharmony_ci if loop is not None: 27817db96d56Sopenharmony_ci loop.close() 27827db96d56Sopenharmony_ci 27837db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no running'): 27847db96d56Sopenharmony_ci asyncio.get_running_loop() 27857db96d56Sopenharmony_ci 27867db96d56Sopenharmony_ci self.assertIs(asyncio._get_running_loop(), None) 27877db96d56Sopenharmony_ci 27887db96d56Sopenharmony_ci def test_get_event_loop_returns_running_loop2(self): 27897db96d56Sopenharmony_ci old_policy = asyncio.get_event_loop_policy() 27907db96d56Sopenharmony_ci try: 27917db96d56Sopenharmony_ci asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy()) 27927db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 27937db96d56Sopenharmony_ci self.addCleanup(loop.close) 27947db96d56Sopenharmony_ci 27957db96d56Sopenharmony_ci loop2 = asyncio.get_event_loop() 27967db96d56Sopenharmony_ci self.addCleanup(loop2.close) 27977db96d56Sopenharmony_ci asyncio.set_event_loop(None) 27987db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no current'): 27997db96d56Sopenharmony_ci asyncio.get_event_loop() 28007db96d56Sopenharmony_ci 28017db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no running'): 28027db96d56Sopenharmony_ci asyncio.get_running_loop() 28037db96d56Sopenharmony_ci self.assertIs(asyncio._get_running_loop(), None) 28047db96d56Sopenharmony_ci 28057db96d56Sopenharmony_ci async def func(): 28067db96d56Sopenharmony_ci self.assertIs(asyncio.get_event_loop(), loop) 28077db96d56Sopenharmony_ci self.assertIs(asyncio.get_running_loop(), loop) 28087db96d56Sopenharmony_ci self.assertIs(asyncio._get_running_loop(), loop) 28097db96d56Sopenharmony_ci 28107db96d56Sopenharmony_ci loop.run_until_complete(func()) 28117db96d56Sopenharmony_ci 28127db96d56Sopenharmony_ci asyncio.set_event_loop(loop) 28137db96d56Sopenharmony_ci self.assertIs(asyncio.get_event_loop(), loop) 28147db96d56Sopenharmony_ci 28157db96d56Sopenharmony_ci asyncio.set_event_loop(None) 28167db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no current'): 28177db96d56Sopenharmony_ci asyncio.get_event_loop() 28187db96d56Sopenharmony_ci 28197db96d56Sopenharmony_ci finally: 28207db96d56Sopenharmony_ci asyncio.set_event_loop_policy(old_policy) 28217db96d56Sopenharmony_ci if loop is not None: 28227db96d56Sopenharmony_ci loop.close() 28237db96d56Sopenharmony_ci 28247db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 'no running'): 28257db96d56Sopenharmony_ci asyncio.get_running_loop() 28267db96d56Sopenharmony_ci 28277db96d56Sopenharmony_ci self.assertIs(asyncio._get_running_loop(), None) 28287db96d56Sopenharmony_ci 28297db96d56Sopenharmony_ci 28307db96d56Sopenharmony_ciclass TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 28317db96d56Sopenharmony_ci 28327db96d56Sopenharmony_ci _get_running_loop_impl = events._py__get_running_loop 28337db96d56Sopenharmony_ci _set_running_loop_impl = events._py__set_running_loop 28347db96d56Sopenharmony_ci get_running_loop_impl = events._py_get_running_loop 28357db96d56Sopenharmony_ci get_event_loop_impl = events._py_get_event_loop 28367db96d56Sopenharmony_ci 28377db96d56Sopenharmony_ci 28387db96d56Sopenharmony_citry: 28397db96d56Sopenharmony_ci import _asyncio # NoQA 28407db96d56Sopenharmony_ciexcept ImportError: 28417db96d56Sopenharmony_ci pass 28427db96d56Sopenharmony_cielse: 28437db96d56Sopenharmony_ci 28447db96d56Sopenharmony_ci class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase): 28457db96d56Sopenharmony_ci 28467db96d56Sopenharmony_ci _get_running_loop_impl = events._c__get_running_loop 28477db96d56Sopenharmony_ci _set_running_loop_impl = events._c__set_running_loop 28487db96d56Sopenharmony_ci get_running_loop_impl = events._c_get_running_loop 28497db96d56Sopenharmony_ci get_event_loop_impl = events._c_get_event_loop 28507db96d56Sopenharmony_ci 28517db96d56Sopenharmony_ci 28527db96d56Sopenharmony_ciclass TestServer(unittest.TestCase): 28537db96d56Sopenharmony_ci 28547db96d56Sopenharmony_ci def test_get_loop(self): 28557db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 28567db96d56Sopenharmony_ci self.addCleanup(loop.close) 28577db96d56Sopenharmony_ci proto = MyProto(loop) 28587db96d56Sopenharmony_ci server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0)) 28597db96d56Sopenharmony_ci self.assertEqual(server.get_loop(), loop) 28607db96d56Sopenharmony_ci server.close() 28617db96d56Sopenharmony_ci loop.run_until_complete(server.wait_closed()) 28627db96d56Sopenharmony_ci 28637db96d56Sopenharmony_ci 28647db96d56Sopenharmony_ciclass TestAbstractServer(unittest.TestCase): 28657db96d56Sopenharmony_ci 28667db96d56Sopenharmony_ci def test_close(self): 28677db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 28687db96d56Sopenharmony_ci events.AbstractServer().close() 28697db96d56Sopenharmony_ci 28707db96d56Sopenharmony_ci def test_wait_closed(self): 28717db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 28727db96d56Sopenharmony_ci self.addCleanup(loop.close) 28737db96d56Sopenharmony_ci 28747db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 28757db96d56Sopenharmony_ci loop.run_until_complete(events.AbstractServer().wait_closed()) 28767db96d56Sopenharmony_ci 28777db96d56Sopenharmony_ci def test_get_loop(self): 28787db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 28797db96d56Sopenharmony_ci events.AbstractServer().get_loop() 28807db96d56Sopenharmony_ci 28817db96d56Sopenharmony_ci 28827db96d56Sopenharmony_ciif __name__ == '__main__': 28837db96d56Sopenharmony_ci unittest.main() 2884