17db96d56Sopenharmony_ci"""Tests for base_events.py""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport concurrent.futures 47db96d56Sopenharmony_ciimport errno 57db96d56Sopenharmony_ciimport math 67db96d56Sopenharmony_ciimport socket 77db96d56Sopenharmony_ciimport sys 87db96d56Sopenharmony_ciimport threading 97db96d56Sopenharmony_ciimport time 107db96d56Sopenharmony_ciimport unittest 117db96d56Sopenharmony_cifrom unittest import mock 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_ciimport asyncio 147db96d56Sopenharmony_cifrom asyncio import base_events 157db96d56Sopenharmony_cifrom asyncio import constants 167db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils 177db96d56Sopenharmony_cifrom test import support 187db96d56Sopenharmony_cifrom test.support.script_helper import assert_python_ok 197db96d56Sopenharmony_cifrom test.support import os_helper 207db96d56Sopenharmony_cifrom test.support import socket_helper 217db96d56Sopenharmony_ciimport warnings 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ciMOCK_ANY = mock.ANY 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_cidef tearDownModule(): 277db96d56Sopenharmony_ci asyncio.set_event_loop_policy(None) 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_ci 307db96d56Sopenharmony_cidef mock_socket_module(): 317db96d56Sopenharmony_ci m_socket = mock.MagicMock(spec=socket) 327db96d56Sopenharmony_ci for name in ( 337db96d56Sopenharmony_ci 'AF_INET', 'AF_INET6', 'AF_UNSPEC', 'IPPROTO_TCP', 'IPPROTO_UDP', 347db96d56Sopenharmony_ci 'SOCK_STREAM', 'SOCK_DGRAM', 'SOL_SOCKET', 'SO_REUSEADDR', 'inet_pton' 357db96d56Sopenharmony_ci ): 367db96d56Sopenharmony_ci if hasattr(socket, name): 377db96d56Sopenharmony_ci setattr(m_socket, name, getattr(socket, name)) 387db96d56Sopenharmony_ci else: 397db96d56Sopenharmony_ci delattr(m_socket, name) 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ci m_socket.socket = mock.MagicMock() 427db96d56Sopenharmony_ci m_socket.socket.return_value = test_utils.mock_nonblocking_socket() 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_ci return m_socket 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_cidef patch_socket(f): 487db96d56Sopenharmony_ci return mock.patch('asyncio.base_events.socket', 497db96d56Sopenharmony_ci new_callable=mock_socket_module)(f) 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ciclass BaseEventTests(test_utils.TestCase): 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci def test_ipaddr_info(self): 557db96d56Sopenharmony_ci UNSPEC = socket.AF_UNSPEC 567db96d56Sopenharmony_ci INET = socket.AF_INET 577db96d56Sopenharmony_ci INET6 = socket.AF_INET6 587db96d56Sopenharmony_ci STREAM = socket.SOCK_STREAM 597db96d56Sopenharmony_ci DGRAM = socket.SOCK_DGRAM 607db96d56Sopenharmony_ci TCP = socket.IPPROTO_TCP 617db96d56Sopenharmony_ci UDP = socket.IPPROTO_UDP 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci self.assertEqual( 647db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 657db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', 1, INET, STREAM, TCP)) 667db96d56Sopenharmony_ci 677db96d56Sopenharmony_ci self.assertEqual( 687db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 697db96d56Sopenharmony_ci base_events._ipaddr_info(b'1.2.3.4', 1, INET, STREAM, TCP)) 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci self.assertEqual( 727db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 737db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, TCP)) 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci self.assertEqual( 767db96d56Sopenharmony_ci (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 777db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, UDP)) 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci # Socket type STREAM implies TCP protocol. 807db96d56Sopenharmony_ci self.assertEqual( 817db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 827db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, STREAM, 0)) 837db96d56Sopenharmony_ci 847db96d56Sopenharmony_ci # Socket type DGRAM implies UDP protocol. 857db96d56Sopenharmony_ci self.assertEqual( 867db96d56Sopenharmony_ci (INET, DGRAM, UDP, '', ('1.2.3.4', 1)), 877db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, DGRAM, 0)) 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci # No socket type. 907db96d56Sopenharmony_ci self.assertIsNone( 917db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', 1, UNSPEC, 0, 0)) 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci if socket_helper.IPV6_ENABLED: 947db96d56Sopenharmony_ci # IPv4 address with family IPv6. 957db96d56Sopenharmony_ci self.assertIsNone( 967db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', 1, INET6, STREAM, TCP)) 977db96d56Sopenharmony_ci 987db96d56Sopenharmony_ci self.assertEqual( 997db96d56Sopenharmony_ci (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 1007db96d56Sopenharmony_ci base_events._ipaddr_info('::3', 1, INET6, STREAM, TCP)) 1017db96d56Sopenharmony_ci 1027db96d56Sopenharmony_ci self.assertEqual( 1037db96d56Sopenharmony_ci (INET6, STREAM, TCP, '', ('::3', 1, 0, 0)), 1047db96d56Sopenharmony_ci base_events._ipaddr_info('::3', 1, UNSPEC, STREAM, TCP)) 1057db96d56Sopenharmony_ci 1067db96d56Sopenharmony_ci # IPv6 address with family IPv4. 1077db96d56Sopenharmony_ci self.assertIsNone( 1087db96d56Sopenharmony_ci base_events._ipaddr_info('::3', 1, INET, STREAM, TCP)) 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci # IPv6 address with zone index. 1117db96d56Sopenharmony_ci self.assertIsNone( 1127db96d56Sopenharmony_ci base_events._ipaddr_info('::3%lo0', 1, INET6, STREAM, TCP)) 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci def test_port_parameter_types(self): 1157db96d56Sopenharmony_ci # Test obscure kinds of arguments for "port". 1167db96d56Sopenharmony_ci INET = socket.AF_INET 1177db96d56Sopenharmony_ci STREAM = socket.SOCK_STREAM 1187db96d56Sopenharmony_ci TCP = socket.IPPROTO_TCP 1197db96d56Sopenharmony_ci 1207db96d56Sopenharmony_ci self.assertEqual( 1217db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 1227db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', None, INET, STREAM, TCP)) 1237db96d56Sopenharmony_ci 1247db96d56Sopenharmony_ci self.assertEqual( 1257db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 1267db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', b'', INET, STREAM, TCP)) 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci self.assertEqual( 1297db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 0)), 1307db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', '', INET, STREAM, TCP)) 1317db96d56Sopenharmony_ci 1327db96d56Sopenharmony_ci self.assertEqual( 1337db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 1347db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', '1', INET, STREAM, TCP)) 1357db96d56Sopenharmony_ci 1367db96d56Sopenharmony_ci self.assertEqual( 1377db96d56Sopenharmony_ci (INET, STREAM, TCP, '', ('1.2.3.4', 1)), 1387db96d56Sopenharmony_ci base_events._ipaddr_info('1.2.3.4', b'1', INET, STREAM, TCP)) 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci @patch_socket 1417db96d56Sopenharmony_ci def test_ipaddr_info_no_inet_pton(self, m_socket): 1427db96d56Sopenharmony_ci del m_socket.inet_pton 1437db96d56Sopenharmony_ci self.assertIsNone(base_events._ipaddr_info('1.2.3.4', 1, 1447db96d56Sopenharmony_ci socket.AF_INET, 1457db96d56Sopenharmony_ci socket.SOCK_STREAM, 1467db96d56Sopenharmony_ci socket.IPPROTO_TCP)) 1477db96d56Sopenharmony_ci 1487db96d56Sopenharmony_ci 1497db96d56Sopenharmony_ciclass BaseEventLoopTests(test_utils.TestCase): 1507db96d56Sopenharmony_ci 1517db96d56Sopenharmony_ci def setUp(self): 1527db96d56Sopenharmony_ci super().setUp() 1537db96d56Sopenharmony_ci self.loop = base_events.BaseEventLoop() 1547db96d56Sopenharmony_ci self.loop._selector = mock.Mock() 1557db96d56Sopenharmony_ci self.loop._selector.select.return_value = () 1567db96d56Sopenharmony_ci self.set_event_loop(self.loop) 1577db96d56Sopenharmony_ci 1587db96d56Sopenharmony_ci def test_not_implemented(self): 1597db96d56Sopenharmony_ci m = mock.Mock() 1607db96d56Sopenharmony_ci self.assertRaises( 1617db96d56Sopenharmony_ci NotImplementedError, 1627db96d56Sopenharmony_ci self.loop._make_socket_transport, m, m) 1637db96d56Sopenharmony_ci self.assertRaises( 1647db96d56Sopenharmony_ci NotImplementedError, 1657db96d56Sopenharmony_ci self.loop._make_ssl_transport, m, m, m, m) 1667db96d56Sopenharmony_ci self.assertRaises( 1677db96d56Sopenharmony_ci NotImplementedError, 1687db96d56Sopenharmony_ci self.loop._make_datagram_transport, m, m) 1697db96d56Sopenharmony_ci self.assertRaises( 1707db96d56Sopenharmony_ci NotImplementedError, self.loop._process_events, []) 1717db96d56Sopenharmony_ci self.assertRaises( 1727db96d56Sopenharmony_ci NotImplementedError, self.loop._write_to_self) 1737db96d56Sopenharmony_ci self.assertRaises( 1747db96d56Sopenharmony_ci NotImplementedError, 1757db96d56Sopenharmony_ci self.loop._make_read_pipe_transport, m, m) 1767db96d56Sopenharmony_ci self.assertRaises( 1777db96d56Sopenharmony_ci NotImplementedError, 1787db96d56Sopenharmony_ci self.loop._make_write_pipe_transport, m, m) 1797db96d56Sopenharmony_ci gen = self.loop._make_subprocess_transport(m, m, m, m, m, m, m) 1807db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 1817db96d56Sopenharmony_ci gen.send(None) 1827db96d56Sopenharmony_ci 1837db96d56Sopenharmony_ci def test_close(self): 1847db96d56Sopenharmony_ci self.assertFalse(self.loop.is_closed()) 1857db96d56Sopenharmony_ci self.loop.close() 1867db96d56Sopenharmony_ci self.assertTrue(self.loop.is_closed()) 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci # it should be possible to call close() more than once 1897db96d56Sopenharmony_ci self.loop.close() 1907db96d56Sopenharmony_ci self.loop.close() 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci # operation blocked when the loop is closed 1937db96d56Sopenharmony_ci f = self.loop.create_future() 1947db96d56Sopenharmony_ci self.assertRaises(RuntimeError, self.loop.run_forever) 1957db96d56Sopenharmony_ci self.assertRaises(RuntimeError, self.loop.run_until_complete, f) 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci def test__add_callback_handle(self): 1987db96d56Sopenharmony_ci h = asyncio.Handle(lambda: False, (), self.loop, None) 1997db96d56Sopenharmony_ci 2007db96d56Sopenharmony_ci self.loop._add_callback(h) 2017db96d56Sopenharmony_ci self.assertFalse(self.loop._scheduled) 2027db96d56Sopenharmony_ci self.assertIn(h, self.loop._ready) 2037db96d56Sopenharmony_ci 2047db96d56Sopenharmony_ci def test__add_callback_cancelled_handle(self): 2057db96d56Sopenharmony_ci h = asyncio.Handle(lambda: False, (), self.loop, None) 2067db96d56Sopenharmony_ci h.cancel() 2077db96d56Sopenharmony_ci 2087db96d56Sopenharmony_ci self.loop._add_callback(h) 2097db96d56Sopenharmony_ci self.assertFalse(self.loop._scheduled) 2107db96d56Sopenharmony_ci self.assertFalse(self.loop._ready) 2117db96d56Sopenharmony_ci 2127db96d56Sopenharmony_ci def test_set_default_executor(self): 2137db96d56Sopenharmony_ci class DummyExecutor(concurrent.futures.ThreadPoolExecutor): 2147db96d56Sopenharmony_ci def submit(self, fn, *args, **kwargs): 2157db96d56Sopenharmony_ci raise NotImplementedError( 2167db96d56Sopenharmony_ci 'cannot submit into a dummy executor') 2177db96d56Sopenharmony_ci 2187db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 2197db96d56Sopenharmony_ci self.loop._write_to_self = mock.Mock() 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci executor = DummyExecutor() 2227db96d56Sopenharmony_ci self.loop.set_default_executor(executor) 2237db96d56Sopenharmony_ci self.assertIs(executor, self.loop._default_executor) 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci def test_set_default_executor_error(self): 2267db96d56Sopenharmony_ci executor = mock.Mock() 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci msg = 'executor must be ThreadPoolExecutor instance' 2297db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, msg): 2307db96d56Sopenharmony_ci self.loop.set_default_executor(executor) 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ci self.assertIsNone(self.loop._default_executor) 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ci def test_call_soon(self): 2357db96d56Sopenharmony_ci def cb(): 2367db96d56Sopenharmony_ci pass 2377db96d56Sopenharmony_ci 2387db96d56Sopenharmony_ci h = self.loop.call_soon(cb) 2397db96d56Sopenharmony_ci self.assertEqual(h._callback, cb) 2407db96d56Sopenharmony_ci self.assertIsInstance(h, asyncio.Handle) 2417db96d56Sopenharmony_ci self.assertIn(h, self.loop._ready) 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci def test_call_soon_non_callable(self): 2447db96d56Sopenharmony_ci self.loop.set_debug(True) 2457db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, 'a callable object'): 2467db96d56Sopenharmony_ci self.loop.call_soon(1) 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci def test_call_later(self): 2497db96d56Sopenharmony_ci def cb(): 2507db96d56Sopenharmony_ci pass 2517db96d56Sopenharmony_ci 2527db96d56Sopenharmony_ci h = self.loop.call_later(10.0, cb) 2537db96d56Sopenharmony_ci self.assertIsInstance(h, asyncio.TimerHandle) 2547db96d56Sopenharmony_ci self.assertIn(h, self.loop._scheduled) 2557db96d56Sopenharmony_ci self.assertNotIn(h, self.loop._ready) 2567db96d56Sopenharmony_ci with self.assertRaises(TypeError, msg="delay must not be None"): 2577db96d56Sopenharmony_ci self.loop.call_later(None, cb) 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ci def test_call_later_negative_delays(self): 2607db96d56Sopenharmony_ci calls = [] 2617db96d56Sopenharmony_ci 2627db96d56Sopenharmony_ci def cb(arg): 2637db96d56Sopenharmony_ci calls.append(arg) 2647db96d56Sopenharmony_ci 2657db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 2667db96d56Sopenharmony_ci self.loop.call_later(-1, cb, 'a') 2677db96d56Sopenharmony_ci self.loop.call_later(-2, cb, 'b') 2687db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 2697db96d56Sopenharmony_ci self.assertEqual(calls, ['b', 'a']) 2707db96d56Sopenharmony_ci 2717db96d56Sopenharmony_ci def test_time_and_call_at(self): 2727db96d56Sopenharmony_ci def cb(): 2737db96d56Sopenharmony_ci self.loop.stop() 2747db96d56Sopenharmony_ci 2757db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 2767db96d56Sopenharmony_ci delay = 0.1 2777db96d56Sopenharmony_ci 2787db96d56Sopenharmony_ci when = self.loop.time() + delay 2797db96d56Sopenharmony_ci self.loop.call_at(when, cb) 2807db96d56Sopenharmony_ci t0 = self.loop.time() 2817db96d56Sopenharmony_ci self.loop.run_forever() 2827db96d56Sopenharmony_ci dt = self.loop.time() - t0 2837db96d56Sopenharmony_ci 2847db96d56Sopenharmony_ci # 50 ms: maximum granularity of the event loop 2857db96d56Sopenharmony_ci self.assertGreaterEqual(dt, delay - 0.050, dt) 2867db96d56Sopenharmony_ci # tolerate a difference of +800 ms because some Python buildbots 2877db96d56Sopenharmony_ci # are really slow 2887db96d56Sopenharmony_ci self.assertLessEqual(dt, 0.9, dt) 2897db96d56Sopenharmony_ci with self.assertRaises(TypeError, msg="when cannot be None"): 2907db96d56Sopenharmony_ci self.loop.call_at(None, cb) 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci def check_thread(self, loop, debug): 2937db96d56Sopenharmony_ci def cb(): 2947db96d56Sopenharmony_ci pass 2957db96d56Sopenharmony_ci 2967db96d56Sopenharmony_ci loop.set_debug(debug) 2977db96d56Sopenharmony_ci if debug: 2987db96d56Sopenharmony_ci msg = ("Non-thread-safe operation invoked on an event loop other " 2997db96d56Sopenharmony_ci "than the current one") 3007db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, msg): 3017db96d56Sopenharmony_ci loop.call_soon(cb) 3027db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, msg): 3037db96d56Sopenharmony_ci loop.call_later(60, cb) 3047db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, msg): 3057db96d56Sopenharmony_ci loop.call_at(loop.time() + 60, cb) 3067db96d56Sopenharmony_ci else: 3077db96d56Sopenharmony_ci loop.call_soon(cb) 3087db96d56Sopenharmony_ci loop.call_later(60, cb) 3097db96d56Sopenharmony_ci loop.call_at(loop.time() + 60, cb) 3107db96d56Sopenharmony_ci 3117db96d56Sopenharmony_ci def test_check_thread(self): 3127db96d56Sopenharmony_ci def check_in_thread(loop, event, debug, create_loop, fut): 3137db96d56Sopenharmony_ci # wait until the event loop is running 3147db96d56Sopenharmony_ci event.wait() 3157db96d56Sopenharmony_ci 3167db96d56Sopenharmony_ci try: 3177db96d56Sopenharmony_ci if create_loop: 3187db96d56Sopenharmony_ci loop2 = base_events.BaseEventLoop() 3197db96d56Sopenharmony_ci try: 3207db96d56Sopenharmony_ci asyncio.set_event_loop(loop2) 3217db96d56Sopenharmony_ci self.check_thread(loop, debug) 3227db96d56Sopenharmony_ci finally: 3237db96d56Sopenharmony_ci asyncio.set_event_loop(None) 3247db96d56Sopenharmony_ci loop2.close() 3257db96d56Sopenharmony_ci else: 3267db96d56Sopenharmony_ci self.check_thread(loop, debug) 3277db96d56Sopenharmony_ci except Exception as exc: 3287db96d56Sopenharmony_ci loop.call_soon_threadsafe(fut.set_exception, exc) 3297db96d56Sopenharmony_ci else: 3307db96d56Sopenharmony_ci loop.call_soon_threadsafe(fut.set_result, None) 3317db96d56Sopenharmony_ci 3327db96d56Sopenharmony_ci def test_thread(loop, debug, create_loop=False): 3337db96d56Sopenharmony_ci event = threading.Event() 3347db96d56Sopenharmony_ci fut = loop.create_future() 3357db96d56Sopenharmony_ci loop.call_soon(event.set) 3367db96d56Sopenharmony_ci args = (loop, event, debug, create_loop, fut) 3377db96d56Sopenharmony_ci thread = threading.Thread(target=check_in_thread, args=args) 3387db96d56Sopenharmony_ci thread.start() 3397db96d56Sopenharmony_ci loop.run_until_complete(fut) 3407db96d56Sopenharmony_ci thread.join() 3417db96d56Sopenharmony_ci 3427db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 3437db96d56Sopenharmony_ci self.loop._write_to_self = mock.Mock() 3447db96d56Sopenharmony_ci 3457db96d56Sopenharmony_ci # raise RuntimeError if the thread has no event loop 3467db96d56Sopenharmony_ci test_thread(self.loop, True) 3477db96d56Sopenharmony_ci 3487db96d56Sopenharmony_ci # check disabled if debug mode is disabled 3497db96d56Sopenharmony_ci test_thread(self.loop, False) 3507db96d56Sopenharmony_ci 3517db96d56Sopenharmony_ci # raise RuntimeError if the event loop of the thread is not the called 3527db96d56Sopenharmony_ci # event loop 3537db96d56Sopenharmony_ci test_thread(self.loop, True, create_loop=True) 3547db96d56Sopenharmony_ci 3557db96d56Sopenharmony_ci # check disabled if debug mode is disabled 3567db96d56Sopenharmony_ci test_thread(self.loop, False, create_loop=True) 3577db96d56Sopenharmony_ci 3587db96d56Sopenharmony_ci def test__run_once(self): 3597db96d56Sopenharmony_ci h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (), 3607db96d56Sopenharmony_ci self.loop, None) 3617db96d56Sopenharmony_ci h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (), 3627db96d56Sopenharmony_ci self.loop, None) 3637db96d56Sopenharmony_ci 3647db96d56Sopenharmony_ci h1.cancel() 3657db96d56Sopenharmony_ci 3667db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 3677db96d56Sopenharmony_ci self.loop._scheduled.append(h1) 3687db96d56Sopenharmony_ci self.loop._scheduled.append(h2) 3697db96d56Sopenharmony_ci self.loop._run_once() 3707db96d56Sopenharmony_ci 3717db96d56Sopenharmony_ci t = self.loop._selector.select.call_args[0][0] 3727db96d56Sopenharmony_ci self.assertTrue(9.5 < t < 10.5, t) 3737db96d56Sopenharmony_ci self.assertEqual([h2], self.loop._scheduled) 3747db96d56Sopenharmony_ci self.assertTrue(self.loop._process_events.called) 3757db96d56Sopenharmony_ci 3767db96d56Sopenharmony_ci def test_set_debug(self): 3777db96d56Sopenharmony_ci self.loop.set_debug(True) 3787db96d56Sopenharmony_ci self.assertTrue(self.loop.get_debug()) 3797db96d56Sopenharmony_ci self.loop.set_debug(False) 3807db96d56Sopenharmony_ci self.assertFalse(self.loop.get_debug()) 3817db96d56Sopenharmony_ci 3827db96d56Sopenharmony_ci def test__run_once_schedule_handle(self): 3837db96d56Sopenharmony_ci handle = None 3847db96d56Sopenharmony_ci processed = False 3857db96d56Sopenharmony_ci 3867db96d56Sopenharmony_ci def cb(loop): 3877db96d56Sopenharmony_ci nonlocal processed, handle 3887db96d56Sopenharmony_ci processed = True 3897db96d56Sopenharmony_ci handle = loop.call_soon(lambda: True) 3907db96d56Sopenharmony_ci 3917db96d56Sopenharmony_ci h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,), 3927db96d56Sopenharmony_ci self.loop, None) 3937db96d56Sopenharmony_ci 3947db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 3957db96d56Sopenharmony_ci self.loop._scheduled.append(h) 3967db96d56Sopenharmony_ci self.loop._run_once() 3977db96d56Sopenharmony_ci 3987db96d56Sopenharmony_ci self.assertTrue(processed) 3997db96d56Sopenharmony_ci self.assertEqual([handle], list(self.loop._ready)) 4007db96d56Sopenharmony_ci 4017db96d56Sopenharmony_ci def test__run_once_cancelled_event_cleanup(self): 4027db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 4037db96d56Sopenharmony_ci 4047db96d56Sopenharmony_ci self.assertTrue( 4057db96d56Sopenharmony_ci 0 < base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION < 1.0) 4067db96d56Sopenharmony_ci 4077db96d56Sopenharmony_ci def cb(): 4087db96d56Sopenharmony_ci pass 4097db96d56Sopenharmony_ci 4107db96d56Sopenharmony_ci # Set up one "blocking" event that will not be cancelled to 4117db96d56Sopenharmony_ci # ensure later cancelled events do not make it to the head 4127db96d56Sopenharmony_ci # of the queue and get cleaned. 4137db96d56Sopenharmony_ci not_cancelled_count = 1 4147db96d56Sopenharmony_ci self.loop.call_later(3000, cb) 4157db96d56Sopenharmony_ci 4167db96d56Sopenharmony_ci # Add less than threshold (base_events._MIN_SCHEDULED_TIMER_HANDLES) 4177db96d56Sopenharmony_ci # cancelled handles, ensure they aren't removed 4187db96d56Sopenharmony_ci 4197db96d56Sopenharmony_ci cancelled_count = 2 4207db96d56Sopenharmony_ci for x in range(2): 4217db96d56Sopenharmony_ci h = self.loop.call_later(3600, cb) 4227db96d56Sopenharmony_ci h.cancel() 4237db96d56Sopenharmony_ci 4247db96d56Sopenharmony_ci # Add some cancelled events that will be at head and removed 4257db96d56Sopenharmony_ci cancelled_count += 2 4267db96d56Sopenharmony_ci for x in range(2): 4277db96d56Sopenharmony_ci h = self.loop.call_later(100, cb) 4287db96d56Sopenharmony_ci h.cancel() 4297db96d56Sopenharmony_ci 4307db96d56Sopenharmony_ci # This test is invalid if _MIN_SCHEDULED_TIMER_HANDLES is too low 4317db96d56Sopenharmony_ci self.assertLessEqual(cancelled_count + not_cancelled_count, 4327db96d56Sopenharmony_ci base_events._MIN_SCHEDULED_TIMER_HANDLES) 4337db96d56Sopenharmony_ci 4347db96d56Sopenharmony_ci self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 4357db96d56Sopenharmony_ci 4367db96d56Sopenharmony_ci self.loop._run_once() 4377db96d56Sopenharmony_ci 4387db96d56Sopenharmony_ci cancelled_count -= 2 4397db96d56Sopenharmony_ci 4407db96d56Sopenharmony_ci self.assertEqual(self.loop._timer_cancelled_count, cancelled_count) 4417db96d56Sopenharmony_ci 4427db96d56Sopenharmony_ci self.assertEqual(len(self.loop._scheduled), 4437db96d56Sopenharmony_ci cancelled_count + not_cancelled_count) 4447db96d56Sopenharmony_ci 4457db96d56Sopenharmony_ci # Need enough events to pass _MIN_CANCELLED_TIMER_HANDLES_FRACTION 4467db96d56Sopenharmony_ci # so that deletion of cancelled events will occur on next _run_once 4477db96d56Sopenharmony_ci add_cancel_count = int(math.ceil( 4487db96d56Sopenharmony_ci base_events._MIN_SCHEDULED_TIMER_HANDLES * 4497db96d56Sopenharmony_ci base_events._MIN_CANCELLED_TIMER_HANDLES_FRACTION)) + 1 4507db96d56Sopenharmony_ci 4517db96d56Sopenharmony_ci add_not_cancel_count = max(base_events._MIN_SCHEDULED_TIMER_HANDLES - 4527db96d56Sopenharmony_ci add_cancel_count, 0) 4537db96d56Sopenharmony_ci 4547db96d56Sopenharmony_ci # Add some events that will not be cancelled 4557db96d56Sopenharmony_ci not_cancelled_count += add_not_cancel_count 4567db96d56Sopenharmony_ci for x in range(add_not_cancel_count): 4577db96d56Sopenharmony_ci self.loop.call_later(3600, cb) 4587db96d56Sopenharmony_ci 4597db96d56Sopenharmony_ci # Add enough cancelled events 4607db96d56Sopenharmony_ci cancelled_count += add_cancel_count 4617db96d56Sopenharmony_ci for x in range(add_cancel_count): 4627db96d56Sopenharmony_ci h = self.loop.call_later(3600, cb) 4637db96d56Sopenharmony_ci h.cancel() 4647db96d56Sopenharmony_ci 4657db96d56Sopenharmony_ci # Ensure all handles are still scheduled 4667db96d56Sopenharmony_ci self.assertEqual(len(self.loop._scheduled), 4677db96d56Sopenharmony_ci cancelled_count + not_cancelled_count) 4687db96d56Sopenharmony_ci 4697db96d56Sopenharmony_ci self.loop._run_once() 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci # Ensure cancelled events were removed 4727db96d56Sopenharmony_ci self.assertEqual(len(self.loop._scheduled), not_cancelled_count) 4737db96d56Sopenharmony_ci 4747db96d56Sopenharmony_ci # Ensure only uncancelled events remain scheduled 4757db96d56Sopenharmony_ci self.assertTrue(all([not x._cancelled for x in self.loop._scheduled])) 4767db96d56Sopenharmony_ci 4777db96d56Sopenharmony_ci def test_run_until_complete_type_error(self): 4787db96d56Sopenharmony_ci self.assertRaises(TypeError, 4797db96d56Sopenharmony_ci self.loop.run_until_complete, 'blah') 4807db96d56Sopenharmony_ci 4817db96d56Sopenharmony_ci def test_run_until_complete_loop(self): 4827db96d56Sopenharmony_ci task = self.loop.create_future() 4837db96d56Sopenharmony_ci other_loop = self.new_test_loop() 4847db96d56Sopenharmony_ci self.addCleanup(other_loop.close) 4857db96d56Sopenharmony_ci self.assertRaises(ValueError, 4867db96d56Sopenharmony_ci other_loop.run_until_complete, task) 4877db96d56Sopenharmony_ci 4887db96d56Sopenharmony_ci def test_run_until_complete_loop_orphan_future_close_loop(self): 4897db96d56Sopenharmony_ci class ShowStopper(SystemExit): 4907db96d56Sopenharmony_ci pass 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_ci async def foo(delay): 4937db96d56Sopenharmony_ci await asyncio.sleep(delay) 4947db96d56Sopenharmony_ci 4957db96d56Sopenharmony_ci def throw(): 4967db96d56Sopenharmony_ci raise ShowStopper 4977db96d56Sopenharmony_ci 4987db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 4997db96d56Sopenharmony_ci self.loop.call_soon(throw) 5007db96d56Sopenharmony_ci with self.assertRaises(ShowStopper): 5017db96d56Sopenharmony_ci self.loop.run_until_complete(foo(0.1)) 5027db96d56Sopenharmony_ci 5037db96d56Sopenharmony_ci # This call fails if run_until_complete does not clean up 5047db96d56Sopenharmony_ci # done-callback for the previous future. 5057db96d56Sopenharmony_ci self.loop.run_until_complete(foo(0.2)) 5067db96d56Sopenharmony_ci 5077db96d56Sopenharmony_ci def test_subprocess_exec_invalid_args(self): 5087db96d56Sopenharmony_ci args = [sys.executable, '-c', 'pass'] 5097db96d56Sopenharmony_ci 5107db96d56Sopenharmony_ci # missing program parameter (empty args) 5117db96d56Sopenharmony_ci self.assertRaises(TypeError, 5127db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_exec, 5137db96d56Sopenharmony_ci asyncio.SubprocessProtocol) 5147db96d56Sopenharmony_ci 5157db96d56Sopenharmony_ci # expected multiple arguments, not a list 5167db96d56Sopenharmony_ci self.assertRaises(TypeError, 5177db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_exec, 5187db96d56Sopenharmony_ci asyncio.SubprocessProtocol, args) 5197db96d56Sopenharmony_ci 5207db96d56Sopenharmony_ci # program arguments must be strings, not int 5217db96d56Sopenharmony_ci self.assertRaises(TypeError, 5227db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_exec, 5237db96d56Sopenharmony_ci asyncio.SubprocessProtocol, sys.executable, 123) 5247db96d56Sopenharmony_ci 5257db96d56Sopenharmony_ci # universal_newlines, shell, bufsize must not be set 5267db96d56Sopenharmony_ci self.assertRaises(TypeError, 5277db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_exec, 5287db96d56Sopenharmony_ci asyncio.SubprocessProtocol, *args, universal_newlines=True) 5297db96d56Sopenharmony_ci self.assertRaises(TypeError, 5307db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_exec, 5317db96d56Sopenharmony_ci asyncio.SubprocessProtocol, *args, shell=True) 5327db96d56Sopenharmony_ci self.assertRaises(TypeError, 5337db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_exec, 5347db96d56Sopenharmony_ci asyncio.SubprocessProtocol, *args, bufsize=4096) 5357db96d56Sopenharmony_ci 5367db96d56Sopenharmony_ci def test_subprocess_shell_invalid_args(self): 5377db96d56Sopenharmony_ci # expected a string, not an int or a list 5387db96d56Sopenharmony_ci self.assertRaises(TypeError, 5397db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_shell, 5407db96d56Sopenharmony_ci asyncio.SubprocessProtocol, 123) 5417db96d56Sopenharmony_ci self.assertRaises(TypeError, 5427db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_shell, 5437db96d56Sopenharmony_ci asyncio.SubprocessProtocol, [sys.executable, '-c', 'pass']) 5447db96d56Sopenharmony_ci 5457db96d56Sopenharmony_ci # universal_newlines, shell, bufsize must not be set 5467db96d56Sopenharmony_ci self.assertRaises(TypeError, 5477db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_shell, 5487db96d56Sopenharmony_ci asyncio.SubprocessProtocol, 'exit 0', universal_newlines=True) 5497db96d56Sopenharmony_ci self.assertRaises(TypeError, 5507db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_shell, 5517db96d56Sopenharmony_ci asyncio.SubprocessProtocol, 'exit 0', shell=True) 5527db96d56Sopenharmony_ci self.assertRaises(TypeError, 5537db96d56Sopenharmony_ci self.loop.run_until_complete, self.loop.subprocess_shell, 5547db96d56Sopenharmony_ci asyncio.SubprocessProtocol, 'exit 0', bufsize=4096) 5557db96d56Sopenharmony_ci 5567db96d56Sopenharmony_ci def test_default_exc_handler_callback(self): 5577db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 5587db96d56Sopenharmony_ci 5597db96d56Sopenharmony_ci def zero_error(fut): 5607db96d56Sopenharmony_ci fut.set_result(True) 5617db96d56Sopenharmony_ci 1/0 5627db96d56Sopenharmony_ci 5637db96d56Sopenharmony_ci # Test call_soon (events.Handle) 5647db96d56Sopenharmony_ci with mock.patch('asyncio.base_events.logger') as log: 5657db96d56Sopenharmony_ci fut = self.loop.create_future() 5667db96d56Sopenharmony_ci self.loop.call_soon(zero_error, fut) 5677db96d56Sopenharmony_ci fut.add_done_callback(lambda fut: self.loop.stop()) 5687db96d56Sopenharmony_ci self.loop.run_forever() 5697db96d56Sopenharmony_ci log.error.assert_called_with( 5707db96d56Sopenharmony_ci test_utils.MockPattern('Exception in callback.*zero'), 5717db96d56Sopenharmony_ci exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 5727db96d56Sopenharmony_ci 5737db96d56Sopenharmony_ci # Test call_later (events.TimerHandle) 5747db96d56Sopenharmony_ci with mock.patch('asyncio.base_events.logger') as log: 5757db96d56Sopenharmony_ci fut = self.loop.create_future() 5767db96d56Sopenharmony_ci self.loop.call_later(0.01, zero_error, fut) 5777db96d56Sopenharmony_ci fut.add_done_callback(lambda fut: self.loop.stop()) 5787db96d56Sopenharmony_ci self.loop.run_forever() 5797db96d56Sopenharmony_ci log.error.assert_called_with( 5807db96d56Sopenharmony_ci test_utils.MockPattern('Exception in callback.*zero'), 5817db96d56Sopenharmony_ci exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 5827db96d56Sopenharmony_ci 5837db96d56Sopenharmony_ci def test_default_exc_handler_coro(self): 5847db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 5857db96d56Sopenharmony_ci 5867db96d56Sopenharmony_ci async def zero_error_coro(): 5877db96d56Sopenharmony_ci await asyncio.sleep(0.01) 5887db96d56Sopenharmony_ci 1/0 5897db96d56Sopenharmony_ci 5907db96d56Sopenharmony_ci # Test Future.__del__ 5917db96d56Sopenharmony_ci with mock.patch('asyncio.base_events.logger') as log: 5927db96d56Sopenharmony_ci fut = asyncio.ensure_future(zero_error_coro(), loop=self.loop) 5937db96d56Sopenharmony_ci fut.add_done_callback(lambda *args: self.loop.stop()) 5947db96d56Sopenharmony_ci self.loop.run_forever() 5957db96d56Sopenharmony_ci fut = None # Trigger Future.__del__ or futures._TracebackLogger 5967db96d56Sopenharmony_ci support.gc_collect() 5977db96d56Sopenharmony_ci # Future.__del__ in logs error with an actual exception context 5987db96d56Sopenharmony_ci log.error.assert_called_with( 5997db96d56Sopenharmony_ci test_utils.MockPattern('.*exception was never retrieved'), 6007db96d56Sopenharmony_ci exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 6017db96d56Sopenharmony_ci 6027db96d56Sopenharmony_ci def test_set_exc_handler_invalid(self): 6037db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, 'A callable object or None'): 6047db96d56Sopenharmony_ci self.loop.set_exception_handler('spam') 6057db96d56Sopenharmony_ci 6067db96d56Sopenharmony_ci def test_set_exc_handler_custom(self): 6077db96d56Sopenharmony_ci def zero_error(): 6087db96d56Sopenharmony_ci 1/0 6097db96d56Sopenharmony_ci 6107db96d56Sopenharmony_ci def run_loop(): 6117db96d56Sopenharmony_ci handle = self.loop.call_soon(zero_error) 6127db96d56Sopenharmony_ci self.loop._run_once() 6137db96d56Sopenharmony_ci return handle 6147db96d56Sopenharmony_ci 6157db96d56Sopenharmony_ci self.loop.set_debug(True) 6167db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 6177db96d56Sopenharmony_ci 6187db96d56Sopenharmony_ci self.assertIsNone(self.loop.get_exception_handler()) 6197db96d56Sopenharmony_ci mock_handler = mock.Mock() 6207db96d56Sopenharmony_ci self.loop.set_exception_handler(mock_handler) 6217db96d56Sopenharmony_ci self.assertIs(self.loop.get_exception_handler(), mock_handler) 6227db96d56Sopenharmony_ci handle = run_loop() 6237db96d56Sopenharmony_ci mock_handler.assert_called_with(self.loop, { 6247db96d56Sopenharmony_ci 'exception': MOCK_ANY, 6257db96d56Sopenharmony_ci 'message': test_utils.MockPattern( 6267db96d56Sopenharmony_ci 'Exception in callback.*zero_error'), 6277db96d56Sopenharmony_ci 'handle': handle, 6287db96d56Sopenharmony_ci 'source_traceback': handle._source_traceback, 6297db96d56Sopenharmony_ci }) 6307db96d56Sopenharmony_ci mock_handler.reset_mock() 6317db96d56Sopenharmony_ci 6327db96d56Sopenharmony_ci self.loop.set_exception_handler(None) 6337db96d56Sopenharmony_ci with mock.patch('asyncio.base_events.logger') as log: 6347db96d56Sopenharmony_ci run_loop() 6357db96d56Sopenharmony_ci log.error.assert_called_with( 6367db96d56Sopenharmony_ci test_utils.MockPattern( 6377db96d56Sopenharmony_ci 'Exception in callback.*zero'), 6387db96d56Sopenharmony_ci exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY)) 6397db96d56Sopenharmony_ci 6407db96d56Sopenharmony_ci self.assertFalse(mock_handler.called) 6417db96d56Sopenharmony_ci 6427db96d56Sopenharmony_ci def test_set_exc_handler_broken(self): 6437db96d56Sopenharmony_ci def run_loop(): 6447db96d56Sopenharmony_ci def zero_error(): 6457db96d56Sopenharmony_ci 1/0 6467db96d56Sopenharmony_ci self.loop.call_soon(zero_error) 6477db96d56Sopenharmony_ci self.loop._run_once() 6487db96d56Sopenharmony_ci 6497db96d56Sopenharmony_ci def handler(loop, context): 6507db96d56Sopenharmony_ci raise AttributeError('spam') 6517db96d56Sopenharmony_ci 6527db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 6537db96d56Sopenharmony_ci 6547db96d56Sopenharmony_ci self.loop.set_exception_handler(handler) 6557db96d56Sopenharmony_ci 6567db96d56Sopenharmony_ci with mock.patch('asyncio.base_events.logger') as log: 6577db96d56Sopenharmony_ci run_loop() 6587db96d56Sopenharmony_ci log.error.assert_called_with( 6597db96d56Sopenharmony_ci test_utils.MockPattern( 6607db96d56Sopenharmony_ci 'Unhandled error in exception handler'), 6617db96d56Sopenharmony_ci exc_info=(AttributeError, MOCK_ANY, MOCK_ANY)) 6627db96d56Sopenharmony_ci 6637db96d56Sopenharmony_ci def test_default_exc_handler_broken(self): 6647db96d56Sopenharmony_ci _context = None 6657db96d56Sopenharmony_ci 6667db96d56Sopenharmony_ci class Loop(base_events.BaseEventLoop): 6677db96d56Sopenharmony_ci 6687db96d56Sopenharmony_ci _selector = mock.Mock() 6697db96d56Sopenharmony_ci _process_events = mock.Mock() 6707db96d56Sopenharmony_ci 6717db96d56Sopenharmony_ci def default_exception_handler(self, context): 6727db96d56Sopenharmony_ci nonlocal _context 6737db96d56Sopenharmony_ci _context = context 6747db96d56Sopenharmony_ci # Simulates custom buggy "default_exception_handler" 6757db96d56Sopenharmony_ci raise ValueError('spam') 6767db96d56Sopenharmony_ci 6777db96d56Sopenharmony_ci loop = Loop() 6787db96d56Sopenharmony_ci self.addCleanup(loop.close) 6797db96d56Sopenharmony_ci asyncio.set_event_loop(loop) 6807db96d56Sopenharmony_ci 6817db96d56Sopenharmony_ci def run_loop(): 6827db96d56Sopenharmony_ci def zero_error(): 6837db96d56Sopenharmony_ci 1/0 6847db96d56Sopenharmony_ci loop.call_soon(zero_error) 6857db96d56Sopenharmony_ci loop._run_once() 6867db96d56Sopenharmony_ci 6877db96d56Sopenharmony_ci with mock.patch('asyncio.base_events.logger') as log: 6887db96d56Sopenharmony_ci run_loop() 6897db96d56Sopenharmony_ci log.error.assert_called_with( 6907db96d56Sopenharmony_ci 'Exception in default exception handler', 6917db96d56Sopenharmony_ci exc_info=True) 6927db96d56Sopenharmony_ci 6937db96d56Sopenharmony_ci def custom_handler(loop, context): 6947db96d56Sopenharmony_ci raise ValueError('ham') 6957db96d56Sopenharmony_ci 6967db96d56Sopenharmony_ci _context = None 6977db96d56Sopenharmony_ci loop.set_exception_handler(custom_handler) 6987db96d56Sopenharmony_ci with mock.patch('asyncio.base_events.logger') as log: 6997db96d56Sopenharmony_ci run_loop() 7007db96d56Sopenharmony_ci log.error.assert_called_with( 7017db96d56Sopenharmony_ci test_utils.MockPattern('Exception in default exception.*' 7027db96d56Sopenharmony_ci 'while handling.*in custom'), 7037db96d56Sopenharmony_ci exc_info=True) 7047db96d56Sopenharmony_ci 7057db96d56Sopenharmony_ci # Check that original context was passed to default 7067db96d56Sopenharmony_ci # exception handler. 7077db96d56Sopenharmony_ci self.assertIn('context', _context) 7087db96d56Sopenharmony_ci self.assertIs(type(_context['context']['exception']), 7097db96d56Sopenharmony_ci ZeroDivisionError) 7107db96d56Sopenharmony_ci 7117db96d56Sopenharmony_ci def test_set_task_factory_invalid(self): 7127db96d56Sopenharmony_ci with self.assertRaisesRegex( 7137db96d56Sopenharmony_ci TypeError, 'task factory must be a callable or None'): 7147db96d56Sopenharmony_ci 7157db96d56Sopenharmony_ci self.loop.set_task_factory(1) 7167db96d56Sopenharmony_ci 7177db96d56Sopenharmony_ci self.assertIsNone(self.loop.get_task_factory()) 7187db96d56Sopenharmony_ci 7197db96d56Sopenharmony_ci def test_set_task_factory(self): 7207db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 7217db96d56Sopenharmony_ci 7227db96d56Sopenharmony_ci class MyTask(asyncio.Task): 7237db96d56Sopenharmony_ci pass 7247db96d56Sopenharmony_ci 7257db96d56Sopenharmony_ci async def coro(): 7267db96d56Sopenharmony_ci pass 7277db96d56Sopenharmony_ci 7287db96d56Sopenharmony_ci factory = lambda loop, coro: MyTask(coro, loop=loop) 7297db96d56Sopenharmony_ci 7307db96d56Sopenharmony_ci self.assertIsNone(self.loop.get_task_factory()) 7317db96d56Sopenharmony_ci self.loop.set_task_factory(factory) 7327db96d56Sopenharmony_ci self.assertIs(self.loop.get_task_factory(), factory) 7337db96d56Sopenharmony_ci 7347db96d56Sopenharmony_ci task = self.loop.create_task(coro()) 7357db96d56Sopenharmony_ci self.assertTrue(isinstance(task, MyTask)) 7367db96d56Sopenharmony_ci self.loop.run_until_complete(task) 7377db96d56Sopenharmony_ci 7387db96d56Sopenharmony_ci self.loop.set_task_factory(None) 7397db96d56Sopenharmony_ci self.assertIsNone(self.loop.get_task_factory()) 7407db96d56Sopenharmony_ci 7417db96d56Sopenharmony_ci task = self.loop.create_task(coro()) 7427db96d56Sopenharmony_ci self.assertTrue(isinstance(task, asyncio.Task)) 7437db96d56Sopenharmony_ci self.assertFalse(isinstance(task, MyTask)) 7447db96d56Sopenharmony_ci self.loop.run_until_complete(task) 7457db96d56Sopenharmony_ci 7467db96d56Sopenharmony_ci def test_env_var_debug(self): 7477db96d56Sopenharmony_ci code = '\n'.join(( 7487db96d56Sopenharmony_ci 'import asyncio', 7497db96d56Sopenharmony_ci 'loop = asyncio.new_event_loop()', 7507db96d56Sopenharmony_ci 'print(loop.get_debug())')) 7517db96d56Sopenharmony_ci 7527db96d56Sopenharmony_ci # Test with -E to not fail if the unit test was run with 7537db96d56Sopenharmony_ci # PYTHONASYNCIODEBUG set to a non-empty string 7547db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-E', '-c', code) 7557db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'False') 7567db96d56Sopenharmony_ci 7577db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-c', code, 7587db96d56Sopenharmony_ci PYTHONASYNCIODEBUG='', 7597db96d56Sopenharmony_ci PYTHONDEVMODE='') 7607db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'False') 7617db96d56Sopenharmony_ci 7627db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-c', code, 7637db96d56Sopenharmony_ci PYTHONASYNCIODEBUG='1', 7647db96d56Sopenharmony_ci PYTHONDEVMODE='') 7657db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'True') 7667db96d56Sopenharmony_ci 7677db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-E', '-c', code, 7687db96d56Sopenharmony_ci PYTHONASYNCIODEBUG='1') 7697db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'False') 7707db96d56Sopenharmony_ci 7717db96d56Sopenharmony_ci # -X dev 7727db96d56Sopenharmony_ci sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 7737db96d56Sopenharmony_ci '-c', code) 7747db96d56Sopenharmony_ci self.assertEqual(stdout.rstrip(), b'True') 7757db96d56Sopenharmony_ci 7767db96d56Sopenharmony_ci def test_create_task(self): 7777db96d56Sopenharmony_ci class MyTask(asyncio.Task): 7787db96d56Sopenharmony_ci pass 7797db96d56Sopenharmony_ci 7807db96d56Sopenharmony_ci async def test(): 7817db96d56Sopenharmony_ci pass 7827db96d56Sopenharmony_ci 7837db96d56Sopenharmony_ci class EventLoop(base_events.BaseEventLoop): 7847db96d56Sopenharmony_ci def create_task(self, coro): 7857db96d56Sopenharmony_ci return MyTask(coro, loop=loop) 7867db96d56Sopenharmony_ci 7877db96d56Sopenharmony_ci loop = EventLoop() 7887db96d56Sopenharmony_ci self.set_event_loop(loop) 7897db96d56Sopenharmony_ci 7907db96d56Sopenharmony_ci coro = test() 7917db96d56Sopenharmony_ci task = asyncio.ensure_future(coro, loop=loop) 7927db96d56Sopenharmony_ci self.assertIsInstance(task, MyTask) 7937db96d56Sopenharmony_ci 7947db96d56Sopenharmony_ci # make warnings quiet 7957db96d56Sopenharmony_ci task._log_destroy_pending = False 7967db96d56Sopenharmony_ci coro.close() 7977db96d56Sopenharmony_ci 7987db96d56Sopenharmony_ci def test_create_task_error_closes_coro(self): 7997db96d56Sopenharmony_ci async def test(): 8007db96d56Sopenharmony_ci pass 8017db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 8027db96d56Sopenharmony_ci loop.close() 8037db96d56Sopenharmony_ci with warnings.catch_warnings(record=True) as w: 8047db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 8057db96d56Sopenharmony_ci asyncio.ensure_future(test(), loop=loop) 8067db96d56Sopenharmony_ci self.assertEqual(len(w), 0) 8077db96d56Sopenharmony_ci 8087db96d56Sopenharmony_ci 8097db96d56Sopenharmony_ci def test_create_named_task_with_default_factory(self): 8107db96d56Sopenharmony_ci async def test(): 8117db96d56Sopenharmony_ci pass 8127db96d56Sopenharmony_ci 8137db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 8147db96d56Sopenharmony_ci task = loop.create_task(test(), name='test_task') 8157db96d56Sopenharmony_ci try: 8167db96d56Sopenharmony_ci self.assertEqual(task.get_name(), 'test_task') 8177db96d56Sopenharmony_ci finally: 8187db96d56Sopenharmony_ci loop.run_until_complete(task) 8197db96d56Sopenharmony_ci loop.close() 8207db96d56Sopenharmony_ci 8217db96d56Sopenharmony_ci def test_create_named_task_with_custom_factory(self): 8227db96d56Sopenharmony_ci def task_factory(loop, coro): 8237db96d56Sopenharmony_ci return asyncio.Task(coro, loop=loop) 8247db96d56Sopenharmony_ci 8257db96d56Sopenharmony_ci async def test(): 8267db96d56Sopenharmony_ci pass 8277db96d56Sopenharmony_ci 8287db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 8297db96d56Sopenharmony_ci loop.set_task_factory(task_factory) 8307db96d56Sopenharmony_ci task = loop.create_task(test(), name='test_task') 8317db96d56Sopenharmony_ci try: 8327db96d56Sopenharmony_ci self.assertEqual(task.get_name(), 'test_task') 8337db96d56Sopenharmony_ci finally: 8347db96d56Sopenharmony_ci loop.run_until_complete(task) 8357db96d56Sopenharmony_ci loop.close() 8367db96d56Sopenharmony_ci 8377db96d56Sopenharmony_ci def test_run_forever_keyboard_interrupt(self): 8387db96d56Sopenharmony_ci # Python issue #22601: ensure that the temporary task created by 8397db96d56Sopenharmony_ci # run_forever() consumes the KeyboardInterrupt and so don't log 8407db96d56Sopenharmony_ci # a warning 8417db96d56Sopenharmony_ci async def raise_keyboard_interrupt(): 8427db96d56Sopenharmony_ci raise KeyboardInterrupt 8437db96d56Sopenharmony_ci 8447db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 8457db96d56Sopenharmony_ci self.loop.call_exception_handler = mock.Mock() 8467db96d56Sopenharmony_ci 8477db96d56Sopenharmony_ci try: 8487db96d56Sopenharmony_ci self.loop.run_until_complete(raise_keyboard_interrupt()) 8497db96d56Sopenharmony_ci except KeyboardInterrupt: 8507db96d56Sopenharmony_ci pass 8517db96d56Sopenharmony_ci self.loop.close() 8527db96d56Sopenharmony_ci support.gc_collect() 8537db96d56Sopenharmony_ci 8547db96d56Sopenharmony_ci self.assertFalse(self.loop.call_exception_handler.called) 8557db96d56Sopenharmony_ci 8567db96d56Sopenharmony_ci def test_run_until_complete_baseexception(self): 8577db96d56Sopenharmony_ci # Python issue #22429: run_until_complete() must not schedule a pending 8587db96d56Sopenharmony_ci # call to stop() if the future raised a BaseException 8597db96d56Sopenharmony_ci async def raise_keyboard_interrupt(): 8607db96d56Sopenharmony_ci raise KeyboardInterrupt 8617db96d56Sopenharmony_ci 8627db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 8637db96d56Sopenharmony_ci 8647db96d56Sopenharmony_ci try: 8657db96d56Sopenharmony_ci self.loop.run_until_complete(raise_keyboard_interrupt()) 8667db96d56Sopenharmony_ci except KeyboardInterrupt: 8677db96d56Sopenharmony_ci pass 8687db96d56Sopenharmony_ci 8697db96d56Sopenharmony_ci def func(): 8707db96d56Sopenharmony_ci self.loop.stop() 8717db96d56Sopenharmony_ci func.called = True 8727db96d56Sopenharmony_ci func.called = False 8737db96d56Sopenharmony_ci try: 8747db96d56Sopenharmony_ci self.loop.call_soon(func) 8757db96d56Sopenharmony_ci self.loop.run_forever() 8767db96d56Sopenharmony_ci except KeyboardInterrupt: 8777db96d56Sopenharmony_ci pass 8787db96d56Sopenharmony_ci self.assertTrue(func.called) 8797db96d56Sopenharmony_ci 8807db96d56Sopenharmony_ci def test_single_selecter_event_callback_after_stopping(self): 8817db96d56Sopenharmony_ci # Python issue #25593: A stopped event loop may cause event callbacks 8827db96d56Sopenharmony_ci # to run more than once. 8837db96d56Sopenharmony_ci event_sentinel = object() 8847db96d56Sopenharmony_ci callcount = 0 8857db96d56Sopenharmony_ci doer = None 8867db96d56Sopenharmony_ci 8877db96d56Sopenharmony_ci def proc_events(event_list): 8887db96d56Sopenharmony_ci nonlocal doer 8897db96d56Sopenharmony_ci if event_sentinel in event_list: 8907db96d56Sopenharmony_ci doer = self.loop.call_soon(do_event) 8917db96d56Sopenharmony_ci 8927db96d56Sopenharmony_ci def do_event(): 8937db96d56Sopenharmony_ci nonlocal callcount 8947db96d56Sopenharmony_ci callcount += 1 8957db96d56Sopenharmony_ci self.loop.call_soon(clear_selector) 8967db96d56Sopenharmony_ci 8977db96d56Sopenharmony_ci def clear_selector(): 8987db96d56Sopenharmony_ci doer.cancel() 8997db96d56Sopenharmony_ci self.loop._selector.select.return_value = () 9007db96d56Sopenharmony_ci 9017db96d56Sopenharmony_ci self.loop._process_events = proc_events 9027db96d56Sopenharmony_ci self.loop._selector.select.return_value = (event_sentinel,) 9037db96d56Sopenharmony_ci 9047db96d56Sopenharmony_ci for i in range(1, 3): 9057db96d56Sopenharmony_ci with self.subTest('Loop %d/2' % i): 9067db96d56Sopenharmony_ci self.loop.call_soon(self.loop.stop) 9077db96d56Sopenharmony_ci self.loop.run_forever() 9087db96d56Sopenharmony_ci self.assertEqual(callcount, 1) 9097db96d56Sopenharmony_ci 9107db96d56Sopenharmony_ci def test_run_once(self): 9117db96d56Sopenharmony_ci # Simple test for test_utils.run_once(). It may seem strange 9127db96d56Sopenharmony_ci # to have a test for this (the function isn't even used!) but 9137db96d56Sopenharmony_ci # it's a de-factor standard API for library tests. This tests 9147db96d56Sopenharmony_ci # the idiom: loop.call_soon(loop.stop); loop.run_forever(). 9157db96d56Sopenharmony_ci count = 0 9167db96d56Sopenharmony_ci 9177db96d56Sopenharmony_ci def callback(): 9187db96d56Sopenharmony_ci nonlocal count 9197db96d56Sopenharmony_ci count += 1 9207db96d56Sopenharmony_ci 9217db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 9227db96d56Sopenharmony_ci self.loop.call_soon(callback) 9237db96d56Sopenharmony_ci test_utils.run_once(self.loop) 9247db96d56Sopenharmony_ci self.assertEqual(count, 1) 9257db96d56Sopenharmony_ci 9267db96d56Sopenharmony_ci def test_run_forever_pre_stopped(self): 9277db96d56Sopenharmony_ci # Test that the old idiom for pre-stopping the loop works. 9287db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 9297db96d56Sopenharmony_ci self.loop.stop() 9307db96d56Sopenharmony_ci self.loop.run_forever() 9317db96d56Sopenharmony_ci self.loop._selector.select.assert_called_once_with(0) 9327db96d56Sopenharmony_ci 9337db96d56Sopenharmony_ci async def leave_unfinalized_asyncgen(self): 9347db96d56Sopenharmony_ci # Create an async generator, iterate it partially, and leave it 9357db96d56Sopenharmony_ci # to be garbage collected. 9367db96d56Sopenharmony_ci # Used in async generator finalization tests. 9377db96d56Sopenharmony_ci # Depends on implementation details of garbage collector. Changes 9387db96d56Sopenharmony_ci # in gc may break this function. 9397db96d56Sopenharmony_ci status = {'started': False, 9407db96d56Sopenharmony_ci 'stopped': False, 9417db96d56Sopenharmony_ci 'finalized': False} 9427db96d56Sopenharmony_ci 9437db96d56Sopenharmony_ci async def agen(): 9447db96d56Sopenharmony_ci status['started'] = True 9457db96d56Sopenharmony_ci try: 9467db96d56Sopenharmony_ci for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']: 9477db96d56Sopenharmony_ci yield item 9487db96d56Sopenharmony_ci finally: 9497db96d56Sopenharmony_ci status['finalized'] = True 9507db96d56Sopenharmony_ci 9517db96d56Sopenharmony_ci ag = agen() 9527db96d56Sopenharmony_ci ai = ag.__aiter__() 9537db96d56Sopenharmony_ci 9547db96d56Sopenharmony_ci async def iter_one(): 9557db96d56Sopenharmony_ci try: 9567db96d56Sopenharmony_ci item = await ai.__anext__() 9577db96d56Sopenharmony_ci except StopAsyncIteration: 9587db96d56Sopenharmony_ci return 9597db96d56Sopenharmony_ci if item == 'THREE': 9607db96d56Sopenharmony_ci status['stopped'] = True 9617db96d56Sopenharmony_ci return 9627db96d56Sopenharmony_ci asyncio.create_task(iter_one()) 9637db96d56Sopenharmony_ci 9647db96d56Sopenharmony_ci asyncio.create_task(iter_one()) 9657db96d56Sopenharmony_ci return status 9667db96d56Sopenharmony_ci 9677db96d56Sopenharmony_ci def test_asyncgen_finalization_by_gc(self): 9687db96d56Sopenharmony_ci # Async generators should be finalized when garbage collected. 9697db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 9707db96d56Sopenharmony_ci self.loop._write_to_self = mock.Mock() 9717db96d56Sopenharmony_ci with support.disable_gc(): 9727db96d56Sopenharmony_ci status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 9737db96d56Sopenharmony_ci while not status['stopped']: 9747db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 9757db96d56Sopenharmony_ci self.assertTrue(status['started']) 9767db96d56Sopenharmony_ci self.assertTrue(status['stopped']) 9777db96d56Sopenharmony_ci self.assertFalse(status['finalized']) 9787db96d56Sopenharmony_ci support.gc_collect() 9797db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 9807db96d56Sopenharmony_ci self.assertTrue(status['finalized']) 9817db96d56Sopenharmony_ci 9827db96d56Sopenharmony_ci def test_asyncgen_finalization_by_gc_in_other_thread(self): 9837db96d56Sopenharmony_ci # Python issue 34769: If garbage collector runs in another 9847db96d56Sopenharmony_ci # thread, async generators will not finalize in debug 9857db96d56Sopenharmony_ci # mode. 9867db96d56Sopenharmony_ci self.loop._process_events = mock.Mock() 9877db96d56Sopenharmony_ci self.loop._write_to_self = mock.Mock() 9887db96d56Sopenharmony_ci self.loop.set_debug(True) 9897db96d56Sopenharmony_ci with support.disable_gc(): 9907db96d56Sopenharmony_ci status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen()) 9917db96d56Sopenharmony_ci while not status['stopped']: 9927db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 9937db96d56Sopenharmony_ci self.assertTrue(status['started']) 9947db96d56Sopenharmony_ci self.assertTrue(status['stopped']) 9957db96d56Sopenharmony_ci self.assertFalse(status['finalized']) 9967db96d56Sopenharmony_ci self.loop.run_until_complete( 9977db96d56Sopenharmony_ci self.loop.run_in_executor(None, support.gc_collect)) 9987db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 9997db96d56Sopenharmony_ci self.assertTrue(status['finalized']) 10007db96d56Sopenharmony_ci 10017db96d56Sopenharmony_ci 10027db96d56Sopenharmony_ciclass MyProto(asyncio.Protocol): 10037db96d56Sopenharmony_ci done = None 10047db96d56Sopenharmony_ci 10057db96d56Sopenharmony_ci def __init__(self, create_future=False): 10067db96d56Sopenharmony_ci self.state = 'INITIAL' 10077db96d56Sopenharmony_ci self.nbytes = 0 10087db96d56Sopenharmony_ci if create_future: 10097db96d56Sopenharmony_ci self.done = asyncio.get_running_loop().create_future() 10107db96d56Sopenharmony_ci 10117db96d56Sopenharmony_ci def _assert_state(self, *expected): 10127db96d56Sopenharmony_ci if self.state not in expected: 10137db96d56Sopenharmony_ci raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 10147db96d56Sopenharmony_ci 10157db96d56Sopenharmony_ci def connection_made(self, transport): 10167db96d56Sopenharmony_ci self.transport = transport 10177db96d56Sopenharmony_ci self._assert_state('INITIAL') 10187db96d56Sopenharmony_ci self.state = 'CONNECTED' 10197db96d56Sopenharmony_ci transport.write(b'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n') 10207db96d56Sopenharmony_ci 10217db96d56Sopenharmony_ci def data_received(self, data): 10227db96d56Sopenharmony_ci self._assert_state('CONNECTED') 10237db96d56Sopenharmony_ci self.nbytes += len(data) 10247db96d56Sopenharmony_ci 10257db96d56Sopenharmony_ci def eof_received(self): 10267db96d56Sopenharmony_ci self._assert_state('CONNECTED') 10277db96d56Sopenharmony_ci self.state = 'EOF' 10287db96d56Sopenharmony_ci 10297db96d56Sopenharmony_ci def connection_lost(self, exc): 10307db96d56Sopenharmony_ci self._assert_state('CONNECTED', 'EOF') 10317db96d56Sopenharmony_ci self.state = 'CLOSED' 10327db96d56Sopenharmony_ci if self.done: 10337db96d56Sopenharmony_ci self.done.set_result(None) 10347db96d56Sopenharmony_ci 10357db96d56Sopenharmony_ci 10367db96d56Sopenharmony_ciclass MyDatagramProto(asyncio.DatagramProtocol): 10377db96d56Sopenharmony_ci done = None 10387db96d56Sopenharmony_ci 10397db96d56Sopenharmony_ci def __init__(self, create_future=False, loop=None): 10407db96d56Sopenharmony_ci self.state = 'INITIAL' 10417db96d56Sopenharmony_ci self.nbytes = 0 10427db96d56Sopenharmony_ci if create_future: 10437db96d56Sopenharmony_ci self.done = loop.create_future() 10447db96d56Sopenharmony_ci 10457db96d56Sopenharmony_ci def _assert_state(self, expected): 10467db96d56Sopenharmony_ci if self.state != expected: 10477db96d56Sopenharmony_ci raise AssertionError(f'state: {self.state!r}, expected: {expected!r}') 10487db96d56Sopenharmony_ci 10497db96d56Sopenharmony_ci def connection_made(self, transport): 10507db96d56Sopenharmony_ci self.transport = transport 10517db96d56Sopenharmony_ci self._assert_state('INITIAL') 10527db96d56Sopenharmony_ci self.state = 'INITIALIZED' 10537db96d56Sopenharmony_ci 10547db96d56Sopenharmony_ci def datagram_received(self, data, addr): 10557db96d56Sopenharmony_ci self._assert_state('INITIALIZED') 10567db96d56Sopenharmony_ci self.nbytes += len(data) 10577db96d56Sopenharmony_ci 10587db96d56Sopenharmony_ci def error_received(self, exc): 10597db96d56Sopenharmony_ci self._assert_state('INITIALIZED') 10607db96d56Sopenharmony_ci 10617db96d56Sopenharmony_ci def connection_lost(self, exc): 10627db96d56Sopenharmony_ci self._assert_state('INITIALIZED') 10637db96d56Sopenharmony_ci self.state = 'CLOSED' 10647db96d56Sopenharmony_ci if self.done: 10657db96d56Sopenharmony_ci self.done.set_result(None) 10667db96d56Sopenharmony_ci 10677db96d56Sopenharmony_ci 10687db96d56Sopenharmony_ciclass BaseEventLoopWithSelectorTests(test_utils.TestCase): 10697db96d56Sopenharmony_ci 10707db96d56Sopenharmony_ci def setUp(self): 10717db96d56Sopenharmony_ci super().setUp() 10727db96d56Sopenharmony_ci self.loop = asyncio.SelectorEventLoop() 10737db96d56Sopenharmony_ci self.set_event_loop(self.loop) 10747db96d56Sopenharmony_ci 10757db96d56Sopenharmony_ci @mock.patch('socket.getnameinfo') 10767db96d56Sopenharmony_ci def test_getnameinfo(self, m_gai): 10777db96d56Sopenharmony_ci m_gai.side_effect = lambda *args: 42 10787db96d56Sopenharmony_ci r = self.loop.run_until_complete(self.loop.getnameinfo(('abc', 123))) 10797db96d56Sopenharmony_ci self.assertEqual(r, 42) 10807db96d56Sopenharmony_ci 10817db96d56Sopenharmony_ci @patch_socket 10827db96d56Sopenharmony_ci def test_create_connection_multiple_errors(self, m_socket): 10837db96d56Sopenharmony_ci 10847db96d56Sopenharmony_ci class MyProto(asyncio.Protocol): 10857db96d56Sopenharmony_ci pass 10867db96d56Sopenharmony_ci 10877db96d56Sopenharmony_ci async def getaddrinfo(*args, **kw): 10887db96d56Sopenharmony_ci return [(2, 1, 6, '', ('107.6.106.82', 80)), 10897db96d56Sopenharmony_ci (2, 1, 6, '', ('107.6.106.82', 80))] 10907db96d56Sopenharmony_ci 10917db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 10927db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 10937db96d56Sopenharmony_ci 10947db96d56Sopenharmony_ci idx = -1 10957db96d56Sopenharmony_ci errors = ['err1', 'err2'] 10967db96d56Sopenharmony_ci 10977db96d56Sopenharmony_ci def _socket(*args, **kw): 10987db96d56Sopenharmony_ci nonlocal idx, errors 10997db96d56Sopenharmony_ci idx += 1 11007db96d56Sopenharmony_ci raise OSError(errors[idx]) 11017db96d56Sopenharmony_ci 11027db96d56Sopenharmony_ci m_socket.socket = _socket 11037db96d56Sopenharmony_ci 11047db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 11057db96d56Sopenharmony_ci 11067db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, 'example.com', 80) 11077db96d56Sopenharmony_ci with self.assertRaises(OSError) as cm: 11087db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 11097db96d56Sopenharmony_ci 11107db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), 'Multiple exceptions: err1, err2') 11117db96d56Sopenharmony_ci 11127db96d56Sopenharmony_ci @patch_socket 11137db96d56Sopenharmony_ci def test_create_connection_timeout(self, m_socket): 11147db96d56Sopenharmony_ci # Ensure that the socket is closed on timeout 11157db96d56Sopenharmony_ci sock = mock.Mock() 11167db96d56Sopenharmony_ci m_socket.socket.return_value = sock 11177db96d56Sopenharmony_ci 11187db96d56Sopenharmony_ci def getaddrinfo(*args, **kw): 11197db96d56Sopenharmony_ci fut = self.loop.create_future() 11207db96d56Sopenharmony_ci addr = (socket.AF_INET, socket.SOCK_STREAM, 0, '', 11217db96d56Sopenharmony_ci ('127.0.0.1', 80)) 11227db96d56Sopenharmony_ci fut.set_result([addr]) 11237db96d56Sopenharmony_ci return fut 11247db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo 11257db96d56Sopenharmony_ci 11267db96d56Sopenharmony_ci with mock.patch.object(self.loop, 'sock_connect', 11277db96d56Sopenharmony_ci side_effect=asyncio.TimeoutError): 11287db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, '127.0.0.1', 80) 11297db96d56Sopenharmony_ci with self.assertRaises(asyncio.TimeoutError): 11307db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 11317db96d56Sopenharmony_ci self.assertTrue(sock.close.called) 11327db96d56Sopenharmony_ci 11337db96d56Sopenharmony_ci def test_create_connection_host_port_sock(self): 11347db96d56Sopenharmony_ci coro = self.loop.create_connection( 11357db96d56Sopenharmony_ci MyProto, 'example.com', 80, sock=object()) 11367db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 11377db96d56Sopenharmony_ci 11387db96d56Sopenharmony_ci def test_create_connection_wrong_sock(self): 11397db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 11407db96d56Sopenharmony_ci with sock: 11417db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, sock=sock) 11427db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 11437db96d56Sopenharmony_ci 'A Stream Socket was expected'): 11447db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 11457db96d56Sopenharmony_ci 11467db96d56Sopenharmony_ci def test_create_server_wrong_sock(self): 11477db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 11487db96d56Sopenharmony_ci with sock: 11497db96d56Sopenharmony_ci coro = self.loop.create_server(MyProto, sock=sock) 11507db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 11517db96d56Sopenharmony_ci 'A Stream Socket was expected'): 11527db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 11537db96d56Sopenharmony_ci 11547db96d56Sopenharmony_ci def test_create_server_ssl_timeout_for_plain_socket(self): 11557db96d56Sopenharmony_ci coro = self.loop.create_server( 11567db96d56Sopenharmony_ci MyProto, 'example.com', 80, ssl_handshake_timeout=1) 11577db96d56Sopenharmony_ci with self.assertRaisesRegex( 11587db96d56Sopenharmony_ci ValueError, 11597db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl'): 11607db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 11617db96d56Sopenharmony_ci 11627db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 11637db96d56Sopenharmony_ci 'no socket.SOCK_NONBLOCK (linux only)') 11647db96d56Sopenharmony_ci def test_create_server_stream_bittype(self): 11657db96d56Sopenharmony_ci sock = socket.socket( 11667db96d56Sopenharmony_ci socket.AF_INET, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 11677db96d56Sopenharmony_ci with sock: 11687db96d56Sopenharmony_ci coro = self.loop.create_server(lambda: None, sock=sock) 11697db96d56Sopenharmony_ci srv = self.loop.run_until_complete(coro) 11707db96d56Sopenharmony_ci srv.close() 11717db96d56Sopenharmony_ci self.loop.run_until_complete(srv.wait_closed()) 11727db96d56Sopenharmony_ci 11737db96d56Sopenharmony_ci @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 11747db96d56Sopenharmony_ci def test_create_server_ipv6(self): 11757db96d56Sopenharmony_ci async def main(): 11767db96d56Sopenharmony_ci srv = await asyncio.start_server(lambda: None, '::1', 0) 11777db96d56Sopenharmony_ci try: 11787db96d56Sopenharmony_ci self.assertGreater(len(srv.sockets), 0) 11797db96d56Sopenharmony_ci finally: 11807db96d56Sopenharmony_ci srv.close() 11817db96d56Sopenharmony_ci await srv.wait_closed() 11827db96d56Sopenharmony_ci 11837db96d56Sopenharmony_ci try: 11847db96d56Sopenharmony_ci self.loop.run_until_complete(main()) 11857db96d56Sopenharmony_ci except OSError as ex: 11867db96d56Sopenharmony_ci if (hasattr(errno, 'EADDRNOTAVAIL') and 11877db96d56Sopenharmony_ci ex.errno == errno.EADDRNOTAVAIL): 11887db96d56Sopenharmony_ci self.skipTest('failed to bind to ::1') 11897db96d56Sopenharmony_ci else: 11907db96d56Sopenharmony_ci raise 11917db96d56Sopenharmony_ci 11927db96d56Sopenharmony_ci def test_create_datagram_endpoint_wrong_sock(self): 11937db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET) 11947db96d56Sopenharmony_ci with sock: 11957db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint(MyProto, sock=sock) 11967db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 11977db96d56Sopenharmony_ci 'A UDP Socket was expected'): 11987db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 11997db96d56Sopenharmony_ci 12007db96d56Sopenharmony_ci def test_create_connection_no_host_port_sock(self): 12017db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto) 12027db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 12037db96d56Sopenharmony_ci 12047db96d56Sopenharmony_ci def test_create_connection_no_getaddrinfo(self): 12057db96d56Sopenharmony_ci async def getaddrinfo(*args, **kw): 12067db96d56Sopenharmony_ci return [] 12077db96d56Sopenharmony_ci 12087db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 12097db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 12107db96d56Sopenharmony_ci 12117db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 12127db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, 'example.com', 80) 12137db96d56Sopenharmony_ci self.assertRaises( 12147db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 12157db96d56Sopenharmony_ci 12167db96d56Sopenharmony_ci def test_create_connection_connect_err(self): 12177db96d56Sopenharmony_ci async def getaddrinfo(*args, **kw): 12187db96d56Sopenharmony_ci return [(2, 1, 6, '', ('107.6.106.82', 80))] 12197db96d56Sopenharmony_ci 12207db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 12217db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 12227db96d56Sopenharmony_ci 12237db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 12247db96d56Sopenharmony_ci self.loop.sock_connect = mock.Mock() 12257db96d56Sopenharmony_ci self.loop.sock_connect.side_effect = OSError 12267db96d56Sopenharmony_ci 12277db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, 'example.com', 80) 12287db96d56Sopenharmony_ci self.assertRaises( 12297db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 12307db96d56Sopenharmony_ci 12317db96d56Sopenharmony_ci def test_create_connection_multiple(self): 12327db96d56Sopenharmony_ci async def getaddrinfo(*args, **kw): 12337db96d56Sopenharmony_ci return [(2, 1, 6, '', ('0.0.0.1', 80)), 12347db96d56Sopenharmony_ci (2, 1, 6, '', ('0.0.0.2', 80))] 12357db96d56Sopenharmony_ci 12367db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 12377db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 12387db96d56Sopenharmony_ci 12397db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 12407db96d56Sopenharmony_ci self.loop.sock_connect = mock.Mock() 12417db96d56Sopenharmony_ci self.loop.sock_connect.side_effect = OSError 12427db96d56Sopenharmony_ci 12437db96d56Sopenharmony_ci coro = self.loop.create_connection( 12447db96d56Sopenharmony_ci MyProto, 'example.com', 80, family=socket.AF_INET) 12457db96d56Sopenharmony_ci with self.assertRaises(OSError): 12467db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 12477db96d56Sopenharmony_ci 12487db96d56Sopenharmony_ci @patch_socket 12497db96d56Sopenharmony_ci def test_create_connection_multiple_errors_local_addr(self, m_socket): 12507db96d56Sopenharmony_ci 12517db96d56Sopenharmony_ci def bind(addr): 12527db96d56Sopenharmony_ci if addr[0] == '0.0.0.1': 12537db96d56Sopenharmony_ci err = OSError('Err') 12547db96d56Sopenharmony_ci err.strerror = 'Err' 12557db96d56Sopenharmony_ci raise err 12567db96d56Sopenharmony_ci 12577db96d56Sopenharmony_ci m_socket.socket.return_value.bind = bind 12587db96d56Sopenharmony_ci 12597db96d56Sopenharmony_ci async def getaddrinfo(*args, **kw): 12607db96d56Sopenharmony_ci return [(2, 1, 6, '', ('0.0.0.1', 80)), 12617db96d56Sopenharmony_ci (2, 1, 6, '', ('0.0.0.2', 80))] 12627db96d56Sopenharmony_ci 12637db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 12647db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 12657db96d56Sopenharmony_ci 12667db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 12677db96d56Sopenharmony_ci self.loop.sock_connect = mock.Mock() 12687db96d56Sopenharmony_ci self.loop.sock_connect.side_effect = OSError('Err2') 12697db96d56Sopenharmony_ci 12707db96d56Sopenharmony_ci coro = self.loop.create_connection( 12717db96d56Sopenharmony_ci MyProto, 'example.com', 80, family=socket.AF_INET, 12727db96d56Sopenharmony_ci local_addr=(None, 8080)) 12737db96d56Sopenharmony_ci with self.assertRaises(OSError) as cm: 12747db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 12757db96d56Sopenharmony_ci 12767db96d56Sopenharmony_ci self.assertTrue(str(cm.exception).startswith('Multiple exceptions: ')) 12777db96d56Sopenharmony_ci self.assertTrue(m_socket.socket.return_value.close.called) 12787db96d56Sopenharmony_ci 12797db96d56Sopenharmony_ci def _test_create_connection_ip_addr(self, m_socket, allow_inet_pton): 12807db96d56Sopenharmony_ci # Test the fallback code, even if this system has inet_pton. 12817db96d56Sopenharmony_ci if not allow_inet_pton: 12827db96d56Sopenharmony_ci del m_socket.inet_pton 12837db96d56Sopenharmony_ci 12847db96d56Sopenharmony_ci m_socket.getaddrinfo = socket.getaddrinfo 12857db96d56Sopenharmony_ci sock = m_socket.socket.return_value 12867db96d56Sopenharmony_ci 12877db96d56Sopenharmony_ci self.loop._add_reader = mock.Mock() 12887db96d56Sopenharmony_ci self.loop._add_writer = mock.Mock() 12897db96d56Sopenharmony_ci 12907db96d56Sopenharmony_ci coro = self.loop.create_connection(asyncio.Protocol, '1.2.3.4', 80) 12917db96d56Sopenharmony_ci t, p = self.loop.run_until_complete(coro) 12927db96d56Sopenharmony_ci try: 12937db96d56Sopenharmony_ci sock.connect.assert_called_with(('1.2.3.4', 80)) 12947db96d56Sopenharmony_ci _, kwargs = m_socket.socket.call_args 12957db96d56Sopenharmony_ci self.assertEqual(kwargs['family'], m_socket.AF_INET) 12967db96d56Sopenharmony_ci self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 12977db96d56Sopenharmony_ci finally: 12987db96d56Sopenharmony_ci t.close() 12997db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) # allow transport to close 13007db96d56Sopenharmony_ci 13017db96d56Sopenharmony_ci if socket_helper.IPV6_ENABLED: 13027db96d56Sopenharmony_ci sock.family = socket.AF_INET6 13037db96d56Sopenharmony_ci coro = self.loop.create_connection(asyncio.Protocol, '::1', 80) 13047db96d56Sopenharmony_ci t, p = self.loop.run_until_complete(coro) 13057db96d56Sopenharmony_ci try: 13067db96d56Sopenharmony_ci # Without inet_pton we use getaddrinfo, which transforms 13077db96d56Sopenharmony_ci # ('::1', 80) to ('::1', 80, 0, 0). The last 0s are flow info, 13087db96d56Sopenharmony_ci # scope id. 13097db96d56Sopenharmony_ci [address] = sock.connect.call_args[0] 13107db96d56Sopenharmony_ci host, port = address[:2] 13117db96d56Sopenharmony_ci self.assertRegex(host, r'::(0\.)*1') 13127db96d56Sopenharmony_ci self.assertEqual(port, 80) 13137db96d56Sopenharmony_ci _, kwargs = m_socket.socket.call_args 13147db96d56Sopenharmony_ci self.assertEqual(kwargs['family'], m_socket.AF_INET6) 13157db96d56Sopenharmony_ci self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 13167db96d56Sopenharmony_ci finally: 13177db96d56Sopenharmony_ci t.close() 13187db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) # allow transport to close 13197db96d56Sopenharmony_ci 13207db96d56Sopenharmony_ci @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'no IPv6 support') 13217db96d56Sopenharmony_ci @unittest.skipIf(sys.platform.startswith('aix'), 13227db96d56Sopenharmony_ci "bpo-25545: IPv6 scope id and getaddrinfo() behave differently on AIX") 13237db96d56Sopenharmony_ci @patch_socket 13247db96d56Sopenharmony_ci def test_create_connection_ipv6_scope(self, m_socket): 13257db96d56Sopenharmony_ci m_socket.getaddrinfo = socket.getaddrinfo 13267db96d56Sopenharmony_ci sock = m_socket.socket.return_value 13277db96d56Sopenharmony_ci sock.family = socket.AF_INET6 13287db96d56Sopenharmony_ci 13297db96d56Sopenharmony_ci self.loop._add_reader = mock.Mock() 13307db96d56Sopenharmony_ci self.loop._add_writer = mock.Mock() 13317db96d56Sopenharmony_ci 13327db96d56Sopenharmony_ci coro = self.loop.create_connection(asyncio.Protocol, 'fe80::1%1', 80) 13337db96d56Sopenharmony_ci t, p = self.loop.run_until_complete(coro) 13347db96d56Sopenharmony_ci try: 13357db96d56Sopenharmony_ci sock.connect.assert_called_with(('fe80::1', 80, 0, 1)) 13367db96d56Sopenharmony_ci _, kwargs = m_socket.socket.call_args 13377db96d56Sopenharmony_ci self.assertEqual(kwargs['family'], m_socket.AF_INET6) 13387db96d56Sopenharmony_ci self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 13397db96d56Sopenharmony_ci finally: 13407db96d56Sopenharmony_ci t.close() 13417db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) # allow transport to close 13427db96d56Sopenharmony_ci 13437db96d56Sopenharmony_ci @patch_socket 13447db96d56Sopenharmony_ci def test_create_connection_ip_addr(self, m_socket): 13457db96d56Sopenharmony_ci self._test_create_connection_ip_addr(m_socket, True) 13467db96d56Sopenharmony_ci 13477db96d56Sopenharmony_ci @patch_socket 13487db96d56Sopenharmony_ci def test_create_connection_no_inet_pton(self, m_socket): 13497db96d56Sopenharmony_ci self._test_create_connection_ip_addr(m_socket, False) 13507db96d56Sopenharmony_ci 13517db96d56Sopenharmony_ci @patch_socket 13527db96d56Sopenharmony_ci def test_create_connection_service_name(self, m_socket): 13537db96d56Sopenharmony_ci m_socket.getaddrinfo = socket.getaddrinfo 13547db96d56Sopenharmony_ci sock = m_socket.socket.return_value 13557db96d56Sopenharmony_ci 13567db96d56Sopenharmony_ci self.loop._add_reader = mock.Mock() 13577db96d56Sopenharmony_ci self.loop._add_writer = mock.Mock() 13587db96d56Sopenharmony_ci 13597db96d56Sopenharmony_ci for service, port in ('http', 80), (b'http', 80): 13607db96d56Sopenharmony_ci coro = self.loop.create_connection(asyncio.Protocol, 13617db96d56Sopenharmony_ci '127.0.0.1', service) 13627db96d56Sopenharmony_ci 13637db96d56Sopenharmony_ci t, p = self.loop.run_until_complete(coro) 13647db96d56Sopenharmony_ci try: 13657db96d56Sopenharmony_ci sock.connect.assert_called_with(('127.0.0.1', port)) 13667db96d56Sopenharmony_ci _, kwargs = m_socket.socket.call_args 13677db96d56Sopenharmony_ci self.assertEqual(kwargs['family'], m_socket.AF_INET) 13687db96d56Sopenharmony_ci self.assertEqual(kwargs['type'], m_socket.SOCK_STREAM) 13697db96d56Sopenharmony_ci finally: 13707db96d56Sopenharmony_ci t.close() 13717db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) # allow transport to close 13727db96d56Sopenharmony_ci 13737db96d56Sopenharmony_ci for service in 'nonsense', b'nonsense': 13747db96d56Sopenharmony_ci coro = self.loop.create_connection(asyncio.Protocol, 13757db96d56Sopenharmony_ci '127.0.0.1', service) 13767db96d56Sopenharmony_ci 13777db96d56Sopenharmony_ci with self.assertRaises(OSError): 13787db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 13797db96d56Sopenharmony_ci 13807db96d56Sopenharmony_ci def test_create_connection_no_local_addr(self): 13817db96d56Sopenharmony_ci async def getaddrinfo(host, *args, **kw): 13827db96d56Sopenharmony_ci if host == 'example.com': 13837db96d56Sopenharmony_ci return [(2, 1, 6, '', ('107.6.106.82', 80)), 13847db96d56Sopenharmony_ci (2, 1, 6, '', ('107.6.106.82', 80))] 13857db96d56Sopenharmony_ci else: 13867db96d56Sopenharmony_ci return [] 13877db96d56Sopenharmony_ci 13887db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 13897db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 13907db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 13917db96d56Sopenharmony_ci 13927db96d56Sopenharmony_ci coro = self.loop.create_connection( 13937db96d56Sopenharmony_ci MyProto, 'example.com', 80, family=socket.AF_INET, 13947db96d56Sopenharmony_ci local_addr=(None, 8080)) 13957db96d56Sopenharmony_ci self.assertRaises( 13967db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 13977db96d56Sopenharmony_ci 13987db96d56Sopenharmony_ci @patch_socket 13997db96d56Sopenharmony_ci def test_create_connection_bluetooth(self, m_socket): 14007db96d56Sopenharmony_ci # See http://bugs.python.org/issue27136, fallback to getaddrinfo when 14017db96d56Sopenharmony_ci # we can't recognize an address is resolved, e.g. a Bluetooth address. 14027db96d56Sopenharmony_ci addr = ('00:01:02:03:04:05', 1) 14037db96d56Sopenharmony_ci 14047db96d56Sopenharmony_ci def getaddrinfo(host, port, *args, **kw): 14057db96d56Sopenharmony_ci self.assertEqual((host, port), addr) 14067db96d56Sopenharmony_ci return [(999, 1, 999, '', (addr, 1))] 14077db96d56Sopenharmony_ci 14087db96d56Sopenharmony_ci m_socket.getaddrinfo = getaddrinfo 14097db96d56Sopenharmony_ci sock = m_socket.socket() 14107db96d56Sopenharmony_ci coro = self.loop.sock_connect(sock, addr) 14117db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 14127db96d56Sopenharmony_ci 14137db96d56Sopenharmony_ci def test_create_connection_ssl_server_hostname_default(self): 14147db96d56Sopenharmony_ci self.loop.getaddrinfo = mock.Mock() 14157db96d56Sopenharmony_ci 14167db96d56Sopenharmony_ci def mock_getaddrinfo(*args, **kwds): 14177db96d56Sopenharmony_ci f = self.loop.create_future() 14187db96d56Sopenharmony_ci f.set_result([(socket.AF_INET, socket.SOCK_STREAM, 14197db96d56Sopenharmony_ci socket.SOL_TCP, '', ('1.2.3.4', 80))]) 14207db96d56Sopenharmony_ci return f 14217db96d56Sopenharmony_ci 14227db96d56Sopenharmony_ci self.loop.getaddrinfo.side_effect = mock_getaddrinfo 14237db96d56Sopenharmony_ci self.loop.sock_connect = mock.Mock() 14247db96d56Sopenharmony_ci self.loop.sock_connect.return_value = self.loop.create_future() 14257db96d56Sopenharmony_ci self.loop.sock_connect.return_value.set_result(None) 14267db96d56Sopenharmony_ci self.loop._make_ssl_transport = mock.Mock() 14277db96d56Sopenharmony_ci 14287db96d56Sopenharmony_ci class _SelectorTransportMock: 14297db96d56Sopenharmony_ci _sock = None 14307db96d56Sopenharmony_ci 14317db96d56Sopenharmony_ci def get_extra_info(self, key): 14327db96d56Sopenharmony_ci return mock.Mock() 14337db96d56Sopenharmony_ci 14347db96d56Sopenharmony_ci def close(self): 14357db96d56Sopenharmony_ci self._sock.close() 14367db96d56Sopenharmony_ci 14377db96d56Sopenharmony_ci def mock_make_ssl_transport(sock, protocol, sslcontext, waiter, 14387db96d56Sopenharmony_ci **kwds): 14397db96d56Sopenharmony_ci waiter.set_result(None) 14407db96d56Sopenharmony_ci transport = _SelectorTransportMock() 14417db96d56Sopenharmony_ci transport._sock = sock 14427db96d56Sopenharmony_ci return transport 14437db96d56Sopenharmony_ci 14447db96d56Sopenharmony_ci self.loop._make_ssl_transport.side_effect = mock_make_ssl_transport 14457db96d56Sopenharmony_ci ANY = mock.ANY 14467db96d56Sopenharmony_ci handshake_timeout = object() 14477db96d56Sopenharmony_ci shutdown_timeout = object() 14487db96d56Sopenharmony_ci # First try the default server_hostname. 14497db96d56Sopenharmony_ci self.loop._make_ssl_transport.reset_mock() 14507db96d56Sopenharmony_ci coro = self.loop.create_connection( 14517db96d56Sopenharmony_ci MyProto, 'python.org', 80, ssl=True, 14527db96d56Sopenharmony_ci ssl_handshake_timeout=handshake_timeout, 14537db96d56Sopenharmony_ci ssl_shutdown_timeout=shutdown_timeout) 14547db96d56Sopenharmony_ci transport, _ = self.loop.run_until_complete(coro) 14557db96d56Sopenharmony_ci transport.close() 14567db96d56Sopenharmony_ci self.loop._make_ssl_transport.assert_called_with( 14577db96d56Sopenharmony_ci ANY, ANY, ANY, ANY, 14587db96d56Sopenharmony_ci server_side=False, 14597db96d56Sopenharmony_ci server_hostname='python.org', 14607db96d56Sopenharmony_ci ssl_handshake_timeout=handshake_timeout, 14617db96d56Sopenharmony_ci ssl_shutdown_timeout=shutdown_timeout) 14627db96d56Sopenharmony_ci # Next try an explicit server_hostname. 14637db96d56Sopenharmony_ci self.loop._make_ssl_transport.reset_mock() 14647db96d56Sopenharmony_ci coro = self.loop.create_connection( 14657db96d56Sopenharmony_ci MyProto, 'python.org', 80, ssl=True, 14667db96d56Sopenharmony_ci server_hostname='perl.com', 14677db96d56Sopenharmony_ci ssl_handshake_timeout=handshake_timeout, 14687db96d56Sopenharmony_ci ssl_shutdown_timeout=shutdown_timeout) 14697db96d56Sopenharmony_ci transport, _ = self.loop.run_until_complete(coro) 14707db96d56Sopenharmony_ci transport.close() 14717db96d56Sopenharmony_ci self.loop._make_ssl_transport.assert_called_with( 14727db96d56Sopenharmony_ci ANY, ANY, ANY, ANY, 14737db96d56Sopenharmony_ci server_side=False, 14747db96d56Sopenharmony_ci server_hostname='perl.com', 14757db96d56Sopenharmony_ci ssl_handshake_timeout=handshake_timeout, 14767db96d56Sopenharmony_ci ssl_shutdown_timeout=shutdown_timeout) 14777db96d56Sopenharmony_ci # Finally try an explicit empty server_hostname. 14787db96d56Sopenharmony_ci self.loop._make_ssl_transport.reset_mock() 14797db96d56Sopenharmony_ci coro = self.loop.create_connection( 14807db96d56Sopenharmony_ci MyProto, 'python.org', 80, ssl=True, 14817db96d56Sopenharmony_ci server_hostname='', 14827db96d56Sopenharmony_ci ssl_handshake_timeout=handshake_timeout, 14837db96d56Sopenharmony_ci ssl_shutdown_timeout=shutdown_timeout) 14847db96d56Sopenharmony_ci transport, _ = self.loop.run_until_complete(coro) 14857db96d56Sopenharmony_ci transport.close() 14867db96d56Sopenharmony_ci self.loop._make_ssl_transport.assert_called_with( 14877db96d56Sopenharmony_ci ANY, ANY, ANY, ANY, 14887db96d56Sopenharmony_ci server_side=False, 14897db96d56Sopenharmony_ci server_hostname='', 14907db96d56Sopenharmony_ci ssl_handshake_timeout=handshake_timeout, 14917db96d56Sopenharmony_ci ssl_shutdown_timeout=shutdown_timeout) 14927db96d56Sopenharmony_ci 14937db96d56Sopenharmony_ci def test_create_connection_no_ssl_server_hostname_errors(self): 14947db96d56Sopenharmony_ci # When not using ssl, server_hostname must be None. 14957db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, 'python.org', 80, 14967db96d56Sopenharmony_ci server_hostname='') 14977db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 14987db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, 'python.org', 80, 14997db96d56Sopenharmony_ci server_hostname='python.org') 15007db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 15017db96d56Sopenharmony_ci 15027db96d56Sopenharmony_ci def test_create_connection_ssl_server_hostname_errors(self): 15037db96d56Sopenharmony_ci # When using ssl, server_hostname may be None if host is non-empty. 15047db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, '', 80, ssl=True) 15057db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 15067db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, None, 80, ssl=True) 15077db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 15087db96d56Sopenharmony_ci sock = socket.socket() 15097db96d56Sopenharmony_ci coro = self.loop.create_connection(MyProto, None, None, 15107db96d56Sopenharmony_ci ssl=True, sock=sock) 15117db96d56Sopenharmony_ci self.addCleanup(sock.close) 15127db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 15137db96d56Sopenharmony_ci 15147db96d56Sopenharmony_ci def test_create_connection_ssl_timeout_for_plain_socket(self): 15157db96d56Sopenharmony_ci coro = self.loop.create_connection( 15167db96d56Sopenharmony_ci MyProto, 'example.com', 80, ssl_handshake_timeout=1) 15177db96d56Sopenharmony_ci with self.assertRaisesRegex( 15187db96d56Sopenharmony_ci ValueError, 15197db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl'): 15207db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 15217db96d56Sopenharmony_ci 15227db96d56Sopenharmony_ci def test_create_server_empty_host(self): 15237db96d56Sopenharmony_ci # if host is empty string use None instead 15247db96d56Sopenharmony_ci host = object() 15257db96d56Sopenharmony_ci 15267db96d56Sopenharmony_ci async def getaddrinfo(*args, **kw): 15277db96d56Sopenharmony_ci nonlocal host 15287db96d56Sopenharmony_ci host = args[0] 15297db96d56Sopenharmony_ci return [] 15307db96d56Sopenharmony_ci 15317db96d56Sopenharmony_ci def getaddrinfo_task(*args, **kwds): 15327db96d56Sopenharmony_ci return self.loop.create_task(getaddrinfo(*args, **kwds)) 15337db96d56Sopenharmony_ci 15347db96d56Sopenharmony_ci self.loop.getaddrinfo = getaddrinfo_task 15357db96d56Sopenharmony_ci fut = self.loop.create_server(MyProto, '', 0) 15367db96d56Sopenharmony_ci self.assertRaises(OSError, self.loop.run_until_complete, fut) 15377db96d56Sopenharmony_ci self.assertIsNone(host) 15387db96d56Sopenharmony_ci 15397db96d56Sopenharmony_ci def test_create_server_host_port_sock(self): 15407db96d56Sopenharmony_ci fut = self.loop.create_server( 15417db96d56Sopenharmony_ci MyProto, '0.0.0.0', 0, sock=object()) 15427db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 15437db96d56Sopenharmony_ci 15447db96d56Sopenharmony_ci def test_create_server_no_host_port_sock(self): 15457db96d56Sopenharmony_ci fut = self.loop.create_server(MyProto) 15467db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 15477db96d56Sopenharmony_ci 15487db96d56Sopenharmony_ci def test_create_server_no_getaddrinfo(self): 15497db96d56Sopenharmony_ci getaddrinfo = self.loop.getaddrinfo = mock.Mock() 15507db96d56Sopenharmony_ci getaddrinfo.return_value = self.loop.create_future() 15517db96d56Sopenharmony_ci getaddrinfo.return_value.set_result(None) 15527db96d56Sopenharmony_ci 15537db96d56Sopenharmony_ci f = self.loop.create_server(MyProto, 'python.org', 0) 15547db96d56Sopenharmony_ci self.assertRaises(OSError, self.loop.run_until_complete, f) 15557db96d56Sopenharmony_ci 15567db96d56Sopenharmony_ci @patch_socket 15577db96d56Sopenharmony_ci def test_create_server_nosoreuseport(self, m_socket): 15587db96d56Sopenharmony_ci m_socket.getaddrinfo = socket.getaddrinfo 15597db96d56Sopenharmony_ci del m_socket.SO_REUSEPORT 15607db96d56Sopenharmony_ci m_socket.socket.return_value = mock.Mock() 15617db96d56Sopenharmony_ci 15627db96d56Sopenharmony_ci f = self.loop.create_server( 15637db96d56Sopenharmony_ci MyProto, '0.0.0.0', 0, reuse_port=True) 15647db96d56Sopenharmony_ci 15657db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, f) 15667db96d56Sopenharmony_ci 15677db96d56Sopenharmony_ci @patch_socket 15687db96d56Sopenharmony_ci def test_create_server_soreuseport_only_defined(self, m_socket): 15697db96d56Sopenharmony_ci m_socket.getaddrinfo = socket.getaddrinfo 15707db96d56Sopenharmony_ci m_socket.socket.return_value = mock.Mock() 15717db96d56Sopenharmony_ci m_socket.SO_REUSEPORT = -1 15727db96d56Sopenharmony_ci 15737db96d56Sopenharmony_ci f = self.loop.create_server( 15747db96d56Sopenharmony_ci MyProto, '0.0.0.0', 0, reuse_port=True) 15757db96d56Sopenharmony_ci 15767db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, f) 15777db96d56Sopenharmony_ci 15787db96d56Sopenharmony_ci @patch_socket 15797db96d56Sopenharmony_ci def test_create_server_cant_bind(self, m_socket): 15807db96d56Sopenharmony_ci 15817db96d56Sopenharmony_ci class Err(OSError): 15827db96d56Sopenharmony_ci strerror = 'error' 15837db96d56Sopenharmony_ci 15847db96d56Sopenharmony_ci m_socket.getaddrinfo.return_value = [ 15857db96d56Sopenharmony_ci (2, 1, 6, '', ('127.0.0.1', 10100))] 15867db96d56Sopenharmony_ci m_sock = m_socket.socket.return_value = mock.Mock() 15877db96d56Sopenharmony_ci m_sock.bind.side_effect = Err 15887db96d56Sopenharmony_ci 15897db96d56Sopenharmony_ci fut = self.loop.create_server(MyProto, '0.0.0.0', 0) 15907db96d56Sopenharmony_ci self.assertRaises(OSError, self.loop.run_until_complete, fut) 15917db96d56Sopenharmony_ci self.assertTrue(m_sock.close.called) 15927db96d56Sopenharmony_ci 15937db96d56Sopenharmony_ci @patch_socket 15947db96d56Sopenharmony_ci def test_create_datagram_endpoint_no_addrinfo(self, m_socket): 15957db96d56Sopenharmony_ci m_socket.getaddrinfo.return_value = [] 15967db96d56Sopenharmony_ci 15977db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 15987db96d56Sopenharmony_ci MyDatagramProto, local_addr=('localhost', 0)) 15997db96d56Sopenharmony_ci self.assertRaises( 16007db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 16017db96d56Sopenharmony_ci 16027db96d56Sopenharmony_ci def test_create_datagram_endpoint_addr_error(self): 16037db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16047db96d56Sopenharmony_ci MyDatagramProto, local_addr='localhost') 16057db96d56Sopenharmony_ci self.assertRaises( 16067db96d56Sopenharmony_ci TypeError, self.loop.run_until_complete, coro) 16077db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16087db96d56Sopenharmony_ci MyDatagramProto, local_addr=('localhost', 1, 2, 3)) 16097db96d56Sopenharmony_ci self.assertRaises( 16107db96d56Sopenharmony_ci TypeError, self.loop.run_until_complete, coro) 16117db96d56Sopenharmony_ci 16127db96d56Sopenharmony_ci def test_create_datagram_endpoint_connect_err(self): 16137db96d56Sopenharmony_ci self.loop.sock_connect = mock.Mock() 16147db96d56Sopenharmony_ci self.loop.sock_connect.side_effect = OSError 16157db96d56Sopenharmony_ci 16167db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16177db96d56Sopenharmony_ci asyncio.DatagramProtocol, remote_addr=('127.0.0.1', 0)) 16187db96d56Sopenharmony_ci self.assertRaises( 16197db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 16207db96d56Sopenharmony_ci 16217db96d56Sopenharmony_ci def test_create_datagram_endpoint_allow_broadcast(self): 16227db96d56Sopenharmony_ci protocol = MyDatagramProto(create_future=True, loop=self.loop) 16237db96d56Sopenharmony_ci self.loop.sock_connect = sock_connect = mock.Mock() 16247db96d56Sopenharmony_ci sock_connect.return_value = [] 16257db96d56Sopenharmony_ci 16267db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16277db96d56Sopenharmony_ci lambda: protocol, 16287db96d56Sopenharmony_ci remote_addr=('127.0.0.1', 0), 16297db96d56Sopenharmony_ci allow_broadcast=True) 16307db96d56Sopenharmony_ci 16317db96d56Sopenharmony_ci transport, _ = self.loop.run_until_complete(coro) 16327db96d56Sopenharmony_ci self.assertFalse(sock_connect.called) 16337db96d56Sopenharmony_ci 16347db96d56Sopenharmony_ci transport.close() 16357db96d56Sopenharmony_ci self.loop.run_until_complete(protocol.done) 16367db96d56Sopenharmony_ci self.assertEqual('CLOSED', protocol.state) 16377db96d56Sopenharmony_ci 16387db96d56Sopenharmony_ci @patch_socket 16397db96d56Sopenharmony_ci def test_create_datagram_endpoint_socket_err(self, m_socket): 16407db96d56Sopenharmony_ci m_socket.getaddrinfo = socket.getaddrinfo 16417db96d56Sopenharmony_ci m_socket.socket.side_effect = OSError 16427db96d56Sopenharmony_ci 16437db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16447db96d56Sopenharmony_ci asyncio.DatagramProtocol, family=socket.AF_INET) 16457db96d56Sopenharmony_ci self.assertRaises( 16467db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 16477db96d56Sopenharmony_ci 16487db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16497db96d56Sopenharmony_ci asyncio.DatagramProtocol, local_addr=('127.0.0.1', 0)) 16507db96d56Sopenharmony_ci self.assertRaises( 16517db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 16527db96d56Sopenharmony_ci 16537db96d56Sopenharmony_ci @unittest.skipUnless(socket_helper.IPV6_ENABLED, 'IPv6 not supported or enabled') 16547db96d56Sopenharmony_ci def test_create_datagram_endpoint_no_matching_family(self): 16557db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16567db96d56Sopenharmony_ci asyncio.DatagramProtocol, 16577db96d56Sopenharmony_ci remote_addr=('127.0.0.1', 0), local_addr=('::1', 0)) 16587db96d56Sopenharmony_ci self.assertRaises( 16597db96d56Sopenharmony_ci ValueError, self.loop.run_until_complete, coro) 16607db96d56Sopenharmony_ci 16617db96d56Sopenharmony_ci @patch_socket 16627db96d56Sopenharmony_ci def test_create_datagram_endpoint_setblk_err(self, m_socket): 16637db96d56Sopenharmony_ci m_socket.socket.return_value.setblocking.side_effect = OSError 16647db96d56Sopenharmony_ci 16657db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16667db96d56Sopenharmony_ci asyncio.DatagramProtocol, family=socket.AF_INET) 16677db96d56Sopenharmony_ci self.assertRaises( 16687db96d56Sopenharmony_ci OSError, self.loop.run_until_complete, coro) 16697db96d56Sopenharmony_ci self.assertTrue( 16707db96d56Sopenharmony_ci m_socket.socket.return_value.close.called) 16717db96d56Sopenharmony_ci 16727db96d56Sopenharmony_ci def test_create_datagram_endpoint_noaddr_nofamily(self): 16737db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 16747db96d56Sopenharmony_ci asyncio.DatagramProtocol) 16757db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 16767db96d56Sopenharmony_ci 16777db96d56Sopenharmony_ci @patch_socket 16787db96d56Sopenharmony_ci def test_create_datagram_endpoint_cant_bind(self, m_socket): 16797db96d56Sopenharmony_ci class Err(OSError): 16807db96d56Sopenharmony_ci pass 16817db96d56Sopenharmony_ci 16827db96d56Sopenharmony_ci m_socket.getaddrinfo = socket.getaddrinfo 16837db96d56Sopenharmony_ci m_sock = m_socket.socket.return_value = mock.Mock() 16847db96d56Sopenharmony_ci m_sock.bind.side_effect = Err 16857db96d56Sopenharmony_ci 16867db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 16877db96d56Sopenharmony_ci MyDatagramProto, 16887db96d56Sopenharmony_ci local_addr=('127.0.0.1', 0), family=socket.AF_INET) 16897db96d56Sopenharmony_ci self.assertRaises(Err, self.loop.run_until_complete, fut) 16907db96d56Sopenharmony_ci self.assertTrue(m_sock.close.called) 16917db96d56Sopenharmony_ci 16927db96d56Sopenharmony_ci def test_create_datagram_endpoint_sock(self): 16937db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 16947db96d56Sopenharmony_ci sock.bind(('127.0.0.1', 0)) 16957db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 16967db96d56Sopenharmony_ci lambda: MyDatagramProto(create_future=True, loop=self.loop), 16977db96d56Sopenharmony_ci sock=sock) 16987db96d56Sopenharmony_ci transport, protocol = self.loop.run_until_complete(fut) 16997db96d56Sopenharmony_ci transport.close() 17007db96d56Sopenharmony_ci self.loop.run_until_complete(protocol.done) 17017db96d56Sopenharmony_ci self.assertEqual('CLOSED', protocol.state) 17027db96d56Sopenharmony_ci 17037db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets') 17047db96d56Sopenharmony_ci def test_create_datagram_endpoint_sock_unix(self): 17057db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17067db96d56Sopenharmony_ci lambda: MyDatagramProto(create_future=True, loop=self.loop), 17077db96d56Sopenharmony_ci family=socket.AF_UNIX) 17087db96d56Sopenharmony_ci transport, protocol = self.loop.run_until_complete(fut) 17097db96d56Sopenharmony_ci self.assertEqual(transport._sock.family, socket.AF_UNIX) 17107db96d56Sopenharmony_ci transport.close() 17117db96d56Sopenharmony_ci self.loop.run_until_complete(protocol.done) 17127db96d56Sopenharmony_ci self.assertEqual('CLOSED', protocol.state) 17137db96d56Sopenharmony_ci 17147db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 17157db96d56Sopenharmony_ci def test_create_datagram_endpoint_existing_sock_unix(self): 17167db96d56Sopenharmony_ci with test_utils.unix_socket_path() as path: 17177db96d56Sopenharmony_ci sock = socket.socket(socket.AF_UNIX, type=socket.SOCK_DGRAM) 17187db96d56Sopenharmony_ci sock.bind(path) 17197db96d56Sopenharmony_ci sock.close() 17207db96d56Sopenharmony_ci 17217db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 17227db96d56Sopenharmony_ci lambda: MyDatagramProto(create_future=True, loop=self.loop), 17237db96d56Sopenharmony_ci path, family=socket.AF_UNIX) 17247db96d56Sopenharmony_ci transport, protocol = self.loop.run_until_complete(coro) 17257db96d56Sopenharmony_ci transport.close() 17267db96d56Sopenharmony_ci self.loop.run_until_complete(protocol.done) 17277db96d56Sopenharmony_ci 17287db96d56Sopenharmony_ci def test_create_datagram_endpoint_sock_sockopts(self): 17297db96d56Sopenharmony_ci class FakeSock: 17307db96d56Sopenharmony_ci type = socket.SOCK_DGRAM 17317db96d56Sopenharmony_ci 17327db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17337db96d56Sopenharmony_ci MyDatagramProto, local_addr=('127.0.0.1', 0), sock=FakeSock()) 17347db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 17357db96d56Sopenharmony_ci 17367db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17377db96d56Sopenharmony_ci MyDatagramProto, remote_addr=('127.0.0.1', 0), sock=FakeSock()) 17387db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 17397db96d56Sopenharmony_ci 17407db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17417db96d56Sopenharmony_ci MyDatagramProto, family=1, sock=FakeSock()) 17427db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 17437db96d56Sopenharmony_ci 17447db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17457db96d56Sopenharmony_ci MyDatagramProto, proto=1, sock=FakeSock()) 17467db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 17477db96d56Sopenharmony_ci 17487db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17497db96d56Sopenharmony_ci MyDatagramProto, flags=1, sock=FakeSock()) 17507db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 17517db96d56Sopenharmony_ci 17527db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17537db96d56Sopenharmony_ci MyDatagramProto, reuse_port=True, sock=FakeSock()) 17547db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 17557db96d56Sopenharmony_ci 17567db96d56Sopenharmony_ci fut = self.loop.create_datagram_endpoint( 17577db96d56Sopenharmony_ci MyDatagramProto, allow_broadcast=True, sock=FakeSock()) 17587db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, fut) 17597db96d56Sopenharmony_ci 17607db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == 'vxworks', 17617db96d56Sopenharmony_ci "SO_BROADCAST is enabled by default on VxWorks") 17627db96d56Sopenharmony_ci def test_create_datagram_endpoint_sockopts(self): 17637db96d56Sopenharmony_ci # Socket options should not be applied unless asked for. 17647db96d56Sopenharmony_ci # SO_REUSEPORT is not available on all platforms. 17657db96d56Sopenharmony_ci 17667db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 17677db96d56Sopenharmony_ci lambda: MyDatagramProto(create_future=True, loop=self.loop), 17687db96d56Sopenharmony_ci local_addr=('127.0.0.1', 0)) 17697db96d56Sopenharmony_ci transport, protocol = self.loop.run_until_complete(coro) 17707db96d56Sopenharmony_ci sock = transport.get_extra_info('socket') 17717db96d56Sopenharmony_ci 17727db96d56Sopenharmony_ci reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 17737db96d56Sopenharmony_ci 17747db96d56Sopenharmony_ci if reuseport_supported: 17757db96d56Sopenharmony_ci self.assertFalse( 17767db96d56Sopenharmony_ci sock.getsockopt( 17777db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_REUSEPORT)) 17787db96d56Sopenharmony_ci self.assertFalse( 17797db96d56Sopenharmony_ci sock.getsockopt( 17807db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_BROADCAST)) 17817db96d56Sopenharmony_ci 17827db96d56Sopenharmony_ci transport.close() 17837db96d56Sopenharmony_ci self.loop.run_until_complete(protocol.done) 17847db96d56Sopenharmony_ci self.assertEqual('CLOSED', protocol.state) 17857db96d56Sopenharmony_ci 17867db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 17877db96d56Sopenharmony_ci lambda: MyDatagramProto(create_future=True, loop=self.loop), 17887db96d56Sopenharmony_ci local_addr=('127.0.0.1', 0), 17897db96d56Sopenharmony_ci reuse_port=reuseport_supported, 17907db96d56Sopenharmony_ci allow_broadcast=True) 17917db96d56Sopenharmony_ci transport, protocol = self.loop.run_until_complete(coro) 17927db96d56Sopenharmony_ci sock = transport.get_extra_info('socket') 17937db96d56Sopenharmony_ci 17947db96d56Sopenharmony_ci self.assertFalse( 17957db96d56Sopenharmony_ci sock.getsockopt( 17967db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_REUSEADDR)) 17977db96d56Sopenharmony_ci if reuseport_supported: 17987db96d56Sopenharmony_ci self.assertTrue( 17997db96d56Sopenharmony_ci sock.getsockopt( 18007db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_REUSEPORT)) 18017db96d56Sopenharmony_ci self.assertTrue( 18027db96d56Sopenharmony_ci sock.getsockopt( 18037db96d56Sopenharmony_ci socket.SOL_SOCKET, socket.SO_BROADCAST)) 18047db96d56Sopenharmony_ci 18057db96d56Sopenharmony_ci transport.close() 18067db96d56Sopenharmony_ci self.loop.run_until_complete(protocol.done) 18077db96d56Sopenharmony_ci self.assertEqual('CLOSED', protocol.state) 18087db96d56Sopenharmony_ci 18097db96d56Sopenharmony_ci @patch_socket 18107db96d56Sopenharmony_ci def test_create_datagram_endpoint_nosoreuseport(self, m_socket): 18117db96d56Sopenharmony_ci del m_socket.SO_REUSEPORT 18127db96d56Sopenharmony_ci m_socket.socket.return_value = mock.Mock() 18137db96d56Sopenharmony_ci 18147db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 18157db96d56Sopenharmony_ci lambda: MyDatagramProto(loop=self.loop), 18167db96d56Sopenharmony_ci local_addr=('127.0.0.1', 0), 18177db96d56Sopenharmony_ci reuse_port=True) 18187db96d56Sopenharmony_ci 18197db96d56Sopenharmony_ci self.assertRaises(ValueError, self.loop.run_until_complete, coro) 18207db96d56Sopenharmony_ci 18217db96d56Sopenharmony_ci @patch_socket 18227db96d56Sopenharmony_ci def test_create_datagram_endpoint_ip_addr(self, m_socket): 18237db96d56Sopenharmony_ci def getaddrinfo(*args, **kw): 18247db96d56Sopenharmony_ci self.fail('should not have called getaddrinfo') 18257db96d56Sopenharmony_ci 18267db96d56Sopenharmony_ci m_socket.getaddrinfo = getaddrinfo 18277db96d56Sopenharmony_ci m_socket.socket.return_value.bind = bind = mock.Mock() 18287db96d56Sopenharmony_ci self.loop._add_reader = mock.Mock() 18297db96d56Sopenharmony_ci 18307db96d56Sopenharmony_ci reuseport_supported = hasattr(socket, 'SO_REUSEPORT') 18317db96d56Sopenharmony_ci coro = self.loop.create_datagram_endpoint( 18327db96d56Sopenharmony_ci lambda: MyDatagramProto(loop=self.loop), 18337db96d56Sopenharmony_ci local_addr=('1.2.3.4', 0), 18347db96d56Sopenharmony_ci reuse_port=reuseport_supported) 18357db96d56Sopenharmony_ci 18367db96d56Sopenharmony_ci t, p = self.loop.run_until_complete(coro) 18377db96d56Sopenharmony_ci try: 18387db96d56Sopenharmony_ci bind.assert_called_with(('1.2.3.4', 0)) 18397db96d56Sopenharmony_ci m_socket.socket.assert_called_with(family=m_socket.AF_INET, 18407db96d56Sopenharmony_ci proto=m_socket.IPPROTO_UDP, 18417db96d56Sopenharmony_ci type=m_socket.SOCK_DGRAM) 18427db96d56Sopenharmony_ci finally: 18437db96d56Sopenharmony_ci t.close() 18447db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) # allow transport to close 18457db96d56Sopenharmony_ci 18467db96d56Sopenharmony_ci def test_accept_connection_retry(self): 18477db96d56Sopenharmony_ci sock = mock.Mock() 18487db96d56Sopenharmony_ci sock.accept.side_effect = BlockingIOError() 18497db96d56Sopenharmony_ci 18507db96d56Sopenharmony_ci self.loop._accept_connection(MyProto, sock) 18517db96d56Sopenharmony_ci self.assertFalse(sock.close.called) 18527db96d56Sopenharmony_ci 18537db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.logger') 18547db96d56Sopenharmony_ci def test_accept_connection_exception(self, m_log): 18557db96d56Sopenharmony_ci sock = mock.Mock() 18567db96d56Sopenharmony_ci sock.fileno.return_value = 10 18577db96d56Sopenharmony_ci sock.accept.side_effect = OSError(errno.EMFILE, 'Too many open files') 18587db96d56Sopenharmony_ci self.loop._remove_reader = mock.Mock() 18597db96d56Sopenharmony_ci self.loop.call_later = mock.Mock() 18607db96d56Sopenharmony_ci 18617db96d56Sopenharmony_ci self.loop._accept_connection(MyProto, sock) 18627db96d56Sopenharmony_ci self.assertTrue(m_log.error.called) 18637db96d56Sopenharmony_ci self.assertFalse(sock.close.called) 18647db96d56Sopenharmony_ci self.loop._remove_reader.assert_called_with(10) 18657db96d56Sopenharmony_ci self.loop.call_later.assert_called_with( 18667db96d56Sopenharmony_ci constants.ACCEPT_RETRY_DELAY, 18677db96d56Sopenharmony_ci # self.loop._start_serving 18687db96d56Sopenharmony_ci mock.ANY, 18697db96d56Sopenharmony_ci MyProto, sock, None, None, mock.ANY, mock.ANY, mock.ANY) 18707db96d56Sopenharmony_ci 18717db96d56Sopenharmony_ci def test_call_coroutine(self): 18727db96d56Sopenharmony_ci async def simple_coroutine(): 18737db96d56Sopenharmony_ci pass 18747db96d56Sopenharmony_ci 18757db96d56Sopenharmony_ci self.loop.set_debug(True) 18767db96d56Sopenharmony_ci coro_func = simple_coroutine 18777db96d56Sopenharmony_ci coro_obj = coro_func() 18787db96d56Sopenharmony_ci self.addCleanup(coro_obj.close) 18797db96d56Sopenharmony_ci for func in (coro_func, coro_obj): 18807db96d56Sopenharmony_ci with self.assertRaises(TypeError): 18817db96d56Sopenharmony_ci self.loop.call_soon(func) 18827db96d56Sopenharmony_ci with self.assertRaises(TypeError): 18837db96d56Sopenharmony_ci self.loop.call_soon_threadsafe(func) 18847db96d56Sopenharmony_ci with self.assertRaises(TypeError): 18857db96d56Sopenharmony_ci self.loop.call_later(60, func) 18867db96d56Sopenharmony_ci with self.assertRaises(TypeError): 18877db96d56Sopenharmony_ci self.loop.call_at(self.loop.time() + 60, func) 18887db96d56Sopenharmony_ci with self.assertRaises(TypeError): 18897db96d56Sopenharmony_ci self.loop.run_until_complete( 18907db96d56Sopenharmony_ci self.loop.run_in_executor(None, func)) 18917db96d56Sopenharmony_ci 18927db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.logger') 18937db96d56Sopenharmony_ci def test_log_slow_callbacks(self, m_logger): 18947db96d56Sopenharmony_ci def stop_loop_cb(loop): 18957db96d56Sopenharmony_ci loop.stop() 18967db96d56Sopenharmony_ci 18977db96d56Sopenharmony_ci async def stop_loop_coro(loop): 18987db96d56Sopenharmony_ci loop.stop() 18997db96d56Sopenharmony_ci 19007db96d56Sopenharmony_ci asyncio.set_event_loop(self.loop) 19017db96d56Sopenharmony_ci self.loop.set_debug(True) 19027db96d56Sopenharmony_ci self.loop.slow_callback_duration = 0.0 19037db96d56Sopenharmony_ci 19047db96d56Sopenharmony_ci # slow callback 19057db96d56Sopenharmony_ci self.loop.call_soon(stop_loop_cb, self.loop) 19067db96d56Sopenharmony_ci self.loop.run_forever() 19077db96d56Sopenharmony_ci fmt, *args = m_logger.warning.call_args[0] 19087db96d56Sopenharmony_ci self.assertRegex(fmt % tuple(args), 19097db96d56Sopenharmony_ci "^Executing <Handle.*stop_loop_cb.*> " 19107db96d56Sopenharmony_ci "took .* seconds$") 19117db96d56Sopenharmony_ci 19127db96d56Sopenharmony_ci # slow task 19137db96d56Sopenharmony_ci asyncio.ensure_future(stop_loop_coro(self.loop), loop=self.loop) 19147db96d56Sopenharmony_ci self.loop.run_forever() 19157db96d56Sopenharmony_ci fmt, *args = m_logger.warning.call_args[0] 19167db96d56Sopenharmony_ci self.assertRegex(fmt % tuple(args), 19177db96d56Sopenharmony_ci "^Executing <Task.*stop_loop_coro.*> " 19187db96d56Sopenharmony_ci "took .* seconds$") 19197db96d56Sopenharmony_ci 19207db96d56Sopenharmony_ci 19217db96d56Sopenharmony_ciclass RunningLoopTests(unittest.TestCase): 19227db96d56Sopenharmony_ci 19237db96d56Sopenharmony_ci def test_running_loop_within_a_loop(self): 19247db96d56Sopenharmony_ci async def runner(loop): 19257db96d56Sopenharmony_ci loop.run_forever() 19267db96d56Sopenharmony_ci 19277db96d56Sopenharmony_ci loop = asyncio.new_event_loop() 19287db96d56Sopenharmony_ci outer_loop = asyncio.new_event_loop() 19297db96d56Sopenharmony_ci try: 19307db96d56Sopenharmony_ci with self.assertRaisesRegex(RuntimeError, 19317db96d56Sopenharmony_ci 'while another loop is running'): 19327db96d56Sopenharmony_ci outer_loop.run_until_complete(runner(loop)) 19337db96d56Sopenharmony_ci finally: 19347db96d56Sopenharmony_ci loop.close() 19357db96d56Sopenharmony_ci outer_loop.close() 19367db96d56Sopenharmony_ci 19377db96d56Sopenharmony_ci 19387db96d56Sopenharmony_ciclass BaseLoopSockSendfileTests(test_utils.TestCase): 19397db96d56Sopenharmony_ci 19407db96d56Sopenharmony_ci DATA = b"12345abcde" * 16 * 1024 # 160 KiB 19417db96d56Sopenharmony_ci 19427db96d56Sopenharmony_ci class MyProto(asyncio.Protocol): 19437db96d56Sopenharmony_ci 19447db96d56Sopenharmony_ci def __init__(self, loop): 19457db96d56Sopenharmony_ci self.started = False 19467db96d56Sopenharmony_ci self.closed = False 19477db96d56Sopenharmony_ci self.data = bytearray() 19487db96d56Sopenharmony_ci self.fut = loop.create_future() 19497db96d56Sopenharmony_ci self.transport = None 19507db96d56Sopenharmony_ci 19517db96d56Sopenharmony_ci def connection_made(self, transport): 19527db96d56Sopenharmony_ci self.started = True 19537db96d56Sopenharmony_ci self.transport = transport 19547db96d56Sopenharmony_ci 19557db96d56Sopenharmony_ci def data_received(self, data): 19567db96d56Sopenharmony_ci self.data.extend(data) 19577db96d56Sopenharmony_ci 19587db96d56Sopenharmony_ci def connection_lost(self, exc): 19597db96d56Sopenharmony_ci self.closed = True 19607db96d56Sopenharmony_ci self.fut.set_result(None) 19617db96d56Sopenharmony_ci self.transport = None 19627db96d56Sopenharmony_ci 19637db96d56Sopenharmony_ci async def wait_closed(self): 19647db96d56Sopenharmony_ci await self.fut 19657db96d56Sopenharmony_ci 19667db96d56Sopenharmony_ci @classmethod 19677db96d56Sopenharmony_ci def setUpClass(cls): 19687db96d56Sopenharmony_ci cls.__old_bufsize = constants.SENDFILE_FALLBACK_READBUFFER_SIZE 19697db96d56Sopenharmony_ci constants.SENDFILE_FALLBACK_READBUFFER_SIZE = 1024 * 16 19707db96d56Sopenharmony_ci with open(os_helper.TESTFN, 'wb') as fp: 19717db96d56Sopenharmony_ci fp.write(cls.DATA) 19727db96d56Sopenharmony_ci super().setUpClass() 19737db96d56Sopenharmony_ci 19747db96d56Sopenharmony_ci @classmethod 19757db96d56Sopenharmony_ci def tearDownClass(cls): 19767db96d56Sopenharmony_ci constants.SENDFILE_FALLBACK_READBUFFER_SIZE = cls.__old_bufsize 19777db96d56Sopenharmony_ci os_helper.unlink(os_helper.TESTFN) 19787db96d56Sopenharmony_ci super().tearDownClass() 19797db96d56Sopenharmony_ci 19807db96d56Sopenharmony_ci def setUp(self): 19817db96d56Sopenharmony_ci from asyncio.selector_events import BaseSelectorEventLoop 19827db96d56Sopenharmony_ci # BaseSelectorEventLoop() has no native implementation 19837db96d56Sopenharmony_ci self.loop = BaseSelectorEventLoop() 19847db96d56Sopenharmony_ci self.set_event_loop(self.loop) 19857db96d56Sopenharmony_ci self.file = open(os_helper.TESTFN, 'rb') 19867db96d56Sopenharmony_ci self.addCleanup(self.file.close) 19877db96d56Sopenharmony_ci super().setUp() 19887db96d56Sopenharmony_ci 19897db96d56Sopenharmony_ci def make_socket(self, blocking=False): 19907db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 19917db96d56Sopenharmony_ci sock.setblocking(blocking) 19927db96d56Sopenharmony_ci self.addCleanup(sock.close) 19937db96d56Sopenharmony_ci return sock 19947db96d56Sopenharmony_ci 19957db96d56Sopenharmony_ci def run_loop(self, coro): 19967db96d56Sopenharmony_ci return self.loop.run_until_complete(coro) 19977db96d56Sopenharmony_ci 19987db96d56Sopenharmony_ci def prepare(self): 19997db96d56Sopenharmony_ci sock = self.make_socket() 20007db96d56Sopenharmony_ci proto = self.MyProto(self.loop) 20017db96d56Sopenharmony_ci server = self.run_loop(self.loop.create_server( 20027db96d56Sopenharmony_ci lambda: proto, socket_helper.HOST, 0, family=socket.AF_INET)) 20037db96d56Sopenharmony_ci addr = server.sockets[0].getsockname() 20047db96d56Sopenharmony_ci 20057db96d56Sopenharmony_ci for _ in range(10): 20067db96d56Sopenharmony_ci try: 20077db96d56Sopenharmony_ci self.run_loop(self.loop.sock_connect(sock, addr)) 20087db96d56Sopenharmony_ci except OSError: 20097db96d56Sopenharmony_ci self.run_loop(asyncio.sleep(0.5)) 20107db96d56Sopenharmony_ci continue 20117db96d56Sopenharmony_ci else: 20127db96d56Sopenharmony_ci break 20137db96d56Sopenharmony_ci else: 20147db96d56Sopenharmony_ci # One last try, so we get the exception 20157db96d56Sopenharmony_ci self.run_loop(self.loop.sock_connect(sock, addr)) 20167db96d56Sopenharmony_ci 20177db96d56Sopenharmony_ci def cleanup(): 20187db96d56Sopenharmony_ci server.close() 20197db96d56Sopenharmony_ci self.run_loop(server.wait_closed()) 20207db96d56Sopenharmony_ci sock.close() 20217db96d56Sopenharmony_ci if proto.transport is not None: 20227db96d56Sopenharmony_ci proto.transport.close() 20237db96d56Sopenharmony_ci self.run_loop(proto.wait_closed()) 20247db96d56Sopenharmony_ci 20257db96d56Sopenharmony_ci self.addCleanup(cleanup) 20267db96d56Sopenharmony_ci 20277db96d56Sopenharmony_ci return sock, proto 20287db96d56Sopenharmony_ci 20297db96d56Sopenharmony_ci def test__sock_sendfile_native_failure(self): 20307db96d56Sopenharmony_ci sock, proto = self.prepare() 20317db96d56Sopenharmony_ci 20327db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 20337db96d56Sopenharmony_ci "sendfile is not available"): 20347db96d56Sopenharmony_ci self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 20357db96d56Sopenharmony_ci 0, None)) 20367db96d56Sopenharmony_ci 20377db96d56Sopenharmony_ci self.assertEqual(proto.data, b'') 20387db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), 0) 20397db96d56Sopenharmony_ci 20407db96d56Sopenharmony_ci def test_sock_sendfile_no_fallback(self): 20417db96d56Sopenharmony_ci sock, proto = self.prepare() 20427db96d56Sopenharmony_ci 20437db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 20447db96d56Sopenharmony_ci "sendfile is not available"): 20457db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, self.file, 20467db96d56Sopenharmony_ci fallback=False)) 20477db96d56Sopenharmony_ci 20487db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), 0) 20497db96d56Sopenharmony_ci self.assertEqual(proto.data, b'') 20507db96d56Sopenharmony_ci 20517db96d56Sopenharmony_ci def test_sock_sendfile_fallback(self): 20527db96d56Sopenharmony_ci sock, proto = self.prepare() 20537db96d56Sopenharmony_ci 20547db96d56Sopenharmony_ci ret = self.run_loop(self.loop.sock_sendfile(sock, self.file)) 20557db96d56Sopenharmony_ci sock.close() 20567db96d56Sopenharmony_ci self.run_loop(proto.wait_closed()) 20577db96d56Sopenharmony_ci 20587db96d56Sopenharmony_ci self.assertEqual(ret, len(self.DATA)) 20597db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), len(self.DATA)) 20607db96d56Sopenharmony_ci self.assertEqual(proto.data, self.DATA) 20617db96d56Sopenharmony_ci 20627db96d56Sopenharmony_ci def test_sock_sendfile_fallback_offset_and_count(self): 20637db96d56Sopenharmony_ci sock, proto = self.prepare() 20647db96d56Sopenharmony_ci 20657db96d56Sopenharmony_ci ret = self.run_loop(self.loop.sock_sendfile(sock, self.file, 20667db96d56Sopenharmony_ci 1000, 2000)) 20677db96d56Sopenharmony_ci sock.close() 20687db96d56Sopenharmony_ci self.run_loop(proto.wait_closed()) 20697db96d56Sopenharmony_ci 20707db96d56Sopenharmony_ci self.assertEqual(ret, 2000) 20717db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), 3000) 20727db96d56Sopenharmony_ci self.assertEqual(proto.data, self.DATA[1000:3000]) 20737db96d56Sopenharmony_ci 20747db96d56Sopenharmony_ci def test_blocking_socket(self): 20757db96d56Sopenharmony_ci self.loop.set_debug(True) 20767db96d56Sopenharmony_ci sock = self.make_socket(blocking=True) 20777db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "must be non-blocking"): 20787db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, self.file)) 20797db96d56Sopenharmony_ci 20807db96d56Sopenharmony_ci def test_nonbinary_file(self): 20817db96d56Sopenharmony_ci sock = self.make_socket() 20827db96d56Sopenharmony_ci with open(os_helper.TESTFN, encoding="utf-8") as f: 20837db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "binary mode"): 20847db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, f)) 20857db96d56Sopenharmony_ci 20867db96d56Sopenharmony_ci def test_nonstream_socket(self): 20877db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 20887db96d56Sopenharmony_ci sock.setblocking(False) 20897db96d56Sopenharmony_ci self.addCleanup(sock.close) 20907db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "only SOCK_STREAM type"): 20917db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, self.file)) 20927db96d56Sopenharmony_ci 20937db96d56Sopenharmony_ci def test_notint_count(self): 20947db96d56Sopenharmony_ci sock = self.make_socket() 20957db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, 20967db96d56Sopenharmony_ci "count must be a positive integer"): 20977db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, 'count')) 20987db96d56Sopenharmony_ci 20997db96d56Sopenharmony_ci def test_negative_count(self): 21007db96d56Sopenharmony_ci sock = self.make_socket() 21017db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 21027db96d56Sopenharmony_ci "count must be a positive integer"): 21037db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, self.file, 0, -1)) 21047db96d56Sopenharmony_ci 21057db96d56Sopenharmony_ci def test_notint_offset(self): 21067db96d56Sopenharmony_ci sock = self.make_socket() 21077db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, 21087db96d56Sopenharmony_ci "offset must be a non-negative integer"): 21097db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, self.file, 'offset')) 21107db96d56Sopenharmony_ci 21117db96d56Sopenharmony_ci def test_negative_offset(self): 21127db96d56Sopenharmony_ci sock = self.make_socket() 21137db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 21147db96d56Sopenharmony_ci "offset must be a non-negative integer"): 21157db96d56Sopenharmony_ci self.run_loop(self.loop.sock_sendfile(sock, self.file, -1)) 21167db96d56Sopenharmony_ci 21177db96d56Sopenharmony_ci 21187db96d56Sopenharmony_ciclass TestSelectorUtils(test_utils.TestCase): 21197db96d56Sopenharmony_ci def check_set_nodelay(self, sock): 21207db96d56Sopenharmony_ci opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 21217db96d56Sopenharmony_ci self.assertFalse(opt) 21227db96d56Sopenharmony_ci 21237db96d56Sopenharmony_ci base_events._set_nodelay(sock) 21247db96d56Sopenharmony_ci 21257db96d56Sopenharmony_ci opt = sock.getsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY) 21267db96d56Sopenharmony_ci self.assertTrue(opt) 21277db96d56Sopenharmony_ci 21287db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(socket, 'TCP_NODELAY'), 21297db96d56Sopenharmony_ci 'need socket.TCP_NODELAY') 21307db96d56Sopenharmony_ci def test_set_nodelay(self): 21317db96d56Sopenharmony_ci sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 21327db96d56Sopenharmony_ci proto=socket.IPPROTO_TCP) 21337db96d56Sopenharmony_ci with sock: 21347db96d56Sopenharmony_ci self.check_set_nodelay(sock) 21357db96d56Sopenharmony_ci 21367db96d56Sopenharmony_ci sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM, 21377db96d56Sopenharmony_ci proto=socket.IPPROTO_TCP) 21387db96d56Sopenharmony_ci with sock: 21397db96d56Sopenharmony_ci sock.setblocking(False) 21407db96d56Sopenharmony_ci self.check_set_nodelay(sock) 21417db96d56Sopenharmony_ci 21427db96d56Sopenharmony_ci 21437db96d56Sopenharmony_ci 21447db96d56Sopenharmony_ciif __name__ == '__main__': 21457db96d56Sopenharmony_ci unittest.main() 2146