17db96d56Sopenharmony_ci"""Tests for events.py."""
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport collections.abc
47db96d56Sopenharmony_ciimport concurrent.futures
57db96d56Sopenharmony_ciimport functools
67db96d56Sopenharmony_ciimport io
77db96d56Sopenharmony_ciimport os
87db96d56Sopenharmony_ciimport platform
97db96d56Sopenharmony_ciimport re
107db96d56Sopenharmony_ciimport signal
117db96d56Sopenharmony_ciimport socket
127db96d56Sopenharmony_citry:
137db96d56Sopenharmony_ci    import ssl
147db96d56Sopenharmony_ciexcept ImportError:
157db96d56Sopenharmony_ci    ssl = None
167db96d56Sopenharmony_ciimport subprocess
177db96d56Sopenharmony_ciimport sys
187db96d56Sopenharmony_ciimport threading
197db96d56Sopenharmony_ciimport time
207db96d56Sopenharmony_ciimport types
217db96d56Sopenharmony_ciimport errno
227db96d56Sopenharmony_ciimport unittest
237db96d56Sopenharmony_cifrom unittest import mock
247db96d56Sopenharmony_ciimport weakref
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ciif sys.platform not in ('win32', 'vxworks'):
277db96d56Sopenharmony_ci    import tty
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ciimport asyncio
307db96d56Sopenharmony_cifrom asyncio import coroutines
317db96d56Sopenharmony_cifrom asyncio import events
327db96d56Sopenharmony_cifrom asyncio import proactor_events
337db96d56Sopenharmony_cifrom asyncio import selector_events
347db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils
357db96d56Sopenharmony_cifrom test import support
367db96d56Sopenharmony_cifrom test.support import socket_helper
377db96d56Sopenharmony_cifrom test.support import threading_helper
387db96d56Sopenharmony_cifrom test.support import ALWAYS_EQ, LARGEST, SMALLEST
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_ci
417db96d56Sopenharmony_cidef tearDownModule():
427db96d56Sopenharmony_ci    asyncio.set_event_loop_policy(None)
437db96d56Sopenharmony_ci
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_cidef broken_unix_getsockname():
467db96d56Sopenharmony_ci    """Return True if the platform is Mac OS 10.4 or older."""
477db96d56Sopenharmony_ci    if sys.platform.startswith("aix"):
487db96d56Sopenharmony_ci        return True
497db96d56Sopenharmony_ci    elif sys.platform != 'darwin':
507db96d56Sopenharmony_ci        return False
517db96d56Sopenharmony_ci    version = platform.mac_ver()[0]
527db96d56Sopenharmony_ci    version = tuple(map(int, version.split('.')))
537db96d56Sopenharmony_ci    return version < (10, 5)
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_cidef _test_get_event_loop_new_process__sub_proc():
577db96d56Sopenharmony_ci    async def doit():
587db96d56Sopenharmony_ci        return 'hello'
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci    loop = asyncio.new_event_loop()
617db96d56Sopenharmony_ci    asyncio.set_event_loop(loop)
627db96d56Sopenharmony_ci    return loop.run_until_complete(doit())
637db96d56Sopenharmony_ci
647db96d56Sopenharmony_ci
657db96d56Sopenharmony_ciclass CoroLike:
667db96d56Sopenharmony_ci    def send(self, v):
677db96d56Sopenharmony_ci        pass
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci    def throw(self, *exc):
707db96d56Sopenharmony_ci        pass
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci    def close(self):
737db96d56Sopenharmony_ci        pass
747db96d56Sopenharmony_ci
757db96d56Sopenharmony_ci    def __await__(self):
767db96d56Sopenharmony_ci        pass
777db96d56Sopenharmony_ci
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ciclass MyBaseProto(asyncio.Protocol):
807db96d56Sopenharmony_ci    connected = None
817db96d56Sopenharmony_ci    done = None
827db96d56Sopenharmony_ci
837db96d56Sopenharmony_ci    def __init__(self, loop=None):
847db96d56Sopenharmony_ci        self.transport = None
857db96d56Sopenharmony_ci        self.state = 'INITIAL'
867db96d56Sopenharmony_ci        self.nbytes = 0
877db96d56Sopenharmony_ci        if loop is not None:
887db96d56Sopenharmony_ci            self.connected = loop.create_future()
897db96d56Sopenharmony_ci            self.done = loop.create_future()
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci    def _assert_state(self, *expected):
927db96d56Sopenharmony_ci        if self.state not in expected:
937db96d56Sopenharmony_ci            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
947db96d56Sopenharmony_ci
957db96d56Sopenharmony_ci    def connection_made(self, transport):
967db96d56Sopenharmony_ci        self.transport = transport
977db96d56Sopenharmony_ci        self._assert_state('INITIAL')
987db96d56Sopenharmony_ci        self.state = 'CONNECTED'
997db96d56Sopenharmony_ci        if self.connected:
1007db96d56Sopenharmony_ci            self.connected.set_result(None)
1017db96d56Sopenharmony_ci
1027db96d56Sopenharmony_ci    def data_received(self, data):
1037db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
1047db96d56Sopenharmony_ci        self.nbytes += len(data)
1057db96d56Sopenharmony_ci
1067db96d56Sopenharmony_ci    def eof_received(self):
1077db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
1087db96d56Sopenharmony_ci        self.state = 'EOF'
1097db96d56Sopenharmony_ci
1107db96d56Sopenharmony_ci    def connection_lost(self, exc):
1117db96d56Sopenharmony_ci        self._assert_state('CONNECTED', 'EOF')
1127db96d56Sopenharmony_ci        self.state = 'CLOSED'
1137db96d56Sopenharmony_ci        if self.done:
1147db96d56Sopenharmony_ci            self.done.set_result(None)
1157db96d56Sopenharmony_ci
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ciclass MyProto(MyBaseProto):
1187db96d56Sopenharmony_ci    def connection_made(self, transport):
1197db96d56Sopenharmony_ci        super().connection_made(transport)
1207db96d56Sopenharmony_ci        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
1217db96d56Sopenharmony_ci
1227db96d56Sopenharmony_ci
1237db96d56Sopenharmony_ciclass MyDatagramProto(asyncio.DatagramProtocol):
1247db96d56Sopenharmony_ci    done = None
1257db96d56Sopenharmony_ci
1267db96d56Sopenharmony_ci    def __init__(self, loop=None):
1277db96d56Sopenharmony_ci        self.state = 'INITIAL'
1287db96d56Sopenharmony_ci        self.nbytes = 0
1297db96d56Sopenharmony_ci        if loop is not None:
1307db96d56Sopenharmony_ci            self.done = loop.create_future()
1317db96d56Sopenharmony_ci
1327db96d56Sopenharmony_ci    def _assert_state(self, expected):
1337db96d56Sopenharmony_ci        if self.state != expected:
1347db96d56Sopenharmony_ci            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1357db96d56Sopenharmony_ci
1367db96d56Sopenharmony_ci    def connection_made(self, transport):
1377db96d56Sopenharmony_ci        self.transport = transport
1387db96d56Sopenharmony_ci        self._assert_state('INITIAL')
1397db96d56Sopenharmony_ci        self.state = 'INITIALIZED'
1407db96d56Sopenharmony_ci
1417db96d56Sopenharmony_ci    def datagram_received(self, data, addr):
1427db96d56Sopenharmony_ci        self._assert_state('INITIALIZED')
1437db96d56Sopenharmony_ci        self.nbytes += len(data)
1447db96d56Sopenharmony_ci
1457db96d56Sopenharmony_ci    def error_received(self, exc):
1467db96d56Sopenharmony_ci        self._assert_state('INITIALIZED')
1477db96d56Sopenharmony_ci
1487db96d56Sopenharmony_ci    def connection_lost(self, exc):
1497db96d56Sopenharmony_ci        self._assert_state('INITIALIZED')
1507db96d56Sopenharmony_ci        self.state = 'CLOSED'
1517db96d56Sopenharmony_ci        if self.done:
1527db96d56Sopenharmony_ci            self.done.set_result(None)
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci
1557db96d56Sopenharmony_ciclass MyReadPipeProto(asyncio.Protocol):
1567db96d56Sopenharmony_ci    done = None
1577db96d56Sopenharmony_ci
1587db96d56Sopenharmony_ci    def __init__(self, loop=None):
1597db96d56Sopenharmony_ci        self.state = ['INITIAL']
1607db96d56Sopenharmony_ci        self.nbytes = 0
1617db96d56Sopenharmony_ci        self.transport = None
1627db96d56Sopenharmony_ci        if loop is not None:
1637db96d56Sopenharmony_ci            self.done = loop.create_future()
1647db96d56Sopenharmony_ci
1657db96d56Sopenharmony_ci    def _assert_state(self, expected):
1667db96d56Sopenharmony_ci        if self.state != expected:
1677db96d56Sopenharmony_ci            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ci    def connection_made(self, transport):
1707db96d56Sopenharmony_ci        self.transport = transport
1717db96d56Sopenharmony_ci        self._assert_state(['INITIAL'])
1727db96d56Sopenharmony_ci        self.state.append('CONNECTED')
1737db96d56Sopenharmony_ci
1747db96d56Sopenharmony_ci    def data_received(self, data):
1757db96d56Sopenharmony_ci        self._assert_state(['INITIAL', 'CONNECTED'])
1767db96d56Sopenharmony_ci        self.nbytes += len(data)
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci    def eof_received(self):
1797db96d56Sopenharmony_ci        self._assert_state(['INITIAL', 'CONNECTED'])
1807db96d56Sopenharmony_ci        self.state.append('EOF')
1817db96d56Sopenharmony_ci
1827db96d56Sopenharmony_ci    def connection_lost(self, exc):
1837db96d56Sopenharmony_ci        if 'EOF' not in self.state:
1847db96d56Sopenharmony_ci            self.state.append('EOF')  # It is okay if EOF is missed.
1857db96d56Sopenharmony_ci        self._assert_state(['INITIAL', 'CONNECTED', 'EOF'])
1867db96d56Sopenharmony_ci        self.state.append('CLOSED')
1877db96d56Sopenharmony_ci        if self.done:
1887db96d56Sopenharmony_ci            self.done.set_result(None)
1897db96d56Sopenharmony_ci
1907db96d56Sopenharmony_ci
1917db96d56Sopenharmony_ciclass MyWritePipeProto(asyncio.BaseProtocol):
1927db96d56Sopenharmony_ci    done = None
1937db96d56Sopenharmony_ci
1947db96d56Sopenharmony_ci    def __init__(self, loop=None):
1957db96d56Sopenharmony_ci        self.state = 'INITIAL'
1967db96d56Sopenharmony_ci        self.transport = None
1977db96d56Sopenharmony_ci        if loop is not None:
1987db96d56Sopenharmony_ci            self.done = loop.create_future()
1997db96d56Sopenharmony_ci
2007db96d56Sopenharmony_ci    def _assert_state(self, expected):
2017db96d56Sopenharmony_ci        if self.state != expected:
2027db96d56Sopenharmony_ci            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
2037db96d56Sopenharmony_ci
2047db96d56Sopenharmony_ci    def connection_made(self, transport):
2057db96d56Sopenharmony_ci        self.transport = transport
2067db96d56Sopenharmony_ci        self._assert_state('INITIAL')
2077db96d56Sopenharmony_ci        self.state = 'CONNECTED'
2087db96d56Sopenharmony_ci
2097db96d56Sopenharmony_ci    def connection_lost(self, exc):
2107db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
2117db96d56Sopenharmony_ci        self.state = 'CLOSED'
2127db96d56Sopenharmony_ci        if self.done:
2137db96d56Sopenharmony_ci            self.done.set_result(None)
2147db96d56Sopenharmony_ci
2157db96d56Sopenharmony_ci
2167db96d56Sopenharmony_ciclass MySubprocessProtocol(asyncio.SubprocessProtocol):
2177db96d56Sopenharmony_ci
2187db96d56Sopenharmony_ci    def __init__(self, loop):
2197db96d56Sopenharmony_ci        self.state = 'INITIAL'
2207db96d56Sopenharmony_ci        self.transport = None
2217db96d56Sopenharmony_ci        self.connected = loop.create_future()
2227db96d56Sopenharmony_ci        self.completed = loop.create_future()
2237db96d56Sopenharmony_ci        self.disconnects = {fd: loop.create_future() for fd in range(3)}
2247db96d56Sopenharmony_ci        self.data = {1: b'', 2: b''}
2257db96d56Sopenharmony_ci        self.returncode = None
2267db96d56Sopenharmony_ci        self.got_data = {1: asyncio.Event(),
2277db96d56Sopenharmony_ci                         2: asyncio.Event()}
2287db96d56Sopenharmony_ci
2297db96d56Sopenharmony_ci    def _assert_state(self, expected):
2307db96d56Sopenharmony_ci        if self.state != expected:
2317db96d56Sopenharmony_ci            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
2327db96d56Sopenharmony_ci
2337db96d56Sopenharmony_ci    def connection_made(self, transport):
2347db96d56Sopenharmony_ci        self.transport = transport
2357db96d56Sopenharmony_ci        self._assert_state('INITIAL')
2367db96d56Sopenharmony_ci        self.state = 'CONNECTED'
2377db96d56Sopenharmony_ci        self.connected.set_result(None)
2387db96d56Sopenharmony_ci
2397db96d56Sopenharmony_ci    def connection_lost(self, exc):
2407db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
2417db96d56Sopenharmony_ci        self.state = 'CLOSED'
2427db96d56Sopenharmony_ci        self.completed.set_result(None)
2437db96d56Sopenharmony_ci
2447db96d56Sopenharmony_ci    def pipe_data_received(self, fd, data):
2457db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
2467db96d56Sopenharmony_ci        self.data[fd] += data
2477db96d56Sopenharmony_ci        self.got_data[fd].set()
2487db96d56Sopenharmony_ci
2497db96d56Sopenharmony_ci    def pipe_connection_lost(self, fd, exc):
2507db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
2517db96d56Sopenharmony_ci        if exc:
2527db96d56Sopenharmony_ci            self.disconnects[fd].set_exception(exc)
2537db96d56Sopenharmony_ci        else:
2547db96d56Sopenharmony_ci            self.disconnects[fd].set_result(exc)
2557db96d56Sopenharmony_ci
2567db96d56Sopenharmony_ci    def process_exited(self):
2577db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
2587db96d56Sopenharmony_ci        self.returncode = self.transport.get_returncode()
2597db96d56Sopenharmony_ci
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ciclass EventLoopTestsMixin:
2627db96d56Sopenharmony_ci
2637db96d56Sopenharmony_ci    def setUp(self):
2647db96d56Sopenharmony_ci        super().setUp()
2657db96d56Sopenharmony_ci        self.loop = self.create_event_loop()
2667db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
2677db96d56Sopenharmony_ci
2687db96d56Sopenharmony_ci    def tearDown(self):
2697db96d56Sopenharmony_ci        # just in case if we have transport close callbacks
2707db96d56Sopenharmony_ci        if not self.loop.is_closed():
2717db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)
2727db96d56Sopenharmony_ci
2737db96d56Sopenharmony_ci        self.doCleanups()
2747db96d56Sopenharmony_ci        support.gc_collect()
2757db96d56Sopenharmony_ci        super().tearDown()
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci    def test_run_until_complete_nesting(self):
2787db96d56Sopenharmony_ci        async def coro1():
2797db96d56Sopenharmony_ci            await asyncio.sleep(0)
2807db96d56Sopenharmony_ci
2817db96d56Sopenharmony_ci        async def coro2():
2827db96d56Sopenharmony_ci            self.assertTrue(self.loop.is_running())
2837db96d56Sopenharmony_ci            self.loop.run_until_complete(coro1())
2847db96d56Sopenharmony_ci
2857db96d56Sopenharmony_ci        with self.assertWarnsRegex(
2867db96d56Sopenharmony_ci            RuntimeWarning,
2877db96d56Sopenharmony_ci            r"coroutine \S+ was never awaited"
2887db96d56Sopenharmony_ci        ):
2897db96d56Sopenharmony_ci            self.assertRaises(
2907db96d56Sopenharmony_ci                RuntimeError, self.loop.run_until_complete, coro2())
2917db96d56Sopenharmony_ci
2927db96d56Sopenharmony_ci    # Note: because of the default Windows timing granularity of
2937db96d56Sopenharmony_ci    # 15.6 msec, we use fairly long sleep times here (~100 msec).
2947db96d56Sopenharmony_ci
2957db96d56Sopenharmony_ci    def test_run_until_complete(self):
2967db96d56Sopenharmony_ci        t0 = self.loop.time()
2977db96d56Sopenharmony_ci        self.loop.run_until_complete(asyncio.sleep(0.1))
2987db96d56Sopenharmony_ci        t1 = self.loop.time()
2997db96d56Sopenharmony_ci        self.assertTrue(0.08 <= t1-t0 <= 0.8, t1-t0)
3007db96d56Sopenharmony_ci
3017db96d56Sopenharmony_ci    def test_run_until_complete_stopped(self):
3027db96d56Sopenharmony_ci
3037db96d56Sopenharmony_ci        async def cb():
3047db96d56Sopenharmony_ci            self.loop.stop()
3057db96d56Sopenharmony_ci            await asyncio.sleep(0.1)
3067db96d56Sopenharmony_ci        task = cb()
3077db96d56Sopenharmony_ci        self.assertRaises(RuntimeError,
3087db96d56Sopenharmony_ci                          self.loop.run_until_complete, task)
3097db96d56Sopenharmony_ci
3107db96d56Sopenharmony_ci    def test_call_later(self):
3117db96d56Sopenharmony_ci        results = []
3127db96d56Sopenharmony_ci
3137db96d56Sopenharmony_ci        def callback(arg):
3147db96d56Sopenharmony_ci            results.append(arg)
3157db96d56Sopenharmony_ci            self.loop.stop()
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ci        self.loop.call_later(0.1, callback, 'hello world')
3187db96d56Sopenharmony_ci        self.loop.run_forever()
3197db96d56Sopenharmony_ci        self.assertEqual(results, ['hello world'])
3207db96d56Sopenharmony_ci
3217db96d56Sopenharmony_ci    def test_call_soon(self):
3227db96d56Sopenharmony_ci        results = []
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ci        def callback(arg1, arg2):
3257db96d56Sopenharmony_ci            results.append((arg1, arg2))
3267db96d56Sopenharmony_ci            self.loop.stop()
3277db96d56Sopenharmony_ci
3287db96d56Sopenharmony_ci        self.loop.call_soon(callback, 'hello', 'world')
3297db96d56Sopenharmony_ci        self.loop.run_forever()
3307db96d56Sopenharmony_ci        self.assertEqual(results, [('hello', 'world')])
3317db96d56Sopenharmony_ci
3327db96d56Sopenharmony_ci    def test_call_soon_threadsafe(self):
3337db96d56Sopenharmony_ci        results = []
3347db96d56Sopenharmony_ci        lock = threading.Lock()
3357db96d56Sopenharmony_ci
3367db96d56Sopenharmony_ci        def callback(arg):
3377db96d56Sopenharmony_ci            results.append(arg)
3387db96d56Sopenharmony_ci            if len(results) >= 2:
3397db96d56Sopenharmony_ci                self.loop.stop()
3407db96d56Sopenharmony_ci
3417db96d56Sopenharmony_ci        def run_in_thread():
3427db96d56Sopenharmony_ci            self.loop.call_soon_threadsafe(callback, 'hello')
3437db96d56Sopenharmony_ci            lock.release()
3447db96d56Sopenharmony_ci
3457db96d56Sopenharmony_ci        lock.acquire()
3467db96d56Sopenharmony_ci        t = threading.Thread(target=run_in_thread)
3477db96d56Sopenharmony_ci        t.start()
3487db96d56Sopenharmony_ci
3497db96d56Sopenharmony_ci        with lock:
3507db96d56Sopenharmony_ci            self.loop.call_soon(callback, 'world')
3517db96d56Sopenharmony_ci            self.loop.run_forever()
3527db96d56Sopenharmony_ci        t.join()
3537db96d56Sopenharmony_ci        self.assertEqual(results, ['hello', 'world'])
3547db96d56Sopenharmony_ci
3557db96d56Sopenharmony_ci    def test_call_soon_threadsafe_same_thread(self):
3567db96d56Sopenharmony_ci        results = []
3577db96d56Sopenharmony_ci
3587db96d56Sopenharmony_ci        def callback(arg):
3597db96d56Sopenharmony_ci            results.append(arg)
3607db96d56Sopenharmony_ci            if len(results) >= 2:
3617db96d56Sopenharmony_ci                self.loop.stop()
3627db96d56Sopenharmony_ci
3637db96d56Sopenharmony_ci        self.loop.call_soon_threadsafe(callback, 'hello')
3647db96d56Sopenharmony_ci        self.loop.call_soon(callback, 'world')
3657db96d56Sopenharmony_ci        self.loop.run_forever()
3667db96d56Sopenharmony_ci        self.assertEqual(results, ['hello', 'world'])
3677db96d56Sopenharmony_ci
3687db96d56Sopenharmony_ci    def test_run_in_executor(self):
3697db96d56Sopenharmony_ci        def run(arg):
3707db96d56Sopenharmony_ci            return (arg, threading.get_ident())
3717db96d56Sopenharmony_ci        f2 = self.loop.run_in_executor(None, run, 'yo')
3727db96d56Sopenharmony_ci        res, thread_id = self.loop.run_until_complete(f2)
3737db96d56Sopenharmony_ci        self.assertEqual(res, 'yo')
3747db96d56Sopenharmony_ci        self.assertNotEqual(thread_id, threading.get_ident())
3757db96d56Sopenharmony_ci
3767db96d56Sopenharmony_ci    def test_run_in_executor_cancel(self):
3777db96d56Sopenharmony_ci        called = False
3787db96d56Sopenharmony_ci
3797db96d56Sopenharmony_ci        def patched_call_soon(*args):
3807db96d56Sopenharmony_ci            nonlocal called
3817db96d56Sopenharmony_ci            called = True
3827db96d56Sopenharmony_ci
3837db96d56Sopenharmony_ci        def run():
3847db96d56Sopenharmony_ci            time.sleep(0.05)
3857db96d56Sopenharmony_ci
3867db96d56Sopenharmony_ci        f2 = self.loop.run_in_executor(None, run)
3877db96d56Sopenharmony_ci        f2.cancel()
3887db96d56Sopenharmony_ci        self.loop.run_until_complete(
3897db96d56Sopenharmony_ci                self.loop.shutdown_default_executor())
3907db96d56Sopenharmony_ci        self.loop.close()
3917db96d56Sopenharmony_ci        self.loop.call_soon = patched_call_soon
3927db96d56Sopenharmony_ci        self.loop.call_soon_threadsafe = patched_call_soon
3937db96d56Sopenharmony_ci        time.sleep(0.4)
3947db96d56Sopenharmony_ci        self.assertFalse(called)
3957db96d56Sopenharmony_ci
3967db96d56Sopenharmony_ci    def test_reader_callback(self):
3977db96d56Sopenharmony_ci        r, w = socket.socketpair()
3987db96d56Sopenharmony_ci        r.setblocking(False)
3997db96d56Sopenharmony_ci        bytes_read = bytearray()
4007db96d56Sopenharmony_ci
4017db96d56Sopenharmony_ci        def reader():
4027db96d56Sopenharmony_ci            try:
4037db96d56Sopenharmony_ci                data = r.recv(1024)
4047db96d56Sopenharmony_ci            except BlockingIOError:
4057db96d56Sopenharmony_ci                # Spurious readiness notifications are possible
4067db96d56Sopenharmony_ci                # at least on Linux -- see man select.
4077db96d56Sopenharmony_ci                return
4087db96d56Sopenharmony_ci            if data:
4097db96d56Sopenharmony_ci                bytes_read.extend(data)
4107db96d56Sopenharmony_ci            else:
4117db96d56Sopenharmony_ci                self.assertTrue(self.loop.remove_reader(r.fileno()))
4127db96d56Sopenharmony_ci                r.close()
4137db96d56Sopenharmony_ci
4147db96d56Sopenharmony_ci        self.loop.add_reader(r.fileno(), reader)
4157db96d56Sopenharmony_ci        self.loop.call_soon(w.send, b'abc')
4167db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
4177db96d56Sopenharmony_ci        self.loop.call_soon(w.send, b'def')
4187db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
4197db96d56Sopenharmony_ci        self.loop.call_soon(w.close)
4207db96d56Sopenharmony_ci        self.loop.call_soon(self.loop.stop)
4217db96d56Sopenharmony_ci        self.loop.run_forever()
4227db96d56Sopenharmony_ci        self.assertEqual(bytes_read, b'abcdef')
4237db96d56Sopenharmony_ci
4247db96d56Sopenharmony_ci    def test_writer_callback(self):
4257db96d56Sopenharmony_ci        r, w = socket.socketpair()
4267db96d56Sopenharmony_ci        w.setblocking(False)
4277db96d56Sopenharmony_ci
4287db96d56Sopenharmony_ci        def writer(data):
4297db96d56Sopenharmony_ci            w.send(data)
4307db96d56Sopenharmony_ci            self.loop.stop()
4317db96d56Sopenharmony_ci
4327db96d56Sopenharmony_ci        data = b'x' * 1024
4337db96d56Sopenharmony_ci        self.loop.add_writer(w.fileno(), writer, data)
4347db96d56Sopenharmony_ci        self.loop.run_forever()
4357db96d56Sopenharmony_ci
4367db96d56Sopenharmony_ci        self.assertTrue(self.loop.remove_writer(w.fileno()))
4377db96d56Sopenharmony_ci        self.assertFalse(self.loop.remove_writer(w.fileno()))
4387db96d56Sopenharmony_ci
4397db96d56Sopenharmony_ci        w.close()
4407db96d56Sopenharmony_ci        read = r.recv(len(data) * 2)
4417db96d56Sopenharmony_ci        r.close()
4427db96d56Sopenharmony_ci        self.assertEqual(read, data)
4437db96d56Sopenharmony_ci
4447db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'SIGKILL'), 'No SIGKILL')
4457db96d56Sopenharmony_ci    def test_add_signal_handler(self):
4467db96d56Sopenharmony_ci        caught = 0
4477db96d56Sopenharmony_ci
4487db96d56Sopenharmony_ci        def my_handler():
4497db96d56Sopenharmony_ci            nonlocal caught
4507db96d56Sopenharmony_ci            caught += 1
4517db96d56Sopenharmony_ci
4527db96d56Sopenharmony_ci        # Check error behavior first.
4537db96d56Sopenharmony_ci        self.assertRaises(
4547db96d56Sopenharmony_ci            TypeError, self.loop.add_signal_handler, 'boom', my_handler)
4557db96d56Sopenharmony_ci        self.assertRaises(
4567db96d56Sopenharmony_ci            TypeError, self.loop.remove_signal_handler, 'boom')
4577db96d56Sopenharmony_ci        self.assertRaises(
4587db96d56Sopenharmony_ci            ValueError, self.loop.add_signal_handler, signal.NSIG+1,
4597db96d56Sopenharmony_ci            my_handler)
4607db96d56Sopenharmony_ci        self.assertRaises(
4617db96d56Sopenharmony_ci            ValueError, self.loop.remove_signal_handler, signal.NSIG+1)
4627db96d56Sopenharmony_ci        self.assertRaises(
4637db96d56Sopenharmony_ci            ValueError, self.loop.add_signal_handler, 0, my_handler)
4647db96d56Sopenharmony_ci        self.assertRaises(
4657db96d56Sopenharmony_ci            ValueError, self.loop.remove_signal_handler, 0)
4667db96d56Sopenharmony_ci        self.assertRaises(
4677db96d56Sopenharmony_ci            ValueError, self.loop.add_signal_handler, -1, my_handler)
4687db96d56Sopenharmony_ci        self.assertRaises(
4697db96d56Sopenharmony_ci            ValueError, self.loop.remove_signal_handler, -1)
4707db96d56Sopenharmony_ci        self.assertRaises(
4717db96d56Sopenharmony_ci            RuntimeError, self.loop.add_signal_handler, signal.SIGKILL,
4727db96d56Sopenharmony_ci            my_handler)
4737db96d56Sopenharmony_ci        # Removing SIGKILL doesn't raise, since we don't call signal().
4747db96d56Sopenharmony_ci        self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
4757db96d56Sopenharmony_ci        # Now set a handler and handle it.
4767db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGINT, my_handler)
4777db96d56Sopenharmony_ci
4787db96d56Sopenharmony_ci        os.kill(os.getpid(), signal.SIGINT)
4797db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: caught)
4807db96d56Sopenharmony_ci
4817db96d56Sopenharmony_ci        # Removing it should restore the default handler.
4827db96d56Sopenharmony_ci        self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
4837db96d56Sopenharmony_ci        self.assertEqual(signal.getsignal(signal.SIGINT),
4847db96d56Sopenharmony_ci                         signal.default_int_handler)
4857db96d56Sopenharmony_ci        # Removing again returns False.
4867db96d56Sopenharmony_ci        self.assertFalse(self.loop.remove_signal_handler(signal.SIGINT))
4877db96d56Sopenharmony_ci
4887db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
4897db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'setitimer'),
4907db96d56Sopenharmony_ci                         'need signal.setitimer()')
4917db96d56Sopenharmony_ci    def test_signal_handling_while_selecting(self):
4927db96d56Sopenharmony_ci        # Test with a signal actually arriving during a select() call.
4937db96d56Sopenharmony_ci        caught = 0
4947db96d56Sopenharmony_ci
4957db96d56Sopenharmony_ci        def my_handler():
4967db96d56Sopenharmony_ci            nonlocal caught
4977db96d56Sopenharmony_ci            caught += 1
4987db96d56Sopenharmony_ci            self.loop.stop()
4997db96d56Sopenharmony_ci
5007db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGALRM, my_handler)
5017db96d56Sopenharmony_ci
5027db96d56Sopenharmony_ci        signal.setitimer(signal.ITIMER_REAL, 0.01, 0)  # Send SIGALRM once.
5037db96d56Sopenharmony_ci        self.loop.call_later(60, self.loop.stop)
5047db96d56Sopenharmony_ci        self.loop.run_forever()
5057db96d56Sopenharmony_ci        self.assertEqual(caught, 1)
5067db96d56Sopenharmony_ci
5077db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'SIGALRM'), 'No SIGALRM')
5087db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'setitimer'),
5097db96d56Sopenharmony_ci                         'need signal.setitimer()')
5107db96d56Sopenharmony_ci    def test_signal_handling_args(self):
5117db96d56Sopenharmony_ci        some_args = (42,)
5127db96d56Sopenharmony_ci        caught = 0
5137db96d56Sopenharmony_ci
5147db96d56Sopenharmony_ci        def my_handler(*args):
5157db96d56Sopenharmony_ci            nonlocal caught
5167db96d56Sopenharmony_ci            caught += 1
5177db96d56Sopenharmony_ci            self.assertEqual(args, some_args)
5187db96d56Sopenharmony_ci            self.loop.stop()
5197db96d56Sopenharmony_ci
5207db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGALRM, my_handler, *some_args)
5217db96d56Sopenharmony_ci
5227db96d56Sopenharmony_ci        signal.setitimer(signal.ITIMER_REAL, 0.1, 0)  # Send SIGALRM once.
5237db96d56Sopenharmony_ci        self.loop.call_later(60, self.loop.stop)
5247db96d56Sopenharmony_ci        self.loop.run_forever()
5257db96d56Sopenharmony_ci        self.assertEqual(caught, 1)
5267db96d56Sopenharmony_ci
5277db96d56Sopenharmony_ci    def _basetest_create_connection(self, connection_fut, check_sockname=True):
5287db96d56Sopenharmony_ci        tr, pr = self.loop.run_until_complete(connection_fut)
5297db96d56Sopenharmony_ci        self.assertIsInstance(tr, asyncio.Transport)
5307db96d56Sopenharmony_ci        self.assertIsInstance(pr, asyncio.Protocol)
5317db96d56Sopenharmony_ci        self.assertIs(pr.transport, tr)
5327db96d56Sopenharmony_ci        if check_sockname:
5337db96d56Sopenharmony_ci            self.assertIsNotNone(tr.get_extra_info('sockname'))
5347db96d56Sopenharmony_ci        self.loop.run_until_complete(pr.done)
5357db96d56Sopenharmony_ci        self.assertGreater(pr.nbytes, 0)
5367db96d56Sopenharmony_ci        tr.close()
5377db96d56Sopenharmony_ci
5387db96d56Sopenharmony_ci    def test_create_connection(self):
5397db96d56Sopenharmony_ci        with test_utils.run_test_server() as httpd:
5407db96d56Sopenharmony_ci            conn_fut = self.loop.create_connection(
5417db96d56Sopenharmony_ci                lambda: MyProto(loop=self.loop), *httpd.address)
5427db96d56Sopenharmony_ci            self._basetest_create_connection(conn_fut)
5437db96d56Sopenharmony_ci
5447db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
5457db96d56Sopenharmony_ci    def test_create_unix_connection(self):
5467db96d56Sopenharmony_ci        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
5477db96d56Sopenharmony_ci        # zero-length address for UNIX socket.
5487db96d56Sopenharmony_ci        check_sockname = not broken_unix_getsockname()
5497db96d56Sopenharmony_ci
5507db96d56Sopenharmony_ci        with test_utils.run_test_unix_server() as httpd:
5517db96d56Sopenharmony_ci            conn_fut = self.loop.create_unix_connection(
5527db96d56Sopenharmony_ci                lambda: MyProto(loop=self.loop), httpd.address)
5537db96d56Sopenharmony_ci            self._basetest_create_connection(conn_fut, check_sockname)
5547db96d56Sopenharmony_ci
5557db96d56Sopenharmony_ci    def check_ssl_extra_info(self, client, check_sockname=True,
5567db96d56Sopenharmony_ci                             peername=None, peercert={}):
5577db96d56Sopenharmony_ci        if check_sockname:
5587db96d56Sopenharmony_ci            self.assertIsNotNone(client.get_extra_info('sockname'))
5597db96d56Sopenharmony_ci        if peername:
5607db96d56Sopenharmony_ci            self.assertEqual(peername,
5617db96d56Sopenharmony_ci                             client.get_extra_info('peername'))
5627db96d56Sopenharmony_ci        else:
5637db96d56Sopenharmony_ci            self.assertIsNotNone(client.get_extra_info('peername'))
5647db96d56Sopenharmony_ci        self.assertEqual(peercert,
5657db96d56Sopenharmony_ci                         client.get_extra_info('peercert'))
5667db96d56Sopenharmony_ci
5677db96d56Sopenharmony_ci        # test SSL cipher
5687db96d56Sopenharmony_ci        cipher = client.get_extra_info('cipher')
5697db96d56Sopenharmony_ci        self.assertIsInstance(cipher, tuple)
5707db96d56Sopenharmony_ci        self.assertEqual(len(cipher), 3, cipher)
5717db96d56Sopenharmony_ci        self.assertIsInstance(cipher[0], str)
5727db96d56Sopenharmony_ci        self.assertIsInstance(cipher[1], str)
5737db96d56Sopenharmony_ci        self.assertIsInstance(cipher[2], int)
5747db96d56Sopenharmony_ci
5757db96d56Sopenharmony_ci        # test SSL object
5767db96d56Sopenharmony_ci        sslobj = client.get_extra_info('ssl_object')
5777db96d56Sopenharmony_ci        self.assertIsNotNone(sslobj)
5787db96d56Sopenharmony_ci        self.assertEqual(sslobj.compression(),
5797db96d56Sopenharmony_ci                         client.get_extra_info('compression'))
5807db96d56Sopenharmony_ci        self.assertEqual(sslobj.cipher(),
5817db96d56Sopenharmony_ci                         client.get_extra_info('cipher'))
5827db96d56Sopenharmony_ci        self.assertEqual(sslobj.getpeercert(),
5837db96d56Sopenharmony_ci                         client.get_extra_info('peercert'))
5847db96d56Sopenharmony_ci        self.assertEqual(sslobj.compression(),
5857db96d56Sopenharmony_ci                         client.get_extra_info('compression'))
5867db96d56Sopenharmony_ci
5877db96d56Sopenharmony_ci    def _basetest_create_ssl_connection(self, connection_fut,
5887db96d56Sopenharmony_ci                                        check_sockname=True,
5897db96d56Sopenharmony_ci                                        peername=None):
5907db96d56Sopenharmony_ci        tr, pr = self.loop.run_until_complete(connection_fut)
5917db96d56Sopenharmony_ci        self.assertIsInstance(tr, asyncio.Transport)
5927db96d56Sopenharmony_ci        self.assertIsInstance(pr, asyncio.Protocol)
5937db96d56Sopenharmony_ci        self.assertTrue('ssl' in tr.__class__.__name__.lower())
5947db96d56Sopenharmony_ci        self.check_ssl_extra_info(tr, check_sockname, peername)
5957db96d56Sopenharmony_ci        self.loop.run_until_complete(pr.done)
5967db96d56Sopenharmony_ci        self.assertGreater(pr.nbytes, 0)
5977db96d56Sopenharmony_ci        tr.close()
5987db96d56Sopenharmony_ci
5997db96d56Sopenharmony_ci    def _test_create_ssl_connection(self, httpd, create_connection,
6007db96d56Sopenharmony_ci                                    check_sockname=True, peername=None):
6017db96d56Sopenharmony_ci        conn_fut = create_connection(ssl=test_utils.dummy_ssl_context())
6027db96d56Sopenharmony_ci        self._basetest_create_ssl_connection(conn_fut, check_sockname,
6037db96d56Sopenharmony_ci                                             peername)
6047db96d56Sopenharmony_ci
6057db96d56Sopenharmony_ci        # ssl.Purpose was introduced in Python 3.4
6067db96d56Sopenharmony_ci        if hasattr(ssl, 'Purpose'):
6077db96d56Sopenharmony_ci            def _dummy_ssl_create_context(purpose=ssl.Purpose.SERVER_AUTH, *,
6087db96d56Sopenharmony_ci                                          cafile=None, capath=None,
6097db96d56Sopenharmony_ci                                          cadata=None):
6107db96d56Sopenharmony_ci                """
6117db96d56Sopenharmony_ci                A ssl.create_default_context() replacement that doesn't enable
6127db96d56Sopenharmony_ci                cert validation.
6137db96d56Sopenharmony_ci                """
6147db96d56Sopenharmony_ci                self.assertEqual(purpose, ssl.Purpose.SERVER_AUTH)
6157db96d56Sopenharmony_ci                return test_utils.dummy_ssl_context()
6167db96d56Sopenharmony_ci
6177db96d56Sopenharmony_ci            # With ssl=True, ssl.create_default_context() should be called
6187db96d56Sopenharmony_ci            with mock.patch('ssl.create_default_context',
6197db96d56Sopenharmony_ci                            side_effect=_dummy_ssl_create_context) as m:
6207db96d56Sopenharmony_ci                conn_fut = create_connection(ssl=True)
6217db96d56Sopenharmony_ci                self._basetest_create_ssl_connection(conn_fut, check_sockname,
6227db96d56Sopenharmony_ci                                                     peername)
6237db96d56Sopenharmony_ci                self.assertEqual(m.call_count, 1)
6247db96d56Sopenharmony_ci
6257db96d56Sopenharmony_ci        # With the real ssl.create_default_context(), certificate
6267db96d56Sopenharmony_ci        # validation will fail
6277db96d56Sopenharmony_ci        with self.assertRaises(ssl.SSLError) as cm:
6287db96d56Sopenharmony_ci            conn_fut = create_connection(ssl=True)
6297db96d56Sopenharmony_ci            # Ignore the "SSL handshake failed" log in debug mode
6307db96d56Sopenharmony_ci            with test_utils.disable_logger():
6317db96d56Sopenharmony_ci                self._basetest_create_ssl_connection(conn_fut, check_sockname,
6327db96d56Sopenharmony_ci                                                     peername)
6337db96d56Sopenharmony_ci
6347db96d56Sopenharmony_ci        self.assertEqual(cm.exception.reason, 'CERTIFICATE_VERIFY_FAILED')
6357db96d56Sopenharmony_ci
6367db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
6377db96d56Sopenharmony_ci    def test_create_ssl_connection(self):
6387db96d56Sopenharmony_ci        with test_utils.run_test_server(use_ssl=True) as httpd:
6397db96d56Sopenharmony_ci            create_connection = functools.partial(
6407db96d56Sopenharmony_ci                self.loop.create_connection,
6417db96d56Sopenharmony_ci                lambda: MyProto(loop=self.loop),
6427db96d56Sopenharmony_ci                *httpd.address)
6437db96d56Sopenharmony_ci            self._test_create_ssl_connection(httpd, create_connection,
6447db96d56Sopenharmony_ci                                             peername=httpd.address)
6457db96d56Sopenharmony_ci
6467db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
6477db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
6487db96d56Sopenharmony_ci    def test_create_ssl_unix_connection(self):
6497db96d56Sopenharmony_ci        # Issue #20682: On Mac OS X Tiger, getsockname() returns a
6507db96d56Sopenharmony_ci        # zero-length address for UNIX socket.
6517db96d56Sopenharmony_ci        check_sockname = not broken_unix_getsockname()
6527db96d56Sopenharmony_ci
6537db96d56Sopenharmony_ci        with test_utils.run_test_unix_server(use_ssl=True) as httpd:
6547db96d56Sopenharmony_ci            create_connection = functools.partial(
6557db96d56Sopenharmony_ci                self.loop.create_unix_connection,
6567db96d56Sopenharmony_ci                lambda: MyProto(loop=self.loop), httpd.address,
6577db96d56Sopenharmony_ci                server_hostname='127.0.0.1')
6587db96d56Sopenharmony_ci
6597db96d56Sopenharmony_ci            self._test_create_ssl_connection(httpd, create_connection,
6607db96d56Sopenharmony_ci                                             check_sockname,
6617db96d56Sopenharmony_ci                                             peername=httpd.address)
6627db96d56Sopenharmony_ci
6637db96d56Sopenharmony_ci    def test_create_connection_local_addr(self):
6647db96d56Sopenharmony_ci        with test_utils.run_test_server() as httpd:
6657db96d56Sopenharmony_ci            port = socket_helper.find_unused_port()
6667db96d56Sopenharmony_ci            f = self.loop.create_connection(
6677db96d56Sopenharmony_ci                lambda: MyProto(loop=self.loop),
6687db96d56Sopenharmony_ci                *httpd.address, local_addr=(httpd.address[0], port))
6697db96d56Sopenharmony_ci            tr, pr = self.loop.run_until_complete(f)
6707db96d56Sopenharmony_ci            expected = pr.transport.get_extra_info('sockname')[1]
6717db96d56Sopenharmony_ci            self.assertEqual(port, expected)
6727db96d56Sopenharmony_ci            tr.close()
6737db96d56Sopenharmony_ci
6747db96d56Sopenharmony_ci    def test_create_connection_local_addr_skip_different_family(self):
6757db96d56Sopenharmony_ci        # See https://github.com/python/cpython/issues/86508
6767db96d56Sopenharmony_ci        port1 = socket_helper.find_unused_port()
6777db96d56Sopenharmony_ci        port2 = socket_helper.find_unused_port()
6787db96d56Sopenharmony_ci        getaddrinfo_orig = self.loop.getaddrinfo
6797db96d56Sopenharmony_ci
6807db96d56Sopenharmony_ci        async def getaddrinfo(host, port, *args, **kwargs):
6817db96d56Sopenharmony_ci            if port == port2:
6827db96d56Sopenharmony_ci                return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0)),
6837db96d56Sopenharmony_ci                        (socket.AF_INET, socket.SOCK_STREAM, 0, '', ('127.0.0.1', 0))]
6847db96d56Sopenharmony_ci            return await getaddrinfo_orig(host, port, *args, **kwargs)
6857db96d56Sopenharmony_ci
6867db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo
6877db96d56Sopenharmony_ci
6887db96d56Sopenharmony_ci        f = self.loop.create_connection(
6897db96d56Sopenharmony_ci            lambda: MyProto(loop=self.loop),
6907db96d56Sopenharmony_ci            'localhost', port1, local_addr=('localhost', port2))
6917db96d56Sopenharmony_ci
6927db96d56Sopenharmony_ci        with self.assertRaises(OSError):
6937db96d56Sopenharmony_ci            self.loop.run_until_complete(f)
6947db96d56Sopenharmony_ci
6957db96d56Sopenharmony_ci    def test_create_connection_local_addr_nomatch_family(self):
6967db96d56Sopenharmony_ci        # See https://github.com/python/cpython/issues/86508
6977db96d56Sopenharmony_ci        port1 = socket_helper.find_unused_port()
6987db96d56Sopenharmony_ci        port2 = socket_helper.find_unused_port()
6997db96d56Sopenharmony_ci        getaddrinfo_orig = self.loop.getaddrinfo
7007db96d56Sopenharmony_ci
7017db96d56Sopenharmony_ci        async def getaddrinfo(host, port, *args, **kwargs):
7027db96d56Sopenharmony_ci            if port == port2:
7037db96d56Sopenharmony_ci                return [(socket.AF_INET6, socket.SOCK_STREAM, 0, '', ('::1', 0, 0, 0))]
7047db96d56Sopenharmony_ci            return await getaddrinfo_orig(host, port, *args, **kwargs)
7057db96d56Sopenharmony_ci
7067db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo
7077db96d56Sopenharmony_ci
7087db96d56Sopenharmony_ci        f = self.loop.create_connection(
7097db96d56Sopenharmony_ci            lambda: MyProto(loop=self.loop),
7107db96d56Sopenharmony_ci            'localhost', port1, local_addr=('localhost', port2))
7117db96d56Sopenharmony_ci
7127db96d56Sopenharmony_ci        with self.assertRaises(OSError):
7137db96d56Sopenharmony_ci            self.loop.run_until_complete(f)
7147db96d56Sopenharmony_ci
7157db96d56Sopenharmony_ci    def test_create_connection_local_addr_in_use(self):
7167db96d56Sopenharmony_ci        with test_utils.run_test_server() as httpd:
7177db96d56Sopenharmony_ci            f = self.loop.create_connection(
7187db96d56Sopenharmony_ci                lambda: MyProto(loop=self.loop),
7197db96d56Sopenharmony_ci                *httpd.address, local_addr=httpd.address)
7207db96d56Sopenharmony_ci            with self.assertRaises(OSError) as cm:
7217db96d56Sopenharmony_ci                self.loop.run_until_complete(f)
7227db96d56Sopenharmony_ci            self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
7237db96d56Sopenharmony_ci            self.assertIn(str(httpd.address), cm.exception.strerror)
7247db96d56Sopenharmony_ci
7257db96d56Sopenharmony_ci    def test_connect_accepted_socket(self, server_ssl=None, client_ssl=None):
7267db96d56Sopenharmony_ci        loop = self.loop
7277db96d56Sopenharmony_ci
7287db96d56Sopenharmony_ci        class MyProto(MyBaseProto):
7297db96d56Sopenharmony_ci
7307db96d56Sopenharmony_ci            def connection_lost(self, exc):
7317db96d56Sopenharmony_ci                super().connection_lost(exc)
7327db96d56Sopenharmony_ci                loop.call_soon(loop.stop)
7337db96d56Sopenharmony_ci
7347db96d56Sopenharmony_ci            def data_received(self, data):
7357db96d56Sopenharmony_ci                super().data_received(data)
7367db96d56Sopenharmony_ci                self.transport.write(expected_response)
7377db96d56Sopenharmony_ci
7387db96d56Sopenharmony_ci        lsock = socket.create_server(('127.0.0.1', 0), backlog=1)
7397db96d56Sopenharmony_ci        addr = lsock.getsockname()
7407db96d56Sopenharmony_ci
7417db96d56Sopenharmony_ci        message = b'test data'
7427db96d56Sopenharmony_ci        response = None
7437db96d56Sopenharmony_ci        expected_response = b'roger'
7447db96d56Sopenharmony_ci
7457db96d56Sopenharmony_ci        def client():
7467db96d56Sopenharmony_ci            nonlocal response
7477db96d56Sopenharmony_ci            try:
7487db96d56Sopenharmony_ci                csock = socket.socket()
7497db96d56Sopenharmony_ci                if client_ssl is not None:
7507db96d56Sopenharmony_ci                    csock = client_ssl.wrap_socket(csock)
7517db96d56Sopenharmony_ci                csock.connect(addr)
7527db96d56Sopenharmony_ci                csock.sendall(message)
7537db96d56Sopenharmony_ci                response = csock.recv(99)
7547db96d56Sopenharmony_ci                csock.close()
7557db96d56Sopenharmony_ci            except Exception as exc:
7567db96d56Sopenharmony_ci                print(
7577db96d56Sopenharmony_ci                    "Failure in client thread in test_connect_accepted_socket",
7587db96d56Sopenharmony_ci                    exc)
7597db96d56Sopenharmony_ci
7607db96d56Sopenharmony_ci        thread = threading.Thread(target=client, daemon=True)
7617db96d56Sopenharmony_ci        thread.start()
7627db96d56Sopenharmony_ci
7637db96d56Sopenharmony_ci        conn, _ = lsock.accept()
7647db96d56Sopenharmony_ci        proto = MyProto(loop=loop)
7657db96d56Sopenharmony_ci        proto.loop = loop
7667db96d56Sopenharmony_ci        loop.run_until_complete(
7677db96d56Sopenharmony_ci            loop.connect_accepted_socket(
7687db96d56Sopenharmony_ci                (lambda: proto), conn, ssl=server_ssl))
7697db96d56Sopenharmony_ci        loop.run_forever()
7707db96d56Sopenharmony_ci        proto.transport.close()
7717db96d56Sopenharmony_ci        lsock.close()
7727db96d56Sopenharmony_ci
7737db96d56Sopenharmony_ci        threading_helper.join_thread(thread)
7747db96d56Sopenharmony_ci        self.assertFalse(thread.is_alive())
7757db96d56Sopenharmony_ci        self.assertEqual(proto.state, 'CLOSED')
7767db96d56Sopenharmony_ci        self.assertEqual(proto.nbytes, len(message))
7777db96d56Sopenharmony_ci        self.assertEqual(response, expected_response)
7787db96d56Sopenharmony_ci
7797db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
7807db96d56Sopenharmony_ci    def test_ssl_connect_accepted_socket(self):
7817db96d56Sopenharmony_ci        server_context = test_utils.simple_server_sslcontext()
7827db96d56Sopenharmony_ci        client_context = test_utils.simple_client_sslcontext()
7837db96d56Sopenharmony_ci
7847db96d56Sopenharmony_ci        self.test_connect_accepted_socket(server_context, client_context)
7857db96d56Sopenharmony_ci
7867db96d56Sopenharmony_ci    def test_connect_accepted_socket_ssl_timeout_for_plain_socket(self):
7877db96d56Sopenharmony_ci        sock = socket.socket()
7887db96d56Sopenharmony_ci        self.addCleanup(sock.close)
7897db96d56Sopenharmony_ci        coro = self.loop.connect_accepted_socket(
7907db96d56Sopenharmony_ci            MyProto, sock, ssl_handshake_timeout=support.LOOPBACK_TIMEOUT)
7917db96d56Sopenharmony_ci        with self.assertRaisesRegex(
7927db96d56Sopenharmony_ci                ValueError,
7937db96d56Sopenharmony_ci                'ssl_handshake_timeout is only meaningful with ssl'):
7947db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
7957db96d56Sopenharmony_ci
7967db96d56Sopenharmony_ci    @mock.patch('asyncio.base_events.socket')
7977db96d56Sopenharmony_ci    def create_server_multiple_hosts(self, family, hosts, mock_sock):
7987db96d56Sopenharmony_ci        async def getaddrinfo(host, port, *args, **kw):
7997db96d56Sopenharmony_ci            if family == socket.AF_INET:
8007db96d56Sopenharmony_ci                return [(family, socket.SOCK_STREAM, 6, '', (host, port))]
8017db96d56Sopenharmony_ci            else:
8027db96d56Sopenharmony_ci                return [(family, socket.SOCK_STREAM, 6, '', (host, port, 0, 0))]
8037db96d56Sopenharmony_ci
8047db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
8057db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
8067db96d56Sopenharmony_ci
8077db96d56Sopenharmony_ci        unique_hosts = set(hosts)
8087db96d56Sopenharmony_ci
8097db96d56Sopenharmony_ci        if family == socket.AF_INET:
8107db96d56Sopenharmony_ci            mock_sock.socket().getsockbyname.side_effect = [
8117db96d56Sopenharmony_ci                (host, 80) for host in unique_hosts]
8127db96d56Sopenharmony_ci        else:
8137db96d56Sopenharmony_ci            mock_sock.socket().getsockbyname.side_effect = [
8147db96d56Sopenharmony_ci                (host, 80, 0, 0) for host in unique_hosts]
8157db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
8167db96d56Sopenharmony_ci        self.loop._start_serving = mock.Mock()
8177db96d56Sopenharmony_ci        self.loop._stop_serving = mock.Mock()
8187db96d56Sopenharmony_ci        f = self.loop.create_server(lambda: MyProto(self.loop), hosts, 80)
8197db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
8207db96d56Sopenharmony_ci        self.addCleanup(server.close)
8217db96d56Sopenharmony_ci        server_hosts = {sock.getsockbyname()[0] for sock in server.sockets}
8227db96d56Sopenharmony_ci        self.assertEqual(server_hosts, unique_hosts)
8237db96d56Sopenharmony_ci
8247db96d56Sopenharmony_ci    def test_create_server_multiple_hosts_ipv4(self):
8257db96d56Sopenharmony_ci        self.create_server_multiple_hosts(socket.AF_INET,
8267db96d56Sopenharmony_ci                                          ['1.2.3.4', '5.6.7.8', '1.2.3.4'])
8277db96d56Sopenharmony_ci
8287db96d56Sopenharmony_ci    def test_create_server_multiple_hosts_ipv6(self):
8297db96d56Sopenharmony_ci        self.create_server_multiple_hosts(socket.AF_INET6,
8307db96d56Sopenharmony_ci                                          ['::1', '::2', '::1'])
8317db96d56Sopenharmony_ci
8327db96d56Sopenharmony_ci    def test_create_server(self):
8337db96d56Sopenharmony_ci        proto = MyProto(self.loop)
8347db96d56Sopenharmony_ci        f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
8357db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
8367db96d56Sopenharmony_ci        self.assertEqual(len(server.sockets), 1)
8377db96d56Sopenharmony_ci        sock = server.sockets[0]
8387db96d56Sopenharmony_ci        host, port = sock.getsockname()
8397db96d56Sopenharmony_ci        self.assertEqual(host, '0.0.0.0')
8407db96d56Sopenharmony_ci        client = socket.socket()
8417db96d56Sopenharmony_ci        client.connect(('127.0.0.1', port))
8427db96d56Sopenharmony_ci        client.sendall(b'xxx')
8437db96d56Sopenharmony_ci
8447db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
8457db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
8467db96d56Sopenharmony_ci
8477db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
8487db96d56Sopenharmony_ci        self.assertEqual(3, proto.nbytes)
8497db96d56Sopenharmony_ci
8507db96d56Sopenharmony_ci        # extra info is available
8517db96d56Sopenharmony_ci        self.assertIsNotNone(proto.transport.get_extra_info('sockname'))
8527db96d56Sopenharmony_ci        self.assertEqual('127.0.0.1',
8537db96d56Sopenharmony_ci                         proto.transport.get_extra_info('peername')[0])
8547db96d56Sopenharmony_ci
8557db96d56Sopenharmony_ci        # close connection
8567db96d56Sopenharmony_ci        proto.transport.close()
8577db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
8587db96d56Sopenharmony_ci
8597db96d56Sopenharmony_ci        self.assertEqual('CLOSED', proto.state)
8607db96d56Sopenharmony_ci
8617db96d56Sopenharmony_ci        # the client socket must be closed after to avoid ECONNRESET upon
8627db96d56Sopenharmony_ci        # recv()/send() on the serving socket
8637db96d56Sopenharmony_ci        client.close()
8647db96d56Sopenharmony_ci
8657db96d56Sopenharmony_ci        # close server
8667db96d56Sopenharmony_ci        server.close()
8677db96d56Sopenharmony_ci
8687db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(socket, 'SO_REUSEPORT'), 'No SO_REUSEPORT')
8697db96d56Sopenharmony_ci    def test_create_server_reuse_port(self):
8707db96d56Sopenharmony_ci        proto = MyProto(self.loop)
8717db96d56Sopenharmony_ci        f = self.loop.create_server(
8727db96d56Sopenharmony_ci            lambda: proto, '0.0.0.0', 0)
8737db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
8747db96d56Sopenharmony_ci        self.assertEqual(len(server.sockets), 1)
8757db96d56Sopenharmony_ci        sock = server.sockets[0]
8767db96d56Sopenharmony_ci        self.assertFalse(
8777db96d56Sopenharmony_ci            sock.getsockopt(
8787db96d56Sopenharmony_ci                socket.SOL_SOCKET, socket.SO_REUSEPORT))
8797db96d56Sopenharmony_ci        server.close()
8807db96d56Sopenharmony_ci
8817db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
8827db96d56Sopenharmony_ci
8837db96d56Sopenharmony_ci        proto = MyProto(self.loop)
8847db96d56Sopenharmony_ci        f = self.loop.create_server(
8857db96d56Sopenharmony_ci            lambda: proto, '0.0.0.0', 0, reuse_port=True)
8867db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
8877db96d56Sopenharmony_ci        self.assertEqual(len(server.sockets), 1)
8887db96d56Sopenharmony_ci        sock = server.sockets[0]
8897db96d56Sopenharmony_ci        self.assertTrue(
8907db96d56Sopenharmony_ci            sock.getsockopt(
8917db96d56Sopenharmony_ci                socket.SOL_SOCKET, socket.SO_REUSEPORT))
8927db96d56Sopenharmony_ci        server.close()
8937db96d56Sopenharmony_ci
8947db96d56Sopenharmony_ci    def _make_unix_server(self, factory, **kwargs):
8957db96d56Sopenharmony_ci        path = test_utils.gen_unix_socket_path()
8967db96d56Sopenharmony_ci        self.addCleanup(lambda: os.path.exists(path) and os.unlink(path))
8977db96d56Sopenharmony_ci
8987db96d56Sopenharmony_ci        f = self.loop.create_unix_server(factory, path, **kwargs)
8997db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
9007db96d56Sopenharmony_ci
9017db96d56Sopenharmony_ci        return server, path
9027db96d56Sopenharmony_ci
9037db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
9047db96d56Sopenharmony_ci    def test_create_unix_server(self):
9057db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
9067db96d56Sopenharmony_ci        server, path = self._make_unix_server(lambda: proto)
9077db96d56Sopenharmony_ci        self.assertEqual(len(server.sockets), 1)
9087db96d56Sopenharmony_ci
9097db96d56Sopenharmony_ci        client = socket.socket(socket.AF_UNIX)
9107db96d56Sopenharmony_ci        client.connect(path)
9117db96d56Sopenharmony_ci        client.sendall(b'xxx')
9127db96d56Sopenharmony_ci
9137db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
9147db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
9157db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
9167db96d56Sopenharmony_ci        self.assertEqual(3, proto.nbytes)
9177db96d56Sopenharmony_ci
9187db96d56Sopenharmony_ci        # close connection
9197db96d56Sopenharmony_ci        proto.transport.close()
9207db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
9217db96d56Sopenharmony_ci
9227db96d56Sopenharmony_ci        self.assertEqual('CLOSED', proto.state)
9237db96d56Sopenharmony_ci
9247db96d56Sopenharmony_ci        # the client socket must be closed after to avoid ECONNRESET upon
9257db96d56Sopenharmony_ci        # recv()/send() on the serving socket
9267db96d56Sopenharmony_ci        client.close()
9277db96d56Sopenharmony_ci
9287db96d56Sopenharmony_ci        # close server
9297db96d56Sopenharmony_ci        server.close()
9307db96d56Sopenharmony_ci
9317db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
9327db96d56Sopenharmony_ci    def test_create_unix_server_path_socket_error(self):
9337db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
9347db96d56Sopenharmony_ci        sock = socket.socket()
9357db96d56Sopenharmony_ci        with sock:
9367db96d56Sopenharmony_ci            f = self.loop.create_unix_server(lambda: proto, '/test', sock=sock)
9377db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError,
9387db96d56Sopenharmony_ci                                        'path and sock can not be specified '
9397db96d56Sopenharmony_ci                                        'at the same time'):
9407db96d56Sopenharmony_ci                self.loop.run_until_complete(f)
9417db96d56Sopenharmony_ci
9427db96d56Sopenharmony_ci    def _create_ssl_context(self, certfile, keyfile=None):
9437db96d56Sopenharmony_ci        sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
9447db96d56Sopenharmony_ci        sslcontext.options |= ssl.OP_NO_SSLv2
9457db96d56Sopenharmony_ci        sslcontext.load_cert_chain(certfile, keyfile)
9467db96d56Sopenharmony_ci        return sslcontext
9477db96d56Sopenharmony_ci
9487db96d56Sopenharmony_ci    def _make_ssl_server(self, factory, certfile, keyfile=None):
9497db96d56Sopenharmony_ci        sslcontext = self._create_ssl_context(certfile, keyfile)
9507db96d56Sopenharmony_ci
9517db96d56Sopenharmony_ci        f = self.loop.create_server(factory, '127.0.0.1', 0, ssl=sslcontext)
9527db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
9537db96d56Sopenharmony_ci
9547db96d56Sopenharmony_ci        sock = server.sockets[0]
9557db96d56Sopenharmony_ci        host, port = sock.getsockname()
9567db96d56Sopenharmony_ci        self.assertEqual(host, '127.0.0.1')
9577db96d56Sopenharmony_ci        return server, host, port
9587db96d56Sopenharmony_ci
9597db96d56Sopenharmony_ci    def _make_ssl_unix_server(self, factory, certfile, keyfile=None):
9607db96d56Sopenharmony_ci        sslcontext = self._create_ssl_context(certfile, keyfile)
9617db96d56Sopenharmony_ci        return self._make_unix_server(factory, ssl=sslcontext)
9627db96d56Sopenharmony_ci
9637db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
9647db96d56Sopenharmony_ci    def test_create_server_ssl(self):
9657db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
9667db96d56Sopenharmony_ci        server, host, port = self._make_ssl_server(
9677db96d56Sopenharmony_ci            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
9687db96d56Sopenharmony_ci
9697db96d56Sopenharmony_ci        f_c = self.loop.create_connection(MyBaseProto, host, port,
9707db96d56Sopenharmony_ci                                          ssl=test_utils.dummy_ssl_context())
9717db96d56Sopenharmony_ci        client, pr = self.loop.run_until_complete(f_c)
9727db96d56Sopenharmony_ci
9737db96d56Sopenharmony_ci        client.write(b'xxx')
9747db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
9757db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
9767db96d56Sopenharmony_ci
9777db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
9787db96d56Sopenharmony_ci        self.assertEqual(3, proto.nbytes)
9797db96d56Sopenharmony_ci
9807db96d56Sopenharmony_ci        # extra info is available
9817db96d56Sopenharmony_ci        self.check_ssl_extra_info(client, peername=(host, port))
9827db96d56Sopenharmony_ci
9837db96d56Sopenharmony_ci        # close connection
9847db96d56Sopenharmony_ci        proto.transport.close()
9857db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
9867db96d56Sopenharmony_ci        self.assertEqual('CLOSED', proto.state)
9877db96d56Sopenharmony_ci
9887db96d56Sopenharmony_ci        # the client socket must be closed after to avoid ECONNRESET upon
9897db96d56Sopenharmony_ci        # recv()/send() on the serving socket
9907db96d56Sopenharmony_ci        client.close()
9917db96d56Sopenharmony_ci
9927db96d56Sopenharmony_ci        # stop serving
9937db96d56Sopenharmony_ci        server.close()
9947db96d56Sopenharmony_ci
9957db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
9967db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
9977db96d56Sopenharmony_ci    def test_create_unix_server_ssl(self):
9987db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
9997db96d56Sopenharmony_ci        server, path = self._make_ssl_unix_server(
10007db96d56Sopenharmony_ci            lambda: proto, test_utils.ONLYCERT, test_utils.ONLYKEY)
10017db96d56Sopenharmony_ci
10027db96d56Sopenharmony_ci        f_c = self.loop.create_unix_connection(
10037db96d56Sopenharmony_ci            MyBaseProto, path, ssl=test_utils.dummy_ssl_context(),
10047db96d56Sopenharmony_ci            server_hostname='')
10057db96d56Sopenharmony_ci
10067db96d56Sopenharmony_ci        client, pr = self.loop.run_until_complete(f_c)
10077db96d56Sopenharmony_ci
10087db96d56Sopenharmony_ci        client.write(b'xxx')
10097db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
10107db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
10117db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
10127db96d56Sopenharmony_ci        self.assertEqual(3, proto.nbytes)
10137db96d56Sopenharmony_ci
10147db96d56Sopenharmony_ci        # close connection
10157db96d56Sopenharmony_ci        proto.transport.close()
10167db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
10177db96d56Sopenharmony_ci        self.assertEqual('CLOSED', proto.state)
10187db96d56Sopenharmony_ci
10197db96d56Sopenharmony_ci        # the client socket must be closed after to avoid ECONNRESET upon
10207db96d56Sopenharmony_ci        # recv()/send() on the serving socket
10217db96d56Sopenharmony_ci        client.close()
10227db96d56Sopenharmony_ci
10237db96d56Sopenharmony_ci        # stop serving
10247db96d56Sopenharmony_ci        server.close()
10257db96d56Sopenharmony_ci
10267db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
10277db96d56Sopenharmony_ci    def test_create_server_ssl_verify_failed(self):
10287db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
10297db96d56Sopenharmony_ci        server, host, port = self._make_ssl_server(
10307db96d56Sopenharmony_ci            lambda: proto, test_utils.SIGNED_CERTFILE)
10317db96d56Sopenharmony_ci
10327db96d56Sopenharmony_ci        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
10337db96d56Sopenharmony_ci        sslcontext_client.options |= ssl.OP_NO_SSLv2
10347db96d56Sopenharmony_ci        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
10357db96d56Sopenharmony_ci        if hasattr(sslcontext_client, 'check_hostname'):
10367db96d56Sopenharmony_ci            sslcontext_client.check_hostname = True
10377db96d56Sopenharmony_ci
10387db96d56Sopenharmony_ci
10397db96d56Sopenharmony_ci        # no CA loaded
10407db96d56Sopenharmony_ci        f_c = self.loop.create_connection(MyProto, host, port,
10417db96d56Sopenharmony_ci                                          ssl=sslcontext_client)
10427db96d56Sopenharmony_ci        with mock.patch.object(self.loop, 'call_exception_handler'):
10437db96d56Sopenharmony_ci            with test_utils.disable_logger():
10447db96d56Sopenharmony_ci                with self.assertRaisesRegex(ssl.SSLError,
10457db96d56Sopenharmony_ci                                            '(?i)certificate.verify.failed'):
10467db96d56Sopenharmony_ci                    self.loop.run_until_complete(f_c)
10477db96d56Sopenharmony_ci
10487db96d56Sopenharmony_ci            # execute the loop to log the connection error
10497db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)
10507db96d56Sopenharmony_ci
10517db96d56Sopenharmony_ci        # close connection
10527db96d56Sopenharmony_ci        self.assertIsNone(proto.transport)
10537db96d56Sopenharmony_ci        server.close()
10547db96d56Sopenharmony_ci
10557db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
10567db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
10577db96d56Sopenharmony_ci    def test_create_unix_server_ssl_verify_failed(self):
10587db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
10597db96d56Sopenharmony_ci        server, path = self._make_ssl_unix_server(
10607db96d56Sopenharmony_ci            lambda: proto, test_utils.SIGNED_CERTFILE)
10617db96d56Sopenharmony_ci
10627db96d56Sopenharmony_ci        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
10637db96d56Sopenharmony_ci        sslcontext_client.options |= ssl.OP_NO_SSLv2
10647db96d56Sopenharmony_ci        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
10657db96d56Sopenharmony_ci        if hasattr(sslcontext_client, 'check_hostname'):
10667db96d56Sopenharmony_ci            sslcontext_client.check_hostname = True
10677db96d56Sopenharmony_ci
10687db96d56Sopenharmony_ci        # no CA loaded
10697db96d56Sopenharmony_ci        f_c = self.loop.create_unix_connection(MyProto, path,
10707db96d56Sopenharmony_ci                                               ssl=sslcontext_client,
10717db96d56Sopenharmony_ci                                               server_hostname='invalid')
10727db96d56Sopenharmony_ci        with mock.patch.object(self.loop, 'call_exception_handler'):
10737db96d56Sopenharmony_ci            with test_utils.disable_logger():
10747db96d56Sopenharmony_ci                with self.assertRaisesRegex(ssl.SSLError,
10757db96d56Sopenharmony_ci                                            '(?i)certificate.verify.failed'):
10767db96d56Sopenharmony_ci                    self.loop.run_until_complete(f_c)
10777db96d56Sopenharmony_ci
10787db96d56Sopenharmony_ci            # execute the loop to log the connection error
10797db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)
10807db96d56Sopenharmony_ci
10817db96d56Sopenharmony_ci        # close connection
10827db96d56Sopenharmony_ci        self.assertIsNone(proto.transport)
10837db96d56Sopenharmony_ci        server.close()
10847db96d56Sopenharmony_ci
10857db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
10867db96d56Sopenharmony_ci    def test_create_server_ssl_match_failed(self):
10877db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
10887db96d56Sopenharmony_ci        server, host, port = self._make_ssl_server(
10897db96d56Sopenharmony_ci            lambda: proto, test_utils.SIGNED_CERTFILE)
10907db96d56Sopenharmony_ci
10917db96d56Sopenharmony_ci        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
10927db96d56Sopenharmony_ci        sslcontext_client.options |= ssl.OP_NO_SSLv2
10937db96d56Sopenharmony_ci        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
10947db96d56Sopenharmony_ci        sslcontext_client.load_verify_locations(
10957db96d56Sopenharmony_ci            cafile=test_utils.SIGNING_CA)
10967db96d56Sopenharmony_ci        if hasattr(sslcontext_client, 'check_hostname'):
10977db96d56Sopenharmony_ci            sslcontext_client.check_hostname = True
10987db96d56Sopenharmony_ci
10997db96d56Sopenharmony_ci        # incorrect server_hostname
11007db96d56Sopenharmony_ci        f_c = self.loop.create_connection(MyProto, host, port,
11017db96d56Sopenharmony_ci                                          ssl=sslcontext_client)
11027db96d56Sopenharmony_ci        with mock.patch.object(self.loop, 'call_exception_handler'):
11037db96d56Sopenharmony_ci            with test_utils.disable_logger():
11047db96d56Sopenharmony_ci                with self.assertRaisesRegex(
11057db96d56Sopenharmony_ci                        ssl.CertificateError,
11067db96d56Sopenharmony_ci                        "IP address mismatch, certificate is not valid for "
11077db96d56Sopenharmony_ci                        "'127.0.0.1'"):
11087db96d56Sopenharmony_ci                    self.loop.run_until_complete(f_c)
11097db96d56Sopenharmony_ci
11107db96d56Sopenharmony_ci        # close connection
11117db96d56Sopenharmony_ci        # transport is None because TLS ALERT aborted the handshake
11127db96d56Sopenharmony_ci        self.assertIsNone(proto.transport)
11137db96d56Sopenharmony_ci        server.close()
11147db96d56Sopenharmony_ci
11157db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
11167db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
11177db96d56Sopenharmony_ci    def test_create_unix_server_ssl_verified(self):
11187db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
11197db96d56Sopenharmony_ci        server, path = self._make_ssl_unix_server(
11207db96d56Sopenharmony_ci            lambda: proto, test_utils.SIGNED_CERTFILE)
11217db96d56Sopenharmony_ci
11227db96d56Sopenharmony_ci        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
11237db96d56Sopenharmony_ci        sslcontext_client.options |= ssl.OP_NO_SSLv2
11247db96d56Sopenharmony_ci        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
11257db96d56Sopenharmony_ci        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
11267db96d56Sopenharmony_ci        if hasattr(sslcontext_client, 'check_hostname'):
11277db96d56Sopenharmony_ci            sslcontext_client.check_hostname = True
11287db96d56Sopenharmony_ci
11297db96d56Sopenharmony_ci        # Connection succeeds with correct CA and server hostname.
11307db96d56Sopenharmony_ci        f_c = self.loop.create_unix_connection(MyProto, path,
11317db96d56Sopenharmony_ci                                               ssl=sslcontext_client,
11327db96d56Sopenharmony_ci                                               server_hostname='localhost')
11337db96d56Sopenharmony_ci        client, pr = self.loop.run_until_complete(f_c)
11347db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
11357db96d56Sopenharmony_ci
11367db96d56Sopenharmony_ci        # close connection
11377db96d56Sopenharmony_ci        proto.transport.close()
11387db96d56Sopenharmony_ci        client.close()
11397db96d56Sopenharmony_ci        server.close()
11407db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
11417db96d56Sopenharmony_ci
11427db96d56Sopenharmony_ci    @unittest.skipIf(ssl is None, 'No ssl module')
11437db96d56Sopenharmony_ci    def test_create_server_ssl_verified(self):
11447db96d56Sopenharmony_ci        proto = MyProto(loop=self.loop)
11457db96d56Sopenharmony_ci        server, host, port = self._make_ssl_server(
11467db96d56Sopenharmony_ci            lambda: proto, test_utils.SIGNED_CERTFILE)
11477db96d56Sopenharmony_ci
11487db96d56Sopenharmony_ci        sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
11497db96d56Sopenharmony_ci        sslcontext_client.options |= ssl.OP_NO_SSLv2
11507db96d56Sopenharmony_ci        sslcontext_client.verify_mode = ssl.CERT_REQUIRED
11517db96d56Sopenharmony_ci        sslcontext_client.load_verify_locations(cafile=test_utils.SIGNING_CA)
11527db96d56Sopenharmony_ci        if hasattr(sslcontext_client, 'check_hostname'):
11537db96d56Sopenharmony_ci            sslcontext_client.check_hostname = True
11547db96d56Sopenharmony_ci
11557db96d56Sopenharmony_ci        # Connection succeeds with correct CA and server hostname.
11567db96d56Sopenharmony_ci        f_c = self.loop.create_connection(MyProto, host, port,
11577db96d56Sopenharmony_ci                                          ssl=sslcontext_client,
11587db96d56Sopenharmony_ci                                          server_hostname='localhost')
11597db96d56Sopenharmony_ci        client, pr = self.loop.run_until_complete(f_c)
11607db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
11617db96d56Sopenharmony_ci
11627db96d56Sopenharmony_ci        # extra info is available
11637db96d56Sopenharmony_ci        self.check_ssl_extra_info(client, peername=(host, port),
11647db96d56Sopenharmony_ci                                  peercert=test_utils.PEERCERT)
11657db96d56Sopenharmony_ci
11667db96d56Sopenharmony_ci        # close connection
11677db96d56Sopenharmony_ci        proto.transport.close()
11687db96d56Sopenharmony_ci        client.close()
11697db96d56Sopenharmony_ci        server.close()
11707db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
11717db96d56Sopenharmony_ci
11727db96d56Sopenharmony_ci    def test_create_server_sock(self):
11737db96d56Sopenharmony_ci        proto = self.loop.create_future()
11747db96d56Sopenharmony_ci
11757db96d56Sopenharmony_ci        class TestMyProto(MyProto):
11767db96d56Sopenharmony_ci            def connection_made(self, transport):
11777db96d56Sopenharmony_ci                super().connection_made(transport)
11787db96d56Sopenharmony_ci                proto.set_result(self)
11797db96d56Sopenharmony_ci
11807db96d56Sopenharmony_ci        sock_ob = socket.create_server(('0.0.0.0', 0))
11817db96d56Sopenharmony_ci
11827db96d56Sopenharmony_ci        f = self.loop.create_server(TestMyProto, sock=sock_ob)
11837db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
11847db96d56Sopenharmony_ci        sock = server.sockets[0]
11857db96d56Sopenharmony_ci        self.assertEqual(sock.fileno(), sock_ob.fileno())
11867db96d56Sopenharmony_ci
11877db96d56Sopenharmony_ci        host, port = sock.getsockname()
11887db96d56Sopenharmony_ci        self.assertEqual(host, '0.0.0.0')
11897db96d56Sopenharmony_ci        client = socket.socket()
11907db96d56Sopenharmony_ci        client.connect(('127.0.0.1', port))
11917db96d56Sopenharmony_ci        client.send(b'xxx')
11927db96d56Sopenharmony_ci        client.close()
11937db96d56Sopenharmony_ci        server.close()
11947db96d56Sopenharmony_ci
11957db96d56Sopenharmony_ci    def test_create_server_addr_in_use(self):
11967db96d56Sopenharmony_ci        sock_ob = socket.create_server(('0.0.0.0', 0))
11977db96d56Sopenharmony_ci
11987db96d56Sopenharmony_ci        f = self.loop.create_server(MyProto, sock=sock_ob)
11997db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
12007db96d56Sopenharmony_ci        sock = server.sockets[0]
12017db96d56Sopenharmony_ci        host, port = sock.getsockname()
12027db96d56Sopenharmony_ci
12037db96d56Sopenharmony_ci        f = self.loop.create_server(MyProto, host=host, port=port)
12047db96d56Sopenharmony_ci        with self.assertRaises(OSError) as cm:
12057db96d56Sopenharmony_ci            self.loop.run_until_complete(f)
12067db96d56Sopenharmony_ci        self.assertEqual(cm.exception.errno, errno.EADDRINUSE)
12077db96d56Sopenharmony_ci
12087db96d56Sopenharmony_ci        server.close()
12097db96d56Sopenharmony_ci
12107db96d56Sopenharmony_ci    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
12117db96d56Sopenharmony_ci    def test_create_server_dual_stack(self):
12127db96d56Sopenharmony_ci        f_proto = self.loop.create_future()
12137db96d56Sopenharmony_ci
12147db96d56Sopenharmony_ci        class TestMyProto(MyProto):
12157db96d56Sopenharmony_ci            def connection_made(self, transport):
12167db96d56Sopenharmony_ci                super().connection_made(transport)
12177db96d56Sopenharmony_ci                f_proto.set_result(self)
12187db96d56Sopenharmony_ci
12197db96d56Sopenharmony_ci        try_count = 0
12207db96d56Sopenharmony_ci        while True:
12217db96d56Sopenharmony_ci            try:
12227db96d56Sopenharmony_ci                port = socket_helper.find_unused_port()
12237db96d56Sopenharmony_ci                f = self.loop.create_server(TestMyProto, host=None, port=port)
12247db96d56Sopenharmony_ci                server = self.loop.run_until_complete(f)
12257db96d56Sopenharmony_ci            except OSError as ex:
12267db96d56Sopenharmony_ci                if ex.errno == errno.EADDRINUSE:
12277db96d56Sopenharmony_ci                    try_count += 1
12287db96d56Sopenharmony_ci                    self.assertGreaterEqual(5, try_count)
12297db96d56Sopenharmony_ci                    continue
12307db96d56Sopenharmony_ci                else:
12317db96d56Sopenharmony_ci                    raise
12327db96d56Sopenharmony_ci            else:
12337db96d56Sopenharmony_ci                break
12347db96d56Sopenharmony_ci        client = socket.socket()
12357db96d56Sopenharmony_ci        client.connect(('127.0.0.1', port))
12367db96d56Sopenharmony_ci        client.send(b'xxx')
12377db96d56Sopenharmony_ci        proto = self.loop.run_until_complete(f_proto)
12387db96d56Sopenharmony_ci        proto.transport.close()
12397db96d56Sopenharmony_ci        client.close()
12407db96d56Sopenharmony_ci
12417db96d56Sopenharmony_ci        f_proto = self.loop.create_future()
12427db96d56Sopenharmony_ci        client = socket.socket(socket.AF_INET6)
12437db96d56Sopenharmony_ci        client.connect(('::1', port))
12447db96d56Sopenharmony_ci        client.send(b'xxx')
12457db96d56Sopenharmony_ci        proto = self.loop.run_until_complete(f_proto)
12467db96d56Sopenharmony_ci        proto.transport.close()
12477db96d56Sopenharmony_ci        client.close()
12487db96d56Sopenharmony_ci
12497db96d56Sopenharmony_ci        server.close()
12507db96d56Sopenharmony_ci
12517db96d56Sopenharmony_ci    def test_server_close(self):
12527db96d56Sopenharmony_ci        f = self.loop.create_server(MyProto, '0.0.0.0', 0)
12537db96d56Sopenharmony_ci        server = self.loop.run_until_complete(f)
12547db96d56Sopenharmony_ci        sock = server.sockets[0]
12557db96d56Sopenharmony_ci        host, port = sock.getsockname()
12567db96d56Sopenharmony_ci
12577db96d56Sopenharmony_ci        client = socket.socket()
12587db96d56Sopenharmony_ci        client.connect(('127.0.0.1', port))
12597db96d56Sopenharmony_ci        client.send(b'xxx')
12607db96d56Sopenharmony_ci        client.close()
12617db96d56Sopenharmony_ci
12627db96d56Sopenharmony_ci        server.close()
12637db96d56Sopenharmony_ci
12647db96d56Sopenharmony_ci        client = socket.socket()
12657db96d56Sopenharmony_ci        self.assertRaises(
12667db96d56Sopenharmony_ci            ConnectionRefusedError, client.connect, ('127.0.0.1', port))
12677db96d56Sopenharmony_ci        client.close()
12687db96d56Sopenharmony_ci
12697db96d56Sopenharmony_ci    def _test_create_datagram_endpoint(self, local_addr, family):
12707db96d56Sopenharmony_ci        class TestMyDatagramProto(MyDatagramProto):
12717db96d56Sopenharmony_ci            def __init__(inner_self):
12727db96d56Sopenharmony_ci                super().__init__(loop=self.loop)
12737db96d56Sopenharmony_ci
12747db96d56Sopenharmony_ci            def datagram_received(self, data, addr):
12757db96d56Sopenharmony_ci                super().datagram_received(data, addr)
12767db96d56Sopenharmony_ci                self.transport.sendto(b'resp:'+data, addr)
12777db96d56Sopenharmony_ci
12787db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
12797db96d56Sopenharmony_ci            TestMyDatagramProto, local_addr=local_addr, family=family)
12807db96d56Sopenharmony_ci        s_transport, server = self.loop.run_until_complete(coro)
12817db96d56Sopenharmony_ci        sockname = s_transport.get_extra_info('sockname')
12827db96d56Sopenharmony_ci        host, port = socket.getnameinfo(
12837db96d56Sopenharmony_ci            sockname, socket.NI_NUMERICHOST|socket.NI_NUMERICSERV)
12847db96d56Sopenharmony_ci
12857db96d56Sopenharmony_ci        self.assertIsInstance(s_transport, asyncio.Transport)
12867db96d56Sopenharmony_ci        self.assertIsInstance(server, TestMyDatagramProto)
12877db96d56Sopenharmony_ci        self.assertEqual('INITIALIZED', server.state)
12887db96d56Sopenharmony_ci        self.assertIs(server.transport, s_transport)
12897db96d56Sopenharmony_ci
12907db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
12917db96d56Sopenharmony_ci            lambda: MyDatagramProto(loop=self.loop),
12927db96d56Sopenharmony_ci            remote_addr=(host, port))
12937db96d56Sopenharmony_ci        transport, client = self.loop.run_until_complete(coro)
12947db96d56Sopenharmony_ci
12957db96d56Sopenharmony_ci        self.assertIsInstance(transport, asyncio.Transport)
12967db96d56Sopenharmony_ci        self.assertIsInstance(client, MyDatagramProto)
12977db96d56Sopenharmony_ci        self.assertEqual('INITIALIZED', client.state)
12987db96d56Sopenharmony_ci        self.assertIs(client.transport, transport)
12997db96d56Sopenharmony_ci
13007db96d56Sopenharmony_ci        transport.sendto(b'xxx')
13017db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: server.nbytes)
13027db96d56Sopenharmony_ci        self.assertEqual(3, server.nbytes)
13037db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: client.nbytes)
13047db96d56Sopenharmony_ci
13057db96d56Sopenharmony_ci        # received
13067db96d56Sopenharmony_ci        self.assertEqual(8, client.nbytes)
13077db96d56Sopenharmony_ci
13087db96d56Sopenharmony_ci        # extra info is available
13097db96d56Sopenharmony_ci        self.assertIsNotNone(transport.get_extra_info('sockname'))
13107db96d56Sopenharmony_ci
13117db96d56Sopenharmony_ci        # close connection
13127db96d56Sopenharmony_ci        transport.close()
13137db96d56Sopenharmony_ci        self.loop.run_until_complete(client.done)
13147db96d56Sopenharmony_ci        self.assertEqual('CLOSED', client.state)
13157db96d56Sopenharmony_ci        server.transport.close()
13167db96d56Sopenharmony_ci
13177db96d56Sopenharmony_ci    def test_create_datagram_endpoint(self):
13187db96d56Sopenharmony_ci        self._test_create_datagram_endpoint(('127.0.0.1', 0), socket.AF_INET)
13197db96d56Sopenharmony_ci
13207db96d56Sopenharmony_ci    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
13217db96d56Sopenharmony_ci    def test_create_datagram_endpoint_ipv6(self):
13227db96d56Sopenharmony_ci        self._test_create_datagram_endpoint(('::1', 0), socket.AF_INET6)
13237db96d56Sopenharmony_ci
13247db96d56Sopenharmony_ci    def test_create_datagram_endpoint_sock(self):
13257db96d56Sopenharmony_ci        sock = None
13267db96d56Sopenharmony_ci        local_address = ('127.0.0.1', 0)
13277db96d56Sopenharmony_ci        infos = self.loop.run_until_complete(
13287db96d56Sopenharmony_ci            self.loop.getaddrinfo(
13297db96d56Sopenharmony_ci                *local_address, type=socket.SOCK_DGRAM))
13307db96d56Sopenharmony_ci        for family, type, proto, cname, address in infos:
13317db96d56Sopenharmony_ci            try:
13327db96d56Sopenharmony_ci                sock = socket.socket(family=family, type=type, proto=proto)
13337db96d56Sopenharmony_ci                sock.setblocking(False)
13347db96d56Sopenharmony_ci                sock.bind(address)
13357db96d56Sopenharmony_ci            except:
13367db96d56Sopenharmony_ci                pass
13377db96d56Sopenharmony_ci            else:
13387db96d56Sopenharmony_ci                break
13397db96d56Sopenharmony_ci        else:
13407db96d56Sopenharmony_ci            self.fail('Can not create socket.')
13417db96d56Sopenharmony_ci
13427db96d56Sopenharmony_ci        f = self.loop.create_datagram_endpoint(
13437db96d56Sopenharmony_ci            lambda: MyDatagramProto(loop=self.loop), sock=sock)
13447db96d56Sopenharmony_ci        tr, pr = self.loop.run_until_complete(f)
13457db96d56Sopenharmony_ci        self.assertIsInstance(tr, asyncio.Transport)
13467db96d56Sopenharmony_ci        self.assertIsInstance(pr, MyDatagramProto)
13477db96d56Sopenharmony_ci        tr.close()
13487db96d56Sopenharmony_ci        self.loop.run_until_complete(pr.done)
13497db96d56Sopenharmony_ci
13507db96d56Sopenharmony_ci    def test_internal_fds(self):
13517db96d56Sopenharmony_ci        loop = self.create_event_loop()
13527db96d56Sopenharmony_ci        if not isinstance(loop, selector_events.BaseSelectorEventLoop):
13537db96d56Sopenharmony_ci            loop.close()
13547db96d56Sopenharmony_ci            self.skipTest('loop is not a BaseSelectorEventLoop')
13557db96d56Sopenharmony_ci
13567db96d56Sopenharmony_ci        self.assertEqual(1, loop._internal_fds)
13577db96d56Sopenharmony_ci        loop.close()
13587db96d56Sopenharmony_ci        self.assertEqual(0, loop._internal_fds)
13597db96d56Sopenharmony_ci        self.assertIsNone(loop._csock)
13607db96d56Sopenharmony_ci        self.assertIsNone(loop._ssock)
13617db96d56Sopenharmony_ci
13627db96d56Sopenharmony_ci    @unittest.skipUnless(sys.platform != 'win32',
13637db96d56Sopenharmony_ci                         "Don't support pipes for Windows")
13647db96d56Sopenharmony_ci    def test_read_pipe(self):
13657db96d56Sopenharmony_ci        proto = MyReadPipeProto(loop=self.loop)
13667db96d56Sopenharmony_ci
13677db96d56Sopenharmony_ci        rpipe, wpipe = os.pipe()
13687db96d56Sopenharmony_ci        pipeobj = io.open(rpipe, 'rb', 1024)
13697db96d56Sopenharmony_ci
13707db96d56Sopenharmony_ci        async def connect():
13717db96d56Sopenharmony_ci            t, p = await self.loop.connect_read_pipe(
13727db96d56Sopenharmony_ci                lambda: proto, pipeobj)
13737db96d56Sopenharmony_ci            self.assertIs(p, proto)
13747db96d56Sopenharmony_ci            self.assertIs(t, proto.transport)
13757db96d56Sopenharmony_ci            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
13767db96d56Sopenharmony_ci            self.assertEqual(0, proto.nbytes)
13777db96d56Sopenharmony_ci
13787db96d56Sopenharmony_ci        self.loop.run_until_complete(connect())
13797db96d56Sopenharmony_ci
13807db96d56Sopenharmony_ci        os.write(wpipe, b'1')
13817db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
13827db96d56Sopenharmony_ci        self.assertEqual(1, proto.nbytes)
13837db96d56Sopenharmony_ci
13847db96d56Sopenharmony_ci        os.write(wpipe, b'2345')
13857db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
13867db96d56Sopenharmony_ci        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
13877db96d56Sopenharmony_ci        self.assertEqual(5, proto.nbytes)
13887db96d56Sopenharmony_ci
13897db96d56Sopenharmony_ci        os.close(wpipe)
13907db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
13917db96d56Sopenharmony_ci        self.assertEqual(
13927db96d56Sopenharmony_ci            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
13937db96d56Sopenharmony_ci        # extra info is available
13947db96d56Sopenharmony_ci        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
13957db96d56Sopenharmony_ci
13967db96d56Sopenharmony_ci    @unittest.skipUnless(sys.platform != 'win32',
13977db96d56Sopenharmony_ci                         "Don't support pipes for Windows")
13987db96d56Sopenharmony_ci    def test_unclosed_pipe_transport(self):
13997db96d56Sopenharmony_ci        # This test reproduces the issue #314 on GitHub
14007db96d56Sopenharmony_ci        loop = self.create_event_loop()
14017db96d56Sopenharmony_ci        read_proto = MyReadPipeProto(loop=loop)
14027db96d56Sopenharmony_ci        write_proto = MyWritePipeProto(loop=loop)
14037db96d56Sopenharmony_ci
14047db96d56Sopenharmony_ci        rpipe, wpipe = os.pipe()
14057db96d56Sopenharmony_ci        rpipeobj = io.open(rpipe, 'rb', 1024)
14067db96d56Sopenharmony_ci        wpipeobj = io.open(wpipe, 'w', 1024, encoding="utf-8")
14077db96d56Sopenharmony_ci
14087db96d56Sopenharmony_ci        async def connect():
14097db96d56Sopenharmony_ci            read_transport, _ = await loop.connect_read_pipe(
14107db96d56Sopenharmony_ci                lambda: read_proto, rpipeobj)
14117db96d56Sopenharmony_ci            write_transport, _ = await loop.connect_write_pipe(
14127db96d56Sopenharmony_ci                lambda: write_proto, wpipeobj)
14137db96d56Sopenharmony_ci            return read_transport, write_transport
14147db96d56Sopenharmony_ci
14157db96d56Sopenharmony_ci        # Run and close the loop without closing the transports
14167db96d56Sopenharmony_ci        read_transport, write_transport = loop.run_until_complete(connect())
14177db96d56Sopenharmony_ci        loop.close()
14187db96d56Sopenharmony_ci
14197db96d56Sopenharmony_ci        # These 'repr' calls used to raise an AttributeError
14207db96d56Sopenharmony_ci        # See Issue #314 on GitHub
14217db96d56Sopenharmony_ci        self.assertIn('open', repr(read_transport))
14227db96d56Sopenharmony_ci        self.assertIn('open', repr(write_transport))
14237db96d56Sopenharmony_ci
14247db96d56Sopenharmony_ci        # Clean up (avoid ResourceWarning)
14257db96d56Sopenharmony_ci        rpipeobj.close()
14267db96d56Sopenharmony_ci        wpipeobj.close()
14277db96d56Sopenharmony_ci        read_transport._pipe = None
14287db96d56Sopenharmony_ci        write_transport._pipe = None
14297db96d56Sopenharmony_ci
14307db96d56Sopenharmony_ci    @unittest.skipUnless(sys.platform != 'win32',
14317db96d56Sopenharmony_ci                         "Don't support pipes for Windows")
14327db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
14337db96d56Sopenharmony_ci    def test_read_pty_output(self):
14347db96d56Sopenharmony_ci        proto = MyReadPipeProto(loop=self.loop)
14357db96d56Sopenharmony_ci
14367db96d56Sopenharmony_ci        master, slave = os.openpty()
14377db96d56Sopenharmony_ci        master_read_obj = io.open(master, 'rb', 0)
14387db96d56Sopenharmony_ci
14397db96d56Sopenharmony_ci        async def connect():
14407db96d56Sopenharmony_ci            t, p = await self.loop.connect_read_pipe(lambda: proto,
14417db96d56Sopenharmony_ci                                                     master_read_obj)
14427db96d56Sopenharmony_ci            self.assertIs(p, proto)
14437db96d56Sopenharmony_ci            self.assertIs(t, proto.transport)
14447db96d56Sopenharmony_ci            self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
14457db96d56Sopenharmony_ci            self.assertEqual(0, proto.nbytes)
14467db96d56Sopenharmony_ci
14477db96d56Sopenharmony_ci        self.loop.run_until_complete(connect())
14487db96d56Sopenharmony_ci
14497db96d56Sopenharmony_ci        os.write(slave, b'1')
14507db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes)
14517db96d56Sopenharmony_ci        self.assertEqual(1, proto.nbytes)
14527db96d56Sopenharmony_ci
14537db96d56Sopenharmony_ci        os.write(slave, b'2345')
14547db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
14557db96d56Sopenharmony_ci        self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
14567db96d56Sopenharmony_ci        self.assertEqual(5, proto.nbytes)
14577db96d56Sopenharmony_ci
14587db96d56Sopenharmony_ci        os.close(slave)
14597db96d56Sopenharmony_ci        proto.transport.close()
14607db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
14617db96d56Sopenharmony_ci        self.assertEqual(
14627db96d56Sopenharmony_ci            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], proto.state)
14637db96d56Sopenharmony_ci        # extra info is available
14647db96d56Sopenharmony_ci        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
14657db96d56Sopenharmony_ci
14667db96d56Sopenharmony_ci    @unittest.skipUnless(sys.platform != 'win32',
14677db96d56Sopenharmony_ci                         "Don't support pipes for Windows")
14687db96d56Sopenharmony_ci    def test_write_pipe(self):
14697db96d56Sopenharmony_ci        rpipe, wpipe = os.pipe()
14707db96d56Sopenharmony_ci        pipeobj = io.open(wpipe, 'wb', 1024)
14717db96d56Sopenharmony_ci
14727db96d56Sopenharmony_ci        proto = MyWritePipeProto(loop=self.loop)
14737db96d56Sopenharmony_ci        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
14747db96d56Sopenharmony_ci        transport, p = self.loop.run_until_complete(connect)
14757db96d56Sopenharmony_ci        self.assertIs(p, proto)
14767db96d56Sopenharmony_ci        self.assertIs(transport, proto.transport)
14777db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
14787db96d56Sopenharmony_ci
14797db96d56Sopenharmony_ci        transport.write(b'1')
14807db96d56Sopenharmony_ci
14817db96d56Sopenharmony_ci        data = bytearray()
14827db96d56Sopenharmony_ci        def reader(data):
14837db96d56Sopenharmony_ci            chunk = os.read(rpipe, 1024)
14847db96d56Sopenharmony_ci            data += chunk
14857db96d56Sopenharmony_ci            return len(data)
14867db96d56Sopenharmony_ci
14877db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: reader(data) >= 1)
14887db96d56Sopenharmony_ci        self.assertEqual(b'1', data)
14897db96d56Sopenharmony_ci
14907db96d56Sopenharmony_ci        transport.write(b'2345')
14917db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: reader(data) >= 5)
14927db96d56Sopenharmony_ci        self.assertEqual(b'12345', data)
14937db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
14947db96d56Sopenharmony_ci
14957db96d56Sopenharmony_ci        os.close(rpipe)
14967db96d56Sopenharmony_ci
14977db96d56Sopenharmony_ci        # extra info is available
14987db96d56Sopenharmony_ci        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
14997db96d56Sopenharmony_ci
15007db96d56Sopenharmony_ci        # close connection
15017db96d56Sopenharmony_ci        proto.transport.close()
15027db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
15037db96d56Sopenharmony_ci        self.assertEqual('CLOSED', proto.state)
15047db96d56Sopenharmony_ci
15057db96d56Sopenharmony_ci    @unittest.skipUnless(sys.platform != 'win32',
15067db96d56Sopenharmony_ci                         "Don't support pipes for Windows")
15077db96d56Sopenharmony_ci    def test_write_pipe_disconnect_on_close(self):
15087db96d56Sopenharmony_ci        rsock, wsock = socket.socketpair()
15097db96d56Sopenharmony_ci        rsock.setblocking(False)
15107db96d56Sopenharmony_ci        pipeobj = io.open(wsock.detach(), 'wb', 1024)
15117db96d56Sopenharmony_ci
15127db96d56Sopenharmony_ci        proto = MyWritePipeProto(loop=self.loop)
15137db96d56Sopenharmony_ci        connect = self.loop.connect_write_pipe(lambda: proto, pipeobj)
15147db96d56Sopenharmony_ci        transport, p = self.loop.run_until_complete(connect)
15157db96d56Sopenharmony_ci        self.assertIs(p, proto)
15167db96d56Sopenharmony_ci        self.assertIs(transport, proto.transport)
15177db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
15187db96d56Sopenharmony_ci
15197db96d56Sopenharmony_ci        transport.write(b'1')
15207db96d56Sopenharmony_ci        data = self.loop.run_until_complete(self.loop.sock_recv(rsock, 1024))
15217db96d56Sopenharmony_ci        self.assertEqual(b'1', data)
15227db96d56Sopenharmony_ci
15237db96d56Sopenharmony_ci        rsock.close()
15247db96d56Sopenharmony_ci
15257db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
15267db96d56Sopenharmony_ci        self.assertEqual('CLOSED', proto.state)
15277db96d56Sopenharmony_ci
15287db96d56Sopenharmony_ci    @unittest.skipUnless(sys.platform != 'win32',
15297db96d56Sopenharmony_ci                         "Don't support pipes for Windows")
15307db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
15317db96d56Sopenharmony_ci    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
15327db96d56Sopenharmony_ci    # older than 10.6 (Snow Leopard)
15337db96d56Sopenharmony_ci    @support.requires_mac_ver(10, 6)
15347db96d56Sopenharmony_ci    def test_write_pty(self):
15357db96d56Sopenharmony_ci        master, slave = os.openpty()
15367db96d56Sopenharmony_ci        slave_write_obj = io.open(slave, 'wb', 0)
15377db96d56Sopenharmony_ci
15387db96d56Sopenharmony_ci        proto = MyWritePipeProto(loop=self.loop)
15397db96d56Sopenharmony_ci        connect = self.loop.connect_write_pipe(lambda: proto, slave_write_obj)
15407db96d56Sopenharmony_ci        transport, p = self.loop.run_until_complete(connect)
15417db96d56Sopenharmony_ci        self.assertIs(p, proto)
15427db96d56Sopenharmony_ci        self.assertIs(transport, proto.transport)
15437db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
15447db96d56Sopenharmony_ci
15457db96d56Sopenharmony_ci        transport.write(b'1')
15467db96d56Sopenharmony_ci
15477db96d56Sopenharmony_ci        data = bytearray()
15487db96d56Sopenharmony_ci        def reader(data):
15497db96d56Sopenharmony_ci            chunk = os.read(master, 1024)
15507db96d56Sopenharmony_ci            data += chunk
15517db96d56Sopenharmony_ci            return len(data)
15527db96d56Sopenharmony_ci
15537db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
15547db96d56Sopenharmony_ci                             timeout=support.SHORT_TIMEOUT)
15557db96d56Sopenharmony_ci        self.assertEqual(b'1', data)
15567db96d56Sopenharmony_ci
15577db96d56Sopenharmony_ci        transport.write(b'2345')
15587db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
15597db96d56Sopenharmony_ci                             timeout=support.SHORT_TIMEOUT)
15607db96d56Sopenharmony_ci        self.assertEqual(b'12345', data)
15617db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
15627db96d56Sopenharmony_ci
15637db96d56Sopenharmony_ci        os.close(master)
15647db96d56Sopenharmony_ci
15657db96d56Sopenharmony_ci        # extra info is available
15667db96d56Sopenharmony_ci        self.assertIsNotNone(proto.transport.get_extra_info('pipe'))
15677db96d56Sopenharmony_ci
15687db96d56Sopenharmony_ci        # close connection
15697db96d56Sopenharmony_ci        proto.transport.close()
15707db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.done)
15717db96d56Sopenharmony_ci        self.assertEqual('CLOSED', proto.state)
15727db96d56Sopenharmony_ci
15737db96d56Sopenharmony_ci    @unittest.skipUnless(sys.platform != 'win32',
15747db96d56Sopenharmony_ci                         "Don't support pipes for Windows")
15757db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(os, 'openpty'), 'need os.openpty()')
15767db96d56Sopenharmony_ci    # select, poll and kqueue don't support character devices (PTY) on Mac OS X
15777db96d56Sopenharmony_ci    # older than 10.6 (Snow Leopard)
15787db96d56Sopenharmony_ci    @support.requires_mac_ver(10, 6)
15797db96d56Sopenharmony_ci    def test_bidirectional_pty(self):
15807db96d56Sopenharmony_ci        master, read_slave = os.openpty()
15817db96d56Sopenharmony_ci        write_slave = os.dup(read_slave)
15827db96d56Sopenharmony_ci        tty.setraw(read_slave)
15837db96d56Sopenharmony_ci
15847db96d56Sopenharmony_ci        slave_read_obj = io.open(read_slave, 'rb', 0)
15857db96d56Sopenharmony_ci        read_proto = MyReadPipeProto(loop=self.loop)
15867db96d56Sopenharmony_ci        read_connect = self.loop.connect_read_pipe(lambda: read_proto,
15877db96d56Sopenharmony_ci                                                   slave_read_obj)
15887db96d56Sopenharmony_ci        read_transport, p = self.loop.run_until_complete(read_connect)
15897db96d56Sopenharmony_ci        self.assertIs(p, read_proto)
15907db96d56Sopenharmony_ci        self.assertIs(read_transport, read_proto.transport)
15917db96d56Sopenharmony_ci        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
15927db96d56Sopenharmony_ci        self.assertEqual(0, read_proto.nbytes)
15937db96d56Sopenharmony_ci
15947db96d56Sopenharmony_ci
15957db96d56Sopenharmony_ci        slave_write_obj = io.open(write_slave, 'wb', 0)
15967db96d56Sopenharmony_ci        write_proto = MyWritePipeProto(loop=self.loop)
15977db96d56Sopenharmony_ci        write_connect = self.loop.connect_write_pipe(lambda: write_proto,
15987db96d56Sopenharmony_ci                                                     slave_write_obj)
15997db96d56Sopenharmony_ci        write_transport, p = self.loop.run_until_complete(write_connect)
16007db96d56Sopenharmony_ci        self.assertIs(p, write_proto)
16017db96d56Sopenharmony_ci        self.assertIs(write_transport, write_proto.transport)
16027db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', write_proto.state)
16037db96d56Sopenharmony_ci
16047db96d56Sopenharmony_ci        data = bytearray()
16057db96d56Sopenharmony_ci        def reader(data):
16067db96d56Sopenharmony_ci            chunk = os.read(master, 1024)
16077db96d56Sopenharmony_ci            data += chunk
16087db96d56Sopenharmony_ci            return len(data)
16097db96d56Sopenharmony_ci
16107db96d56Sopenharmony_ci        write_transport.write(b'1')
16117db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: reader(data) >= 1,
16127db96d56Sopenharmony_ci                             timeout=support.SHORT_TIMEOUT)
16137db96d56Sopenharmony_ci        self.assertEqual(b'1', data)
16147db96d56Sopenharmony_ci        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
16157db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', write_proto.state)
16167db96d56Sopenharmony_ci
16177db96d56Sopenharmony_ci        os.write(master, b'a')
16187db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 1,
16197db96d56Sopenharmony_ci                             timeout=support.SHORT_TIMEOUT)
16207db96d56Sopenharmony_ci        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
16217db96d56Sopenharmony_ci        self.assertEqual(1, read_proto.nbytes)
16227db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', write_proto.state)
16237db96d56Sopenharmony_ci
16247db96d56Sopenharmony_ci        write_transport.write(b'2345')
16257db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: reader(data) >= 5,
16267db96d56Sopenharmony_ci                             timeout=support.SHORT_TIMEOUT)
16277db96d56Sopenharmony_ci        self.assertEqual(b'12345', data)
16287db96d56Sopenharmony_ci        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
16297db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', write_proto.state)
16307db96d56Sopenharmony_ci
16317db96d56Sopenharmony_ci        os.write(master, b'bcde')
16327db96d56Sopenharmony_ci        test_utils.run_until(self.loop, lambda: read_proto.nbytes >= 5,
16337db96d56Sopenharmony_ci                             timeout=support.SHORT_TIMEOUT)
16347db96d56Sopenharmony_ci        self.assertEqual(['INITIAL', 'CONNECTED'], read_proto.state)
16357db96d56Sopenharmony_ci        self.assertEqual(5, read_proto.nbytes)
16367db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', write_proto.state)
16377db96d56Sopenharmony_ci
16387db96d56Sopenharmony_ci        os.close(master)
16397db96d56Sopenharmony_ci
16407db96d56Sopenharmony_ci        read_transport.close()
16417db96d56Sopenharmony_ci        self.loop.run_until_complete(read_proto.done)
16427db96d56Sopenharmony_ci        self.assertEqual(
16437db96d56Sopenharmony_ci            ['INITIAL', 'CONNECTED', 'EOF', 'CLOSED'], read_proto.state)
16447db96d56Sopenharmony_ci
16457db96d56Sopenharmony_ci        write_transport.close()
16467db96d56Sopenharmony_ci        self.loop.run_until_complete(write_proto.done)
16477db96d56Sopenharmony_ci        self.assertEqual('CLOSED', write_proto.state)
16487db96d56Sopenharmony_ci
16497db96d56Sopenharmony_ci    def test_prompt_cancellation(self):
16507db96d56Sopenharmony_ci        r, w = socket.socketpair()
16517db96d56Sopenharmony_ci        r.setblocking(False)
16527db96d56Sopenharmony_ci        f = self.loop.create_task(self.loop.sock_recv(r, 1))
16537db96d56Sopenharmony_ci        ov = getattr(f, 'ov', None)
16547db96d56Sopenharmony_ci        if ov is not None:
16557db96d56Sopenharmony_ci            self.assertTrue(ov.pending)
16567db96d56Sopenharmony_ci
16577db96d56Sopenharmony_ci        async def main():
16587db96d56Sopenharmony_ci            try:
16597db96d56Sopenharmony_ci                self.loop.call_soon(f.cancel)
16607db96d56Sopenharmony_ci                await f
16617db96d56Sopenharmony_ci            except asyncio.CancelledError:
16627db96d56Sopenharmony_ci                res = 'cancelled'
16637db96d56Sopenharmony_ci            else:
16647db96d56Sopenharmony_ci                res = None
16657db96d56Sopenharmony_ci            finally:
16667db96d56Sopenharmony_ci                self.loop.stop()
16677db96d56Sopenharmony_ci            return res
16687db96d56Sopenharmony_ci
16697db96d56Sopenharmony_ci        start = time.monotonic()
16707db96d56Sopenharmony_ci        t = self.loop.create_task(main())
16717db96d56Sopenharmony_ci        self.loop.run_forever()
16727db96d56Sopenharmony_ci        elapsed = time.monotonic() - start
16737db96d56Sopenharmony_ci
16747db96d56Sopenharmony_ci        self.assertLess(elapsed, 0.1)
16757db96d56Sopenharmony_ci        self.assertEqual(t.result(), 'cancelled')
16767db96d56Sopenharmony_ci        self.assertRaises(asyncio.CancelledError, f.result)
16777db96d56Sopenharmony_ci        if ov is not None:
16787db96d56Sopenharmony_ci            self.assertFalse(ov.pending)
16797db96d56Sopenharmony_ci        self.loop._stop_serving(r)
16807db96d56Sopenharmony_ci
16817db96d56Sopenharmony_ci        r.close()
16827db96d56Sopenharmony_ci        w.close()
16837db96d56Sopenharmony_ci
16847db96d56Sopenharmony_ci    def test_timeout_rounding(self):
16857db96d56Sopenharmony_ci        def _run_once():
16867db96d56Sopenharmony_ci            self.loop._run_once_counter += 1
16877db96d56Sopenharmony_ci            orig_run_once()
16887db96d56Sopenharmony_ci
16897db96d56Sopenharmony_ci        orig_run_once = self.loop._run_once
16907db96d56Sopenharmony_ci        self.loop._run_once_counter = 0
16917db96d56Sopenharmony_ci        self.loop._run_once = _run_once
16927db96d56Sopenharmony_ci
16937db96d56Sopenharmony_ci        async def wait():
16947db96d56Sopenharmony_ci            loop = self.loop
16957db96d56Sopenharmony_ci            await asyncio.sleep(1e-2)
16967db96d56Sopenharmony_ci            await asyncio.sleep(1e-4)
16977db96d56Sopenharmony_ci            await asyncio.sleep(1e-6)
16987db96d56Sopenharmony_ci            await asyncio.sleep(1e-8)
16997db96d56Sopenharmony_ci            await asyncio.sleep(1e-10)
17007db96d56Sopenharmony_ci
17017db96d56Sopenharmony_ci        self.loop.run_until_complete(wait())
17027db96d56Sopenharmony_ci        # The ideal number of call is 12, but on some platforms, the selector
17037db96d56Sopenharmony_ci        # may sleep at little bit less than timeout depending on the resolution
17047db96d56Sopenharmony_ci        # of the clock used by the kernel. Tolerate a few useless calls on
17057db96d56Sopenharmony_ci        # these platforms.
17067db96d56Sopenharmony_ci        self.assertLessEqual(self.loop._run_once_counter, 20,
17077db96d56Sopenharmony_ci            {'clock_resolution': self.loop._clock_resolution,
17087db96d56Sopenharmony_ci             'selector': self.loop._selector.__class__.__name__})
17097db96d56Sopenharmony_ci
17107db96d56Sopenharmony_ci    def test_remove_fds_after_closing(self):
17117db96d56Sopenharmony_ci        loop = self.create_event_loop()
17127db96d56Sopenharmony_ci        callback = lambda: None
17137db96d56Sopenharmony_ci        r, w = socket.socketpair()
17147db96d56Sopenharmony_ci        self.addCleanup(r.close)
17157db96d56Sopenharmony_ci        self.addCleanup(w.close)
17167db96d56Sopenharmony_ci        loop.add_reader(r, callback)
17177db96d56Sopenharmony_ci        loop.add_writer(w, callback)
17187db96d56Sopenharmony_ci        loop.close()
17197db96d56Sopenharmony_ci        self.assertFalse(loop.remove_reader(r))
17207db96d56Sopenharmony_ci        self.assertFalse(loop.remove_writer(w))
17217db96d56Sopenharmony_ci
17227db96d56Sopenharmony_ci    def test_add_fds_after_closing(self):
17237db96d56Sopenharmony_ci        loop = self.create_event_loop()
17247db96d56Sopenharmony_ci        callback = lambda: None
17257db96d56Sopenharmony_ci        r, w = socket.socketpair()
17267db96d56Sopenharmony_ci        self.addCleanup(r.close)
17277db96d56Sopenharmony_ci        self.addCleanup(w.close)
17287db96d56Sopenharmony_ci        loop.close()
17297db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17307db96d56Sopenharmony_ci            loop.add_reader(r, callback)
17317db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17327db96d56Sopenharmony_ci            loop.add_writer(w, callback)
17337db96d56Sopenharmony_ci
17347db96d56Sopenharmony_ci    def test_close_running_event_loop(self):
17357db96d56Sopenharmony_ci        async def close_loop(loop):
17367db96d56Sopenharmony_ci            self.loop.close()
17377db96d56Sopenharmony_ci
17387db96d56Sopenharmony_ci        coro = close_loop(self.loop)
17397db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17407db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
17417db96d56Sopenharmony_ci
17427db96d56Sopenharmony_ci    def test_close(self):
17437db96d56Sopenharmony_ci        self.loop.close()
17447db96d56Sopenharmony_ci
17457db96d56Sopenharmony_ci        async def test():
17467db96d56Sopenharmony_ci            pass
17477db96d56Sopenharmony_ci
17487db96d56Sopenharmony_ci        func = lambda: False
17497db96d56Sopenharmony_ci        coro = test()
17507db96d56Sopenharmony_ci        self.addCleanup(coro.close)
17517db96d56Sopenharmony_ci
17527db96d56Sopenharmony_ci        # operation blocked when the loop is closed
17537db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17547db96d56Sopenharmony_ci            self.loop.run_forever()
17557db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17567db96d56Sopenharmony_ci            fut = self.loop.create_future()
17577db96d56Sopenharmony_ci            self.loop.run_until_complete(fut)
17587db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17597db96d56Sopenharmony_ci            self.loop.call_soon(func)
17607db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17617db96d56Sopenharmony_ci            self.loop.call_soon_threadsafe(func)
17627db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17637db96d56Sopenharmony_ci            self.loop.call_later(1.0, func)
17647db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17657db96d56Sopenharmony_ci            self.loop.call_at(self.loop.time() + .0, func)
17667db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17677db96d56Sopenharmony_ci            self.loop.create_task(coro)
17687db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17697db96d56Sopenharmony_ci            self.loop.add_signal_handler(signal.SIGTERM, func)
17707db96d56Sopenharmony_ci
17717db96d56Sopenharmony_ci        # run_in_executor test is tricky: the method is a coroutine,
17727db96d56Sopenharmony_ci        # but run_until_complete cannot be called on closed loop.
17737db96d56Sopenharmony_ci        # Thus iterate once explicitly.
17747db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
17757db96d56Sopenharmony_ci            it = self.loop.run_in_executor(None, func).__await__()
17767db96d56Sopenharmony_ci            next(it)
17777db96d56Sopenharmony_ci
17787db96d56Sopenharmony_ci
17797db96d56Sopenharmony_ciclass SubprocessTestsMixin:
17807db96d56Sopenharmony_ci
17817db96d56Sopenharmony_ci    def check_terminated(self, returncode):
17827db96d56Sopenharmony_ci        if sys.platform == 'win32':
17837db96d56Sopenharmony_ci            self.assertIsInstance(returncode, int)
17847db96d56Sopenharmony_ci            # expect 1 but sometimes get 0
17857db96d56Sopenharmony_ci        else:
17867db96d56Sopenharmony_ci            self.assertEqual(-signal.SIGTERM, returncode)
17877db96d56Sopenharmony_ci
17887db96d56Sopenharmony_ci    def check_killed(self, returncode):
17897db96d56Sopenharmony_ci        if sys.platform == 'win32':
17907db96d56Sopenharmony_ci            self.assertIsInstance(returncode, int)
17917db96d56Sopenharmony_ci            # expect 1 but sometimes get 0
17927db96d56Sopenharmony_ci        else:
17937db96d56Sopenharmony_ci            self.assertEqual(-signal.SIGKILL, returncode)
17947db96d56Sopenharmony_ci
17957db96d56Sopenharmony_ci    def test_subprocess_exec(self):
17967db96d56Sopenharmony_ci        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
17977db96d56Sopenharmony_ci
17987db96d56Sopenharmony_ci        connect = self.loop.subprocess_exec(
17997db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
18007db96d56Sopenharmony_ci                        sys.executable, prog)
18017db96d56Sopenharmony_ci
18027db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
18037db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
18047db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
18057db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
18067db96d56Sopenharmony_ci
18077db96d56Sopenharmony_ci        stdin = transp.get_pipe_transport(0)
18087db96d56Sopenharmony_ci        stdin.write(b'Python The Winner')
18097db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.got_data[1].wait())
18107db96d56Sopenharmony_ci        with test_utils.disable_logger():
18117db96d56Sopenharmony_ci            transp.close()
18127db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
18137db96d56Sopenharmony_ci        self.check_killed(proto.returncode)
18147db96d56Sopenharmony_ci        self.assertEqual(b'Python The Winner', proto.data[1])
18157db96d56Sopenharmony_ci
18167db96d56Sopenharmony_ci    def test_subprocess_interactive(self):
18177db96d56Sopenharmony_ci        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
18187db96d56Sopenharmony_ci
18197db96d56Sopenharmony_ci        connect = self.loop.subprocess_exec(
18207db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
18217db96d56Sopenharmony_ci                        sys.executable, prog)
18227db96d56Sopenharmony_ci
18237db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
18247db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
18257db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
18267db96d56Sopenharmony_ci        self.assertEqual('CONNECTED', proto.state)
18277db96d56Sopenharmony_ci
18287db96d56Sopenharmony_ci        stdin = transp.get_pipe_transport(0)
18297db96d56Sopenharmony_ci        stdin.write(b'Python ')
18307db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.got_data[1].wait())
18317db96d56Sopenharmony_ci        proto.got_data[1].clear()
18327db96d56Sopenharmony_ci        self.assertEqual(b'Python ', proto.data[1])
18337db96d56Sopenharmony_ci
18347db96d56Sopenharmony_ci        stdin.write(b'The Winner')
18357db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.got_data[1].wait())
18367db96d56Sopenharmony_ci        self.assertEqual(b'Python The Winner', proto.data[1])
18377db96d56Sopenharmony_ci
18387db96d56Sopenharmony_ci        with test_utils.disable_logger():
18397db96d56Sopenharmony_ci            transp.close()
18407db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
18417db96d56Sopenharmony_ci        self.check_killed(proto.returncode)
18427db96d56Sopenharmony_ci
18437db96d56Sopenharmony_ci    def test_subprocess_shell(self):
18447db96d56Sopenharmony_ci        connect = self.loop.subprocess_shell(
18457db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
18467db96d56Sopenharmony_ci                        'echo Python')
18477db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
18487db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
18497db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
18507db96d56Sopenharmony_ci
18517db96d56Sopenharmony_ci        transp.get_pipe_transport(0).close()
18527db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
18537db96d56Sopenharmony_ci        self.assertEqual(0, proto.returncode)
18547db96d56Sopenharmony_ci        self.assertTrue(all(f.done() for f in proto.disconnects.values()))
18557db96d56Sopenharmony_ci        self.assertEqual(proto.data[1].rstrip(b'\r\n'), b'Python')
18567db96d56Sopenharmony_ci        self.assertEqual(proto.data[2], b'')
18577db96d56Sopenharmony_ci        transp.close()
18587db96d56Sopenharmony_ci
18597db96d56Sopenharmony_ci    def test_subprocess_exitcode(self):
18607db96d56Sopenharmony_ci        connect = self.loop.subprocess_shell(
18617db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
18627db96d56Sopenharmony_ci                        'exit 7', stdin=None, stdout=None, stderr=None)
18637db96d56Sopenharmony_ci
18647db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
18657db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
18667db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
18677db96d56Sopenharmony_ci        self.assertEqual(7, proto.returncode)
18687db96d56Sopenharmony_ci        transp.close()
18697db96d56Sopenharmony_ci
18707db96d56Sopenharmony_ci    def test_subprocess_close_after_finish(self):
18717db96d56Sopenharmony_ci        connect = self.loop.subprocess_shell(
18727db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
18737db96d56Sopenharmony_ci                        'exit 7', stdin=None, stdout=None, stderr=None)
18747db96d56Sopenharmony_ci
18757db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
18767db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
18777db96d56Sopenharmony_ci        self.assertIsNone(transp.get_pipe_transport(0))
18787db96d56Sopenharmony_ci        self.assertIsNone(transp.get_pipe_transport(1))
18797db96d56Sopenharmony_ci        self.assertIsNone(transp.get_pipe_transport(2))
18807db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
18817db96d56Sopenharmony_ci        self.assertEqual(7, proto.returncode)
18827db96d56Sopenharmony_ci        self.assertIsNone(transp.close())
18837db96d56Sopenharmony_ci
18847db96d56Sopenharmony_ci    def test_subprocess_kill(self):
18857db96d56Sopenharmony_ci        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
18867db96d56Sopenharmony_ci
18877db96d56Sopenharmony_ci        connect = self.loop.subprocess_exec(
18887db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
18897db96d56Sopenharmony_ci                        sys.executable, prog)
18907db96d56Sopenharmony_ci
18917db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
18927db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
18937db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
18947db96d56Sopenharmony_ci
18957db96d56Sopenharmony_ci        transp.kill()
18967db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
18977db96d56Sopenharmony_ci        self.check_killed(proto.returncode)
18987db96d56Sopenharmony_ci        transp.close()
18997db96d56Sopenharmony_ci
19007db96d56Sopenharmony_ci    def test_subprocess_terminate(self):
19017db96d56Sopenharmony_ci        prog = os.path.join(os.path.dirname(__file__), 'echo.py')
19027db96d56Sopenharmony_ci
19037db96d56Sopenharmony_ci        connect = self.loop.subprocess_exec(
19047db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
19057db96d56Sopenharmony_ci                        sys.executable, prog)
19067db96d56Sopenharmony_ci
19077db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
19087db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
19097db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
19107db96d56Sopenharmony_ci
19117db96d56Sopenharmony_ci        transp.terminate()
19127db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
19137db96d56Sopenharmony_ci        self.check_terminated(proto.returncode)
19147db96d56Sopenharmony_ci        transp.close()
19157db96d56Sopenharmony_ci
19167db96d56Sopenharmony_ci    @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
19177db96d56Sopenharmony_ci    def test_subprocess_send_signal(self):
19187db96d56Sopenharmony_ci        # bpo-31034: Make sure that we get the default signal handler (killing
19197db96d56Sopenharmony_ci        # the process). The parent process may have decided to ignore SIGHUP,
19207db96d56Sopenharmony_ci        # and signal handlers are inherited.
19217db96d56Sopenharmony_ci        old_handler = signal.signal(signal.SIGHUP, signal.SIG_DFL)
19227db96d56Sopenharmony_ci        try:
19237db96d56Sopenharmony_ci            prog = os.path.join(os.path.dirname(__file__), 'echo.py')
19247db96d56Sopenharmony_ci
19257db96d56Sopenharmony_ci            connect = self.loop.subprocess_exec(
19267db96d56Sopenharmony_ci                            functools.partial(MySubprocessProtocol, self.loop),
19277db96d56Sopenharmony_ci                            sys.executable, prog)
19287db96d56Sopenharmony_ci
19297db96d56Sopenharmony_ci
19307db96d56Sopenharmony_ci            transp, proto = self.loop.run_until_complete(connect)
19317db96d56Sopenharmony_ci            self.assertIsInstance(proto, MySubprocessProtocol)
19327db96d56Sopenharmony_ci            self.loop.run_until_complete(proto.connected)
19337db96d56Sopenharmony_ci
19347db96d56Sopenharmony_ci            transp.send_signal(signal.SIGHUP)
19357db96d56Sopenharmony_ci            self.loop.run_until_complete(proto.completed)
19367db96d56Sopenharmony_ci            self.assertEqual(-signal.SIGHUP, proto.returncode)
19377db96d56Sopenharmony_ci            transp.close()
19387db96d56Sopenharmony_ci        finally:
19397db96d56Sopenharmony_ci            signal.signal(signal.SIGHUP, old_handler)
19407db96d56Sopenharmony_ci
19417db96d56Sopenharmony_ci    def test_subprocess_stderr(self):
19427db96d56Sopenharmony_ci        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
19437db96d56Sopenharmony_ci
19447db96d56Sopenharmony_ci        connect = self.loop.subprocess_exec(
19457db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
19467db96d56Sopenharmony_ci                        sys.executable, prog)
19477db96d56Sopenharmony_ci
19487db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
19497db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
19507db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
19517db96d56Sopenharmony_ci
19527db96d56Sopenharmony_ci        stdin = transp.get_pipe_transport(0)
19537db96d56Sopenharmony_ci        stdin.write(b'test')
19547db96d56Sopenharmony_ci
19557db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
19567db96d56Sopenharmony_ci
19577db96d56Sopenharmony_ci        transp.close()
19587db96d56Sopenharmony_ci        self.assertEqual(b'OUT:test', proto.data[1])
19597db96d56Sopenharmony_ci        self.assertTrue(proto.data[2].startswith(b'ERR:test'), proto.data[2])
19607db96d56Sopenharmony_ci        self.assertEqual(0, proto.returncode)
19617db96d56Sopenharmony_ci
19627db96d56Sopenharmony_ci    def test_subprocess_stderr_redirect_to_stdout(self):
19637db96d56Sopenharmony_ci        prog = os.path.join(os.path.dirname(__file__), 'echo2.py')
19647db96d56Sopenharmony_ci
19657db96d56Sopenharmony_ci        connect = self.loop.subprocess_exec(
19667db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
19677db96d56Sopenharmony_ci                        sys.executable, prog, stderr=subprocess.STDOUT)
19687db96d56Sopenharmony_ci
19697db96d56Sopenharmony_ci
19707db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
19717db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
19727db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
19737db96d56Sopenharmony_ci
19747db96d56Sopenharmony_ci        stdin = transp.get_pipe_transport(0)
19757db96d56Sopenharmony_ci        self.assertIsNotNone(transp.get_pipe_transport(1))
19767db96d56Sopenharmony_ci        self.assertIsNone(transp.get_pipe_transport(2))
19777db96d56Sopenharmony_ci
19787db96d56Sopenharmony_ci        stdin.write(b'test')
19797db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
19807db96d56Sopenharmony_ci        self.assertTrue(proto.data[1].startswith(b'OUT:testERR:test'),
19817db96d56Sopenharmony_ci                        proto.data[1])
19827db96d56Sopenharmony_ci        self.assertEqual(b'', proto.data[2])
19837db96d56Sopenharmony_ci
19847db96d56Sopenharmony_ci        transp.close()
19857db96d56Sopenharmony_ci        self.assertEqual(0, proto.returncode)
19867db96d56Sopenharmony_ci
19877db96d56Sopenharmony_ci    def test_subprocess_close_client_stream(self):
19887db96d56Sopenharmony_ci        prog = os.path.join(os.path.dirname(__file__), 'echo3.py')
19897db96d56Sopenharmony_ci
19907db96d56Sopenharmony_ci        connect = self.loop.subprocess_exec(
19917db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
19927db96d56Sopenharmony_ci                        sys.executable, prog)
19937db96d56Sopenharmony_ci
19947db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
19957db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
19967db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.connected)
19977db96d56Sopenharmony_ci
19987db96d56Sopenharmony_ci        stdin = transp.get_pipe_transport(0)
19997db96d56Sopenharmony_ci        stdout = transp.get_pipe_transport(1)
20007db96d56Sopenharmony_ci        stdin.write(b'test')
20017db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.got_data[1].wait())
20027db96d56Sopenharmony_ci        self.assertEqual(b'OUT:test', proto.data[1])
20037db96d56Sopenharmony_ci
20047db96d56Sopenharmony_ci        stdout.close()
20057db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.disconnects[1])
20067db96d56Sopenharmony_ci        stdin.write(b'xxx')
20077db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.got_data[2].wait())
20087db96d56Sopenharmony_ci        if sys.platform != 'win32':
20097db96d56Sopenharmony_ci            self.assertEqual(b'ERR:BrokenPipeError', proto.data[2])
20107db96d56Sopenharmony_ci        else:
20117db96d56Sopenharmony_ci            # After closing the read-end of a pipe, writing to the
20127db96d56Sopenharmony_ci            # write-end using os.write() fails with errno==EINVAL and
20137db96d56Sopenharmony_ci            # GetLastError()==ERROR_INVALID_NAME on Windows!?!  (Using
20147db96d56Sopenharmony_ci            # WriteFile() we get ERROR_BROKEN_PIPE as expected.)
20157db96d56Sopenharmony_ci            self.assertEqual(b'ERR:OSError', proto.data[2])
20167db96d56Sopenharmony_ci        with test_utils.disable_logger():
20177db96d56Sopenharmony_ci            transp.close()
20187db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
20197db96d56Sopenharmony_ci        self.check_killed(proto.returncode)
20207db96d56Sopenharmony_ci
20217db96d56Sopenharmony_ci    def test_subprocess_wait_no_same_group(self):
20227db96d56Sopenharmony_ci        # start the new process in a new session
20237db96d56Sopenharmony_ci        connect = self.loop.subprocess_shell(
20247db96d56Sopenharmony_ci                        functools.partial(MySubprocessProtocol, self.loop),
20257db96d56Sopenharmony_ci                        'exit 7', stdin=None, stdout=None, stderr=None,
20267db96d56Sopenharmony_ci                        start_new_session=True)
20277db96d56Sopenharmony_ci        transp, proto = self.loop.run_until_complete(connect)
20287db96d56Sopenharmony_ci        self.assertIsInstance(proto, MySubprocessProtocol)
20297db96d56Sopenharmony_ci        self.loop.run_until_complete(proto.completed)
20307db96d56Sopenharmony_ci        self.assertEqual(7, proto.returncode)
20317db96d56Sopenharmony_ci        transp.close()
20327db96d56Sopenharmony_ci
20337db96d56Sopenharmony_ci    def test_subprocess_exec_invalid_args(self):
20347db96d56Sopenharmony_ci        async def connect(**kwds):
20357db96d56Sopenharmony_ci            await self.loop.subprocess_exec(
20367db96d56Sopenharmony_ci                asyncio.SubprocessProtocol,
20377db96d56Sopenharmony_ci                'pwd', **kwds)
20387db96d56Sopenharmony_ci
20397db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
20407db96d56Sopenharmony_ci            self.loop.run_until_complete(connect(universal_newlines=True))
20417db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
20427db96d56Sopenharmony_ci            self.loop.run_until_complete(connect(bufsize=4096))
20437db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
20447db96d56Sopenharmony_ci            self.loop.run_until_complete(connect(shell=True))
20457db96d56Sopenharmony_ci
20467db96d56Sopenharmony_ci    def test_subprocess_shell_invalid_args(self):
20477db96d56Sopenharmony_ci
20487db96d56Sopenharmony_ci        async def connect(cmd=None, **kwds):
20497db96d56Sopenharmony_ci            if not cmd:
20507db96d56Sopenharmony_ci                cmd = 'pwd'
20517db96d56Sopenharmony_ci            await self.loop.subprocess_shell(
20527db96d56Sopenharmony_ci                asyncio.SubprocessProtocol,
20537db96d56Sopenharmony_ci                cmd, **kwds)
20547db96d56Sopenharmony_ci
20557db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
20567db96d56Sopenharmony_ci            self.loop.run_until_complete(connect(['ls', '-l']))
20577db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
20587db96d56Sopenharmony_ci            self.loop.run_until_complete(connect(universal_newlines=True))
20597db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
20607db96d56Sopenharmony_ci            self.loop.run_until_complete(connect(bufsize=4096))
20617db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
20627db96d56Sopenharmony_ci            self.loop.run_until_complete(connect(shell=False))
20637db96d56Sopenharmony_ci
20647db96d56Sopenharmony_ci
20657db96d56Sopenharmony_ciif sys.platform == 'win32':
20667db96d56Sopenharmony_ci
20677db96d56Sopenharmony_ci    class SelectEventLoopTests(EventLoopTestsMixin,
20687db96d56Sopenharmony_ci                               test_utils.TestCase):
20697db96d56Sopenharmony_ci
20707db96d56Sopenharmony_ci        def create_event_loop(self):
20717db96d56Sopenharmony_ci            return asyncio.SelectorEventLoop()
20727db96d56Sopenharmony_ci
20737db96d56Sopenharmony_ci    class ProactorEventLoopTests(EventLoopTestsMixin,
20747db96d56Sopenharmony_ci                                 SubprocessTestsMixin,
20757db96d56Sopenharmony_ci                                 test_utils.TestCase):
20767db96d56Sopenharmony_ci
20777db96d56Sopenharmony_ci        def create_event_loop(self):
20787db96d56Sopenharmony_ci            return asyncio.ProactorEventLoop()
20797db96d56Sopenharmony_ci
20807db96d56Sopenharmony_ci        def test_reader_callback(self):
20817db96d56Sopenharmony_ci            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
20827db96d56Sopenharmony_ci
20837db96d56Sopenharmony_ci        def test_reader_callback_cancel(self):
20847db96d56Sopenharmony_ci            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
20857db96d56Sopenharmony_ci
20867db96d56Sopenharmony_ci        def test_writer_callback(self):
20877db96d56Sopenharmony_ci            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
20887db96d56Sopenharmony_ci
20897db96d56Sopenharmony_ci        def test_writer_callback_cancel(self):
20907db96d56Sopenharmony_ci            raise unittest.SkipTest("IocpEventLoop does not have add_writer()")
20917db96d56Sopenharmony_ci
20927db96d56Sopenharmony_ci        def test_remove_fds_after_closing(self):
20937db96d56Sopenharmony_ci            raise unittest.SkipTest("IocpEventLoop does not have add_reader()")
20947db96d56Sopenharmony_cielse:
20957db96d56Sopenharmony_ci    import selectors
20967db96d56Sopenharmony_ci
20977db96d56Sopenharmony_ci    class UnixEventLoopTestsMixin(EventLoopTestsMixin):
20987db96d56Sopenharmony_ci        def setUp(self):
20997db96d56Sopenharmony_ci            super().setUp()
21007db96d56Sopenharmony_ci            watcher = asyncio.SafeChildWatcher()
21017db96d56Sopenharmony_ci            watcher.attach_loop(self.loop)
21027db96d56Sopenharmony_ci            asyncio.set_child_watcher(watcher)
21037db96d56Sopenharmony_ci
21047db96d56Sopenharmony_ci        def tearDown(self):
21057db96d56Sopenharmony_ci            asyncio.set_child_watcher(None)
21067db96d56Sopenharmony_ci            super().tearDown()
21077db96d56Sopenharmony_ci
21087db96d56Sopenharmony_ci
21097db96d56Sopenharmony_ci    if hasattr(selectors, 'KqueueSelector'):
21107db96d56Sopenharmony_ci        class KqueueEventLoopTests(UnixEventLoopTestsMixin,
21117db96d56Sopenharmony_ci                                   SubprocessTestsMixin,
21127db96d56Sopenharmony_ci                                   test_utils.TestCase):
21137db96d56Sopenharmony_ci
21147db96d56Sopenharmony_ci            def create_event_loop(self):
21157db96d56Sopenharmony_ci                return asyncio.SelectorEventLoop(
21167db96d56Sopenharmony_ci                    selectors.KqueueSelector())
21177db96d56Sopenharmony_ci
21187db96d56Sopenharmony_ci            # kqueue doesn't support character devices (PTY) on Mac OS X older
21197db96d56Sopenharmony_ci            # than 10.9 (Maverick)
21207db96d56Sopenharmony_ci            @support.requires_mac_ver(10, 9)
21217db96d56Sopenharmony_ci            # Issue #20667: KqueueEventLoopTests.test_read_pty_output()
21227db96d56Sopenharmony_ci            # hangs on OpenBSD 5.5
21237db96d56Sopenharmony_ci            @unittest.skipIf(sys.platform.startswith('openbsd'),
21247db96d56Sopenharmony_ci                             'test hangs on OpenBSD')
21257db96d56Sopenharmony_ci            def test_read_pty_output(self):
21267db96d56Sopenharmony_ci                super().test_read_pty_output()
21277db96d56Sopenharmony_ci
21287db96d56Sopenharmony_ci            # kqueue doesn't support character devices (PTY) on Mac OS X older
21297db96d56Sopenharmony_ci            # than 10.9 (Maverick)
21307db96d56Sopenharmony_ci            @support.requires_mac_ver(10, 9)
21317db96d56Sopenharmony_ci            def test_write_pty(self):
21327db96d56Sopenharmony_ci                super().test_write_pty()
21337db96d56Sopenharmony_ci
21347db96d56Sopenharmony_ci    if hasattr(selectors, 'EpollSelector'):
21357db96d56Sopenharmony_ci        class EPollEventLoopTests(UnixEventLoopTestsMixin,
21367db96d56Sopenharmony_ci                                  SubprocessTestsMixin,
21377db96d56Sopenharmony_ci                                  test_utils.TestCase):
21387db96d56Sopenharmony_ci
21397db96d56Sopenharmony_ci            def create_event_loop(self):
21407db96d56Sopenharmony_ci                return asyncio.SelectorEventLoop(selectors.EpollSelector())
21417db96d56Sopenharmony_ci
21427db96d56Sopenharmony_ci    if hasattr(selectors, 'PollSelector'):
21437db96d56Sopenharmony_ci        class PollEventLoopTests(UnixEventLoopTestsMixin,
21447db96d56Sopenharmony_ci                                 SubprocessTestsMixin,
21457db96d56Sopenharmony_ci                                 test_utils.TestCase):
21467db96d56Sopenharmony_ci
21477db96d56Sopenharmony_ci            def create_event_loop(self):
21487db96d56Sopenharmony_ci                return asyncio.SelectorEventLoop(selectors.PollSelector())
21497db96d56Sopenharmony_ci
21507db96d56Sopenharmony_ci    # Should always exist.
21517db96d56Sopenharmony_ci    class SelectEventLoopTests(UnixEventLoopTestsMixin,
21527db96d56Sopenharmony_ci                               SubprocessTestsMixin,
21537db96d56Sopenharmony_ci                               test_utils.TestCase):
21547db96d56Sopenharmony_ci
21557db96d56Sopenharmony_ci        def create_event_loop(self):
21567db96d56Sopenharmony_ci            return asyncio.SelectorEventLoop(selectors.SelectSelector())
21577db96d56Sopenharmony_ci
21587db96d56Sopenharmony_ci
21597db96d56Sopenharmony_cidef noop(*args, **kwargs):
21607db96d56Sopenharmony_ci    pass
21617db96d56Sopenharmony_ci
21627db96d56Sopenharmony_ci
21637db96d56Sopenharmony_ciclass HandleTests(test_utils.TestCase):
21647db96d56Sopenharmony_ci
21657db96d56Sopenharmony_ci    def setUp(self):
21667db96d56Sopenharmony_ci        super().setUp()
21677db96d56Sopenharmony_ci        self.loop = mock.Mock()
21687db96d56Sopenharmony_ci        self.loop.get_debug.return_value = True
21697db96d56Sopenharmony_ci
21707db96d56Sopenharmony_ci    def test_handle(self):
21717db96d56Sopenharmony_ci        def callback(*args):
21727db96d56Sopenharmony_ci            return args
21737db96d56Sopenharmony_ci
21747db96d56Sopenharmony_ci        args = ()
21757db96d56Sopenharmony_ci        h = asyncio.Handle(callback, args, self.loop)
21767db96d56Sopenharmony_ci        self.assertIs(h._callback, callback)
21777db96d56Sopenharmony_ci        self.assertIs(h._args, args)
21787db96d56Sopenharmony_ci        self.assertFalse(h.cancelled())
21797db96d56Sopenharmony_ci
21807db96d56Sopenharmony_ci        h.cancel()
21817db96d56Sopenharmony_ci        self.assertTrue(h.cancelled())
21827db96d56Sopenharmony_ci
21837db96d56Sopenharmony_ci    def test_callback_with_exception(self):
21847db96d56Sopenharmony_ci        def callback():
21857db96d56Sopenharmony_ci            raise ValueError()
21867db96d56Sopenharmony_ci
21877db96d56Sopenharmony_ci        self.loop = mock.Mock()
21887db96d56Sopenharmony_ci        self.loop.call_exception_handler = mock.Mock()
21897db96d56Sopenharmony_ci
21907db96d56Sopenharmony_ci        h = asyncio.Handle(callback, (), self.loop)
21917db96d56Sopenharmony_ci        h._run()
21927db96d56Sopenharmony_ci
21937db96d56Sopenharmony_ci        self.loop.call_exception_handler.assert_called_with({
21947db96d56Sopenharmony_ci            'message': test_utils.MockPattern('Exception in callback.*'),
21957db96d56Sopenharmony_ci            'exception': mock.ANY,
21967db96d56Sopenharmony_ci            'handle': h,
21977db96d56Sopenharmony_ci            'source_traceback': h._source_traceback,
21987db96d56Sopenharmony_ci        })
21997db96d56Sopenharmony_ci
22007db96d56Sopenharmony_ci    def test_handle_weakref(self):
22017db96d56Sopenharmony_ci        wd = weakref.WeakValueDictionary()
22027db96d56Sopenharmony_ci        h = asyncio.Handle(lambda: None, (), self.loop)
22037db96d56Sopenharmony_ci        wd['h'] = h  # Would fail without __weakref__ slot.
22047db96d56Sopenharmony_ci
22057db96d56Sopenharmony_ci    def test_handle_repr(self):
22067db96d56Sopenharmony_ci        self.loop.get_debug.return_value = False
22077db96d56Sopenharmony_ci
22087db96d56Sopenharmony_ci        # simple function
22097db96d56Sopenharmony_ci        h = asyncio.Handle(noop, (1, 2), self.loop)
22107db96d56Sopenharmony_ci        filename, lineno = test_utils.get_function_source(noop)
22117db96d56Sopenharmony_ci        self.assertEqual(repr(h),
22127db96d56Sopenharmony_ci                        '<Handle noop(1, 2) at %s:%s>'
22137db96d56Sopenharmony_ci                        % (filename, lineno))
22147db96d56Sopenharmony_ci
22157db96d56Sopenharmony_ci        # cancelled handle
22167db96d56Sopenharmony_ci        h.cancel()
22177db96d56Sopenharmony_ci        self.assertEqual(repr(h),
22187db96d56Sopenharmony_ci                        '<Handle cancelled>')
22197db96d56Sopenharmony_ci
22207db96d56Sopenharmony_ci        # decorated function
22217db96d56Sopenharmony_ci        cb = types.coroutine(noop)
22227db96d56Sopenharmony_ci        h = asyncio.Handle(cb, (), self.loop)
22237db96d56Sopenharmony_ci        self.assertEqual(repr(h),
22247db96d56Sopenharmony_ci                        '<Handle noop() at %s:%s>'
22257db96d56Sopenharmony_ci                        % (filename, lineno))
22267db96d56Sopenharmony_ci
22277db96d56Sopenharmony_ci        # partial function
22287db96d56Sopenharmony_ci        cb = functools.partial(noop, 1, 2)
22297db96d56Sopenharmony_ci        h = asyncio.Handle(cb, (3,), self.loop)
22307db96d56Sopenharmony_ci        regex = (r'^<Handle noop\(1, 2\)\(3\) at %s:%s>$'
22317db96d56Sopenharmony_ci                 % (re.escape(filename), lineno))
22327db96d56Sopenharmony_ci        self.assertRegex(repr(h), regex)
22337db96d56Sopenharmony_ci
22347db96d56Sopenharmony_ci        # partial function with keyword args
22357db96d56Sopenharmony_ci        cb = functools.partial(noop, x=1)
22367db96d56Sopenharmony_ci        h = asyncio.Handle(cb, (2, 3), self.loop)
22377db96d56Sopenharmony_ci        regex = (r'^<Handle noop\(x=1\)\(2, 3\) at %s:%s>$'
22387db96d56Sopenharmony_ci                 % (re.escape(filename), lineno))
22397db96d56Sopenharmony_ci        self.assertRegex(repr(h), regex)
22407db96d56Sopenharmony_ci
22417db96d56Sopenharmony_ci        # partial method
22427db96d56Sopenharmony_ci        method = HandleTests.test_handle_repr
22437db96d56Sopenharmony_ci        cb = functools.partialmethod(method)
22447db96d56Sopenharmony_ci        filename, lineno = test_utils.get_function_source(method)
22457db96d56Sopenharmony_ci        h = asyncio.Handle(cb, (), self.loop)
22467db96d56Sopenharmony_ci
22477db96d56Sopenharmony_ci        cb_regex = r'<function HandleTests.test_handle_repr .*>'
22487db96d56Sopenharmony_ci        cb_regex = fr'functools.partialmethod\({cb_regex}, , \)\(\)'
22497db96d56Sopenharmony_ci        regex = fr'^<Handle {cb_regex} at {re.escape(filename)}:{lineno}>$'
22507db96d56Sopenharmony_ci        self.assertRegex(repr(h), regex)
22517db96d56Sopenharmony_ci
22527db96d56Sopenharmony_ci    def test_handle_repr_debug(self):
22537db96d56Sopenharmony_ci        self.loop.get_debug.return_value = True
22547db96d56Sopenharmony_ci
22557db96d56Sopenharmony_ci        # simple function
22567db96d56Sopenharmony_ci        create_filename = __file__
22577db96d56Sopenharmony_ci        create_lineno = sys._getframe().f_lineno + 1
22587db96d56Sopenharmony_ci        h = asyncio.Handle(noop, (1, 2), self.loop)
22597db96d56Sopenharmony_ci        filename, lineno = test_utils.get_function_source(noop)
22607db96d56Sopenharmony_ci        self.assertEqual(repr(h),
22617db96d56Sopenharmony_ci                        '<Handle noop(1, 2) at %s:%s created at %s:%s>'
22627db96d56Sopenharmony_ci                        % (filename, lineno, create_filename, create_lineno))
22637db96d56Sopenharmony_ci
22647db96d56Sopenharmony_ci        # cancelled handle
22657db96d56Sopenharmony_ci        h.cancel()
22667db96d56Sopenharmony_ci        self.assertEqual(
22677db96d56Sopenharmony_ci            repr(h),
22687db96d56Sopenharmony_ci            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
22697db96d56Sopenharmony_ci            % (filename, lineno, create_filename, create_lineno))
22707db96d56Sopenharmony_ci
22717db96d56Sopenharmony_ci        # double cancellation won't overwrite _repr
22727db96d56Sopenharmony_ci        h.cancel()
22737db96d56Sopenharmony_ci        self.assertEqual(
22747db96d56Sopenharmony_ci            repr(h),
22757db96d56Sopenharmony_ci            '<Handle cancelled noop(1, 2) at %s:%s created at %s:%s>'
22767db96d56Sopenharmony_ci            % (filename, lineno, create_filename, create_lineno))
22777db96d56Sopenharmony_ci
22787db96d56Sopenharmony_ci    def test_handle_source_traceback(self):
22797db96d56Sopenharmony_ci        loop = asyncio.get_event_loop_policy().new_event_loop()
22807db96d56Sopenharmony_ci        loop.set_debug(True)
22817db96d56Sopenharmony_ci        self.set_event_loop(loop)
22827db96d56Sopenharmony_ci
22837db96d56Sopenharmony_ci        def check_source_traceback(h):
22847db96d56Sopenharmony_ci            lineno = sys._getframe(1).f_lineno - 1
22857db96d56Sopenharmony_ci            self.assertIsInstance(h._source_traceback, list)
22867db96d56Sopenharmony_ci            self.assertEqual(h._source_traceback[-1][:3],
22877db96d56Sopenharmony_ci                             (__file__,
22887db96d56Sopenharmony_ci                              lineno,
22897db96d56Sopenharmony_ci                              'test_handle_source_traceback'))
22907db96d56Sopenharmony_ci
22917db96d56Sopenharmony_ci        # call_soon
22927db96d56Sopenharmony_ci        h = loop.call_soon(noop)
22937db96d56Sopenharmony_ci        check_source_traceback(h)
22947db96d56Sopenharmony_ci
22957db96d56Sopenharmony_ci        # call_soon_threadsafe
22967db96d56Sopenharmony_ci        h = loop.call_soon_threadsafe(noop)
22977db96d56Sopenharmony_ci        check_source_traceback(h)
22987db96d56Sopenharmony_ci
22997db96d56Sopenharmony_ci        # call_later
23007db96d56Sopenharmony_ci        h = loop.call_later(0, noop)
23017db96d56Sopenharmony_ci        check_source_traceback(h)
23027db96d56Sopenharmony_ci
23037db96d56Sopenharmony_ci        # call_at
23047db96d56Sopenharmony_ci        h = loop.call_later(0, noop)
23057db96d56Sopenharmony_ci        check_source_traceback(h)
23067db96d56Sopenharmony_ci
23077db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(collections.abc, 'Coroutine'),
23087db96d56Sopenharmony_ci                         'No collections.abc.Coroutine')
23097db96d56Sopenharmony_ci    def test_coroutine_like_object_debug_formatting(self):
23107db96d56Sopenharmony_ci        # Test that asyncio can format coroutines that are instances of
23117db96d56Sopenharmony_ci        # collections.abc.Coroutine, but lack cr_core or gi_code attributes
23127db96d56Sopenharmony_ci        # (such as ones compiled with Cython).
23137db96d56Sopenharmony_ci
23147db96d56Sopenharmony_ci        coro = CoroLike()
23157db96d56Sopenharmony_ci        coro.__name__ = 'AAA'
23167db96d56Sopenharmony_ci        self.assertTrue(asyncio.iscoroutine(coro))
23177db96d56Sopenharmony_ci        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
23187db96d56Sopenharmony_ci
23197db96d56Sopenharmony_ci        coro.__qualname__ = 'BBB'
23207db96d56Sopenharmony_ci        self.assertEqual(coroutines._format_coroutine(coro), 'BBB()')
23217db96d56Sopenharmony_ci
23227db96d56Sopenharmony_ci        coro.cr_running = True
23237db96d56Sopenharmony_ci        self.assertEqual(coroutines._format_coroutine(coro), 'BBB() running')
23247db96d56Sopenharmony_ci
23257db96d56Sopenharmony_ci        coro.__name__ = coro.__qualname__ = None
23267db96d56Sopenharmony_ci        self.assertEqual(coroutines._format_coroutine(coro),
23277db96d56Sopenharmony_ci                         '<CoroLike without __name__>() running')
23287db96d56Sopenharmony_ci
23297db96d56Sopenharmony_ci        coro = CoroLike()
23307db96d56Sopenharmony_ci        coro.__qualname__ = 'CoroLike'
23317db96d56Sopenharmony_ci        # Some coroutines might not have '__name__', such as
23327db96d56Sopenharmony_ci        # built-in async_gen.asend().
23337db96d56Sopenharmony_ci        self.assertEqual(coroutines._format_coroutine(coro), 'CoroLike()')
23347db96d56Sopenharmony_ci
23357db96d56Sopenharmony_ci        coro = CoroLike()
23367db96d56Sopenharmony_ci        coro.__qualname__ = 'AAA'
23377db96d56Sopenharmony_ci        coro.cr_code = None
23387db96d56Sopenharmony_ci        self.assertEqual(coroutines._format_coroutine(coro), 'AAA()')
23397db96d56Sopenharmony_ci
23407db96d56Sopenharmony_ci
23417db96d56Sopenharmony_ciclass TimerTests(unittest.TestCase):
23427db96d56Sopenharmony_ci
23437db96d56Sopenharmony_ci    def setUp(self):
23447db96d56Sopenharmony_ci        super().setUp()
23457db96d56Sopenharmony_ci        self.loop = mock.Mock()
23467db96d56Sopenharmony_ci
23477db96d56Sopenharmony_ci    def test_hash(self):
23487db96d56Sopenharmony_ci        when = time.monotonic()
23497db96d56Sopenharmony_ci        h = asyncio.TimerHandle(when, lambda: False, (),
23507db96d56Sopenharmony_ci                                mock.Mock())
23517db96d56Sopenharmony_ci        self.assertEqual(hash(h), hash(when))
23527db96d56Sopenharmony_ci
23537db96d56Sopenharmony_ci    def test_when(self):
23547db96d56Sopenharmony_ci        when = time.monotonic()
23557db96d56Sopenharmony_ci        h = asyncio.TimerHandle(when, lambda: False, (),
23567db96d56Sopenharmony_ci                                mock.Mock())
23577db96d56Sopenharmony_ci        self.assertEqual(when, h.when())
23587db96d56Sopenharmony_ci
23597db96d56Sopenharmony_ci    def test_timer(self):
23607db96d56Sopenharmony_ci        def callback(*args):
23617db96d56Sopenharmony_ci            return args
23627db96d56Sopenharmony_ci
23637db96d56Sopenharmony_ci        args = (1, 2, 3)
23647db96d56Sopenharmony_ci        when = time.monotonic()
23657db96d56Sopenharmony_ci        h = asyncio.TimerHandle(when, callback, args, mock.Mock())
23667db96d56Sopenharmony_ci        self.assertIs(h._callback, callback)
23677db96d56Sopenharmony_ci        self.assertIs(h._args, args)
23687db96d56Sopenharmony_ci        self.assertFalse(h.cancelled())
23697db96d56Sopenharmony_ci
23707db96d56Sopenharmony_ci        # cancel
23717db96d56Sopenharmony_ci        h.cancel()
23727db96d56Sopenharmony_ci        self.assertTrue(h.cancelled())
23737db96d56Sopenharmony_ci        self.assertIsNone(h._callback)
23747db96d56Sopenharmony_ci        self.assertIsNone(h._args)
23757db96d56Sopenharmony_ci
23767db96d56Sopenharmony_ci
23777db96d56Sopenharmony_ci    def test_timer_repr(self):
23787db96d56Sopenharmony_ci        self.loop.get_debug.return_value = False
23797db96d56Sopenharmony_ci
23807db96d56Sopenharmony_ci        # simple function
23817db96d56Sopenharmony_ci        h = asyncio.TimerHandle(123, noop, (), self.loop)
23827db96d56Sopenharmony_ci        src = test_utils.get_function_source(noop)
23837db96d56Sopenharmony_ci        self.assertEqual(repr(h),
23847db96d56Sopenharmony_ci                        '<TimerHandle when=123 noop() at %s:%s>' % src)
23857db96d56Sopenharmony_ci
23867db96d56Sopenharmony_ci        # cancelled handle
23877db96d56Sopenharmony_ci        h.cancel()
23887db96d56Sopenharmony_ci        self.assertEqual(repr(h),
23897db96d56Sopenharmony_ci                        '<TimerHandle cancelled when=123>')
23907db96d56Sopenharmony_ci
23917db96d56Sopenharmony_ci    def test_timer_repr_debug(self):
23927db96d56Sopenharmony_ci        self.loop.get_debug.return_value = True
23937db96d56Sopenharmony_ci
23947db96d56Sopenharmony_ci        # simple function
23957db96d56Sopenharmony_ci        create_filename = __file__
23967db96d56Sopenharmony_ci        create_lineno = sys._getframe().f_lineno + 1
23977db96d56Sopenharmony_ci        h = asyncio.TimerHandle(123, noop, (), self.loop)
23987db96d56Sopenharmony_ci        filename, lineno = test_utils.get_function_source(noop)
23997db96d56Sopenharmony_ci        self.assertEqual(repr(h),
24007db96d56Sopenharmony_ci                        '<TimerHandle when=123 noop() '
24017db96d56Sopenharmony_ci                        'at %s:%s created at %s:%s>'
24027db96d56Sopenharmony_ci                        % (filename, lineno, create_filename, create_lineno))
24037db96d56Sopenharmony_ci
24047db96d56Sopenharmony_ci        # cancelled handle
24057db96d56Sopenharmony_ci        h.cancel()
24067db96d56Sopenharmony_ci        self.assertEqual(repr(h),
24077db96d56Sopenharmony_ci                        '<TimerHandle cancelled when=123 noop() '
24087db96d56Sopenharmony_ci                        'at %s:%s created at %s:%s>'
24097db96d56Sopenharmony_ci                        % (filename, lineno, create_filename, create_lineno))
24107db96d56Sopenharmony_ci
24117db96d56Sopenharmony_ci
24127db96d56Sopenharmony_ci    def test_timer_comparison(self):
24137db96d56Sopenharmony_ci        def callback(*args):
24147db96d56Sopenharmony_ci            return args
24157db96d56Sopenharmony_ci
24167db96d56Sopenharmony_ci        when = time.monotonic()
24177db96d56Sopenharmony_ci
24187db96d56Sopenharmony_ci        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
24197db96d56Sopenharmony_ci        h2 = asyncio.TimerHandle(when, callback, (), self.loop)
24207db96d56Sopenharmony_ci        # TODO: Use assertLess etc.
24217db96d56Sopenharmony_ci        self.assertFalse(h1 < h2)
24227db96d56Sopenharmony_ci        self.assertFalse(h2 < h1)
24237db96d56Sopenharmony_ci        self.assertTrue(h1 <= h2)
24247db96d56Sopenharmony_ci        self.assertTrue(h2 <= h1)
24257db96d56Sopenharmony_ci        self.assertFalse(h1 > h2)
24267db96d56Sopenharmony_ci        self.assertFalse(h2 > h1)
24277db96d56Sopenharmony_ci        self.assertTrue(h1 >= h2)
24287db96d56Sopenharmony_ci        self.assertTrue(h2 >= h1)
24297db96d56Sopenharmony_ci        self.assertTrue(h1 == h2)
24307db96d56Sopenharmony_ci        self.assertFalse(h1 != h2)
24317db96d56Sopenharmony_ci
24327db96d56Sopenharmony_ci        h2.cancel()
24337db96d56Sopenharmony_ci        self.assertFalse(h1 == h2)
24347db96d56Sopenharmony_ci
24357db96d56Sopenharmony_ci        h1 = asyncio.TimerHandle(when, callback, (), self.loop)
24367db96d56Sopenharmony_ci        h2 = asyncio.TimerHandle(when + 10.0, callback, (), self.loop)
24377db96d56Sopenharmony_ci        self.assertTrue(h1 < h2)
24387db96d56Sopenharmony_ci        self.assertFalse(h2 < h1)
24397db96d56Sopenharmony_ci        self.assertTrue(h1 <= h2)
24407db96d56Sopenharmony_ci        self.assertFalse(h2 <= h1)
24417db96d56Sopenharmony_ci        self.assertFalse(h1 > h2)
24427db96d56Sopenharmony_ci        self.assertTrue(h2 > h1)
24437db96d56Sopenharmony_ci        self.assertFalse(h1 >= h2)
24447db96d56Sopenharmony_ci        self.assertTrue(h2 >= h1)
24457db96d56Sopenharmony_ci        self.assertFalse(h1 == h2)
24467db96d56Sopenharmony_ci        self.assertTrue(h1 != h2)
24477db96d56Sopenharmony_ci
24487db96d56Sopenharmony_ci        h3 = asyncio.Handle(callback, (), self.loop)
24497db96d56Sopenharmony_ci        self.assertIs(NotImplemented, h1.__eq__(h3))
24507db96d56Sopenharmony_ci        self.assertIs(NotImplemented, h1.__ne__(h3))
24517db96d56Sopenharmony_ci
24527db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
24537db96d56Sopenharmony_ci            h1 < ()
24547db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
24557db96d56Sopenharmony_ci            h1 > ()
24567db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
24577db96d56Sopenharmony_ci            h1 <= ()
24587db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
24597db96d56Sopenharmony_ci            h1 >= ()
24607db96d56Sopenharmony_ci        self.assertFalse(h1 == ())
24617db96d56Sopenharmony_ci        self.assertTrue(h1 != ())
24627db96d56Sopenharmony_ci
24637db96d56Sopenharmony_ci        self.assertTrue(h1 == ALWAYS_EQ)
24647db96d56Sopenharmony_ci        self.assertFalse(h1 != ALWAYS_EQ)
24657db96d56Sopenharmony_ci        self.assertTrue(h1 < LARGEST)
24667db96d56Sopenharmony_ci        self.assertFalse(h1 > LARGEST)
24677db96d56Sopenharmony_ci        self.assertTrue(h1 <= LARGEST)
24687db96d56Sopenharmony_ci        self.assertFalse(h1 >= LARGEST)
24697db96d56Sopenharmony_ci        self.assertFalse(h1 < SMALLEST)
24707db96d56Sopenharmony_ci        self.assertTrue(h1 > SMALLEST)
24717db96d56Sopenharmony_ci        self.assertFalse(h1 <= SMALLEST)
24727db96d56Sopenharmony_ci        self.assertTrue(h1 >= SMALLEST)
24737db96d56Sopenharmony_ci
24747db96d56Sopenharmony_ci
24757db96d56Sopenharmony_ciclass AbstractEventLoopTests(unittest.TestCase):
24767db96d56Sopenharmony_ci
24777db96d56Sopenharmony_ci    def test_not_implemented(self):
24787db96d56Sopenharmony_ci        f = mock.Mock()
24797db96d56Sopenharmony_ci        loop = asyncio.AbstractEventLoop()
24807db96d56Sopenharmony_ci        self.assertRaises(
24817db96d56Sopenharmony_ci            NotImplementedError, loop.run_forever)
24827db96d56Sopenharmony_ci        self.assertRaises(
24837db96d56Sopenharmony_ci            NotImplementedError, loop.run_until_complete, None)
24847db96d56Sopenharmony_ci        self.assertRaises(
24857db96d56Sopenharmony_ci            NotImplementedError, loop.stop)
24867db96d56Sopenharmony_ci        self.assertRaises(
24877db96d56Sopenharmony_ci            NotImplementedError, loop.is_running)
24887db96d56Sopenharmony_ci        self.assertRaises(
24897db96d56Sopenharmony_ci            NotImplementedError, loop.is_closed)
24907db96d56Sopenharmony_ci        self.assertRaises(
24917db96d56Sopenharmony_ci            NotImplementedError, loop.close)
24927db96d56Sopenharmony_ci        self.assertRaises(
24937db96d56Sopenharmony_ci            NotImplementedError, loop.create_task, None)
24947db96d56Sopenharmony_ci        self.assertRaises(
24957db96d56Sopenharmony_ci            NotImplementedError, loop.call_later, None, None)
24967db96d56Sopenharmony_ci        self.assertRaises(
24977db96d56Sopenharmony_ci            NotImplementedError, loop.call_at, f, f)
24987db96d56Sopenharmony_ci        self.assertRaises(
24997db96d56Sopenharmony_ci            NotImplementedError, loop.call_soon, None)
25007db96d56Sopenharmony_ci        self.assertRaises(
25017db96d56Sopenharmony_ci            NotImplementedError, loop.time)
25027db96d56Sopenharmony_ci        self.assertRaises(
25037db96d56Sopenharmony_ci            NotImplementedError, loop.call_soon_threadsafe, None)
25047db96d56Sopenharmony_ci        self.assertRaises(
25057db96d56Sopenharmony_ci            NotImplementedError, loop.set_default_executor, f)
25067db96d56Sopenharmony_ci        self.assertRaises(
25077db96d56Sopenharmony_ci            NotImplementedError, loop.add_reader, 1, f)
25087db96d56Sopenharmony_ci        self.assertRaises(
25097db96d56Sopenharmony_ci            NotImplementedError, loop.remove_reader, 1)
25107db96d56Sopenharmony_ci        self.assertRaises(
25117db96d56Sopenharmony_ci            NotImplementedError, loop.add_writer, 1, f)
25127db96d56Sopenharmony_ci        self.assertRaises(
25137db96d56Sopenharmony_ci            NotImplementedError, loop.remove_writer, 1)
25147db96d56Sopenharmony_ci        self.assertRaises(
25157db96d56Sopenharmony_ci            NotImplementedError, loop.add_signal_handler, 1, f)
25167db96d56Sopenharmony_ci        self.assertRaises(
25177db96d56Sopenharmony_ci            NotImplementedError, loop.remove_signal_handler, 1)
25187db96d56Sopenharmony_ci        self.assertRaises(
25197db96d56Sopenharmony_ci            NotImplementedError, loop.remove_signal_handler, 1)
25207db96d56Sopenharmony_ci        self.assertRaises(
25217db96d56Sopenharmony_ci            NotImplementedError, loop.set_exception_handler, f)
25227db96d56Sopenharmony_ci        self.assertRaises(
25237db96d56Sopenharmony_ci            NotImplementedError, loop.default_exception_handler, f)
25247db96d56Sopenharmony_ci        self.assertRaises(
25257db96d56Sopenharmony_ci            NotImplementedError, loop.call_exception_handler, f)
25267db96d56Sopenharmony_ci        self.assertRaises(
25277db96d56Sopenharmony_ci            NotImplementedError, loop.get_debug)
25287db96d56Sopenharmony_ci        self.assertRaises(
25297db96d56Sopenharmony_ci            NotImplementedError, loop.set_debug, f)
25307db96d56Sopenharmony_ci
25317db96d56Sopenharmony_ci    def test_not_implemented_async(self):
25327db96d56Sopenharmony_ci
25337db96d56Sopenharmony_ci        async def inner():
25347db96d56Sopenharmony_ci            f = mock.Mock()
25357db96d56Sopenharmony_ci            loop = asyncio.AbstractEventLoop()
25367db96d56Sopenharmony_ci
25377db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25387db96d56Sopenharmony_ci                await loop.run_in_executor(f, f)
25397db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25407db96d56Sopenharmony_ci                await loop.getaddrinfo('localhost', 8080)
25417db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25427db96d56Sopenharmony_ci                await loop.getnameinfo(('localhost', 8080))
25437db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25447db96d56Sopenharmony_ci                await loop.create_connection(f)
25457db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25467db96d56Sopenharmony_ci                await loop.create_server(f)
25477db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25487db96d56Sopenharmony_ci                await loop.create_datagram_endpoint(f)
25497db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25507db96d56Sopenharmony_ci                await loop.sock_recv(f, 10)
25517db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25527db96d56Sopenharmony_ci                await loop.sock_recv_into(f, 10)
25537db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25547db96d56Sopenharmony_ci                await loop.sock_sendall(f, 10)
25557db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25567db96d56Sopenharmony_ci                await loop.sock_connect(f, f)
25577db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25587db96d56Sopenharmony_ci                await loop.sock_accept(f)
25597db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25607db96d56Sopenharmony_ci                await loop.sock_sendfile(f, f)
25617db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25627db96d56Sopenharmony_ci                await loop.sendfile(f, f)
25637db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25647db96d56Sopenharmony_ci                await loop.connect_read_pipe(f, mock.sentinel.pipe)
25657db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25667db96d56Sopenharmony_ci                await loop.connect_write_pipe(f, mock.sentinel.pipe)
25677db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25687db96d56Sopenharmony_ci                await loop.subprocess_shell(f, mock.sentinel)
25697db96d56Sopenharmony_ci            with self.assertRaises(NotImplementedError):
25707db96d56Sopenharmony_ci                await loop.subprocess_exec(f)
25717db96d56Sopenharmony_ci
25727db96d56Sopenharmony_ci        loop = asyncio.new_event_loop()
25737db96d56Sopenharmony_ci        loop.run_until_complete(inner())
25747db96d56Sopenharmony_ci        loop.close()
25757db96d56Sopenharmony_ci
25767db96d56Sopenharmony_ci
25777db96d56Sopenharmony_ciclass PolicyTests(unittest.TestCase):
25787db96d56Sopenharmony_ci
25797db96d56Sopenharmony_ci    def test_event_loop_policy(self):
25807db96d56Sopenharmony_ci        policy = asyncio.AbstractEventLoopPolicy()
25817db96d56Sopenharmony_ci        self.assertRaises(NotImplementedError, policy.get_event_loop)
25827db96d56Sopenharmony_ci        self.assertRaises(NotImplementedError, policy.set_event_loop, object())
25837db96d56Sopenharmony_ci        self.assertRaises(NotImplementedError, policy.new_event_loop)
25847db96d56Sopenharmony_ci        self.assertRaises(NotImplementedError, policy.get_child_watcher)
25857db96d56Sopenharmony_ci        self.assertRaises(NotImplementedError, policy.set_child_watcher,
25867db96d56Sopenharmony_ci                          object())
25877db96d56Sopenharmony_ci
25887db96d56Sopenharmony_ci    def test_get_event_loop(self):
25897db96d56Sopenharmony_ci        policy = asyncio.DefaultEventLoopPolicy()
25907db96d56Sopenharmony_ci        self.assertIsNone(policy._local._loop)
25917db96d56Sopenharmony_ci        loop = policy.get_event_loop()
25927db96d56Sopenharmony_ci        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
25937db96d56Sopenharmony_ci
25947db96d56Sopenharmony_ci        self.assertIs(policy._local._loop, loop)
25957db96d56Sopenharmony_ci        self.assertIs(loop, policy.get_event_loop())
25967db96d56Sopenharmony_ci        loop.close()
25977db96d56Sopenharmony_ci
25987db96d56Sopenharmony_ci    def test_get_event_loop_calls_set_event_loop(self):
25997db96d56Sopenharmony_ci        policy = asyncio.DefaultEventLoopPolicy()
26007db96d56Sopenharmony_ci
26017db96d56Sopenharmony_ci        with mock.patch.object(
26027db96d56Sopenharmony_ci                policy, "set_event_loop",
26037db96d56Sopenharmony_ci                wraps=policy.set_event_loop) as m_set_event_loop:
26047db96d56Sopenharmony_ci
26057db96d56Sopenharmony_ci            loop = policy.get_event_loop()
26067db96d56Sopenharmony_ci            self.addCleanup(loop.close)
26077db96d56Sopenharmony_ci
26087db96d56Sopenharmony_ci            # policy._local._loop must be set through .set_event_loop()
26097db96d56Sopenharmony_ci            # (the unix DefaultEventLoopPolicy needs this call to attach
26107db96d56Sopenharmony_ci            # the child watcher correctly)
26117db96d56Sopenharmony_ci            m_set_event_loop.assert_called_with(loop)
26127db96d56Sopenharmony_ci
26137db96d56Sopenharmony_ci        loop.close()
26147db96d56Sopenharmony_ci
26157db96d56Sopenharmony_ci    def test_get_event_loop_after_set_none(self):
26167db96d56Sopenharmony_ci        policy = asyncio.DefaultEventLoopPolicy()
26177db96d56Sopenharmony_ci        policy.set_event_loop(None)
26187db96d56Sopenharmony_ci        self.assertRaises(RuntimeError, policy.get_event_loop)
26197db96d56Sopenharmony_ci
26207db96d56Sopenharmony_ci    @mock.patch('asyncio.events.threading.current_thread')
26217db96d56Sopenharmony_ci    def test_get_event_loop_thread(self, m_current_thread):
26227db96d56Sopenharmony_ci
26237db96d56Sopenharmony_ci        def f():
26247db96d56Sopenharmony_ci            policy = asyncio.DefaultEventLoopPolicy()
26257db96d56Sopenharmony_ci            self.assertRaises(RuntimeError, policy.get_event_loop)
26267db96d56Sopenharmony_ci
26277db96d56Sopenharmony_ci        th = threading.Thread(target=f)
26287db96d56Sopenharmony_ci        th.start()
26297db96d56Sopenharmony_ci        th.join()
26307db96d56Sopenharmony_ci
26317db96d56Sopenharmony_ci    def test_new_event_loop(self):
26327db96d56Sopenharmony_ci        policy = asyncio.DefaultEventLoopPolicy()
26337db96d56Sopenharmony_ci
26347db96d56Sopenharmony_ci        loop = policy.new_event_loop()
26357db96d56Sopenharmony_ci        self.assertIsInstance(loop, asyncio.AbstractEventLoop)
26367db96d56Sopenharmony_ci        loop.close()
26377db96d56Sopenharmony_ci
26387db96d56Sopenharmony_ci    def test_set_event_loop(self):
26397db96d56Sopenharmony_ci        policy = asyncio.DefaultEventLoopPolicy()
26407db96d56Sopenharmony_ci        old_loop = policy.new_event_loop()
26417db96d56Sopenharmony_ci        policy.set_event_loop(old_loop)
26427db96d56Sopenharmony_ci
26437db96d56Sopenharmony_ci        self.assertRaises(TypeError, policy.set_event_loop, object())
26447db96d56Sopenharmony_ci
26457db96d56Sopenharmony_ci        loop = policy.new_event_loop()
26467db96d56Sopenharmony_ci        policy.set_event_loop(loop)
26477db96d56Sopenharmony_ci        self.assertIs(loop, policy.get_event_loop())
26487db96d56Sopenharmony_ci        self.assertIsNot(old_loop, policy.get_event_loop())
26497db96d56Sopenharmony_ci        loop.close()
26507db96d56Sopenharmony_ci        old_loop.close()
26517db96d56Sopenharmony_ci
26527db96d56Sopenharmony_ci    def test_get_event_loop_policy(self):
26537db96d56Sopenharmony_ci        policy = asyncio.get_event_loop_policy()
26547db96d56Sopenharmony_ci        self.assertIsInstance(policy, asyncio.AbstractEventLoopPolicy)
26557db96d56Sopenharmony_ci        self.assertIs(policy, asyncio.get_event_loop_policy())
26567db96d56Sopenharmony_ci
26577db96d56Sopenharmony_ci    def test_set_event_loop_policy(self):
26587db96d56Sopenharmony_ci        self.assertRaises(
26597db96d56Sopenharmony_ci            TypeError, asyncio.set_event_loop_policy, object())
26607db96d56Sopenharmony_ci
26617db96d56Sopenharmony_ci        old_policy = asyncio.get_event_loop_policy()
26627db96d56Sopenharmony_ci
26637db96d56Sopenharmony_ci        policy = asyncio.DefaultEventLoopPolicy()
26647db96d56Sopenharmony_ci        asyncio.set_event_loop_policy(policy)
26657db96d56Sopenharmony_ci        self.assertIs(policy, asyncio.get_event_loop_policy())
26667db96d56Sopenharmony_ci        self.assertIsNot(policy, old_policy)
26677db96d56Sopenharmony_ci
26687db96d56Sopenharmony_ci
26697db96d56Sopenharmony_ciclass GetEventLoopTestsMixin:
26707db96d56Sopenharmony_ci
26717db96d56Sopenharmony_ci    _get_running_loop_impl = None
26727db96d56Sopenharmony_ci    _set_running_loop_impl = None
26737db96d56Sopenharmony_ci    get_running_loop_impl = None
26747db96d56Sopenharmony_ci    get_event_loop_impl = None
26757db96d56Sopenharmony_ci
26767db96d56Sopenharmony_ci    def setUp(self):
26777db96d56Sopenharmony_ci        self._get_running_loop_saved = events._get_running_loop
26787db96d56Sopenharmony_ci        self._set_running_loop_saved = events._set_running_loop
26797db96d56Sopenharmony_ci        self.get_running_loop_saved = events.get_running_loop
26807db96d56Sopenharmony_ci        self.get_event_loop_saved = events.get_event_loop
26817db96d56Sopenharmony_ci
26827db96d56Sopenharmony_ci        events._get_running_loop = type(self)._get_running_loop_impl
26837db96d56Sopenharmony_ci        events._set_running_loop = type(self)._set_running_loop_impl
26847db96d56Sopenharmony_ci        events.get_running_loop = type(self).get_running_loop_impl
26857db96d56Sopenharmony_ci        events.get_event_loop = type(self).get_event_loop_impl
26867db96d56Sopenharmony_ci
26877db96d56Sopenharmony_ci        asyncio._get_running_loop = type(self)._get_running_loop_impl
26887db96d56Sopenharmony_ci        asyncio._set_running_loop = type(self)._set_running_loop_impl
26897db96d56Sopenharmony_ci        asyncio.get_running_loop = type(self).get_running_loop_impl
26907db96d56Sopenharmony_ci        asyncio.get_event_loop = type(self).get_event_loop_impl
26917db96d56Sopenharmony_ci
26927db96d56Sopenharmony_ci        super().setUp()
26937db96d56Sopenharmony_ci
26947db96d56Sopenharmony_ci        self.loop = asyncio.new_event_loop()
26957db96d56Sopenharmony_ci        asyncio.set_event_loop(self.loop)
26967db96d56Sopenharmony_ci
26977db96d56Sopenharmony_ci        if sys.platform != 'win32':
26987db96d56Sopenharmony_ci            watcher = asyncio.SafeChildWatcher()
26997db96d56Sopenharmony_ci            watcher.attach_loop(self.loop)
27007db96d56Sopenharmony_ci            asyncio.set_child_watcher(watcher)
27017db96d56Sopenharmony_ci
27027db96d56Sopenharmony_ci    def tearDown(self):
27037db96d56Sopenharmony_ci        try:
27047db96d56Sopenharmony_ci            if sys.platform != 'win32':
27057db96d56Sopenharmony_ci                asyncio.set_child_watcher(None)
27067db96d56Sopenharmony_ci
27077db96d56Sopenharmony_ci            super().tearDown()
27087db96d56Sopenharmony_ci        finally:
27097db96d56Sopenharmony_ci            self.loop.close()
27107db96d56Sopenharmony_ci            asyncio.set_event_loop(None)
27117db96d56Sopenharmony_ci
27127db96d56Sopenharmony_ci            events._get_running_loop = self._get_running_loop_saved
27137db96d56Sopenharmony_ci            events._set_running_loop = self._set_running_loop_saved
27147db96d56Sopenharmony_ci            events.get_running_loop = self.get_running_loop_saved
27157db96d56Sopenharmony_ci            events.get_event_loop = self.get_event_loop_saved
27167db96d56Sopenharmony_ci
27177db96d56Sopenharmony_ci            asyncio._get_running_loop = self._get_running_loop_saved
27187db96d56Sopenharmony_ci            asyncio._set_running_loop = self._set_running_loop_saved
27197db96d56Sopenharmony_ci            asyncio.get_running_loop = self.get_running_loop_saved
27207db96d56Sopenharmony_ci            asyncio.get_event_loop = self.get_event_loop_saved
27217db96d56Sopenharmony_ci
27227db96d56Sopenharmony_ci    if sys.platform != 'win32':
27237db96d56Sopenharmony_ci
27247db96d56Sopenharmony_ci        def test_get_event_loop_new_process(self):
27257db96d56Sopenharmony_ci            # bpo-32126: The multiprocessing module used by
27267db96d56Sopenharmony_ci            # ProcessPoolExecutor is not functional when the
27277db96d56Sopenharmony_ci            # multiprocessing.synchronize module cannot be imported.
27287db96d56Sopenharmony_ci            support.skip_if_broken_multiprocessing_synchronize()
27297db96d56Sopenharmony_ci
27307db96d56Sopenharmony_ci            async def main():
27317db96d56Sopenharmony_ci                pool = concurrent.futures.ProcessPoolExecutor()
27327db96d56Sopenharmony_ci                result = await self.loop.run_in_executor(
27337db96d56Sopenharmony_ci                    pool, _test_get_event_loop_new_process__sub_proc)
27347db96d56Sopenharmony_ci                pool.shutdown()
27357db96d56Sopenharmony_ci                return result
27367db96d56Sopenharmony_ci
27377db96d56Sopenharmony_ci            self.assertEqual(
27387db96d56Sopenharmony_ci                self.loop.run_until_complete(main()),
27397db96d56Sopenharmony_ci                'hello')
27407db96d56Sopenharmony_ci
27417db96d56Sopenharmony_ci    def test_get_event_loop_returns_running_loop(self):
27427db96d56Sopenharmony_ci        class TestError(Exception):
27437db96d56Sopenharmony_ci            pass
27447db96d56Sopenharmony_ci
27457db96d56Sopenharmony_ci        class Policy(asyncio.DefaultEventLoopPolicy):
27467db96d56Sopenharmony_ci            def get_event_loop(self):
27477db96d56Sopenharmony_ci                raise TestError
27487db96d56Sopenharmony_ci
27497db96d56Sopenharmony_ci        old_policy = asyncio.get_event_loop_policy()
27507db96d56Sopenharmony_ci        try:
27517db96d56Sopenharmony_ci            asyncio.set_event_loop_policy(Policy())
27527db96d56Sopenharmony_ci            loop = asyncio.new_event_loop()
27537db96d56Sopenharmony_ci
27547db96d56Sopenharmony_ci            with self.assertRaises(TestError):
27557db96d56Sopenharmony_ci                asyncio.get_event_loop()
27567db96d56Sopenharmony_ci            asyncio.set_event_loop(None)
27577db96d56Sopenharmony_ci            with self.assertRaises(TestError):
27587db96d56Sopenharmony_ci                asyncio.get_event_loop()
27597db96d56Sopenharmony_ci
27607db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError, 'no running'):
27617db96d56Sopenharmony_ci                asyncio.get_running_loop()
27627db96d56Sopenharmony_ci            self.assertIs(asyncio._get_running_loop(), None)
27637db96d56Sopenharmony_ci
27647db96d56Sopenharmony_ci            async def func():
27657db96d56Sopenharmony_ci                self.assertIs(asyncio.get_event_loop(), loop)
27667db96d56Sopenharmony_ci                self.assertIs(asyncio.get_running_loop(), loop)
27677db96d56Sopenharmony_ci                self.assertIs(asyncio._get_running_loop(), loop)
27687db96d56Sopenharmony_ci
27697db96d56Sopenharmony_ci            loop.run_until_complete(func())
27707db96d56Sopenharmony_ci
27717db96d56Sopenharmony_ci            asyncio.set_event_loop(loop)
27727db96d56Sopenharmony_ci            with self.assertRaises(TestError):
27737db96d56Sopenharmony_ci                asyncio.get_event_loop()
27747db96d56Sopenharmony_ci            asyncio.set_event_loop(None)
27757db96d56Sopenharmony_ci            with self.assertRaises(TestError):
27767db96d56Sopenharmony_ci                asyncio.get_event_loop()
27777db96d56Sopenharmony_ci
27787db96d56Sopenharmony_ci        finally:
27797db96d56Sopenharmony_ci            asyncio.set_event_loop_policy(old_policy)
27807db96d56Sopenharmony_ci            if loop is not None:
27817db96d56Sopenharmony_ci                loop.close()
27827db96d56Sopenharmony_ci
27837db96d56Sopenharmony_ci        with self.assertRaisesRegex(RuntimeError, 'no running'):
27847db96d56Sopenharmony_ci            asyncio.get_running_loop()
27857db96d56Sopenharmony_ci
27867db96d56Sopenharmony_ci        self.assertIs(asyncio._get_running_loop(), None)
27877db96d56Sopenharmony_ci
27887db96d56Sopenharmony_ci    def test_get_event_loop_returns_running_loop2(self):
27897db96d56Sopenharmony_ci        old_policy = asyncio.get_event_loop_policy()
27907db96d56Sopenharmony_ci        try:
27917db96d56Sopenharmony_ci            asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
27927db96d56Sopenharmony_ci            loop = asyncio.new_event_loop()
27937db96d56Sopenharmony_ci            self.addCleanup(loop.close)
27947db96d56Sopenharmony_ci
27957db96d56Sopenharmony_ci            loop2 = asyncio.get_event_loop()
27967db96d56Sopenharmony_ci            self.addCleanup(loop2.close)
27977db96d56Sopenharmony_ci            asyncio.set_event_loop(None)
27987db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError, 'no current'):
27997db96d56Sopenharmony_ci                asyncio.get_event_loop()
28007db96d56Sopenharmony_ci
28017db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError, 'no running'):
28027db96d56Sopenharmony_ci                asyncio.get_running_loop()
28037db96d56Sopenharmony_ci            self.assertIs(asyncio._get_running_loop(), None)
28047db96d56Sopenharmony_ci
28057db96d56Sopenharmony_ci            async def func():
28067db96d56Sopenharmony_ci                self.assertIs(asyncio.get_event_loop(), loop)
28077db96d56Sopenharmony_ci                self.assertIs(asyncio.get_running_loop(), loop)
28087db96d56Sopenharmony_ci                self.assertIs(asyncio._get_running_loop(), loop)
28097db96d56Sopenharmony_ci
28107db96d56Sopenharmony_ci            loop.run_until_complete(func())
28117db96d56Sopenharmony_ci
28127db96d56Sopenharmony_ci            asyncio.set_event_loop(loop)
28137db96d56Sopenharmony_ci            self.assertIs(asyncio.get_event_loop(), loop)
28147db96d56Sopenharmony_ci
28157db96d56Sopenharmony_ci            asyncio.set_event_loop(None)
28167db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError, 'no current'):
28177db96d56Sopenharmony_ci                asyncio.get_event_loop()
28187db96d56Sopenharmony_ci
28197db96d56Sopenharmony_ci        finally:
28207db96d56Sopenharmony_ci            asyncio.set_event_loop_policy(old_policy)
28217db96d56Sopenharmony_ci            if loop is not None:
28227db96d56Sopenharmony_ci                loop.close()
28237db96d56Sopenharmony_ci
28247db96d56Sopenharmony_ci        with self.assertRaisesRegex(RuntimeError, 'no running'):
28257db96d56Sopenharmony_ci            asyncio.get_running_loop()
28267db96d56Sopenharmony_ci
28277db96d56Sopenharmony_ci        self.assertIs(asyncio._get_running_loop(), None)
28287db96d56Sopenharmony_ci
28297db96d56Sopenharmony_ci
28307db96d56Sopenharmony_ciclass TestPyGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
28317db96d56Sopenharmony_ci
28327db96d56Sopenharmony_ci    _get_running_loop_impl = events._py__get_running_loop
28337db96d56Sopenharmony_ci    _set_running_loop_impl = events._py__set_running_loop
28347db96d56Sopenharmony_ci    get_running_loop_impl = events._py_get_running_loop
28357db96d56Sopenharmony_ci    get_event_loop_impl = events._py_get_event_loop
28367db96d56Sopenharmony_ci
28377db96d56Sopenharmony_ci
28387db96d56Sopenharmony_citry:
28397db96d56Sopenharmony_ci    import _asyncio  # NoQA
28407db96d56Sopenharmony_ciexcept ImportError:
28417db96d56Sopenharmony_ci    pass
28427db96d56Sopenharmony_cielse:
28437db96d56Sopenharmony_ci
28447db96d56Sopenharmony_ci    class TestCGetEventLoop(GetEventLoopTestsMixin, unittest.TestCase):
28457db96d56Sopenharmony_ci
28467db96d56Sopenharmony_ci        _get_running_loop_impl = events._c__get_running_loop
28477db96d56Sopenharmony_ci        _set_running_loop_impl = events._c__set_running_loop
28487db96d56Sopenharmony_ci        get_running_loop_impl = events._c_get_running_loop
28497db96d56Sopenharmony_ci        get_event_loop_impl = events._c_get_event_loop
28507db96d56Sopenharmony_ci
28517db96d56Sopenharmony_ci
28527db96d56Sopenharmony_ciclass TestServer(unittest.TestCase):
28537db96d56Sopenharmony_ci
28547db96d56Sopenharmony_ci    def test_get_loop(self):
28557db96d56Sopenharmony_ci        loop = asyncio.new_event_loop()
28567db96d56Sopenharmony_ci        self.addCleanup(loop.close)
28577db96d56Sopenharmony_ci        proto = MyProto(loop)
28587db96d56Sopenharmony_ci        server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
28597db96d56Sopenharmony_ci        self.assertEqual(server.get_loop(), loop)
28607db96d56Sopenharmony_ci        server.close()
28617db96d56Sopenharmony_ci        loop.run_until_complete(server.wait_closed())
28627db96d56Sopenharmony_ci
28637db96d56Sopenharmony_ci
28647db96d56Sopenharmony_ciclass TestAbstractServer(unittest.TestCase):
28657db96d56Sopenharmony_ci
28667db96d56Sopenharmony_ci    def test_close(self):
28677db96d56Sopenharmony_ci        with self.assertRaises(NotImplementedError):
28687db96d56Sopenharmony_ci            events.AbstractServer().close()
28697db96d56Sopenharmony_ci
28707db96d56Sopenharmony_ci    def test_wait_closed(self):
28717db96d56Sopenharmony_ci        loop = asyncio.new_event_loop()
28727db96d56Sopenharmony_ci        self.addCleanup(loop.close)
28737db96d56Sopenharmony_ci
28747db96d56Sopenharmony_ci        with self.assertRaises(NotImplementedError):
28757db96d56Sopenharmony_ci            loop.run_until_complete(events.AbstractServer().wait_closed())
28767db96d56Sopenharmony_ci
28777db96d56Sopenharmony_ci    def test_get_loop(self):
28787db96d56Sopenharmony_ci        with self.assertRaises(NotImplementedError):
28797db96d56Sopenharmony_ci            events.AbstractServer().get_loop()
28807db96d56Sopenharmony_ci
28817db96d56Sopenharmony_ci
28827db96d56Sopenharmony_ciif __name__ == '__main__':
28837db96d56Sopenharmony_ci    unittest.main()
2884