17db96d56Sopenharmony_ci"""Tests for base_events.py"""
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport concurrent.futures
47db96d56Sopenharmony_ciimport errno
57db96d56Sopenharmony_ciimport math
67db96d56Sopenharmony_ciimport socket
77db96d56Sopenharmony_ciimport sys
87db96d56Sopenharmony_ciimport threading
97db96d56Sopenharmony_ciimport time
107db96d56Sopenharmony_ciimport unittest
117db96d56Sopenharmony_cifrom unittest import mock
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_ciimport asyncio
147db96d56Sopenharmony_cifrom asyncio import base_events
157db96d56Sopenharmony_cifrom asyncio import constants
167db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils
177db96d56Sopenharmony_cifrom test import support
187db96d56Sopenharmony_cifrom test.support.script_helper import assert_python_ok
197db96d56Sopenharmony_cifrom test.support import os_helper
207db96d56Sopenharmony_cifrom test.support import socket_helper
217db96d56Sopenharmony_ciimport warnings
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ciMOCK_ANY = mock.ANY
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_cidef tearDownModule():
277db96d56Sopenharmony_ci    asyncio.set_event_loop_policy(None)
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_ci
307db96d56Sopenharmony_cidef mock_socket_module():
317db96d56Sopenharmony_ci    m_socket = mock.MagicMock(spec=socket)
327db96d56Sopenharmony_ci    for name in (
337db96d56Sopenharmony_ci        'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP',
347db96d56Sopenharmony_ci        'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton'
357db96d56Sopenharmony_ci    ):
367db96d56Sopenharmony_ci        if hasattr(socket, name):
377db96d56Sopenharmony_ci            setattr(m_socket, name, getattr(socket, name))
387db96d56Sopenharmony_ci        else:
397db96d56Sopenharmony_ci            delattr(m_socket, name)
407db96d56Sopenharmony_ci
417db96d56Sopenharmony_ci    m_socket.socket = mock.MagicMock()
427db96d56Sopenharmony_ci    m_socket.socket.return_value = test_utils.mock_nonblocking_socket()
437db96d56Sopenharmony_ci
447db96d56Sopenharmony_ci    return m_socket
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ci
477db96d56Sopenharmony_cidef patch_socket(f):
487db96d56Sopenharmony_ci    return mock.patch('asyncio.base_events.socket',
497db96d56Sopenharmony_ci                      new_callable=mock_socket_module)(f)
507db96d56Sopenharmony_ci
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ciclass BaseEventTests(test_utils.TestCase):
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci    def test_ipaddr_info(self):
557db96d56Sopenharmony_ci        UNSPEC = socket.AF_UNSPEC
567db96d56Sopenharmony_ci        INET = socket.AF_INET
577db96d56Sopenharmony_ci        INET6 = socket.AF_INET6
587db96d56Sopenharmony_ci        STREAM = socket.SOCK_STREAM
597db96d56Sopenharmony_ci        DGRAM = socket.SOCK_DGRAM
607db96d56Sopenharmony_ci        TCP = socket.IPPROTO_TCP
617db96d56Sopenharmony_ci        UDP = socket.IPPROTO_UDP
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ci        self.assertEqual(
647db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
657db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP))
667db96d56Sopenharmony_ci
677db96d56Sopenharmony_ci        self.assertEqual(
687db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
697db96d56Sopenharmony_ci            base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP))
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_ci        self.assertEqual(
727db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
737db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP))
747db96d56Sopenharmony_ci
757db96d56Sopenharmony_ci        self.assertEqual(
767db96d56Sopenharmony_ci            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
777db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP))
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci        # Socket type STREAM implies TCP protocol.
807db96d56Sopenharmony_ci        self.assertEqual(
817db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
827db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0))
837db96d56Sopenharmony_ci
847db96d56Sopenharmony_ci        # Socket type DGRAM implies UDP protocol.
857db96d56Sopenharmony_ci        self.assertEqual(
867db96d56Sopenharmony_ci            (INET, DGRAM, UDP, '', ('1.2.3.4', 1)),
877db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0))
887db96d56Sopenharmony_ci
897db96d56Sopenharmony_ci        # No socket type.
907db96d56Sopenharmony_ci        self.assertIsNone(
917db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0))
927db96d56Sopenharmony_ci
937db96d56Sopenharmony_ci        if socket_helper.IPV6_ENABLED:
947db96d56Sopenharmony_ci            # IPv4 address with family IPv6.
957db96d56Sopenharmony_ci            self.assertIsNone(
967db96d56Sopenharmony_ci                base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP))
977db96d56Sopenharmony_ci
987db96d56Sopenharmony_ci            self.assertEqual(
997db96d56Sopenharmony_ci                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
1007db96d56Sopenharmony_ci                base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP))
1017db96d56Sopenharmony_ci
1027db96d56Sopenharmony_ci            self.assertEqual(
1037db96d56Sopenharmony_ci                (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)),
1047db96d56Sopenharmony_ci                base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP))
1057db96d56Sopenharmony_ci
1067db96d56Sopenharmony_ci            # IPv6 address with family IPv4.
1077db96d56Sopenharmony_ci            self.assertIsNone(
1087db96d56Sopenharmony_ci                base_events._ipaddr_info('::3', 1, INET, STREAM, TCP))
1097db96d56Sopenharmony_ci
1107db96d56Sopenharmony_ci            # IPv6 address with zone index.
1117db96d56Sopenharmony_ci            self.assertIsNone(
1127db96d56Sopenharmony_ci                base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP))
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ci    def test_port_parameter_types(self):
1157db96d56Sopenharmony_ci        # Test obscure kinds of arguments for "port".
1167db96d56Sopenharmony_ci        INET = socket.AF_INET
1177db96d56Sopenharmony_ci        STREAM = socket.SOCK_STREAM
1187db96d56Sopenharmony_ci        TCP = socket.IPPROTO_TCP
1197db96d56Sopenharmony_ci
1207db96d56Sopenharmony_ci        self.assertEqual(
1217db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
1227db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP))
1237db96d56Sopenharmony_ci
1247db96d56Sopenharmony_ci        self.assertEqual(
1257db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
1267db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP))
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci        self.assertEqual(
1297db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 0)),
1307db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP))
1317db96d56Sopenharmony_ci
1327db96d56Sopenharmony_ci        self.assertEqual(
1337db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
1347db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP))
1357db96d56Sopenharmony_ci
1367db96d56Sopenharmony_ci        self.assertEqual(
1377db96d56Sopenharmony_ci            (INET, STREAM, TCP, '', ('1.2.3.4', 1)),
1387db96d56Sopenharmony_ci            base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP))
1397db96d56Sopenharmony_ci
1407db96d56Sopenharmony_ci    @patch_socket
1417db96d56Sopenharmony_ci    def test_ipaddr_info_no_inet_pton(self, m_socket):
1427db96d56Sopenharmony_ci        del m_socket.inet_pton
1437db96d56Sopenharmony_ci        self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1,
1447db96d56Sopenharmony_ci                                                   socket.AF_INET,
1457db96d56Sopenharmony_ci                                                   socket.SOCK_STREAM,
1467db96d56Sopenharmony_ci                                                   socket.IPPROTO_TCP))
1477db96d56Sopenharmony_ci
1487db96d56Sopenharmony_ci
1497db96d56Sopenharmony_ciclass BaseEventLoopTests(test_utils.TestCase):
1507db96d56Sopenharmony_ci
1517db96d56Sopenharmony_ci    def setUp(self):
1527db96d56Sopenharmony_ci        super().setUp()
1537db96d56Sopenharmony_ci        self.loop = base_events.BaseEventLoop()
1547db96d56Sopenharmony_ci        self.loop._selector = mock.Mock()
1557db96d56Sopenharmony_ci        self.loop._selector.select.return_value = ()
1567db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
1577db96d56Sopenharmony_ci
1587db96d56Sopenharmony_ci    def test_not_implemented(self):
1597db96d56Sopenharmony_ci        m = mock.Mock()
1607db96d56Sopenharmony_ci        self.assertRaises(
1617db96d56Sopenharmony_ci            NotImplementedError,
1627db96d56Sopenharmony_ci            self.loop._make_socket_transport, m, m)
1637db96d56Sopenharmony_ci        self.assertRaises(
1647db96d56Sopenharmony_ci            NotImplementedError,
1657db96d56Sopenharmony_ci            self.loop._make_ssl_transport, m, m, m, m)
1667db96d56Sopenharmony_ci        self.assertRaises(
1677db96d56Sopenharmony_ci            NotImplementedError,
1687db96d56Sopenharmony_ci            self.loop._make_datagram_transport, m, m)
1697db96d56Sopenharmony_ci        self.assertRaises(
1707db96d56Sopenharmony_ci            NotImplementedError, self.loop._process_events, [])
1717db96d56Sopenharmony_ci        self.assertRaises(
1727db96d56Sopenharmony_ci            NotImplementedError, self.loop._write_to_self)
1737db96d56Sopenharmony_ci        self.assertRaises(
1747db96d56Sopenharmony_ci            NotImplementedError,
1757db96d56Sopenharmony_ci            self.loop._make_read_pipe_transport, m, m)
1767db96d56Sopenharmony_ci        self.assertRaises(
1777db96d56Sopenharmony_ci            NotImplementedError,
1787db96d56Sopenharmony_ci            self.loop._make_write_pipe_transport, m, m)
1797db96d56Sopenharmony_ci        gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m)
1807db96d56Sopenharmony_ci        with self.assertRaises(NotImplementedError):
1817db96d56Sopenharmony_ci            gen.send(None)
1827db96d56Sopenharmony_ci
1837db96d56Sopenharmony_ci    def test_close(self):
1847db96d56Sopenharmony_ci        self.assertFalse(self.loop.is_closed())
1857db96d56Sopenharmony_ci        self.loop.close()
1867db96d56Sopenharmony_ci        self.assertTrue(self.loop.is_closed())
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ci        # it should be possible to call close() more than once
1897db96d56Sopenharmony_ci        self.loop.close()
1907db96d56Sopenharmony_ci        self.loop.close()
1917db96d56Sopenharmony_ci
1927db96d56Sopenharmony_ci        # operation blocked when the loop is closed
1937db96d56Sopenharmony_ci        f = self.loop.create_future()
1947db96d56Sopenharmony_ci        self.assertRaises(RuntimeError, self.loop.run_forever)
1957db96d56Sopenharmony_ci        self.assertRaises(RuntimeError, self.loop.run_until_complete, f)
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci    def test__add_callback_handle(self):
1987db96d56Sopenharmony_ci        h = asyncio.Handle(lambda: False, (), self.loop, None)
1997db96d56Sopenharmony_ci
2007db96d56Sopenharmony_ci        self.loop._add_callback(h)
2017db96d56Sopenharmony_ci        self.assertFalse(self.loop._scheduled)
2027db96d56Sopenharmony_ci        self.assertIn(h, self.loop._ready)
2037db96d56Sopenharmony_ci
2047db96d56Sopenharmony_ci    def test__add_callback_cancelled_handle(self):
2057db96d56Sopenharmony_ci        h = asyncio.Handle(lambda: False, (), self.loop, None)
2067db96d56Sopenharmony_ci        h.cancel()
2077db96d56Sopenharmony_ci
2087db96d56Sopenharmony_ci        self.loop._add_callback(h)
2097db96d56Sopenharmony_ci        self.assertFalse(self.loop._scheduled)
2107db96d56Sopenharmony_ci        self.assertFalse(self.loop._ready)
2117db96d56Sopenharmony_ci
2127db96d56Sopenharmony_ci    def test_set_default_executor(self):
2137db96d56Sopenharmony_ci        class DummyExecutor(concurrent.futures.ThreadPoolExecutor):
2147db96d56Sopenharmony_ci            def submit(self, fn, *args, **kwargs):
2157db96d56Sopenharmony_ci                raise NotImplementedError(
2167db96d56Sopenharmony_ci                    'cannot submit into a dummy executor')
2177db96d56Sopenharmony_ci
2187db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
2197db96d56Sopenharmony_ci        self.loop._write_to_self = mock.Mock()
2207db96d56Sopenharmony_ci
2217db96d56Sopenharmony_ci        executor = DummyExecutor()
2227db96d56Sopenharmony_ci        self.loop.set_default_executor(executor)
2237db96d56Sopenharmony_ci        self.assertIs(executor, self.loop._default_executor)
2247db96d56Sopenharmony_ci
2257db96d56Sopenharmony_ci    def test_set_default_executor_error(self):
2267db96d56Sopenharmony_ci        executor = mock.Mock()
2277db96d56Sopenharmony_ci
2287db96d56Sopenharmony_ci        msg = 'executor must be ThreadPoolExecutor instance'
2297db96d56Sopenharmony_ci        with self.assertRaisesRegex(TypeError, msg):
2307db96d56Sopenharmony_ci            self.loop.set_default_executor(executor)
2317db96d56Sopenharmony_ci
2327db96d56Sopenharmony_ci        self.assertIsNone(self.loop._default_executor)
2337db96d56Sopenharmony_ci
2347db96d56Sopenharmony_ci    def test_call_soon(self):
2357db96d56Sopenharmony_ci        def cb():
2367db96d56Sopenharmony_ci            pass
2377db96d56Sopenharmony_ci
2387db96d56Sopenharmony_ci        h = self.loop.call_soon(cb)
2397db96d56Sopenharmony_ci        self.assertEqual(h._callback, cb)
2407db96d56Sopenharmony_ci        self.assertIsInstance(h, asyncio.Handle)
2417db96d56Sopenharmony_ci        self.assertIn(h, self.loop._ready)
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ci    def test_call_soon_non_callable(self):
2447db96d56Sopenharmony_ci        self.loop.set_debug(True)
2457db96d56Sopenharmony_ci        with self.assertRaisesRegex(TypeError, 'a callable object'):
2467db96d56Sopenharmony_ci            self.loop.call_soon(1)
2477db96d56Sopenharmony_ci
2487db96d56Sopenharmony_ci    def test_call_later(self):
2497db96d56Sopenharmony_ci        def cb():
2507db96d56Sopenharmony_ci            pass
2517db96d56Sopenharmony_ci
2527db96d56Sopenharmony_ci        h = self.loop.call_later(10.0, cb)
2537db96d56Sopenharmony_ci        self.assertIsInstance(h, asyncio.TimerHandle)
2547db96d56Sopenharmony_ci        self.assertIn(h, self.loop._scheduled)
2557db96d56Sopenharmony_ci        self.assertNotIn(h, self.loop._ready)
2567db96d56Sopenharmony_ci        with self.assertRaises(TypeError, msg="delay must not be None"):
2577db96d56Sopenharmony_ci            self.loop.call_later(None, cb)
2587db96d56Sopenharmony_ci
2597db96d56Sopenharmony_ci    def test_call_later_negative_delays(self):
2607db96d56Sopenharmony_ci        calls = []
2617db96d56Sopenharmony_ci
2627db96d56Sopenharmony_ci        def cb(arg):
2637db96d56Sopenharmony_ci            calls.append(arg)
2647db96d56Sopenharmony_ci
2657db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
2667db96d56Sopenharmony_ci        self.loop.call_later(-1, cb, 'a')
2677db96d56Sopenharmony_ci        self.loop.call_later(-2, cb, 'b')
2687db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
2697db96d56Sopenharmony_ci        self.assertEqual(calls, ['b', 'a'])
2707db96d56Sopenharmony_ci
2717db96d56Sopenharmony_ci    def test_time_and_call_at(self):
2727db96d56Sopenharmony_ci        def cb():
2737db96d56Sopenharmony_ci            self.loop.stop()
2747db96d56Sopenharmony_ci
2757db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
2767db96d56Sopenharmony_ci        delay = 0.1
2777db96d56Sopenharmony_ci
2787db96d56Sopenharmony_ci        when = self.loop.time() + delay
2797db96d56Sopenharmony_ci        self.loop.call_at(when, cb)
2807db96d56Sopenharmony_ci        t0 = self.loop.time()
2817db96d56Sopenharmony_ci        self.loop.run_forever()
2827db96d56Sopenharmony_ci        dt = self.loop.time() - t0
2837db96d56Sopenharmony_ci
2847db96d56Sopenharmony_ci        # 50 ms: maximum granularity of the event loop
2857db96d56Sopenharmony_ci        self.assertGreaterEqual(dt, delay - 0.050, dt)
2867db96d56Sopenharmony_ci        # tolerate a difference of +800 ms because some Python buildbots
2877db96d56Sopenharmony_ci        # are really slow
2887db96d56Sopenharmony_ci        self.assertLessEqual(dt, 0.9, dt)
2897db96d56Sopenharmony_ci        with self.assertRaises(TypeError, msg="when cannot be None"):
2907db96d56Sopenharmony_ci            self.loop.call_at(None, cb)
2917db96d56Sopenharmony_ci
2927db96d56Sopenharmony_ci    def check_thread(self, loop, debug):
2937db96d56Sopenharmony_ci        def cb():
2947db96d56Sopenharmony_ci            pass
2957db96d56Sopenharmony_ci
2967db96d56Sopenharmony_ci        loop.set_debug(debug)
2977db96d56Sopenharmony_ci        if debug:
2987db96d56Sopenharmony_ci            msg = ("Non-thread-safe operation invoked on an event loop other "
2997db96d56Sopenharmony_ci                   "than the current one")
3007db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError, msg):
3017db96d56Sopenharmony_ci                loop.call_soon(cb)
3027db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError, msg):
3037db96d56Sopenharmony_ci                loop.call_later(60, cb)
3047db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError, msg):
3057db96d56Sopenharmony_ci                loop.call_at(loop.time() + 60, cb)
3067db96d56Sopenharmony_ci        else:
3077db96d56Sopenharmony_ci            loop.call_soon(cb)
3087db96d56Sopenharmony_ci            loop.call_later(60, cb)
3097db96d56Sopenharmony_ci            loop.call_at(loop.time() + 60, cb)
3107db96d56Sopenharmony_ci
3117db96d56Sopenharmony_ci    def test_check_thread(self):
3127db96d56Sopenharmony_ci        def check_in_thread(loop, event, debug, create_loop, fut):
3137db96d56Sopenharmony_ci            # wait until the event loop is running
3147db96d56Sopenharmony_ci            event.wait()
3157db96d56Sopenharmony_ci
3167db96d56Sopenharmony_ci            try:
3177db96d56Sopenharmony_ci                if create_loop:
3187db96d56Sopenharmony_ci                    loop2 = base_events.BaseEventLoop()
3197db96d56Sopenharmony_ci                    try:
3207db96d56Sopenharmony_ci                        asyncio.set_event_loop(loop2)
3217db96d56Sopenharmony_ci                        self.check_thread(loop, debug)
3227db96d56Sopenharmony_ci                    finally:
3237db96d56Sopenharmony_ci                        asyncio.set_event_loop(None)
3247db96d56Sopenharmony_ci                        loop2.close()
3257db96d56Sopenharmony_ci                else:
3267db96d56Sopenharmony_ci                    self.check_thread(loop, debug)
3277db96d56Sopenharmony_ci            except Exception as exc:
3287db96d56Sopenharmony_ci                loop.call_soon_threadsafe(fut.set_exception, exc)
3297db96d56Sopenharmony_ci            else:
3307db96d56Sopenharmony_ci                loop.call_soon_threadsafe(fut.set_result, None)
3317db96d56Sopenharmony_ci
3327db96d56Sopenharmony_ci        def test_thread(loop, debug, create_loop=False):
3337db96d56Sopenharmony_ci            event = threading.Event()
3347db96d56Sopenharmony_ci            fut = loop.create_future()
3357db96d56Sopenharmony_ci            loop.call_soon(event.set)
3367db96d56Sopenharmony_ci            args = (loop, event, debug, create_loop, fut)
3377db96d56Sopenharmony_ci            thread = threading.Thread(target=check_in_thread, args=args)
3387db96d56Sopenharmony_ci            thread.start()
3397db96d56Sopenharmony_ci            loop.run_until_complete(fut)
3407db96d56Sopenharmony_ci            thread.join()
3417db96d56Sopenharmony_ci
3427db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
3437db96d56Sopenharmony_ci        self.loop._write_to_self = mock.Mock()
3447db96d56Sopenharmony_ci
3457db96d56Sopenharmony_ci        # raise RuntimeError if the thread has no event loop
3467db96d56Sopenharmony_ci        test_thread(self.loop, True)
3477db96d56Sopenharmony_ci
3487db96d56Sopenharmony_ci        # check disabled if debug mode is disabled
3497db96d56Sopenharmony_ci        test_thread(self.loop, False)
3507db96d56Sopenharmony_ci
3517db96d56Sopenharmony_ci        # raise RuntimeError if the event loop of the thread is not the called
3527db96d56Sopenharmony_ci        # event loop
3537db96d56Sopenharmony_ci        test_thread(self.loop, True, create_loop=True)
3547db96d56Sopenharmony_ci
3557db96d56Sopenharmony_ci        # check disabled if debug mode is disabled
3567db96d56Sopenharmony_ci        test_thread(self.loop, False, create_loop=True)
3577db96d56Sopenharmony_ci
3587db96d56Sopenharmony_ci    def test__run_once(self):
3597db96d56Sopenharmony_ci        h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
3607db96d56Sopenharmony_ci                                 self.loop, None)
3617db96d56Sopenharmony_ci        h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
3627db96d56Sopenharmony_ci                                 self.loop, None)
3637db96d56Sopenharmony_ci
3647db96d56Sopenharmony_ci        h1.cancel()
3657db96d56Sopenharmony_ci
3667db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
3677db96d56Sopenharmony_ci        self.loop._scheduled.append(h1)
3687db96d56Sopenharmony_ci        self.loop._scheduled.append(h2)
3697db96d56Sopenharmony_ci        self.loop._run_once()
3707db96d56Sopenharmony_ci
3717db96d56Sopenharmony_ci        t = self.loop._selector.select.call_args[0][0]
3727db96d56Sopenharmony_ci        self.assertTrue(9.5 < t < 10.5, t)
3737db96d56Sopenharmony_ci        self.assertEqual([h2], self.loop._scheduled)
3747db96d56Sopenharmony_ci        self.assertTrue(self.loop._process_events.called)
3757db96d56Sopenharmony_ci
3767db96d56Sopenharmony_ci    def test_set_debug(self):
3777db96d56Sopenharmony_ci        self.loop.set_debug(True)
3787db96d56Sopenharmony_ci        self.assertTrue(self.loop.get_debug())
3797db96d56Sopenharmony_ci        self.loop.set_debug(False)
3807db96d56Sopenharmony_ci        self.assertFalse(self.loop.get_debug())
3817db96d56Sopenharmony_ci
3827db96d56Sopenharmony_ci    def test__run_once_schedule_handle(self):
3837db96d56Sopenharmony_ci        handle = None
3847db96d56Sopenharmony_ci        processed = False
3857db96d56Sopenharmony_ci
3867db96d56Sopenharmony_ci        def cb(loop):
3877db96d56Sopenharmony_ci            nonlocal processed, handle
3887db96d56Sopenharmony_ci            processed = True
3897db96d56Sopenharmony_ci            handle = loop.call_soon(lambda: True)
3907db96d56Sopenharmony_ci
3917db96d56Sopenharmony_ci        h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
3927db96d56Sopenharmony_ci                                self.loop, None)
3937db96d56Sopenharmony_ci
3947db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
3957db96d56Sopenharmony_ci        self.loop._scheduled.append(h)
3967db96d56Sopenharmony_ci        self.loop._run_once()
3977db96d56Sopenharmony_ci
3987db96d56Sopenharmony_ci        self.assertTrue(processed)
3997db96d56Sopenharmony_ci        self.assertEqual([handle], list(self.loop._ready))
4007db96d56Sopenharmony_ci
4017db96d56Sopenharmony_ci    def test__run_once_cancelled_event_cleanup(self):
4027db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
4037db96d56Sopenharmony_ci
4047db96d56Sopenharmony_ci        self.assertTrue(
4057db96d56Sopenharmony_ci            0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0)
4067db96d56Sopenharmony_ci
4077db96d56Sopenharmony_ci        def cb():
4087db96d56Sopenharmony_ci            pass
4097db96d56Sopenharmony_ci
4107db96d56Sopenharmony_ci        # Set up one "blocking" event that will not be cancelled to
4117db96d56Sopenharmony_ci        # ensure later cancelled events do not make it to the head
4127db96d56Sopenharmony_ci        # of the queue and get cleaned.
4137db96d56Sopenharmony_ci        not_cancelled_count = 1
4147db96d56Sopenharmony_ci        self.loop.call_later(3000, cb)
4157db96d56Sopenharmony_ci
4167db96d56Sopenharmony_ci        # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES)
4177db96d56Sopenharmony_ci        # cancelled handles, ensure they aren't removed
4187db96d56Sopenharmony_ci
4197db96d56Sopenharmony_ci        cancelled_count = 2
4207db96d56Sopenharmony_ci        for x in range(2):
4217db96d56Sopenharmony_ci            h = self.loop.call_later(3600, cb)
4227db96d56Sopenharmony_ci            h.cancel()
4237db96d56Sopenharmony_ci
4247db96d56Sopenharmony_ci        # Add some cancelled events that will be at head and removed
4257db96d56Sopenharmony_ci        cancelled_count += 2
4267db96d56Sopenharmony_ci        for x in range(2):
4277db96d56Sopenharmony_ci            h = self.loop.call_later(100, cb)
4287db96d56Sopenharmony_ci            h.cancel()
4297db96d56Sopenharmony_ci
4307db96d56Sopenharmony_ci        # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low
4317db96d56Sopenharmony_ci        self.assertLessEqual(cancelled_count + not_cancelled_count,
4327db96d56Sopenharmony_ci            base_events._MIN_SCHEDULED_TIMER_HANDLES)
4337db96d56Sopenharmony_ci
4347db96d56Sopenharmony_ci        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
4357db96d56Sopenharmony_ci
4367db96d56Sopenharmony_ci        self.loop._run_once()
4377db96d56Sopenharmony_ci
4387db96d56Sopenharmony_ci        cancelled_count -= 2
4397db96d56Sopenharmony_ci
4407db96d56Sopenharmony_ci        self.assertEqual(self.loop._timer_cancelled_count, cancelled_count)
4417db96d56Sopenharmony_ci
4427db96d56Sopenharmony_ci        self.assertEqual(len(self.loop._scheduled),
4437db96d56Sopenharmony_ci            cancelled_count + not_cancelled_count)
4447db96d56Sopenharmony_ci
4457db96d56Sopenharmony_ci        # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION
4467db96d56Sopenharmony_ci        # so that deletion of cancelled events will occur on next _run_once
4477db96d56Sopenharmony_ci        add_cancel_count = int(math.ceil(
4487db96d56Sopenharmony_ci            base_events._MIN_SCHEDULED_TIMER_HANDLES *
4497db96d56Sopenharmony_ci            base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1
4507db96d56Sopenharmony_ci
4517db96d56Sopenharmony_ci        add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES -
4527db96d56Sopenharmony_ci            add_cancel_count, 0)
4537db96d56Sopenharmony_ci
4547db96d56Sopenharmony_ci        # Add some events that will not be cancelled
4557db96d56Sopenharmony_ci        not_cancelled_count += add_not_cancel_count
4567db96d56Sopenharmony_ci        for x in range(add_not_cancel_count):
4577db96d56Sopenharmony_ci            self.loop.call_later(3600, cb)
4587db96d56Sopenharmony_ci
4597db96d56Sopenharmony_ci        # Add enough cancelled events
4607db96d56Sopenharmony_ci        cancelled_count += add_cancel_count
4617db96d56Sopenharmony_ci        for x in range(add_cancel_count):
4627db96d56Sopenharmony_ci            h = self.loop.call_later(3600, cb)
4637db96d56Sopenharmony_ci            h.cancel()
4647db96d56Sopenharmony_ci
4657db96d56Sopenharmony_ci        # Ensure all handles are still scheduled
4667db96d56Sopenharmony_ci        self.assertEqual(len(self.loop._scheduled),
4677db96d56Sopenharmony_ci            cancelled_count + not_cancelled_count)
4687db96d56Sopenharmony_ci
4697db96d56Sopenharmony_ci        self.loop._run_once()
4707db96d56Sopenharmony_ci
4717db96d56Sopenharmony_ci        # Ensure cancelled events were removed
4727db96d56Sopenharmony_ci        self.assertEqual(len(self.loop._scheduled), not_cancelled_count)
4737db96d56Sopenharmony_ci
4747db96d56Sopenharmony_ci        # Ensure only uncancelled events remain scheduled
4757db96d56Sopenharmony_ci        self.assertTrue(all([not x._cancelled for x in self.loop._scheduled]))
4767db96d56Sopenharmony_ci
4777db96d56Sopenharmony_ci    def test_run_until_complete_type_error(self):
4787db96d56Sopenharmony_ci        self.assertRaises(TypeError,
4797db96d56Sopenharmony_ci            self.loop.run_until_complete, 'blah')
4807db96d56Sopenharmony_ci
4817db96d56Sopenharmony_ci    def test_run_until_complete_loop(self):
4827db96d56Sopenharmony_ci        task = self.loop.create_future()
4837db96d56Sopenharmony_ci        other_loop = self.new_test_loop()
4847db96d56Sopenharmony_ci        self.addCleanup(other_loop.close)
4857db96d56Sopenharmony_ci        self.assertRaises(ValueError,
4867db96d56Sopenharmony_ci            other_loop.run_until_complete, task)
4877db96d56Sopenharmony_ci
4887db96d56Sopenharmony_ci    def test_run_until_complete_loop_orphan_future_close_loop(self):
4897db96d56Sopenharmony_ci        class ShowStopper(SystemExit):
4907db96d56Sopenharmony_ci            pass
4917db96d56Sopenharmony_ci
4927db96d56Sopenharmony_ci        async def foo(delay):
4937db96d56Sopenharmony_ci            await asyncio.sleep(delay)
4947db96d56Sopenharmony_ci
4957db96d56Sopenharmony_ci        def throw():
4967db96d56Sopenharmony_ci            raise ShowStopper
4977db96d56Sopenharmony_ci
4987db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
4997db96d56Sopenharmony_ci        self.loop.call_soon(throw)
5007db96d56Sopenharmony_ci        with self.assertRaises(ShowStopper):
5017db96d56Sopenharmony_ci            self.loop.run_until_complete(foo(0.1))
5027db96d56Sopenharmony_ci
5037db96d56Sopenharmony_ci        # This call fails if run_until_complete does not clean up
5047db96d56Sopenharmony_ci        # done-callback for the previous future.
5057db96d56Sopenharmony_ci        self.loop.run_until_complete(foo(0.2))
5067db96d56Sopenharmony_ci
5077db96d56Sopenharmony_ci    def test_subprocess_exec_invalid_args(self):
5087db96d56Sopenharmony_ci        args = [sys.executable, '-c', 'pass']
5097db96d56Sopenharmony_ci
5107db96d56Sopenharmony_ci        # missing program parameter (empty args)
5117db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5127db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_exec,
5137db96d56Sopenharmony_ci            asyncio.SubprocessProtocol)
5147db96d56Sopenharmony_ci
5157db96d56Sopenharmony_ci        # expected multiple arguments, not a list
5167db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5177db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_exec,
5187db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, args)
5197db96d56Sopenharmony_ci
5207db96d56Sopenharmony_ci        # program arguments must be strings, not int
5217db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5227db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_exec,
5237db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, sys.executable, 123)
5247db96d56Sopenharmony_ci
5257db96d56Sopenharmony_ci        # universal_newlines, shell, bufsize must not be set
5267db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5277db96d56Sopenharmony_ci        self.loop.run_until_complete, self.loop.subprocess_exec,
5287db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, *args, universal_newlines=True)
5297db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5307db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_exec,
5317db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, *args, shell=True)
5327db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5337db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_exec,
5347db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, *args, bufsize=4096)
5357db96d56Sopenharmony_ci
5367db96d56Sopenharmony_ci    def test_subprocess_shell_invalid_args(self):
5377db96d56Sopenharmony_ci        # expected a string, not an int or a list
5387db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5397db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_shell,
5407db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, 123)
5417db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5427db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_shell,
5437db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass'])
5447db96d56Sopenharmony_ci
5457db96d56Sopenharmony_ci        # universal_newlines, shell, bufsize must not be set
5467db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5477db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_shell,
5487db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True)
5497db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5507db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_shell,
5517db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, 'exit 0', shell=True)
5527db96d56Sopenharmony_ci        self.assertRaises(TypeError,
5537db96d56Sopenharmony_ci            self.loop.run_until_complete, self.loop.subprocess_shell,
5547db96d56Sopenharmony_ci            asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
5557db96d56Sopenharmony_ci
5567db96d56Sopenharmony_ci    def test_default_exc_handler_callback(self):
5577db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
5587db96d56Sopenharmony_ci
5597db96d56Sopenharmony_ci        def zero_error(fut):
5607db96d56Sopenharmony_ci            fut.set_result(True)
5617db96d56Sopenharmony_ci            1/0
5627db96d56Sopenharmony_ci
5637db96d56Sopenharmony_ci        # Test call_soon (events.Handle)
5647db96d56Sopenharmony_ci        with mock.patch('asyncio.base_events.logger') as log:
5657db96d56Sopenharmony_ci            fut = self.loop.create_future()
5667db96d56Sopenharmony_ci            self.loop.call_soon(zero_error, fut)
5677db96d56Sopenharmony_ci            fut.add_done_callback(lambda fut: self.loop.stop())
5687db96d56Sopenharmony_ci            self.loop.run_forever()
5697db96d56Sopenharmony_ci            log.error.assert_called_with(
5707db96d56Sopenharmony_ci                test_utils.MockPattern('Exception in callback.*zero'),
5717db96d56Sopenharmony_ci                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
5727db96d56Sopenharmony_ci
5737db96d56Sopenharmony_ci        # Test call_later (events.TimerHandle)
5747db96d56Sopenharmony_ci        with mock.patch('asyncio.base_events.logger') as log:
5757db96d56Sopenharmony_ci            fut = self.loop.create_future()
5767db96d56Sopenharmony_ci            self.loop.call_later(0.01, zero_error, fut)
5777db96d56Sopenharmony_ci            fut.add_done_callback(lambda fut: self.loop.stop())
5787db96d56Sopenharmony_ci            self.loop.run_forever()
5797db96d56Sopenharmony_ci            log.error.assert_called_with(
5807db96d56Sopenharmony_ci                test_utils.MockPattern('Exception in callback.*zero'),
5817db96d56Sopenharmony_ci                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
5827db96d56Sopenharmony_ci
5837db96d56Sopenharmony_ci    def test_default_exc_handler_coro(self):
5847db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
5857db96d56Sopenharmony_ci
5867db96d56Sopenharmony_ci        async def zero_error_coro():
5877db96d56Sopenharmony_ci            await asyncio.sleep(0.01)
5887db96d56Sopenharmony_ci            1/0
5897db96d56Sopenharmony_ci
5907db96d56Sopenharmony_ci        # Test Future.__del__
5917db96d56Sopenharmony_ci        with mock.patch('asyncio.base_events.logger') as log:
5927db96d56Sopenharmony_ci            fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop)
5937db96d56Sopenharmony_ci            fut.add_done_callback(lambda *args: self.loop.stop())
5947db96d56Sopenharmony_ci            self.loop.run_forever()
5957db96d56Sopenharmony_ci            fut = None # Trigger Future.__del__ or futures._TracebackLogger
5967db96d56Sopenharmony_ci            support.gc_collect()
5977db96d56Sopenharmony_ci            # Future.__del__ in logs error with an actual exception context
5987db96d56Sopenharmony_ci            log.error.assert_called_with(
5997db96d56Sopenharmony_ci                test_utils.MockPattern('.*exception was never retrieved'),
6007db96d56Sopenharmony_ci                exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
6017db96d56Sopenharmony_ci
6027db96d56Sopenharmony_ci    def test_set_exc_handler_invalid(self):
6037db96d56Sopenharmony_ci        with self.assertRaisesRegex(TypeError, 'A callable object or None'):
6047db96d56Sopenharmony_ci            self.loop.set_exception_handler('spam')
6057db96d56Sopenharmony_ci
6067db96d56Sopenharmony_ci    def test_set_exc_handler_custom(self):
6077db96d56Sopenharmony_ci        def zero_error():
6087db96d56Sopenharmony_ci            1/0
6097db96d56Sopenharmony_ci
6107db96d56Sopenharmony_ci        def run_loop():
6117db96d56Sopenharmony_ci            handle = self.loop.call_soon(zero_error)
6127db96d56Sopenharmony_ci            self.loop._run_once()
6137db96d56Sopenharmony_ci            return handle
6147db96d56Sopenharmony_ci
6157db96d56Sopenharmony_ci        self.loop.set_debug(True)
6167db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
6177db96d56Sopenharmony_ci
6187db96d56Sopenharmony_ci        self.assertIsNone(self.loop.get_exception_handler())
6197db96d56Sopenharmony_ci        mock_handler = mock.Mock()
6207db96d56Sopenharmony_ci        self.loop.set_exception_handler(mock_handler)
6217db96d56Sopenharmony_ci        self.assertIs(self.loop.get_exception_handler(), mock_handler)
6227db96d56Sopenharmony_ci        handle = run_loop()
6237db96d56Sopenharmony_ci        mock_handler.assert_called_with(self.loop, {
6247db96d56Sopenharmony_ci            'exception': MOCK_ANY,
6257db96d56Sopenharmony_ci            'message': test_utils.MockPattern(
6267db96d56Sopenharmony_ci                                'Exception in callback.*zero_error'),
6277db96d56Sopenharmony_ci            'handle': handle,
6287db96d56Sopenharmony_ci            'source_traceback': handle._source_traceback,
6297db96d56Sopenharmony_ci        })
6307db96d56Sopenharmony_ci        mock_handler.reset_mock()
6317db96d56Sopenharmony_ci
6327db96d56Sopenharmony_ci        self.loop.set_exception_handler(None)
6337db96d56Sopenharmony_ci        with mock.patch('asyncio.base_events.logger') as log:
6347db96d56Sopenharmony_ci            run_loop()
6357db96d56Sopenharmony_ci            log.error.assert_called_with(
6367db96d56Sopenharmony_ci                        test_utils.MockPattern(
6377db96d56Sopenharmony_ci                                'Exception in callback.*zero'),
6387db96d56Sopenharmony_ci                        exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
6397db96d56Sopenharmony_ci
6407db96d56Sopenharmony_ci        self.assertFalse(mock_handler.called)
6417db96d56Sopenharmony_ci
6427db96d56Sopenharmony_ci    def test_set_exc_handler_broken(self):
6437db96d56Sopenharmony_ci        def run_loop():
6447db96d56Sopenharmony_ci            def zero_error():
6457db96d56Sopenharmony_ci                1/0
6467db96d56Sopenharmony_ci            self.loop.call_soon(zero_error)
6477db96d56Sopenharmony_ci            self.loop._run_once()
6487db96d56Sopenharmony_ci
6497db96d56Sopenharmony_ci        def handler(loop, context):
6507db96d56Sopenharmony_ci            raise AttributeError('spam')
6517db96d56Sopenharmony_ci
6527db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
6537db96d56Sopenharmony_ci
6547db96d56Sopenharmony_ci        self.loop.set_exception_handler(handler)
6557db96d56Sopenharmony_ci
6567db96d56Sopenharmony_ci        with mock.patch('asyncio.base_events.logger') as log:
6577db96d56Sopenharmony_ci            run_loop()
6587db96d56Sopenharmony_ci            log.error.assert_called_with(
6597db96d56Sopenharmony_ci                test_utils.MockPattern(
6607db96d56Sopenharmony_ci                    'Unhandled error in exception handler'),
6617db96d56Sopenharmony_ci                exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
6627db96d56Sopenharmony_ci
6637db96d56Sopenharmony_ci    def test_default_exc_handler_broken(self):
6647db96d56Sopenharmony_ci        _context = None
6657db96d56Sopenharmony_ci
6667db96d56Sopenharmony_ci        class Loop(base_events.BaseEventLoop):
6677db96d56Sopenharmony_ci
6687db96d56Sopenharmony_ci            _selector = mock.Mock()
6697db96d56Sopenharmony_ci            _process_events = mock.Mock()
6707db96d56Sopenharmony_ci
6717db96d56Sopenharmony_ci            def default_exception_handler(self, context):
6727db96d56Sopenharmony_ci                nonlocal _context
6737db96d56Sopenharmony_ci                _context = context
6747db96d56Sopenharmony_ci                # Simulates custom buggy "default_exception_handler"
6757db96d56Sopenharmony_ci                raise ValueError('spam')
6767db96d56Sopenharmony_ci
6777db96d56Sopenharmony_ci        loop = Loop()
6787db96d56Sopenharmony_ci        self.addCleanup(loop.close)
6797db96d56Sopenharmony_ci        asyncio.set_event_loop(loop)
6807db96d56Sopenharmony_ci
6817db96d56Sopenharmony_ci        def run_loop():
6827db96d56Sopenharmony_ci            def zero_error():
6837db96d56Sopenharmony_ci                1/0
6847db96d56Sopenharmony_ci            loop.call_soon(zero_error)
6857db96d56Sopenharmony_ci            loop._run_once()
6867db96d56Sopenharmony_ci
6877db96d56Sopenharmony_ci        with mock.patch('asyncio.base_events.logger') as log:
6887db96d56Sopenharmony_ci            run_loop()
6897db96d56Sopenharmony_ci            log.error.assert_called_with(
6907db96d56Sopenharmony_ci                'Exception in default exception handler',
6917db96d56Sopenharmony_ci                exc_info=True)
6927db96d56Sopenharmony_ci
6937db96d56Sopenharmony_ci        def custom_handler(loop, context):
6947db96d56Sopenharmony_ci            raise ValueError('ham')
6957db96d56Sopenharmony_ci
6967db96d56Sopenharmony_ci        _context = None
6977db96d56Sopenharmony_ci        loop.set_exception_handler(custom_handler)
6987db96d56Sopenharmony_ci        with mock.patch('asyncio.base_events.logger') as log:
6997db96d56Sopenharmony_ci            run_loop()
7007db96d56Sopenharmony_ci            log.error.assert_called_with(
7017db96d56Sopenharmony_ci                test_utils.MockPattern('Exception in default exception.*'
7027db96d56Sopenharmony_ci                                       'while handling.*in custom'),
7037db96d56Sopenharmony_ci                exc_info=True)
7047db96d56Sopenharmony_ci
7057db96d56Sopenharmony_ci            # Check that original context was passed to default
7067db96d56Sopenharmony_ci            # exception handler.
7077db96d56Sopenharmony_ci            self.assertIn('context', _context)
7087db96d56Sopenharmony_ci            self.assertIs(type(_context['context']['exception']),
7097db96d56Sopenharmony_ci                          ZeroDivisionError)
7107db96d56Sopenharmony_ci
7117db96d56Sopenharmony_ci    def test_set_task_factory_invalid(self):
7127db96d56Sopenharmony_ci        with self.assertRaisesRegex(
7137db96d56Sopenharmony_ci            TypeError, 'task factory must be a callable or None'):
7147db96d56Sopenharmony_ci
7157db96d56Sopenharmony_ci            self.loop.set_task_factory(1)
7167db96d56Sopenharmony_ci
7177db96d56Sopenharmony_ci        self.assertIsNone(self.loop.get_task_factory())
7187db96d56Sopenharmony_ci
7197db96d56Sopenharmony_ci    def test_set_task_factory(self):
7207db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
7217db96d56Sopenharmony_ci
7227db96d56Sopenharmony_ci        class MyTask(asyncio.Task):
7237db96d56Sopenharmony_ci            pass
7247db96d56Sopenharmony_ci
7257db96d56Sopenharmony_ci        async def coro():
7267db96d56Sopenharmony_ci            pass
7277db96d56Sopenharmony_ci
7287db96d56Sopenharmony_ci        factory = lambda loop, coro: MyTask(coro, loop=loop)
7297db96d56Sopenharmony_ci
7307db96d56Sopenharmony_ci        self.assertIsNone(self.loop.get_task_factory())
7317db96d56Sopenharmony_ci        self.loop.set_task_factory(factory)
7327db96d56Sopenharmony_ci        self.assertIs(self.loop.get_task_factory(), factory)
7337db96d56Sopenharmony_ci
7347db96d56Sopenharmony_ci        task = self.loop.create_task(coro())
7357db96d56Sopenharmony_ci        self.assertTrue(isinstance(task, MyTask))
7367db96d56Sopenharmony_ci        self.loop.run_until_complete(task)
7377db96d56Sopenharmony_ci
7387db96d56Sopenharmony_ci        self.loop.set_task_factory(None)
7397db96d56Sopenharmony_ci        self.assertIsNone(self.loop.get_task_factory())
7407db96d56Sopenharmony_ci
7417db96d56Sopenharmony_ci        task = self.loop.create_task(coro())
7427db96d56Sopenharmony_ci        self.assertTrue(isinstance(task, asyncio.Task))
7437db96d56Sopenharmony_ci        self.assertFalse(isinstance(task, MyTask))
7447db96d56Sopenharmony_ci        self.loop.run_until_complete(task)
7457db96d56Sopenharmony_ci
7467db96d56Sopenharmony_ci    def test_env_var_debug(self):
7477db96d56Sopenharmony_ci        code = '\n'.join((
7487db96d56Sopenharmony_ci            'import asyncio',
7497db96d56Sopenharmony_ci            'loop = asyncio.new_event_loop()',
7507db96d56Sopenharmony_ci            'print(loop.get_debug())'))
7517db96d56Sopenharmony_ci
7527db96d56Sopenharmony_ci        # Test with -E to not fail if the unit test was run with
7537db96d56Sopenharmony_ci        # PYTHONASYNCIODEBUG set to a non-empty string
7547db96d56Sopenharmony_ci        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
7557db96d56Sopenharmony_ci        self.assertEqual(stdout.rstrip(), b'False')
7567db96d56Sopenharmony_ci
7577db96d56Sopenharmony_ci        sts, stdout, stderr = assert_python_ok('-c', code,
7587db96d56Sopenharmony_ci                                               PYTHONASYNCIODEBUG='',
7597db96d56Sopenharmony_ci                                               PYTHONDEVMODE='')
7607db96d56Sopenharmony_ci        self.assertEqual(stdout.rstrip(), b'False')
7617db96d56Sopenharmony_ci
7627db96d56Sopenharmony_ci        sts, stdout, stderr = assert_python_ok('-c', code,
7637db96d56Sopenharmony_ci                                               PYTHONASYNCIODEBUG='1',
7647db96d56Sopenharmony_ci                                               PYTHONDEVMODE='')
7657db96d56Sopenharmony_ci        self.assertEqual(stdout.rstrip(), b'True')
7667db96d56Sopenharmony_ci
7677db96d56Sopenharmony_ci        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
7687db96d56Sopenharmony_ci                                               PYTHONASYNCIODEBUG='1')
7697db96d56Sopenharmony_ci        self.assertEqual(stdout.rstrip(), b'False')
7707db96d56Sopenharmony_ci
7717db96d56Sopenharmony_ci        # -X dev
7727db96d56Sopenharmony_ci        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
7737db96d56Sopenharmony_ci                                               '-c', code)
7747db96d56Sopenharmony_ci        self.assertEqual(stdout.rstrip(), b'True')
7757db96d56Sopenharmony_ci
7767db96d56Sopenharmony_ci    def test_create_task(self):
7777db96d56Sopenharmony_ci        class MyTask(asyncio.Task):
7787db96d56Sopenharmony_ci            pass
7797db96d56Sopenharmony_ci
7807db96d56Sopenharmony_ci        async def test():
7817db96d56Sopenharmony_ci            pass
7827db96d56Sopenharmony_ci
7837db96d56Sopenharmony_ci        class EventLoop(base_events.BaseEventLoop):
7847db96d56Sopenharmony_ci            def create_task(self, coro):
7857db96d56Sopenharmony_ci                return MyTask(coro, loop=loop)
7867db96d56Sopenharmony_ci
7877db96d56Sopenharmony_ci        loop = EventLoop()
7887db96d56Sopenharmony_ci        self.set_event_loop(loop)
7897db96d56Sopenharmony_ci
7907db96d56Sopenharmony_ci        coro = test()
7917db96d56Sopenharmony_ci        task = asyncio.ensure_future(coro, loop=loop)
7927db96d56Sopenharmony_ci        self.assertIsInstance(task, MyTask)
7937db96d56Sopenharmony_ci
7947db96d56Sopenharmony_ci        # make warnings quiet
7957db96d56Sopenharmony_ci        task._log_destroy_pending = False
7967db96d56Sopenharmony_ci        coro.close()
7977db96d56Sopenharmony_ci
7987db96d56Sopenharmony_ci    def test_create_task_error_closes_coro(self):
7997db96d56Sopenharmony_ci        async def test():
8007db96d56Sopenharmony_ci            pass
8017db96d56Sopenharmony_ci        loop = asyncio.new_event_loop()
8027db96d56Sopenharmony_ci        loop.close()
8037db96d56Sopenharmony_ci        with warnings.catch_warnings(record=True) as w:
8047db96d56Sopenharmony_ci            with self.assertRaises(RuntimeError):
8057db96d56Sopenharmony_ci                asyncio.ensure_future(test(), loop=loop)
8067db96d56Sopenharmony_ci            self.assertEqual(len(w), 0)
8077db96d56Sopenharmony_ci
8087db96d56Sopenharmony_ci
8097db96d56Sopenharmony_ci    def test_create_named_task_with_default_factory(self):
8107db96d56Sopenharmony_ci        async def test():
8117db96d56Sopenharmony_ci            pass
8127db96d56Sopenharmony_ci
8137db96d56Sopenharmony_ci        loop = asyncio.new_event_loop()
8147db96d56Sopenharmony_ci        task = loop.create_task(test(), name='test_task')
8157db96d56Sopenharmony_ci        try:
8167db96d56Sopenharmony_ci            self.assertEqual(task.get_name(), 'test_task')
8177db96d56Sopenharmony_ci        finally:
8187db96d56Sopenharmony_ci            loop.run_until_complete(task)
8197db96d56Sopenharmony_ci            loop.close()
8207db96d56Sopenharmony_ci
8217db96d56Sopenharmony_ci    def test_create_named_task_with_custom_factory(self):
8227db96d56Sopenharmony_ci        def task_factory(loop, coro):
8237db96d56Sopenharmony_ci            return asyncio.Task(coro, loop=loop)
8247db96d56Sopenharmony_ci
8257db96d56Sopenharmony_ci        async def test():
8267db96d56Sopenharmony_ci            pass
8277db96d56Sopenharmony_ci
8287db96d56Sopenharmony_ci        loop = asyncio.new_event_loop()
8297db96d56Sopenharmony_ci        loop.set_task_factory(task_factory)
8307db96d56Sopenharmony_ci        task = loop.create_task(test(), name='test_task')
8317db96d56Sopenharmony_ci        try:
8327db96d56Sopenharmony_ci            self.assertEqual(task.get_name(), 'test_task')
8337db96d56Sopenharmony_ci        finally:
8347db96d56Sopenharmony_ci            loop.run_until_complete(task)
8357db96d56Sopenharmony_ci            loop.close()
8367db96d56Sopenharmony_ci
8377db96d56Sopenharmony_ci    def test_run_forever_keyboard_interrupt(self):
8387db96d56Sopenharmony_ci        # Python issue #22601: ensure that the temporary task created by
8397db96d56Sopenharmony_ci        # run_forever() consumes the KeyboardInterrupt and so don't log
8407db96d56Sopenharmony_ci        # a warning
8417db96d56Sopenharmony_ci        async def raise_keyboard_interrupt():
8427db96d56Sopenharmony_ci            raise KeyboardInterrupt
8437db96d56Sopenharmony_ci
8447db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
8457db96d56Sopenharmony_ci        self.loop.call_exception_handler = mock.Mock()
8467db96d56Sopenharmony_ci
8477db96d56Sopenharmony_ci        try:
8487db96d56Sopenharmony_ci            self.loop.run_until_complete(raise_keyboard_interrupt())
8497db96d56Sopenharmony_ci        except KeyboardInterrupt:
8507db96d56Sopenharmony_ci            pass
8517db96d56Sopenharmony_ci        self.loop.close()
8527db96d56Sopenharmony_ci        support.gc_collect()
8537db96d56Sopenharmony_ci
8547db96d56Sopenharmony_ci        self.assertFalse(self.loop.call_exception_handler.called)
8557db96d56Sopenharmony_ci
8567db96d56Sopenharmony_ci    def test_run_until_complete_baseexception(self):
8577db96d56Sopenharmony_ci        # Python issue #22429: run_until_complete() must not schedule a pending
8587db96d56Sopenharmony_ci        # call to stop() if the future raised a BaseException
8597db96d56Sopenharmony_ci        async def raise_keyboard_interrupt():
8607db96d56Sopenharmony_ci            raise KeyboardInterrupt
8617db96d56Sopenharmony_ci
8627db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
8637db96d56Sopenharmony_ci
8647db96d56Sopenharmony_ci        try:
8657db96d56Sopenharmony_ci            self.loop.run_until_complete(raise_keyboard_interrupt())
8667db96d56Sopenharmony_ci        except KeyboardInterrupt:
8677db96d56Sopenharmony_ci            pass
8687db96d56Sopenharmony_ci
8697db96d56Sopenharmony_ci        def func():
8707db96d56Sopenharmony_ci            self.loop.stop()
8717db96d56Sopenharmony_ci            func.called = True
8727db96d56Sopenharmony_ci        func.called = False
8737db96d56Sopenharmony_ci        try:
8747db96d56Sopenharmony_ci            self.loop.call_soon(func)
8757db96d56Sopenharmony_ci            self.loop.run_forever()
8767db96d56Sopenharmony_ci        except KeyboardInterrupt:
8777db96d56Sopenharmony_ci            pass
8787db96d56Sopenharmony_ci        self.assertTrue(func.called)
8797db96d56Sopenharmony_ci
8807db96d56Sopenharmony_ci    def test_single_selecter_event_callback_after_stopping(self):
8817db96d56Sopenharmony_ci        # Python issue #25593: A stopped event loop may cause event callbacks
8827db96d56Sopenharmony_ci        # to run more than once.
8837db96d56Sopenharmony_ci        event_sentinel = object()
8847db96d56Sopenharmony_ci        callcount = 0
8857db96d56Sopenharmony_ci        doer = None
8867db96d56Sopenharmony_ci
8877db96d56Sopenharmony_ci        def proc_events(event_list):
8887db96d56Sopenharmony_ci            nonlocal doer
8897db96d56Sopenharmony_ci            if event_sentinel in event_list:
8907db96d56Sopenharmony_ci                doer = self.loop.call_soon(do_event)
8917db96d56Sopenharmony_ci
8927db96d56Sopenharmony_ci        def do_event():
8937db96d56Sopenharmony_ci            nonlocal callcount
8947db96d56Sopenharmony_ci            callcount += 1
8957db96d56Sopenharmony_ci            self.loop.call_soon(clear_selector)
8967db96d56Sopenharmony_ci
8977db96d56Sopenharmony_ci        def clear_selector():
8987db96d56Sopenharmony_ci            doer.cancel()
8997db96d56Sopenharmony_ci            self.loop._selector.select.return_value = ()
9007db96d56Sopenharmony_ci
9017db96d56Sopenharmony_ci        self.loop._process_events = proc_events
9027db96d56Sopenharmony_ci        self.loop._selector.select.return_value = (event_sentinel,)
9037db96d56Sopenharmony_ci
9047db96d56Sopenharmony_ci        for i in range(1, 3):
9057db96d56Sopenharmony_ci            with self.subTest('Loop %d/2' % i):
9067db96d56Sopenharmony_ci                self.loop.call_soon(self.loop.stop)
9077db96d56Sopenharmony_ci                self.loop.run_forever()
9087db96d56Sopenharmony_ci                self.assertEqual(callcount, 1)
9097db96d56Sopenharmony_ci
9107db96d56Sopenharmony_ci    def test_run_once(self):
9117db96d56Sopenharmony_ci        # Simple test for test_utils.run_once().  It may seem strange
9127db96d56Sopenharmony_ci        # to have a test for this (the function isn't even used!) but
9137db96d56Sopenharmony_ci        # it's a de-factor standard API for library tests.  This tests
9147db96d56Sopenharmony_ci        # the idiom: loop.call_soon(loop.stop); loop.run_forever().
9157db96d56Sopenharmony_ci        count = 0
9167db96d56Sopenharmony_ci
9177db96d56Sopenharmony_ci        def callback():
9187db96d56Sopenharmony_ci            nonlocal count
9197db96d56Sopenharmony_ci            count += 1
9207db96d56Sopenharmony_ci
9217db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
9227db96d56Sopenharmony_ci        self.loop.call_soon(callback)
9237db96d56Sopenharmony_ci        test_utils.run_once(self.loop)
9247db96d56Sopenharmony_ci        self.assertEqual(count, 1)
9257db96d56Sopenharmony_ci
9267db96d56Sopenharmony_ci    def test_run_forever_pre_stopped(self):
9277db96d56Sopenharmony_ci        # Test that the old idiom for pre-stopping the loop works.
9287db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
9297db96d56Sopenharmony_ci        self.loop.stop()
9307db96d56Sopenharmony_ci        self.loop.run_forever()
9317db96d56Sopenharmony_ci        self.loop._selector.select.assert_called_once_with(0)
9327db96d56Sopenharmony_ci
9337db96d56Sopenharmony_ci    async def leave_unfinalized_asyncgen(self):
9347db96d56Sopenharmony_ci        # Create an async generator, iterate it partially, and leave it
9357db96d56Sopenharmony_ci        # to be garbage collected.
9367db96d56Sopenharmony_ci        # Used in async generator finalization tests.
9377db96d56Sopenharmony_ci        # Depends on implementation details of garbage collector. Changes
9387db96d56Sopenharmony_ci        # in gc may break this function.
9397db96d56Sopenharmony_ci        status = {'started': False,
9407db96d56Sopenharmony_ci                  'stopped': False,
9417db96d56Sopenharmony_ci                  'finalized': False}
9427db96d56Sopenharmony_ci
9437db96d56Sopenharmony_ci        async def agen():
9447db96d56Sopenharmony_ci            status['started'] = True
9457db96d56Sopenharmony_ci            try:
9467db96d56Sopenharmony_ci                for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
9477db96d56Sopenharmony_ci                    yield item
9487db96d56Sopenharmony_ci            finally:
9497db96d56Sopenharmony_ci                status['finalized'] = True
9507db96d56Sopenharmony_ci
9517db96d56Sopenharmony_ci        ag = agen()
9527db96d56Sopenharmony_ci        ai = ag.__aiter__()
9537db96d56Sopenharmony_ci
9547db96d56Sopenharmony_ci        async def iter_one():
9557db96d56Sopenharmony_ci            try:
9567db96d56Sopenharmony_ci                item = await ai.__anext__()
9577db96d56Sopenharmony_ci            except StopAsyncIteration:
9587db96d56Sopenharmony_ci                return
9597db96d56Sopenharmony_ci            if item == 'THREE':
9607db96d56Sopenharmony_ci                status['stopped'] = True
9617db96d56Sopenharmony_ci                return
9627db96d56Sopenharmony_ci            asyncio.create_task(iter_one())
9637db96d56Sopenharmony_ci
9647db96d56Sopenharmony_ci        asyncio.create_task(iter_one())
9657db96d56Sopenharmony_ci        return status
9667db96d56Sopenharmony_ci
9677db96d56Sopenharmony_ci    def test_asyncgen_finalization_by_gc(self):
9687db96d56Sopenharmony_ci        # Async generators should be finalized when garbage collected.
9697db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
9707db96d56Sopenharmony_ci        self.loop._write_to_self = mock.Mock()
9717db96d56Sopenharmony_ci        with support.disable_gc():
9727db96d56Sopenharmony_ci            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
9737db96d56Sopenharmony_ci            while not status['stopped']:
9747db96d56Sopenharmony_ci                test_utils.run_briefly(self.loop)
9757db96d56Sopenharmony_ci            self.assertTrue(status['started'])
9767db96d56Sopenharmony_ci            self.assertTrue(status['stopped'])
9777db96d56Sopenharmony_ci            self.assertFalse(status['finalized'])
9787db96d56Sopenharmony_ci            support.gc_collect()
9797db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)
9807db96d56Sopenharmony_ci            self.assertTrue(status['finalized'])
9817db96d56Sopenharmony_ci
9827db96d56Sopenharmony_ci    def test_asyncgen_finalization_by_gc_in_other_thread(self):
9837db96d56Sopenharmony_ci        # Python issue 34769: If garbage collector runs in another
9847db96d56Sopenharmony_ci        # thread, async generators will not finalize in debug
9857db96d56Sopenharmony_ci        # mode.
9867db96d56Sopenharmony_ci        self.loop._process_events = mock.Mock()
9877db96d56Sopenharmony_ci        self.loop._write_to_self = mock.Mock()
9887db96d56Sopenharmony_ci        self.loop.set_debug(True)
9897db96d56Sopenharmony_ci        with support.disable_gc():
9907db96d56Sopenharmony_ci            status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
9917db96d56Sopenharmony_ci            while not status['stopped']:
9927db96d56Sopenharmony_ci                test_utils.run_briefly(self.loop)
9937db96d56Sopenharmony_ci            self.assertTrue(status['started'])
9947db96d56Sopenharmony_ci            self.assertTrue(status['stopped'])
9957db96d56Sopenharmony_ci            self.assertFalse(status['finalized'])
9967db96d56Sopenharmony_ci            self.loop.run_until_complete(
9977db96d56Sopenharmony_ci                self.loop.run_in_executor(None, support.gc_collect))
9987db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)
9997db96d56Sopenharmony_ci            self.assertTrue(status['finalized'])
10007db96d56Sopenharmony_ci
10017db96d56Sopenharmony_ci
10027db96d56Sopenharmony_ciclass MyProto(asyncio.Protocol):
10037db96d56Sopenharmony_ci    done = None
10047db96d56Sopenharmony_ci
10057db96d56Sopenharmony_ci    def __init__(self, create_future=False):
10067db96d56Sopenharmony_ci        self.state = 'INITIAL'
10077db96d56Sopenharmony_ci        self.nbytes = 0
10087db96d56Sopenharmony_ci        if create_future:
10097db96d56Sopenharmony_ci            self.done = asyncio.get_running_loop().create_future()
10107db96d56Sopenharmony_ci
10117db96d56Sopenharmony_ci    def _assert_state(self, *expected):
10127db96d56Sopenharmony_ci        if self.state not in expected:
10137db96d56Sopenharmony_ci            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
10147db96d56Sopenharmony_ci
10157db96d56Sopenharmony_ci    def connection_made(self, transport):
10167db96d56Sopenharmony_ci        self.transport = transport
10177db96d56Sopenharmony_ci        self._assert_state('INITIAL')
10187db96d56Sopenharmony_ci        self.state = 'CONNECTED'
10197db96d56Sopenharmony_ci        transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n')
10207db96d56Sopenharmony_ci
10217db96d56Sopenharmony_ci    def data_received(self, data):
10227db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
10237db96d56Sopenharmony_ci        self.nbytes += len(data)
10247db96d56Sopenharmony_ci
10257db96d56Sopenharmony_ci    def eof_received(self):
10267db96d56Sopenharmony_ci        self._assert_state('CONNECTED')
10277db96d56Sopenharmony_ci        self.state = 'EOF'
10287db96d56Sopenharmony_ci
10297db96d56Sopenharmony_ci    def connection_lost(self, exc):
10307db96d56Sopenharmony_ci        self._assert_state('CONNECTED', 'EOF')
10317db96d56Sopenharmony_ci        self.state = 'CLOSED'
10327db96d56Sopenharmony_ci        if self.done:
10337db96d56Sopenharmony_ci            self.done.set_result(None)
10347db96d56Sopenharmony_ci
10357db96d56Sopenharmony_ci
10367db96d56Sopenharmony_ciclass MyDatagramProto(asyncio.DatagramProtocol):
10377db96d56Sopenharmony_ci    done = None
10387db96d56Sopenharmony_ci
10397db96d56Sopenharmony_ci    def __init__(self, create_future=False, loop=None):
10407db96d56Sopenharmony_ci        self.state = 'INITIAL'
10417db96d56Sopenharmony_ci        self.nbytes = 0
10427db96d56Sopenharmony_ci        if create_future:
10437db96d56Sopenharmony_ci            self.done = loop.create_future()
10447db96d56Sopenharmony_ci
10457db96d56Sopenharmony_ci    def _assert_state(self, expected):
10467db96d56Sopenharmony_ci        if self.state != expected:
10477db96d56Sopenharmony_ci            raise AssertionError(f'state: {self.state!r}, expected: {expected!r}')
10487db96d56Sopenharmony_ci
10497db96d56Sopenharmony_ci    def connection_made(self, transport):
10507db96d56Sopenharmony_ci        self.transport = transport
10517db96d56Sopenharmony_ci        self._assert_state('INITIAL')
10527db96d56Sopenharmony_ci        self.state = 'INITIALIZED'
10537db96d56Sopenharmony_ci
10547db96d56Sopenharmony_ci    def datagram_received(self, data, addr):
10557db96d56Sopenharmony_ci        self._assert_state('INITIALIZED')
10567db96d56Sopenharmony_ci        self.nbytes += len(data)
10577db96d56Sopenharmony_ci
10587db96d56Sopenharmony_ci    def error_received(self, exc):
10597db96d56Sopenharmony_ci        self._assert_state('INITIALIZED')
10607db96d56Sopenharmony_ci
10617db96d56Sopenharmony_ci    def connection_lost(self, exc):
10627db96d56Sopenharmony_ci        self._assert_state('INITIALIZED')
10637db96d56Sopenharmony_ci        self.state = 'CLOSED'
10647db96d56Sopenharmony_ci        if self.done:
10657db96d56Sopenharmony_ci            self.done.set_result(None)
10667db96d56Sopenharmony_ci
10677db96d56Sopenharmony_ci
10687db96d56Sopenharmony_ciclass BaseEventLoopWithSelectorTests(test_utils.TestCase):
10697db96d56Sopenharmony_ci
10707db96d56Sopenharmony_ci    def setUp(self):
10717db96d56Sopenharmony_ci        super().setUp()
10727db96d56Sopenharmony_ci        self.loop = asyncio.SelectorEventLoop()
10737db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
10747db96d56Sopenharmony_ci
10757db96d56Sopenharmony_ci    @mock.patch('socket.getnameinfo')
10767db96d56Sopenharmony_ci    def test_getnameinfo(self, m_gai):
10777db96d56Sopenharmony_ci        m_gai.side_effect = lambda *args: 42
10787db96d56Sopenharmony_ci        r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123)))
10797db96d56Sopenharmony_ci        self.assertEqual(r, 42)
10807db96d56Sopenharmony_ci
10817db96d56Sopenharmony_ci    @patch_socket
10827db96d56Sopenharmony_ci    def test_create_connection_multiple_errors(self, m_socket):
10837db96d56Sopenharmony_ci
10847db96d56Sopenharmony_ci        class MyProto(asyncio.Protocol):
10857db96d56Sopenharmony_ci            pass
10867db96d56Sopenharmony_ci
10877db96d56Sopenharmony_ci        async def getaddrinfo(*args, **kw):
10887db96d56Sopenharmony_ci            return [(2, 1, 6, '', ('107.6.106.82', 80)),
10897db96d56Sopenharmony_ci                    (2, 1, 6, '', ('107.6.106.82', 80))]
10907db96d56Sopenharmony_ci
10917db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
10927db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
10937db96d56Sopenharmony_ci
10947db96d56Sopenharmony_ci        idx = -1
10957db96d56Sopenharmony_ci        errors = ['err1', 'err2']
10967db96d56Sopenharmony_ci
10977db96d56Sopenharmony_ci        def _socket(*args, **kw):
10987db96d56Sopenharmony_ci            nonlocal idx, errors
10997db96d56Sopenharmony_ci            idx += 1
11007db96d56Sopenharmony_ci            raise OSError(errors[idx])
11017db96d56Sopenharmony_ci
11027db96d56Sopenharmony_ci        m_socket.socket = _socket
11037db96d56Sopenharmony_ci
11047db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
11057db96d56Sopenharmony_ci
11067db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, 'example.com', 80)
11077db96d56Sopenharmony_ci        with self.assertRaises(OSError) as cm:
11087db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
11097db96d56Sopenharmony_ci
11107db96d56Sopenharmony_ci        self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2')
11117db96d56Sopenharmony_ci
11127db96d56Sopenharmony_ci    @patch_socket
11137db96d56Sopenharmony_ci    def test_create_connection_timeout(self, m_socket):
11147db96d56Sopenharmony_ci        # Ensure that the socket is closed on timeout
11157db96d56Sopenharmony_ci        sock = mock.Mock()
11167db96d56Sopenharmony_ci        m_socket.socket.return_value = sock
11177db96d56Sopenharmony_ci
11187db96d56Sopenharmony_ci        def getaddrinfo(*args, **kw):
11197db96d56Sopenharmony_ci            fut = self.loop.create_future()
11207db96d56Sopenharmony_ci            addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '',
11217db96d56Sopenharmony_ci                    ('127.0.0.1', 80))
11227db96d56Sopenharmony_ci            fut.set_result([addr])
11237db96d56Sopenharmony_ci            return fut
11247db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo
11257db96d56Sopenharmony_ci
11267db96d56Sopenharmony_ci        with mock.patch.object(self.loop, 'sock_connect',
11277db96d56Sopenharmony_ci                               side_effect=asyncio.TimeoutError):
11287db96d56Sopenharmony_ci            coro = self.loop.create_connection(MyProto, '127.0.0.1', 80)
11297db96d56Sopenharmony_ci            with self.assertRaises(asyncio.TimeoutError):
11307db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
11317db96d56Sopenharmony_ci            self.assertTrue(sock.close.called)
11327db96d56Sopenharmony_ci
11337db96d56Sopenharmony_ci    def test_create_connection_host_port_sock(self):
11347db96d56Sopenharmony_ci        coro = self.loop.create_connection(
11357db96d56Sopenharmony_ci            MyProto, 'example.com', 80, sock=object())
11367db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
11377db96d56Sopenharmony_ci
11387db96d56Sopenharmony_ci    def test_create_connection_wrong_sock(self):
11397db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
11407db96d56Sopenharmony_ci        with sock:
11417db96d56Sopenharmony_ci            coro = self.loop.create_connection(MyProto, sock=sock)
11427db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError,
11437db96d56Sopenharmony_ci                                        'A Stream Socket was expected'):
11447db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
11457db96d56Sopenharmony_ci
11467db96d56Sopenharmony_ci    def test_create_server_wrong_sock(self):
11477db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
11487db96d56Sopenharmony_ci        with sock:
11497db96d56Sopenharmony_ci            coro = self.loop.create_server(MyProto, sock=sock)
11507db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError,
11517db96d56Sopenharmony_ci                                        'A Stream Socket was expected'):
11527db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
11537db96d56Sopenharmony_ci
11547db96d56Sopenharmony_ci    def test_create_server_ssl_timeout_for_plain_socket(self):
11557db96d56Sopenharmony_ci        coro = self.loop.create_server(
11567db96d56Sopenharmony_ci            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
11577db96d56Sopenharmony_ci        with self.assertRaisesRegex(
11587db96d56Sopenharmony_ci                ValueError,
11597db96d56Sopenharmony_ci                'ssl_handshake_timeout is only meaningful with ssl'):
11607db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
11617db96d56Sopenharmony_ci
11627db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
11637db96d56Sopenharmony_ci                         'no socket.SOCK_NONBLOCK (linux only)')
11647db96d56Sopenharmony_ci    def test_create_server_stream_bittype(self):
11657db96d56Sopenharmony_ci        sock = socket.socket(
11667db96d56Sopenharmony_ci            socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
11677db96d56Sopenharmony_ci        with sock:
11687db96d56Sopenharmony_ci            coro = self.loop.create_server(lambda: None, sock=sock)
11697db96d56Sopenharmony_ci            srv = self.loop.run_until_complete(coro)
11707db96d56Sopenharmony_ci            srv.close()
11717db96d56Sopenharmony_ci            self.loop.run_until_complete(srv.wait_closed())
11727db96d56Sopenharmony_ci
11737db96d56Sopenharmony_ci    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
11747db96d56Sopenharmony_ci    def test_create_server_ipv6(self):
11757db96d56Sopenharmony_ci        async def main():
11767db96d56Sopenharmony_ci            srv = await asyncio.start_server(lambda: None, '::1', 0)
11777db96d56Sopenharmony_ci            try:
11787db96d56Sopenharmony_ci                self.assertGreater(len(srv.sockets), 0)
11797db96d56Sopenharmony_ci            finally:
11807db96d56Sopenharmony_ci                srv.close()
11817db96d56Sopenharmony_ci                await srv.wait_closed()
11827db96d56Sopenharmony_ci
11837db96d56Sopenharmony_ci        try:
11847db96d56Sopenharmony_ci            self.loop.run_until_complete(main())
11857db96d56Sopenharmony_ci        except OSError as ex:
11867db96d56Sopenharmony_ci            if (hasattr(errno, 'EADDRNOTAVAIL') and
11877db96d56Sopenharmony_ci                    ex.errno == errno.EADDRNOTAVAIL):
11887db96d56Sopenharmony_ci                self.skipTest('failed to bind to ::1')
11897db96d56Sopenharmony_ci            else:
11907db96d56Sopenharmony_ci                raise
11917db96d56Sopenharmony_ci
11927db96d56Sopenharmony_ci    def test_create_datagram_endpoint_wrong_sock(self):
11937db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_INET)
11947db96d56Sopenharmony_ci        with sock:
11957db96d56Sopenharmony_ci            coro = self.loop.create_datagram_endpoint(MyProto, sock=sock)
11967db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError,
11977db96d56Sopenharmony_ci                                        'A UDP Socket was expected'):
11987db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
11997db96d56Sopenharmony_ci
12007db96d56Sopenharmony_ci    def test_create_connection_no_host_port_sock(self):
12017db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto)
12027db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
12037db96d56Sopenharmony_ci
12047db96d56Sopenharmony_ci    def test_create_connection_no_getaddrinfo(self):
12057db96d56Sopenharmony_ci        async def getaddrinfo(*args, **kw):
12067db96d56Sopenharmony_ci            return []
12077db96d56Sopenharmony_ci
12087db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
12097db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
12107db96d56Sopenharmony_ci
12117db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
12127db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, 'example.com', 80)
12137db96d56Sopenharmony_ci        self.assertRaises(
12147db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
12157db96d56Sopenharmony_ci
12167db96d56Sopenharmony_ci    def test_create_connection_connect_err(self):
12177db96d56Sopenharmony_ci        async def getaddrinfo(*args, **kw):
12187db96d56Sopenharmony_ci            return [(2, 1, 6, '', ('107.6.106.82', 80))]
12197db96d56Sopenharmony_ci
12207db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
12217db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
12227db96d56Sopenharmony_ci
12237db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
12247db96d56Sopenharmony_ci        self.loop.sock_connect = mock.Mock()
12257db96d56Sopenharmony_ci        self.loop.sock_connect.side_effect = OSError
12267db96d56Sopenharmony_ci
12277db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, 'example.com', 80)
12287db96d56Sopenharmony_ci        self.assertRaises(
12297db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
12307db96d56Sopenharmony_ci
12317db96d56Sopenharmony_ci    def test_create_connection_multiple(self):
12327db96d56Sopenharmony_ci        async def getaddrinfo(*args, **kw):
12337db96d56Sopenharmony_ci            return [(2, 1, 6, '', ('0.0.0.1', 80)),
12347db96d56Sopenharmony_ci                    (2, 1, 6, '', ('0.0.0.2', 80))]
12357db96d56Sopenharmony_ci
12367db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
12377db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
12387db96d56Sopenharmony_ci
12397db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
12407db96d56Sopenharmony_ci        self.loop.sock_connect = mock.Mock()
12417db96d56Sopenharmony_ci        self.loop.sock_connect.side_effect = OSError
12427db96d56Sopenharmony_ci
12437db96d56Sopenharmony_ci        coro = self.loop.create_connection(
12447db96d56Sopenharmony_ci            MyProto, 'example.com', 80, family=socket.AF_INET)
12457db96d56Sopenharmony_ci        with self.assertRaises(OSError):
12467db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
12477db96d56Sopenharmony_ci
12487db96d56Sopenharmony_ci    @patch_socket
12497db96d56Sopenharmony_ci    def test_create_connection_multiple_errors_local_addr(self, m_socket):
12507db96d56Sopenharmony_ci
12517db96d56Sopenharmony_ci        def bind(addr):
12527db96d56Sopenharmony_ci            if addr[0] == '0.0.0.1':
12537db96d56Sopenharmony_ci                err = OSError('Err')
12547db96d56Sopenharmony_ci                err.strerror = 'Err'
12557db96d56Sopenharmony_ci                raise err
12567db96d56Sopenharmony_ci
12577db96d56Sopenharmony_ci        m_socket.socket.return_value.bind = bind
12587db96d56Sopenharmony_ci
12597db96d56Sopenharmony_ci        async def getaddrinfo(*args, **kw):
12607db96d56Sopenharmony_ci            return [(2, 1, 6, '', ('0.0.0.1', 80)),
12617db96d56Sopenharmony_ci                    (2, 1, 6, '', ('0.0.0.2', 80))]
12627db96d56Sopenharmony_ci
12637db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
12647db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
12657db96d56Sopenharmony_ci
12667db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
12677db96d56Sopenharmony_ci        self.loop.sock_connect = mock.Mock()
12687db96d56Sopenharmony_ci        self.loop.sock_connect.side_effect = OSError('Err2')
12697db96d56Sopenharmony_ci
12707db96d56Sopenharmony_ci        coro = self.loop.create_connection(
12717db96d56Sopenharmony_ci            MyProto, 'example.com', 80, family=socket.AF_INET,
12727db96d56Sopenharmony_ci            local_addr=(None, 8080))
12737db96d56Sopenharmony_ci        with self.assertRaises(OSError) as cm:
12747db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
12757db96d56Sopenharmony_ci
12767db96d56Sopenharmony_ci        self.assertTrue(str(cm.exception).startswith('Multiple exceptions: '))
12777db96d56Sopenharmony_ci        self.assertTrue(m_socket.socket.return_value.close.called)
12787db96d56Sopenharmony_ci
12797db96d56Sopenharmony_ci    def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton):
12807db96d56Sopenharmony_ci        # Test the fallback code, even if this system has inet_pton.
12817db96d56Sopenharmony_ci        if not allow_inet_pton:
12827db96d56Sopenharmony_ci            del m_socket.inet_pton
12837db96d56Sopenharmony_ci
12847db96d56Sopenharmony_ci        m_socket.getaddrinfo = socket.getaddrinfo
12857db96d56Sopenharmony_ci        sock = m_socket.socket.return_value
12867db96d56Sopenharmony_ci
12877db96d56Sopenharmony_ci        self.loop._add_reader = mock.Mock()
12887db96d56Sopenharmony_ci        self.loop._add_writer = mock.Mock()
12897db96d56Sopenharmony_ci
12907db96d56Sopenharmony_ci        coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80)
12917db96d56Sopenharmony_ci        t, p = self.loop.run_until_complete(coro)
12927db96d56Sopenharmony_ci        try:
12937db96d56Sopenharmony_ci            sock.connect.assert_called_with(('1.2.3.4', 80))
12947db96d56Sopenharmony_ci            _, kwargs = m_socket.socket.call_args
12957db96d56Sopenharmony_ci            self.assertEqual(kwargs['family'], m_socket.AF_INET)
12967db96d56Sopenharmony_ci            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
12977db96d56Sopenharmony_ci        finally:
12987db96d56Sopenharmony_ci            t.close()
12997db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)  # allow transport to close
13007db96d56Sopenharmony_ci
13017db96d56Sopenharmony_ci        if socket_helper.IPV6_ENABLED:
13027db96d56Sopenharmony_ci            sock.family = socket.AF_INET6
13037db96d56Sopenharmony_ci            coro = self.loop.create_connection(asyncio.Protocol, '::1', 80)
13047db96d56Sopenharmony_ci            t, p = self.loop.run_until_complete(coro)
13057db96d56Sopenharmony_ci            try:
13067db96d56Sopenharmony_ci                # Without inet_pton we use getaddrinfo, which transforms
13077db96d56Sopenharmony_ci                # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info,
13087db96d56Sopenharmony_ci                # scope id.
13097db96d56Sopenharmony_ci                [address] = sock.connect.call_args[0]
13107db96d56Sopenharmony_ci                host, port = address[:2]
13117db96d56Sopenharmony_ci                self.assertRegex(host, r'::(0\.)*1')
13127db96d56Sopenharmony_ci                self.assertEqual(port, 80)
13137db96d56Sopenharmony_ci                _, kwargs = m_socket.socket.call_args
13147db96d56Sopenharmony_ci                self.assertEqual(kwargs['family'], m_socket.AF_INET6)
13157db96d56Sopenharmony_ci                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
13167db96d56Sopenharmony_ci            finally:
13177db96d56Sopenharmony_ci                t.close()
13187db96d56Sopenharmony_ci                test_utils.run_briefly(self.loop)  # allow transport to close
13197db96d56Sopenharmony_ci
13207db96d56Sopenharmony_ci    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support')
13217db96d56Sopenharmony_ci    @unittest.skipIf(sys.platform.startswith('aix'),
13227db96d56Sopenharmony_ci                    "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX")
13237db96d56Sopenharmony_ci    @patch_socket
13247db96d56Sopenharmony_ci    def test_create_connection_ipv6_scope(self, m_socket):
13257db96d56Sopenharmony_ci        m_socket.getaddrinfo = socket.getaddrinfo
13267db96d56Sopenharmony_ci        sock = m_socket.socket.return_value
13277db96d56Sopenharmony_ci        sock.family = socket.AF_INET6
13287db96d56Sopenharmony_ci
13297db96d56Sopenharmony_ci        self.loop._add_reader = mock.Mock()
13307db96d56Sopenharmony_ci        self.loop._add_writer = mock.Mock()
13317db96d56Sopenharmony_ci
13327db96d56Sopenharmony_ci        coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80)
13337db96d56Sopenharmony_ci        t, p = self.loop.run_until_complete(coro)
13347db96d56Sopenharmony_ci        try:
13357db96d56Sopenharmony_ci            sock.connect.assert_called_with(('fe80::1', 80, 0, 1))
13367db96d56Sopenharmony_ci            _, kwargs = m_socket.socket.call_args
13377db96d56Sopenharmony_ci            self.assertEqual(kwargs['family'], m_socket.AF_INET6)
13387db96d56Sopenharmony_ci            self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
13397db96d56Sopenharmony_ci        finally:
13407db96d56Sopenharmony_ci            t.close()
13417db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)  # allow transport to close
13427db96d56Sopenharmony_ci
13437db96d56Sopenharmony_ci    @patch_socket
13447db96d56Sopenharmony_ci    def test_create_connection_ip_addr(self, m_socket):
13457db96d56Sopenharmony_ci        self._test_create_connection_ip_addr(m_socket, True)
13467db96d56Sopenharmony_ci
13477db96d56Sopenharmony_ci    @patch_socket
13487db96d56Sopenharmony_ci    def test_create_connection_no_inet_pton(self, m_socket):
13497db96d56Sopenharmony_ci        self._test_create_connection_ip_addr(m_socket, False)
13507db96d56Sopenharmony_ci
13517db96d56Sopenharmony_ci    @patch_socket
13527db96d56Sopenharmony_ci    def test_create_connection_service_name(self, m_socket):
13537db96d56Sopenharmony_ci        m_socket.getaddrinfo = socket.getaddrinfo
13547db96d56Sopenharmony_ci        sock = m_socket.socket.return_value
13557db96d56Sopenharmony_ci
13567db96d56Sopenharmony_ci        self.loop._add_reader = mock.Mock()
13577db96d56Sopenharmony_ci        self.loop._add_writer = mock.Mock()
13587db96d56Sopenharmony_ci
13597db96d56Sopenharmony_ci        for service, port in ('http', 80), (b'http', 80):
13607db96d56Sopenharmony_ci            coro = self.loop.create_connection(asyncio.Protocol,
13617db96d56Sopenharmony_ci                                               '127.0.0.1', service)
13627db96d56Sopenharmony_ci
13637db96d56Sopenharmony_ci            t, p = self.loop.run_until_complete(coro)
13647db96d56Sopenharmony_ci            try:
13657db96d56Sopenharmony_ci                sock.connect.assert_called_with(('127.0.0.1', port))
13667db96d56Sopenharmony_ci                _, kwargs = m_socket.socket.call_args
13677db96d56Sopenharmony_ci                self.assertEqual(kwargs['family'], m_socket.AF_INET)
13687db96d56Sopenharmony_ci                self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM)
13697db96d56Sopenharmony_ci            finally:
13707db96d56Sopenharmony_ci                t.close()
13717db96d56Sopenharmony_ci                test_utils.run_briefly(self.loop)  # allow transport to close
13727db96d56Sopenharmony_ci
13737db96d56Sopenharmony_ci        for service in 'nonsense', b'nonsense':
13747db96d56Sopenharmony_ci            coro = self.loop.create_connection(asyncio.Protocol,
13757db96d56Sopenharmony_ci                                               '127.0.0.1', service)
13767db96d56Sopenharmony_ci
13777db96d56Sopenharmony_ci            with self.assertRaises(OSError):
13787db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
13797db96d56Sopenharmony_ci
13807db96d56Sopenharmony_ci    def test_create_connection_no_local_addr(self):
13817db96d56Sopenharmony_ci        async def getaddrinfo(host, *args, **kw):
13827db96d56Sopenharmony_ci            if host == 'example.com':
13837db96d56Sopenharmony_ci                return [(2, 1, 6, '', ('107.6.106.82', 80)),
13847db96d56Sopenharmony_ci                        (2, 1, 6, '', ('107.6.106.82', 80))]
13857db96d56Sopenharmony_ci            else:
13867db96d56Sopenharmony_ci                return []
13877db96d56Sopenharmony_ci
13887db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
13897db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
13907db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
13917db96d56Sopenharmony_ci
13927db96d56Sopenharmony_ci        coro = self.loop.create_connection(
13937db96d56Sopenharmony_ci            MyProto, 'example.com', 80, family=socket.AF_INET,
13947db96d56Sopenharmony_ci            local_addr=(None, 8080))
13957db96d56Sopenharmony_ci        self.assertRaises(
13967db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
13977db96d56Sopenharmony_ci
13987db96d56Sopenharmony_ci    @patch_socket
13997db96d56Sopenharmony_ci    def test_create_connection_bluetooth(self, m_socket):
14007db96d56Sopenharmony_ci        # See http://bugs.python.org/issue27136, fallback to getaddrinfo when
14017db96d56Sopenharmony_ci        # we can't recognize an address is resolved, e.g. a Bluetooth address.
14027db96d56Sopenharmony_ci        addr = ('00:01:02:03:04:05', 1)
14037db96d56Sopenharmony_ci
14047db96d56Sopenharmony_ci        def getaddrinfo(host, port, *args, **kw):
14057db96d56Sopenharmony_ci            self.assertEqual((host, port), addr)
14067db96d56Sopenharmony_ci            return [(999, 1, 999, '', (addr, 1))]
14077db96d56Sopenharmony_ci
14087db96d56Sopenharmony_ci        m_socket.getaddrinfo = getaddrinfo
14097db96d56Sopenharmony_ci        sock = m_socket.socket()
14107db96d56Sopenharmony_ci        coro = self.loop.sock_connect(sock, addr)
14117db96d56Sopenharmony_ci        self.loop.run_until_complete(coro)
14127db96d56Sopenharmony_ci
14137db96d56Sopenharmony_ci    def test_create_connection_ssl_server_hostname_default(self):
14147db96d56Sopenharmony_ci        self.loop.getaddrinfo = mock.Mock()
14157db96d56Sopenharmony_ci
14167db96d56Sopenharmony_ci        def mock_getaddrinfo(*args, **kwds):
14177db96d56Sopenharmony_ci            f = self.loop.create_future()
14187db96d56Sopenharmony_ci            f.set_result([(socket.AF_INET, socket.SOCK_STREAM,
14197db96d56Sopenharmony_ci                           socket.SOL_TCP, '', ('1.2.3.4', 80))])
14207db96d56Sopenharmony_ci            return f
14217db96d56Sopenharmony_ci
14227db96d56Sopenharmony_ci        self.loop.getaddrinfo.side_effect = mock_getaddrinfo
14237db96d56Sopenharmony_ci        self.loop.sock_connect = mock.Mock()
14247db96d56Sopenharmony_ci        self.loop.sock_connect.return_value = self.loop.create_future()
14257db96d56Sopenharmony_ci        self.loop.sock_connect.return_value.set_result(None)
14267db96d56Sopenharmony_ci        self.loop._make_ssl_transport = mock.Mock()
14277db96d56Sopenharmony_ci
14287db96d56Sopenharmony_ci        class _SelectorTransportMock:
14297db96d56Sopenharmony_ci            _sock = None
14307db96d56Sopenharmony_ci
14317db96d56Sopenharmony_ci            def get_extra_info(self, key):
14327db96d56Sopenharmony_ci                return mock.Mock()
14337db96d56Sopenharmony_ci
14347db96d56Sopenharmony_ci            def close(self):
14357db96d56Sopenharmony_ci                self._sock.close()
14367db96d56Sopenharmony_ci
14377db96d56Sopenharmony_ci        def mock_make_ssl_transport(sock, protocol, sslcontext, waiter,
14387db96d56Sopenharmony_ci                                    **kwds):
14397db96d56Sopenharmony_ci            waiter.set_result(None)
14407db96d56Sopenharmony_ci            transport = _SelectorTransportMock()
14417db96d56Sopenharmony_ci            transport._sock = sock
14427db96d56Sopenharmony_ci            return transport
14437db96d56Sopenharmony_ci
14447db96d56Sopenharmony_ci        self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport
14457db96d56Sopenharmony_ci        ANY = mock.ANY
14467db96d56Sopenharmony_ci        handshake_timeout = object()
14477db96d56Sopenharmony_ci        shutdown_timeout = object()
14487db96d56Sopenharmony_ci        # First try the default server_hostname.
14497db96d56Sopenharmony_ci        self.loop._make_ssl_transport.reset_mock()
14507db96d56Sopenharmony_ci        coro = self.loop.create_connection(
14517db96d56Sopenharmony_ci                MyProto, 'python.org', 80, ssl=True,
14527db96d56Sopenharmony_ci                ssl_handshake_timeout=handshake_timeout,
14537db96d56Sopenharmony_ci                ssl_shutdown_timeout=shutdown_timeout)
14547db96d56Sopenharmony_ci        transport, _ = self.loop.run_until_complete(coro)
14557db96d56Sopenharmony_ci        transport.close()
14567db96d56Sopenharmony_ci        self.loop._make_ssl_transport.assert_called_with(
14577db96d56Sopenharmony_ci            ANY, ANY, ANY, ANY,
14587db96d56Sopenharmony_ci            server_side=False,
14597db96d56Sopenharmony_ci            server_hostname='python.org',
14607db96d56Sopenharmony_ci            ssl_handshake_timeout=handshake_timeout,
14617db96d56Sopenharmony_ci            ssl_shutdown_timeout=shutdown_timeout)
14627db96d56Sopenharmony_ci        # Next try an explicit server_hostname.
14637db96d56Sopenharmony_ci        self.loop._make_ssl_transport.reset_mock()
14647db96d56Sopenharmony_ci        coro = self.loop.create_connection(
14657db96d56Sopenharmony_ci                MyProto, 'python.org', 80, ssl=True,
14667db96d56Sopenharmony_ci                server_hostname='perl.com',
14677db96d56Sopenharmony_ci                ssl_handshake_timeout=handshake_timeout,
14687db96d56Sopenharmony_ci                ssl_shutdown_timeout=shutdown_timeout)
14697db96d56Sopenharmony_ci        transport, _ = self.loop.run_until_complete(coro)
14707db96d56Sopenharmony_ci        transport.close()
14717db96d56Sopenharmony_ci        self.loop._make_ssl_transport.assert_called_with(
14727db96d56Sopenharmony_ci            ANY, ANY, ANY, ANY,
14737db96d56Sopenharmony_ci            server_side=False,
14747db96d56Sopenharmony_ci            server_hostname='perl.com',
14757db96d56Sopenharmony_ci            ssl_handshake_timeout=handshake_timeout,
14767db96d56Sopenharmony_ci            ssl_shutdown_timeout=shutdown_timeout)
14777db96d56Sopenharmony_ci        # Finally try an explicit empty server_hostname.
14787db96d56Sopenharmony_ci        self.loop._make_ssl_transport.reset_mock()
14797db96d56Sopenharmony_ci        coro = self.loop.create_connection(
14807db96d56Sopenharmony_ci                MyProto, 'python.org', 80, ssl=True,
14817db96d56Sopenharmony_ci                server_hostname='',
14827db96d56Sopenharmony_ci                ssl_handshake_timeout=handshake_timeout,
14837db96d56Sopenharmony_ci                ssl_shutdown_timeout=shutdown_timeout)
14847db96d56Sopenharmony_ci        transport, _ = self.loop.run_until_complete(coro)
14857db96d56Sopenharmony_ci        transport.close()
14867db96d56Sopenharmony_ci        self.loop._make_ssl_transport.assert_called_with(
14877db96d56Sopenharmony_ci                ANY, ANY, ANY, ANY,
14887db96d56Sopenharmony_ci                server_side=False,
14897db96d56Sopenharmony_ci                server_hostname='',
14907db96d56Sopenharmony_ci                ssl_handshake_timeout=handshake_timeout,
14917db96d56Sopenharmony_ci                ssl_shutdown_timeout=shutdown_timeout)
14927db96d56Sopenharmony_ci
14937db96d56Sopenharmony_ci    def test_create_connection_no_ssl_server_hostname_errors(self):
14947db96d56Sopenharmony_ci        # When not using ssl, server_hostname must be None.
14957db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, 'python.org', 80,
14967db96d56Sopenharmony_ci                                           server_hostname='')
14977db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
14987db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, 'python.org', 80,
14997db96d56Sopenharmony_ci                                           server_hostname='python.org')
15007db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
15017db96d56Sopenharmony_ci
15027db96d56Sopenharmony_ci    def test_create_connection_ssl_server_hostname_errors(self):
15037db96d56Sopenharmony_ci        # When using ssl, server_hostname may be None if host is non-empty.
15047db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, '', 80, ssl=True)
15057db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
15067db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, None, 80, ssl=True)
15077db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
15087db96d56Sopenharmony_ci        sock = socket.socket()
15097db96d56Sopenharmony_ci        coro = self.loop.create_connection(MyProto, None, None,
15107db96d56Sopenharmony_ci                                           ssl=True, sock=sock)
15117db96d56Sopenharmony_ci        self.addCleanup(sock.close)
15127db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
15137db96d56Sopenharmony_ci
15147db96d56Sopenharmony_ci    def test_create_connection_ssl_timeout_for_plain_socket(self):
15157db96d56Sopenharmony_ci        coro = self.loop.create_connection(
15167db96d56Sopenharmony_ci            MyProto, 'example.com', 80, ssl_handshake_timeout=1)
15177db96d56Sopenharmony_ci        with self.assertRaisesRegex(
15187db96d56Sopenharmony_ci                ValueError,
15197db96d56Sopenharmony_ci                'ssl_handshake_timeout is only meaningful with ssl'):
15207db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
15217db96d56Sopenharmony_ci
15227db96d56Sopenharmony_ci    def test_create_server_empty_host(self):
15237db96d56Sopenharmony_ci        # if host is empty string use None instead
15247db96d56Sopenharmony_ci        host = object()
15257db96d56Sopenharmony_ci
15267db96d56Sopenharmony_ci        async def getaddrinfo(*args, **kw):
15277db96d56Sopenharmony_ci            nonlocal host
15287db96d56Sopenharmony_ci            host = args[0]
15297db96d56Sopenharmony_ci            return []
15307db96d56Sopenharmony_ci
15317db96d56Sopenharmony_ci        def getaddrinfo_task(*args, **kwds):
15327db96d56Sopenharmony_ci            return self.loop.create_task(getaddrinfo(*args, **kwds))
15337db96d56Sopenharmony_ci
15347db96d56Sopenharmony_ci        self.loop.getaddrinfo = getaddrinfo_task
15357db96d56Sopenharmony_ci        fut = self.loop.create_server(MyProto, '', 0)
15367db96d56Sopenharmony_ci        self.assertRaises(OSError, self.loop.run_until_complete, fut)
15377db96d56Sopenharmony_ci        self.assertIsNone(host)
15387db96d56Sopenharmony_ci
15397db96d56Sopenharmony_ci    def test_create_server_host_port_sock(self):
15407db96d56Sopenharmony_ci        fut = self.loop.create_server(
15417db96d56Sopenharmony_ci            MyProto, '0.0.0.0', 0, sock=object())
15427db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
15437db96d56Sopenharmony_ci
15447db96d56Sopenharmony_ci    def test_create_server_no_host_port_sock(self):
15457db96d56Sopenharmony_ci        fut = self.loop.create_server(MyProto)
15467db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
15477db96d56Sopenharmony_ci
15487db96d56Sopenharmony_ci    def test_create_server_no_getaddrinfo(self):
15497db96d56Sopenharmony_ci        getaddrinfo = self.loop.getaddrinfo = mock.Mock()
15507db96d56Sopenharmony_ci        getaddrinfo.return_value = self.loop.create_future()
15517db96d56Sopenharmony_ci        getaddrinfo.return_value.set_result(None)
15527db96d56Sopenharmony_ci
15537db96d56Sopenharmony_ci        f = self.loop.create_server(MyProto, 'python.org', 0)
15547db96d56Sopenharmony_ci        self.assertRaises(OSError, self.loop.run_until_complete, f)
15557db96d56Sopenharmony_ci
15567db96d56Sopenharmony_ci    @patch_socket
15577db96d56Sopenharmony_ci    def test_create_server_nosoreuseport(self, m_socket):
15587db96d56Sopenharmony_ci        m_socket.getaddrinfo = socket.getaddrinfo
15597db96d56Sopenharmony_ci        del m_socket.SO_REUSEPORT
15607db96d56Sopenharmony_ci        m_socket.socket.return_value = mock.Mock()
15617db96d56Sopenharmony_ci
15627db96d56Sopenharmony_ci        f = self.loop.create_server(
15637db96d56Sopenharmony_ci            MyProto, '0.0.0.0', 0, reuse_port=True)
15647db96d56Sopenharmony_ci
15657db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, f)
15667db96d56Sopenharmony_ci
15677db96d56Sopenharmony_ci    @patch_socket
15687db96d56Sopenharmony_ci    def test_create_server_soreuseport_only_defined(self, m_socket):
15697db96d56Sopenharmony_ci        m_socket.getaddrinfo = socket.getaddrinfo
15707db96d56Sopenharmony_ci        m_socket.socket.return_value = mock.Mock()
15717db96d56Sopenharmony_ci        m_socket.SO_REUSEPORT = -1
15727db96d56Sopenharmony_ci
15737db96d56Sopenharmony_ci        f = self.loop.create_server(
15747db96d56Sopenharmony_ci            MyProto, '0.0.0.0', 0, reuse_port=True)
15757db96d56Sopenharmony_ci
15767db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, f)
15777db96d56Sopenharmony_ci
15787db96d56Sopenharmony_ci    @patch_socket
15797db96d56Sopenharmony_ci    def test_create_server_cant_bind(self, m_socket):
15807db96d56Sopenharmony_ci
15817db96d56Sopenharmony_ci        class Err(OSError):
15827db96d56Sopenharmony_ci            strerror = 'error'
15837db96d56Sopenharmony_ci
15847db96d56Sopenharmony_ci        m_socket.getaddrinfo.return_value = [
15857db96d56Sopenharmony_ci            (2, 1, 6, '', ('127.0.0.1', 10100))]
15867db96d56Sopenharmony_ci        m_sock = m_socket.socket.return_value = mock.Mock()
15877db96d56Sopenharmony_ci        m_sock.bind.side_effect = Err
15887db96d56Sopenharmony_ci
15897db96d56Sopenharmony_ci        fut = self.loop.create_server(MyProto, '0.0.0.0', 0)
15907db96d56Sopenharmony_ci        self.assertRaises(OSError, self.loop.run_until_complete, fut)
15917db96d56Sopenharmony_ci        self.assertTrue(m_sock.close.called)
15927db96d56Sopenharmony_ci
15937db96d56Sopenharmony_ci    @patch_socket
15947db96d56Sopenharmony_ci    def test_create_datagram_endpoint_no_addrinfo(self, m_socket):
15957db96d56Sopenharmony_ci        m_socket.getaddrinfo.return_value = []
15967db96d56Sopenharmony_ci
15977db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
15987db96d56Sopenharmony_ci            MyDatagramProto, local_addr=('localhost', 0))
15997db96d56Sopenharmony_ci        self.assertRaises(
16007db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
16017db96d56Sopenharmony_ci
16027db96d56Sopenharmony_ci    def test_create_datagram_endpoint_addr_error(self):
16037db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16047db96d56Sopenharmony_ci            MyDatagramProto, local_addr='localhost')
16057db96d56Sopenharmony_ci        self.assertRaises(
16067db96d56Sopenharmony_ci            TypeError, self.loop.run_until_complete, coro)
16077db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16087db96d56Sopenharmony_ci            MyDatagramProto, local_addr=('localhost', 1, 2, 3))
16097db96d56Sopenharmony_ci        self.assertRaises(
16107db96d56Sopenharmony_ci            TypeError, self.loop.run_until_complete, coro)
16117db96d56Sopenharmony_ci
16127db96d56Sopenharmony_ci    def test_create_datagram_endpoint_connect_err(self):
16137db96d56Sopenharmony_ci        self.loop.sock_connect = mock.Mock()
16147db96d56Sopenharmony_ci        self.loop.sock_connect.side_effect = OSError
16157db96d56Sopenharmony_ci
16167db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16177db96d56Sopenharmony_ci            asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0))
16187db96d56Sopenharmony_ci        self.assertRaises(
16197db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
16207db96d56Sopenharmony_ci
16217db96d56Sopenharmony_ci    def test_create_datagram_endpoint_allow_broadcast(self):
16227db96d56Sopenharmony_ci        protocol = MyDatagramProto(create_future=True, loop=self.loop)
16237db96d56Sopenharmony_ci        self.loop.sock_connect = sock_connect = mock.Mock()
16247db96d56Sopenharmony_ci        sock_connect.return_value = []
16257db96d56Sopenharmony_ci
16267db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16277db96d56Sopenharmony_ci            lambda: protocol,
16287db96d56Sopenharmony_ci            remote_addr=('127.0.0.1', 0),
16297db96d56Sopenharmony_ci            allow_broadcast=True)
16307db96d56Sopenharmony_ci
16317db96d56Sopenharmony_ci        transport, _ = self.loop.run_until_complete(coro)
16327db96d56Sopenharmony_ci        self.assertFalse(sock_connect.called)
16337db96d56Sopenharmony_ci
16347db96d56Sopenharmony_ci        transport.close()
16357db96d56Sopenharmony_ci        self.loop.run_until_complete(protocol.done)
16367db96d56Sopenharmony_ci        self.assertEqual('CLOSED', protocol.state)
16377db96d56Sopenharmony_ci
16387db96d56Sopenharmony_ci    @patch_socket
16397db96d56Sopenharmony_ci    def test_create_datagram_endpoint_socket_err(self, m_socket):
16407db96d56Sopenharmony_ci        m_socket.getaddrinfo = socket.getaddrinfo
16417db96d56Sopenharmony_ci        m_socket.socket.side_effect = OSError
16427db96d56Sopenharmony_ci
16437db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16447db96d56Sopenharmony_ci            asyncio.DatagramProtocol, family=socket.AF_INET)
16457db96d56Sopenharmony_ci        self.assertRaises(
16467db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
16477db96d56Sopenharmony_ci
16487db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16497db96d56Sopenharmony_ci            asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0))
16507db96d56Sopenharmony_ci        self.assertRaises(
16517db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
16527db96d56Sopenharmony_ci
16537db96d56Sopenharmony_ci    @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled')
16547db96d56Sopenharmony_ci    def test_create_datagram_endpoint_no_matching_family(self):
16557db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16567db96d56Sopenharmony_ci            asyncio.DatagramProtocol,
16577db96d56Sopenharmony_ci            remote_addr=('127.0.0.1', 0), local_addr=('::1', 0))
16587db96d56Sopenharmony_ci        self.assertRaises(
16597db96d56Sopenharmony_ci            ValueError, self.loop.run_until_complete, coro)
16607db96d56Sopenharmony_ci
16617db96d56Sopenharmony_ci    @patch_socket
16627db96d56Sopenharmony_ci    def test_create_datagram_endpoint_setblk_err(self, m_socket):
16637db96d56Sopenharmony_ci        m_socket.socket.return_value.setblocking.side_effect = OSError
16647db96d56Sopenharmony_ci
16657db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16667db96d56Sopenharmony_ci            asyncio.DatagramProtocol, family=socket.AF_INET)
16677db96d56Sopenharmony_ci        self.assertRaises(
16687db96d56Sopenharmony_ci            OSError, self.loop.run_until_complete, coro)
16697db96d56Sopenharmony_ci        self.assertTrue(
16707db96d56Sopenharmony_ci            m_socket.socket.return_value.close.called)
16717db96d56Sopenharmony_ci
16727db96d56Sopenharmony_ci    def test_create_datagram_endpoint_noaddr_nofamily(self):
16737db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
16747db96d56Sopenharmony_ci            asyncio.DatagramProtocol)
16757db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
16767db96d56Sopenharmony_ci
16777db96d56Sopenharmony_ci    @patch_socket
16787db96d56Sopenharmony_ci    def test_create_datagram_endpoint_cant_bind(self, m_socket):
16797db96d56Sopenharmony_ci        class Err(OSError):
16807db96d56Sopenharmony_ci            pass
16817db96d56Sopenharmony_ci
16827db96d56Sopenharmony_ci        m_socket.getaddrinfo = socket.getaddrinfo
16837db96d56Sopenharmony_ci        m_sock = m_socket.socket.return_value = mock.Mock()
16847db96d56Sopenharmony_ci        m_sock.bind.side_effect = Err
16857db96d56Sopenharmony_ci
16867db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
16877db96d56Sopenharmony_ci            MyDatagramProto,
16887db96d56Sopenharmony_ci            local_addr=('127.0.0.1', 0), family=socket.AF_INET)
16897db96d56Sopenharmony_ci        self.assertRaises(Err, self.loop.run_until_complete, fut)
16907db96d56Sopenharmony_ci        self.assertTrue(m_sock.close.called)
16917db96d56Sopenharmony_ci
16927db96d56Sopenharmony_ci    def test_create_datagram_endpoint_sock(self):
16937db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
16947db96d56Sopenharmony_ci        sock.bind(('127.0.0.1', 0))
16957db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
16967db96d56Sopenharmony_ci            lambda: MyDatagramProto(create_future=True, loop=self.loop),
16977db96d56Sopenharmony_ci            sock=sock)
16987db96d56Sopenharmony_ci        transport, protocol = self.loop.run_until_complete(fut)
16997db96d56Sopenharmony_ci        transport.close()
17007db96d56Sopenharmony_ci        self.loop.run_until_complete(protocol.done)
17017db96d56Sopenharmony_ci        self.assertEqual('CLOSED', protocol.state)
17027db96d56Sopenharmony_ci
17037db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
17047db96d56Sopenharmony_ci    def test_create_datagram_endpoint_sock_unix(self):
17057db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17067db96d56Sopenharmony_ci            lambda: MyDatagramProto(create_future=True, loop=self.loop),
17077db96d56Sopenharmony_ci            family=socket.AF_UNIX)
17087db96d56Sopenharmony_ci        transport, protocol = self.loop.run_until_complete(fut)
17097db96d56Sopenharmony_ci        self.assertEqual(transport._sock.family, socket.AF_UNIX)
17107db96d56Sopenharmony_ci        transport.close()
17117db96d56Sopenharmony_ci        self.loop.run_until_complete(protocol.done)
17127db96d56Sopenharmony_ci        self.assertEqual('CLOSED', protocol.state)
17137db96d56Sopenharmony_ci
17147db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
17157db96d56Sopenharmony_ci    def test_create_datagram_endpoint_existing_sock_unix(self):
17167db96d56Sopenharmony_ci        with test_utils.unix_socket_path() as path:
17177db96d56Sopenharmony_ci            sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM)
17187db96d56Sopenharmony_ci            sock.bind(path)
17197db96d56Sopenharmony_ci            sock.close()
17207db96d56Sopenharmony_ci
17217db96d56Sopenharmony_ci            coro = self.loop.create_datagram_endpoint(
17227db96d56Sopenharmony_ci                lambda: MyDatagramProto(create_future=True, loop=self.loop),
17237db96d56Sopenharmony_ci                path, family=socket.AF_UNIX)
17247db96d56Sopenharmony_ci            transport, protocol = self.loop.run_until_complete(coro)
17257db96d56Sopenharmony_ci            transport.close()
17267db96d56Sopenharmony_ci            self.loop.run_until_complete(protocol.done)
17277db96d56Sopenharmony_ci
17287db96d56Sopenharmony_ci    def test_create_datagram_endpoint_sock_sockopts(self):
17297db96d56Sopenharmony_ci        class FakeSock:
17307db96d56Sopenharmony_ci            type = socket.SOCK_DGRAM
17317db96d56Sopenharmony_ci
17327db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17337db96d56Sopenharmony_ci            MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock())
17347db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
17357db96d56Sopenharmony_ci
17367db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17377db96d56Sopenharmony_ci            MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock())
17387db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
17397db96d56Sopenharmony_ci
17407db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17417db96d56Sopenharmony_ci            MyDatagramProto, family=1, sock=FakeSock())
17427db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
17437db96d56Sopenharmony_ci
17447db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17457db96d56Sopenharmony_ci            MyDatagramProto, proto=1, sock=FakeSock())
17467db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
17477db96d56Sopenharmony_ci
17487db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17497db96d56Sopenharmony_ci            MyDatagramProto, flags=1, sock=FakeSock())
17507db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
17517db96d56Sopenharmony_ci
17527db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17537db96d56Sopenharmony_ci            MyDatagramProto, reuse_port=True, sock=FakeSock())
17547db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
17557db96d56Sopenharmony_ci
17567db96d56Sopenharmony_ci        fut = self.loop.create_datagram_endpoint(
17577db96d56Sopenharmony_ci            MyDatagramProto, allow_broadcast=True, sock=FakeSock())
17587db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, fut)
17597db96d56Sopenharmony_ci
17607db96d56Sopenharmony_ci    @unittest.skipIf(sys.platform == 'vxworks',
17617db96d56Sopenharmony_ci                    "SO_BROADCAST is enabled by default on VxWorks")
17627db96d56Sopenharmony_ci    def test_create_datagram_endpoint_sockopts(self):
17637db96d56Sopenharmony_ci        # Socket options should not be applied unless asked for.
17647db96d56Sopenharmony_ci        # SO_REUSEPORT is not available on all platforms.
17657db96d56Sopenharmony_ci
17667db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
17677db96d56Sopenharmony_ci            lambda: MyDatagramProto(create_future=True, loop=self.loop),
17687db96d56Sopenharmony_ci            local_addr=('127.0.0.1', 0))
17697db96d56Sopenharmony_ci        transport, protocol = self.loop.run_until_complete(coro)
17707db96d56Sopenharmony_ci        sock = transport.get_extra_info('socket')
17717db96d56Sopenharmony_ci
17727db96d56Sopenharmony_ci        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
17737db96d56Sopenharmony_ci
17747db96d56Sopenharmony_ci        if reuseport_supported:
17757db96d56Sopenharmony_ci            self.assertFalse(
17767db96d56Sopenharmony_ci                sock.getsockopt(
17777db96d56Sopenharmony_ci                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
17787db96d56Sopenharmony_ci        self.assertFalse(
17797db96d56Sopenharmony_ci            sock.getsockopt(
17807db96d56Sopenharmony_ci                socket.SOL_SOCKET, socket.SO_BROADCAST))
17817db96d56Sopenharmony_ci
17827db96d56Sopenharmony_ci        transport.close()
17837db96d56Sopenharmony_ci        self.loop.run_until_complete(protocol.done)
17847db96d56Sopenharmony_ci        self.assertEqual('CLOSED', protocol.state)
17857db96d56Sopenharmony_ci
17867db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
17877db96d56Sopenharmony_ci            lambda: MyDatagramProto(create_future=True, loop=self.loop),
17887db96d56Sopenharmony_ci            local_addr=('127.0.0.1', 0),
17897db96d56Sopenharmony_ci            reuse_port=reuseport_supported,
17907db96d56Sopenharmony_ci            allow_broadcast=True)
17917db96d56Sopenharmony_ci        transport, protocol = self.loop.run_until_complete(coro)
17927db96d56Sopenharmony_ci        sock = transport.get_extra_info('socket')
17937db96d56Sopenharmony_ci
17947db96d56Sopenharmony_ci        self.assertFalse(
17957db96d56Sopenharmony_ci            sock.getsockopt(
17967db96d56Sopenharmony_ci                socket.SOL_SOCKET, socket.SO_REUSEADDR))
17977db96d56Sopenharmony_ci        if reuseport_supported:
17987db96d56Sopenharmony_ci            self.assertTrue(
17997db96d56Sopenharmony_ci                sock.getsockopt(
18007db96d56Sopenharmony_ci                    socket.SOL_SOCKET, socket.SO_REUSEPORT))
18017db96d56Sopenharmony_ci        self.assertTrue(
18027db96d56Sopenharmony_ci            sock.getsockopt(
18037db96d56Sopenharmony_ci                socket.SOL_SOCKET, socket.SO_BROADCAST))
18047db96d56Sopenharmony_ci
18057db96d56Sopenharmony_ci        transport.close()
18067db96d56Sopenharmony_ci        self.loop.run_until_complete(protocol.done)
18077db96d56Sopenharmony_ci        self.assertEqual('CLOSED', protocol.state)
18087db96d56Sopenharmony_ci
18097db96d56Sopenharmony_ci    @patch_socket
18107db96d56Sopenharmony_ci    def test_create_datagram_endpoint_nosoreuseport(self, m_socket):
18117db96d56Sopenharmony_ci        del m_socket.SO_REUSEPORT
18127db96d56Sopenharmony_ci        m_socket.socket.return_value = mock.Mock()
18137db96d56Sopenharmony_ci
18147db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
18157db96d56Sopenharmony_ci            lambda: MyDatagramProto(loop=self.loop),
18167db96d56Sopenharmony_ci            local_addr=('127.0.0.1', 0),
18177db96d56Sopenharmony_ci            reuse_port=True)
18187db96d56Sopenharmony_ci
18197db96d56Sopenharmony_ci        self.assertRaises(ValueError, self.loop.run_until_complete, coro)
18207db96d56Sopenharmony_ci
18217db96d56Sopenharmony_ci    @patch_socket
18227db96d56Sopenharmony_ci    def test_create_datagram_endpoint_ip_addr(self, m_socket):
18237db96d56Sopenharmony_ci        def getaddrinfo(*args, **kw):
18247db96d56Sopenharmony_ci            self.fail('should not have called getaddrinfo')
18257db96d56Sopenharmony_ci
18267db96d56Sopenharmony_ci        m_socket.getaddrinfo = getaddrinfo
18277db96d56Sopenharmony_ci        m_socket.socket.return_value.bind = bind = mock.Mock()
18287db96d56Sopenharmony_ci        self.loop._add_reader = mock.Mock()
18297db96d56Sopenharmony_ci
18307db96d56Sopenharmony_ci        reuseport_supported = hasattr(socket, 'SO_REUSEPORT')
18317db96d56Sopenharmony_ci        coro = self.loop.create_datagram_endpoint(
18327db96d56Sopenharmony_ci            lambda: MyDatagramProto(loop=self.loop),
18337db96d56Sopenharmony_ci            local_addr=('1.2.3.4', 0),
18347db96d56Sopenharmony_ci            reuse_port=reuseport_supported)
18357db96d56Sopenharmony_ci
18367db96d56Sopenharmony_ci        t, p = self.loop.run_until_complete(coro)
18377db96d56Sopenharmony_ci        try:
18387db96d56Sopenharmony_ci            bind.assert_called_with(('1.2.3.4', 0))
18397db96d56Sopenharmony_ci            m_socket.socket.assert_called_with(family=m_socket.AF_INET,
18407db96d56Sopenharmony_ci                                               proto=m_socket.IPPROTO_UDP,
18417db96d56Sopenharmony_ci                                               type=m_socket.SOCK_DGRAM)
18427db96d56Sopenharmony_ci        finally:
18437db96d56Sopenharmony_ci            t.close()
18447db96d56Sopenharmony_ci            test_utils.run_briefly(self.loop)  # allow transport to close
18457db96d56Sopenharmony_ci
18467db96d56Sopenharmony_ci    def test_accept_connection_retry(self):
18477db96d56Sopenharmony_ci        sock = mock.Mock()
18487db96d56Sopenharmony_ci        sock.accept.side_effect = BlockingIOError()
18497db96d56Sopenharmony_ci
18507db96d56Sopenharmony_ci        self.loop._accept_connection(MyProto, sock)
18517db96d56Sopenharmony_ci        self.assertFalse(sock.close.called)
18527db96d56Sopenharmony_ci
18537db96d56Sopenharmony_ci    @mock.patch('asyncio.base_events.logger')
18547db96d56Sopenharmony_ci    def test_accept_connection_exception(self, m_log):
18557db96d56Sopenharmony_ci        sock = mock.Mock()
18567db96d56Sopenharmony_ci        sock.fileno.return_value = 10
18577db96d56Sopenharmony_ci        sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files')
18587db96d56Sopenharmony_ci        self.loop._remove_reader = mock.Mock()
18597db96d56Sopenharmony_ci        self.loop.call_later = mock.Mock()
18607db96d56Sopenharmony_ci
18617db96d56Sopenharmony_ci        self.loop._accept_connection(MyProto, sock)
18627db96d56Sopenharmony_ci        self.assertTrue(m_log.error.called)
18637db96d56Sopenharmony_ci        self.assertFalse(sock.close.called)
18647db96d56Sopenharmony_ci        self.loop._remove_reader.assert_called_with(10)
18657db96d56Sopenharmony_ci        self.loop.call_later.assert_called_with(
18667db96d56Sopenharmony_ci            constants.ACCEPT_RETRY_DELAY,
18677db96d56Sopenharmony_ci            # self.loop._start_serving
18687db96d56Sopenharmony_ci            mock.ANY,
18697db96d56Sopenharmony_ci            MyProto, sock, None, None, mock.ANY, mock.ANY, mock.ANY)
18707db96d56Sopenharmony_ci
18717db96d56Sopenharmony_ci    def test_call_coroutine(self):
18727db96d56Sopenharmony_ci        async def simple_coroutine():
18737db96d56Sopenharmony_ci            pass
18747db96d56Sopenharmony_ci
18757db96d56Sopenharmony_ci        self.loop.set_debug(True)
18767db96d56Sopenharmony_ci        coro_func = simple_coroutine
18777db96d56Sopenharmony_ci        coro_obj = coro_func()
18787db96d56Sopenharmony_ci        self.addCleanup(coro_obj.close)
18797db96d56Sopenharmony_ci        for func in (coro_func, coro_obj):
18807db96d56Sopenharmony_ci            with self.assertRaises(TypeError):
18817db96d56Sopenharmony_ci                self.loop.call_soon(func)
18827db96d56Sopenharmony_ci            with self.assertRaises(TypeError):
18837db96d56Sopenharmony_ci                self.loop.call_soon_threadsafe(func)
18847db96d56Sopenharmony_ci            with self.assertRaises(TypeError):
18857db96d56Sopenharmony_ci                self.loop.call_later(60, func)
18867db96d56Sopenharmony_ci            with self.assertRaises(TypeError):
18877db96d56Sopenharmony_ci                self.loop.call_at(self.loop.time() + 60, func)
18887db96d56Sopenharmony_ci            with self.assertRaises(TypeError):
18897db96d56Sopenharmony_ci                self.loop.run_until_complete(
18907db96d56Sopenharmony_ci                    self.loop.run_in_executor(None, func))
18917db96d56Sopenharmony_ci
18927db96d56Sopenharmony_ci    @mock.patch('asyncio.base_events.logger')
18937db96d56Sopenharmony_ci    def test_log_slow_callbacks(self, m_logger):
18947db96d56Sopenharmony_ci        def stop_loop_cb(loop):
18957db96d56Sopenharmony_ci            loop.stop()
18967db96d56Sopenharmony_ci
18977db96d56Sopenharmony_ci        async def stop_loop_coro(loop):
18987db96d56Sopenharmony_ci            loop.stop()
18997db96d56Sopenharmony_ci
19007db96d56Sopenharmony_ci        asyncio.set_event_loop(self.loop)
19017db96d56Sopenharmony_ci        self.loop.set_debug(True)
19027db96d56Sopenharmony_ci        self.loop.slow_callback_duration = 0.0
19037db96d56Sopenharmony_ci
19047db96d56Sopenharmony_ci        # slow callback
19057db96d56Sopenharmony_ci        self.loop.call_soon(stop_loop_cb, self.loop)
19067db96d56Sopenharmony_ci        self.loop.run_forever()
19077db96d56Sopenharmony_ci        fmt, *args = m_logger.warning.call_args[0]
19087db96d56Sopenharmony_ci        self.assertRegex(fmt % tuple(args),
19097db96d56Sopenharmony_ci                         "^Executing <Handle.*stop_loop_cb.*> "
19107db96d56Sopenharmony_ci                         "took .* seconds$")
19117db96d56Sopenharmony_ci
19127db96d56Sopenharmony_ci        # slow task
19137db96d56Sopenharmony_ci        asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop)
19147db96d56Sopenharmony_ci        self.loop.run_forever()
19157db96d56Sopenharmony_ci        fmt, *args = m_logger.warning.call_args[0]
19167db96d56Sopenharmony_ci        self.assertRegex(fmt % tuple(args),
19177db96d56Sopenharmony_ci                         "^Executing <Task.*stop_loop_coro.*> "
19187db96d56Sopenharmony_ci                         "took .* seconds$")
19197db96d56Sopenharmony_ci
19207db96d56Sopenharmony_ci
19217db96d56Sopenharmony_ciclass RunningLoopTests(unittest.TestCase):
19227db96d56Sopenharmony_ci
19237db96d56Sopenharmony_ci    def test_running_loop_within_a_loop(self):
19247db96d56Sopenharmony_ci        async def runner(loop):
19257db96d56Sopenharmony_ci            loop.run_forever()
19267db96d56Sopenharmony_ci
19277db96d56Sopenharmony_ci        loop = asyncio.new_event_loop()
19287db96d56Sopenharmony_ci        outer_loop = asyncio.new_event_loop()
19297db96d56Sopenharmony_ci        try:
19307db96d56Sopenharmony_ci            with self.assertRaisesRegex(RuntimeError,
19317db96d56Sopenharmony_ci                                        'while another loop is running'):
19327db96d56Sopenharmony_ci                outer_loop.run_until_complete(runner(loop))
19337db96d56Sopenharmony_ci        finally:
19347db96d56Sopenharmony_ci            loop.close()
19357db96d56Sopenharmony_ci            outer_loop.close()
19367db96d56Sopenharmony_ci
19377db96d56Sopenharmony_ci
19387db96d56Sopenharmony_ciclass BaseLoopSockSendfileTests(test_utils.TestCase):
19397db96d56Sopenharmony_ci
19407db96d56Sopenharmony_ci    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
19417db96d56Sopenharmony_ci
19427db96d56Sopenharmony_ci    class MyProto(asyncio.Protocol):
19437db96d56Sopenharmony_ci
19447db96d56Sopenharmony_ci        def __init__(self, loop):
19457db96d56Sopenharmony_ci            self.started = False
19467db96d56Sopenharmony_ci            self.closed = False
19477db96d56Sopenharmony_ci            self.data = bytearray()
19487db96d56Sopenharmony_ci            self.fut = loop.create_future()
19497db96d56Sopenharmony_ci            self.transport = None
19507db96d56Sopenharmony_ci
19517db96d56Sopenharmony_ci        def connection_made(self, transport):
19527db96d56Sopenharmony_ci            self.started = True
19537db96d56Sopenharmony_ci            self.transport = transport
19547db96d56Sopenharmony_ci
19557db96d56Sopenharmony_ci        def data_received(self, data):
19567db96d56Sopenharmony_ci            self.data.extend(data)
19577db96d56Sopenharmony_ci
19587db96d56Sopenharmony_ci        def connection_lost(self, exc):
19597db96d56Sopenharmony_ci            self.closed = True
19607db96d56Sopenharmony_ci            self.fut.set_result(None)
19617db96d56Sopenharmony_ci            self.transport = None
19627db96d56Sopenharmony_ci
19637db96d56Sopenharmony_ci        async def wait_closed(self):
19647db96d56Sopenharmony_ci            await self.fut
19657db96d56Sopenharmony_ci
19667db96d56Sopenharmony_ci    @classmethod
19677db96d56Sopenharmony_ci    def setUpClass(cls):
19687db96d56Sopenharmony_ci        cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE
19697db96d56Sopenharmony_ci        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16
19707db96d56Sopenharmony_ci        with open(os_helper.TESTFN, 'wb') as fp:
19717db96d56Sopenharmony_ci            fp.write(cls.DATA)
19727db96d56Sopenharmony_ci        super().setUpClass()
19737db96d56Sopenharmony_ci
19747db96d56Sopenharmony_ci    @classmethod
19757db96d56Sopenharmony_ci    def tearDownClass(cls):
19767db96d56Sopenharmony_ci        constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize
19777db96d56Sopenharmony_ci        os_helper.unlink(os_helper.TESTFN)
19787db96d56Sopenharmony_ci        super().tearDownClass()
19797db96d56Sopenharmony_ci
19807db96d56Sopenharmony_ci    def setUp(self):
19817db96d56Sopenharmony_ci        from asyncio.selector_events import BaseSelectorEventLoop
19827db96d56Sopenharmony_ci        # BaseSelectorEventLoop() has no native implementation
19837db96d56Sopenharmony_ci        self.loop = BaseSelectorEventLoop()
19847db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
19857db96d56Sopenharmony_ci        self.file = open(os_helper.TESTFN, 'rb')
19867db96d56Sopenharmony_ci        self.addCleanup(self.file.close)
19877db96d56Sopenharmony_ci        super().setUp()
19887db96d56Sopenharmony_ci
19897db96d56Sopenharmony_ci    def make_socket(self, blocking=False):
19907db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
19917db96d56Sopenharmony_ci        sock.setblocking(blocking)
19927db96d56Sopenharmony_ci        self.addCleanup(sock.close)
19937db96d56Sopenharmony_ci        return sock
19947db96d56Sopenharmony_ci
19957db96d56Sopenharmony_ci    def run_loop(self, coro):
19967db96d56Sopenharmony_ci        return self.loop.run_until_complete(coro)
19977db96d56Sopenharmony_ci
19987db96d56Sopenharmony_ci    def prepare(self):
19997db96d56Sopenharmony_ci        sock = self.make_socket()
20007db96d56Sopenharmony_ci        proto = self.MyProto(self.loop)
20017db96d56Sopenharmony_ci        server = self.run_loop(self.loop.create_server(
20027db96d56Sopenharmony_ci            lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET))
20037db96d56Sopenharmony_ci        addr = server.sockets[0].getsockname()
20047db96d56Sopenharmony_ci
20057db96d56Sopenharmony_ci        for _ in range(10):
20067db96d56Sopenharmony_ci            try:
20077db96d56Sopenharmony_ci                self.run_loop(self.loop.sock_connect(sock, addr))
20087db96d56Sopenharmony_ci            except OSError:
20097db96d56Sopenharmony_ci                self.run_loop(asyncio.sleep(0.5))
20107db96d56Sopenharmony_ci                continue
20117db96d56Sopenharmony_ci            else:
20127db96d56Sopenharmony_ci                break
20137db96d56Sopenharmony_ci        else:
20147db96d56Sopenharmony_ci            # One last try, so we get the exception
20157db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_connect(sock, addr))
20167db96d56Sopenharmony_ci
20177db96d56Sopenharmony_ci        def cleanup():
20187db96d56Sopenharmony_ci            server.close()
20197db96d56Sopenharmony_ci            self.run_loop(server.wait_closed())
20207db96d56Sopenharmony_ci            sock.close()
20217db96d56Sopenharmony_ci            if proto.transport is not None:
20227db96d56Sopenharmony_ci                proto.transport.close()
20237db96d56Sopenharmony_ci                self.run_loop(proto.wait_closed())
20247db96d56Sopenharmony_ci
20257db96d56Sopenharmony_ci        self.addCleanup(cleanup)
20267db96d56Sopenharmony_ci
20277db96d56Sopenharmony_ci        return sock, proto
20287db96d56Sopenharmony_ci
20297db96d56Sopenharmony_ci    def test__sock_sendfile_native_failure(self):
20307db96d56Sopenharmony_ci        sock, proto = self.prepare()
20317db96d56Sopenharmony_ci
20327db96d56Sopenharmony_ci        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
20337db96d56Sopenharmony_ci                                    "sendfile is not available"):
20347db96d56Sopenharmony_ci            self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
20357db96d56Sopenharmony_ci                                                          0, None))
20367db96d56Sopenharmony_ci
20377db96d56Sopenharmony_ci        self.assertEqual(proto.data, b'')
20387db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), 0)
20397db96d56Sopenharmony_ci
20407db96d56Sopenharmony_ci    def test_sock_sendfile_no_fallback(self):
20417db96d56Sopenharmony_ci        sock, proto = self.prepare()
20427db96d56Sopenharmony_ci
20437db96d56Sopenharmony_ci        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
20447db96d56Sopenharmony_ci                                    "sendfile is not available"):
20457db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_sendfile(sock, self.file,
20467db96d56Sopenharmony_ci                                                  fallback=False))
20477db96d56Sopenharmony_ci
20487db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), 0)
20497db96d56Sopenharmony_ci        self.assertEqual(proto.data, b'')
20507db96d56Sopenharmony_ci
20517db96d56Sopenharmony_ci    def test_sock_sendfile_fallback(self):
20527db96d56Sopenharmony_ci        sock, proto = self.prepare()
20537db96d56Sopenharmony_ci
20547db96d56Sopenharmony_ci        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file))
20557db96d56Sopenharmony_ci        sock.close()
20567db96d56Sopenharmony_ci        self.run_loop(proto.wait_closed())
20577db96d56Sopenharmony_ci
20587db96d56Sopenharmony_ci        self.assertEqual(ret, len(self.DATA))
20597db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), len(self.DATA))
20607db96d56Sopenharmony_ci        self.assertEqual(proto.data, self.DATA)
20617db96d56Sopenharmony_ci
20627db96d56Sopenharmony_ci    def test_sock_sendfile_fallback_offset_and_count(self):
20637db96d56Sopenharmony_ci        sock, proto = self.prepare()
20647db96d56Sopenharmony_ci
20657db96d56Sopenharmony_ci        ret = self.run_loop(self.loop.sock_sendfile(sock, self.file,
20667db96d56Sopenharmony_ci                                                    1000, 2000))
20677db96d56Sopenharmony_ci        sock.close()
20687db96d56Sopenharmony_ci        self.run_loop(proto.wait_closed())
20697db96d56Sopenharmony_ci
20707db96d56Sopenharmony_ci        self.assertEqual(ret, 2000)
20717db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), 3000)
20727db96d56Sopenharmony_ci        self.assertEqual(proto.data, self.DATA[1000:3000])
20737db96d56Sopenharmony_ci
20747db96d56Sopenharmony_ci    def test_blocking_socket(self):
20757db96d56Sopenharmony_ci        self.loop.set_debug(True)
20767db96d56Sopenharmony_ci        sock = self.make_socket(blocking=True)
20777db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError, "must be non-blocking"):
20787db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_sendfile(sock, self.file))
20797db96d56Sopenharmony_ci
20807db96d56Sopenharmony_ci    def test_nonbinary_file(self):
20817db96d56Sopenharmony_ci        sock = self.make_socket()
20827db96d56Sopenharmony_ci        with open(os_helper.TESTFN, encoding="utf-8") as f:
20837db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError, "binary mode"):
20847db96d56Sopenharmony_ci                self.run_loop(self.loop.sock_sendfile(sock, f))
20857db96d56Sopenharmony_ci
20867db96d56Sopenharmony_ci    def test_nonstream_socket(self):
20877db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
20887db96d56Sopenharmony_ci        sock.setblocking(False)
20897db96d56Sopenharmony_ci        self.addCleanup(sock.close)
20907db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"):
20917db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_sendfile(sock, self.file))
20927db96d56Sopenharmony_ci
20937db96d56Sopenharmony_ci    def test_notint_count(self):
20947db96d56Sopenharmony_ci        sock = self.make_socket()
20957db96d56Sopenharmony_ci        with self.assertRaisesRegex(TypeError,
20967db96d56Sopenharmony_ci                                    "count must be a positive integer"):
20977db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count'))
20987db96d56Sopenharmony_ci
20997db96d56Sopenharmony_ci    def test_negative_count(self):
21007db96d56Sopenharmony_ci        sock = self.make_socket()
21017db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError,
21027db96d56Sopenharmony_ci                                    "count must be a positive integer"):
21037db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1))
21047db96d56Sopenharmony_ci
21057db96d56Sopenharmony_ci    def test_notint_offset(self):
21067db96d56Sopenharmony_ci        sock = self.make_socket()
21077db96d56Sopenharmony_ci        with self.assertRaisesRegex(TypeError,
21087db96d56Sopenharmony_ci                                    "offset must be a non-negative integer"):
21097db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset'))
21107db96d56Sopenharmony_ci
21117db96d56Sopenharmony_ci    def test_negative_offset(self):
21127db96d56Sopenharmony_ci        sock = self.make_socket()
21137db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError,
21147db96d56Sopenharmony_ci                                    "offset must be a non-negative integer"):
21157db96d56Sopenharmony_ci            self.run_loop(self.loop.sock_sendfile(sock, self.file, -1))
21167db96d56Sopenharmony_ci
21177db96d56Sopenharmony_ci
21187db96d56Sopenharmony_ciclass TestSelectorUtils(test_utils.TestCase):
21197db96d56Sopenharmony_ci    def check_set_nodelay(self, sock):
21207db96d56Sopenharmony_ci        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
21217db96d56Sopenharmony_ci        self.assertFalse(opt)
21227db96d56Sopenharmony_ci
21237db96d56Sopenharmony_ci        base_events._set_nodelay(sock)
21247db96d56Sopenharmony_ci
21257db96d56Sopenharmony_ci        opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY)
21267db96d56Sopenharmony_ci        self.assertTrue(opt)
21277db96d56Sopenharmony_ci
21287db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'),
21297db96d56Sopenharmony_ci                         'need socket.TCP_NODELAY')
21307db96d56Sopenharmony_ci    def test_set_nodelay(self):
21317db96d56Sopenharmony_ci        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
21327db96d56Sopenharmony_ci                             proto=socket.IPPROTO_TCP)
21337db96d56Sopenharmony_ci        with sock:
21347db96d56Sopenharmony_ci            self.check_set_nodelay(sock)
21357db96d56Sopenharmony_ci
21367db96d56Sopenharmony_ci        sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM,
21377db96d56Sopenharmony_ci                             proto=socket.IPPROTO_TCP)
21387db96d56Sopenharmony_ci        with sock:
21397db96d56Sopenharmony_ci            sock.setblocking(False)
21407db96d56Sopenharmony_ci            self.check_set_nodelay(sock)
21417db96d56Sopenharmony_ci
21427db96d56Sopenharmony_ci
21437db96d56Sopenharmony_ci
21447db96d56Sopenharmony_ciif __name__ == '__main__':
21457db96d56Sopenharmony_ci    unittest.main()
2146