17db96d56Sopenharmony_ci"""Tests for unix_events.py.""" 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport contextlib 47db96d56Sopenharmony_ciimport errno 57db96d56Sopenharmony_ciimport io 67db96d56Sopenharmony_ciimport os 77db96d56Sopenharmony_ciimport pathlib 87db96d56Sopenharmony_ciimport signal 97db96d56Sopenharmony_ciimport socket 107db96d56Sopenharmony_ciimport stat 117db96d56Sopenharmony_ciimport sys 127db96d56Sopenharmony_ciimport tempfile 137db96d56Sopenharmony_ciimport threading 147db96d56Sopenharmony_ciimport unittest 157db96d56Sopenharmony_cifrom unittest import mock 167db96d56Sopenharmony_cifrom test.support import os_helper 177db96d56Sopenharmony_cifrom test.support import socket_helper 187db96d56Sopenharmony_ci 197db96d56Sopenharmony_ciif sys.platform == 'win32': 207db96d56Sopenharmony_ci raise unittest.SkipTest('UNIX only') 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ciimport asyncio 247db96d56Sopenharmony_cifrom asyncio import log 257db96d56Sopenharmony_cifrom asyncio import unix_events 267db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_cidef tearDownModule(): 307db96d56Sopenharmony_ci asyncio.set_event_loop_policy(None) 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ciMOCK_ANY = mock.ANY 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_cidef EXITCODE(exitcode): 377db96d56Sopenharmony_ci return 32768 + exitcode 387db96d56Sopenharmony_ci 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_cidef SIGNAL(signum): 417db96d56Sopenharmony_ci if not 1 <= signum <= 68: 427db96d56Sopenharmony_ci raise AssertionError(f'invalid signum {signum}') 437db96d56Sopenharmony_ci return 32768 - signum 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_cidef close_pipe_transport(transport): 477db96d56Sopenharmony_ci # Don't call transport.close() because the event loop and the selector 487db96d56Sopenharmony_ci # are mocked 497db96d56Sopenharmony_ci if transport._pipe is None: 507db96d56Sopenharmony_ci return 517db96d56Sopenharmony_ci transport._pipe.close() 527db96d56Sopenharmony_ci transport._pipe = None 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci@unittest.skipUnless(signal, 'Signals are not supported') 567db96d56Sopenharmony_ciclass SelectorEventLoopSignalTests(test_utils.TestCase): 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci def setUp(self): 597db96d56Sopenharmony_ci super().setUp() 607db96d56Sopenharmony_ci self.loop = asyncio.SelectorEventLoop() 617db96d56Sopenharmony_ci self.set_event_loop(self.loop) 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci def test_check_signal(self): 647db96d56Sopenharmony_ci self.assertRaises( 657db96d56Sopenharmony_ci TypeError, self.loop._check_signal, '1') 667db96d56Sopenharmony_ci self.assertRaises( 677db96d56Sopenharmony_ci ValueError, self.loop._check_signal, signal.NSIG + 1) 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci def test_handle_signal_no_handler(self): 707db96d56Sopenharmony_ci self.loop._handle_signal(signal.NSIG + 1) 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci def test_handle_signal_cancelled_handler(self): 737db96d56Sopenharmony_ci h = asyncio.Handle(mock.Mock(), (), 747db96d56Sopenharmony_ci loop=mock.Mock()) 757db96d56Sopenharmony_ci h.cancel() 767db96d56Sopenharmony_ci self.loop._signal_handlers[signal.NSIG + 1] = h 777db96d56Sopenharmony_ci self.loop.remove_signal_handler = mock.Mock() 787db96d56Sopenharmony_ci self.loop._handle_signal(signal.NSIG + 1) 797db96d56Sopenharmony_ci self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1) 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 827db96d56Sopenharmony_ci def test_add_signal_handler_setup_error(self, m_signal): 837db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 847db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 857db96d56Sopenharmony_ci m_signal.set_wakeup_fd.side_effect = ValueError 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci self.assertRaises( 887db96d56Sopenharmony_ci RuntimeError, 897db96d56Sopenharmony_ci self.loop.add_signal_handler, 907db96d56Sopenharmony_ci signal.SIGINT, lambda: True) 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 937db96d56Sopenharmony_ci def test_add_signal_handler_coroutine_error(self, m_signal): 947db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci async def simple_coroutine(): 977db96d56Sopenharmony_ci pass 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci # callback must not be a coroutine function 1007db96d56Sopenharmony_ci coro_func = simple_coroutine 1017db96d56Sopenharmony_ci coro_obj = coro_func() 1027db96d56Sopenharmony_ci self.addCleanup(coro_obj.close) 1037db96d56Sopenharmony_ci for func in (coro_func, coro_obj): 1047db96d56Sopenharmony_ci self.assertRaisesRegex( 1057db96d56Sopenharmony_ci TypeError, 'coroutines cannot be used with add_signal_handler', 1067db96d56Sopenharmony_ci self.loop.add_signal_handler, 1077db96d56Sopenharmony_ci signal.SIGINT, func) 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 1107db96d56Sopenharmony_ci def test_add_signal_handler(self, m_signal): 1117db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 1127db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci cb = lambda: True 1157db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGHUP, cb) 1167db96d56Sopenharmony_ci h = self.loop._signal_handlers.get(signal.SIGHUP) 1177db96d56Sopenharmony_ci self.assertIsInstance(h, asyncio.Handle) 1187db96d56Sopenharmony_ci self.assertEqual(h._callback, cb) 1197db96d56Sopenharmony_ci 1207db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 1217db96d56Sopenharmony_ci def test_add_signal_handler_install_error(self, m_signal): 1227db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 1237db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 1247db96d56Sopenharmony_ci 1257db96d56Sopenharmony_ci def set_wakeup_fd(fd): 1267db96d56Sopenharmony_ci if fd == -1: 1277db96d56Sopenharmony_ci raise ValueError() 1287db96d56Sopenharmony_ci m_signal.set_wakeup_fd = set_wakeup_fd 1297db96d56Sopenharmony_ci 1307db96d56Sopenharmony_ci class Err(OSError): 1317db96d56Sopenharmony_ci errno = errno.EFAULT 1327db96d56Sopenharmony_ci m_signal.signal.side_effect = Err 1337db96d56Sopenharmony_ci 1347db96d56Sopenharmony_ci self.assertRaises( 1357db96d56Sopenharmony_ci Err, 1367db96d56Sopenharmony_ci self.loop.add_signal_handler, 1377db96d56Sopenharmony_ci signal.SIGINT, lambda: True) 1387db96d56Sopenharmony_ci 1397db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 1407db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.logger') 1417db96d56Sopenharmony_ci def test_add_signal_handler_install_error2(self, m_logging, m_signal): 1427db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 1437db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci class Err(OSError): 1467db96d56Sopenharmony_ci errno = errno.EINVAL 1477db96d56Sopenharmony_ci m_signal.signal.side_effect = Err 1487db96d56Sopenharmony_ci 1497db96d56Sopenharmony_ci self.loop._signal_handlers[signal.SIGHUP] = lambda: True 1507db96d56Sopenharmony_ci self.assertRaises( 1517db96d56Sopenharmony_ci RuntimeError, 1527db96d56Sopenharmony_ci self.loop.add_signal_handler, 1537db96d56Sopenharmony_ci signal.SIGINT, lambda: True) 1547db96d56Sopenharmony_ci self.assertFalse(m_logging.info.called) 1557db96d56Sopenharmony_ci self.assertEqual(1, m_signal.set_wakeup_fd.call_count) 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 1587db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.logger') 1597db96d56Sopenharmony_ci def test_add_signal_handler_install_error3(self, m_logging, m_signal): 1607db96d56Sopenharmony_ci class Err(OSError): 1617db96d56Sopenharmony_ci errno = errno.EINVAL 1627db96d56Sopenharmony_ci m_signal.signal.side_effect = Err 1637db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 1647db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci self.assertRaises( 1677db96d56Sopenharmony_ci RuntimeError, 1687db96d56Sopenharmony_ci self.loop.add_signal_handler, 1697db96d56Sopenharmony_ci signal.SIGINT, lambda: True) 1707db96d56Sopenharmony_ci self.assertFalse(m_logging.info.called) 1717db96d56Sopenharmony_ci self.assertEqual(2, m_signal.set_wakeup_fd.call_count) 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 1747db96d56Sopenharmony_ci def test_remove_signal_handler(self, m_signal): 1757db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 1767db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci self.assertTrue( 1817db96d56Sopenharmony_ci self.loop.remove_signal_handler(signal.SIGHUP)) 1827db96d56Sopenharmony_ci self.assertTrue(m_signal.set_wakeup_fd.called) 1837db96d56Sopenharmony_ci self.assertTrue(m_signal.signal.called) 1847db96d56Sopenharmony_ci self.assertEqual( 1857db96d56Sopenharmony_ci (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0]) 1867db96d56Sopenharmony_ci 1877db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 1887db96d56Sopenharmony_ci def test_remove_signal_handler_2(self, m_signal): 1897db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 1907db96d56Sopenharmony_ci m_signal.SIGINT = signal.SIGINT 1917db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGINT, lambda: True) 1947db96d56Sopenharmony_ci self.loop._signal_handlers[signal.SIGHUP] = object() 1957db96d56Sopenharmony_ci m_signal.set_wakeup_fd.reset_mock() 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci self.assertTrue( 1987db96d56Sopenharmony_ci self.loop.remove_signal_handler(signal.SIGINT)) 1997db96d56Sopenharmony_ci self.assertFalse(m_signal.set_wakeup_fd.called) 2007db96d56Sopenharmony_ci self.assertTrue(m_signal.signal.called) 2017db96d56Sopenharmony_ci self.assertEqual( 2027db96d56Sopenharmony_ci (signal.SIGINT, m_signal.default_int_handler), 2037db96d56Sopenharmony_ci m_signal.signal.call_args[0]) 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 2067db96d56Sopenharmony_ci @mock.patch('asyncio.base_events.logger') 2077db96d56Sopenharmony_ci def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal): 2087db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 2097db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 2107db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 2117db96d56Sopenharmony_ci 2127db96d56Sopenharmony_ci m_signal.set_wakeup_fd.side_effect = ValueError 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ci self.loop.remove_signal_handler(signal.SIGHUP) 2157db96d56Sopenharmony_ci self.assertTrue(m_logging.info) 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 2187db96d56Sopenharmony_ci def test_remove_signal_handler_error(self, m_signal): 2197db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 2207db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 2217db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci m_signal.signal.side_effect = OSError 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci self.assertRaises( 2267db96d56Sopenharmony_ci OSError, self.loop.remove_signal_handler, signal.SIGHUP) 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 2297db96d56Sopenharmony_ci def test_remove_signal_handler_error2(self, m_signal): 2307db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 2317db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 2327db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ci class Err(OSError): 2357db96d56Sopenharmony_ci errno = errno.EINVAL 2367db96d56Sopenharmony_ci m_signal.signal.side_effect = Err 2377db96d56Sopenharmony_ci 2387db96d56Sopenharmony_ci self.assertRaises( 2397db96d56Sopenharmony_ci RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP) 2407db96d56Sopenharmony_ci 2417db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 2427db96d56Sopenharmony_ci def test_close(self, m_signal): 2437db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 2447db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 2457db96d56Sopenharmony_ci 2467db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 2477db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGCHLD, lambda: True) 2487db96d56Sopenharmony_ci 2497db96d56Sopenharmony_ci self.assertEqual(len(self.loop._signal_handlers), 2) 2507db96d56Sopenharmony_ci 2517db96d56Sopenharmony_ci m_signal.set_wakeup_fd.reset_mock() 2527db96d56Sopenharmony_ci 2537db96d56Sopenharmony_ci self.loop.close() 2547db96d56Sopenharmony_ci 2557db96d56Sopenharmony_ci self.assertEqual(len(self.loop._signal_handlers), 0) 2567db96d56Sopenharmony_ci m_signal.set_wakeup_fd.assert_called_once_with(-1) 2577db96d56Sopenharmony_ci 2587db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.sys') 2597db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.signal') 2607db96d56Sopenharmony_ci def test_close_on_finalizing(self, m_signal, m_sys): 2617db96d56Sopenharmony_ci m_signal.NSIG = signal.NSIG 2627db96d56Sopenharmony_ci m_signal.valid_signals = signal.valid_signals 2637db96d56Sopenharmony_ci self.loop.add_signal_handler(signal.SIGHUP, lambda: True) 2647db96d56Sopenharmony_ci 2657db96d56Sopenharmony_ci self.assertEqual(len(self.loop._signal_handlers), 1) 2667db96d56Sopenharmony_ci m_sys.is_finalizing.return_value = True 2677db96d56Sopenharmony_ci m_signal.signal.reset_mock() 2687db96d56Sopenharmony_ci 2697db96d56Sopenharmony_ci with self.assertWarnsRegex(ResourceWarning, 2707db96d56Sopenharmony_ci "skipping signal handlers removal"): 2717db96d56Sopenharmony_ci self.loop.close() 2727db96d56Sopenharmony_ci 2737db96d56Sopenharmony_ci self.assertEqual(len(self.loop._signal_handlers), 0) 2747db96d56Sopenharmony_ci self.assertFalse(m_signal.signal.called) 2757db96d56Sopenharmony_ci 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 2787db96d56Sopenharmony_ci 'UNIX Sockets are not supported') 2797db96d56Sopenharmony_ciclass SelectorEventLoopUnixSocketTests(test_utils.TestCase): 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci def setUp(self): 2827db96d56Sopenharmony_ci super().setUp() 2837db96d56Sopenharmony_ci self.loop = asyncio.SelectorEventLoop() 2847db96d56Sopenharmony_ci self.set_event_loop(self.loop) 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 2877db96d56Sopenharmony_ci def test_create_unix_server_existing_path_sock(self): 2887db96d56Sopenharmony_ci with test_utils.unix_socket_path() as path: 2897db96d56Sopenharmony_ci sock = socket.socket(socket.AF_UNIX) 2907db96d56Sopenharmony_ci sock.bind(path) 2917db96d56Sopenharmony_ci sock.listen(1) 2927db96d56Sopenharmony_ci sock.close() 2937db96d56Sopenharmony_ci 2947db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path) 2957db96d56Sopenharmony_ci srv = self.loop.run_until_complete(coro) 2967db96d56Sopenharmony_ci srv.close() 2977db96d56Sopenharmony_ci self.loop.run_until_complete(srv.wait_closed()) 2987db96d56Sopenharmony_ci 2997db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 3007db96d56Sopenharmony_ci def test_create_unix_server_pathlib(self): 3017db96d56Sopenharmony_ci with test_utils.unix_socket_path() as path: 3027db96d56Sopenharmony_ci path = pathlib.Path(path) 3037db96d56Sopenharmony_ci srv_coro = self.loop.create_unix_server(lambda: None, path) 3047db96d56Sopenharmony_ci srv = self.loop.run_until_complete(srv_coro) 3057db96d56Sopenharmony_ci srv.close() 3067db96d56Sopenharmony_ci self.loop.run_until_complete(srv.wait_closed()) 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci def test_create_unix_connection_pathlib(self): 3097db96d56Sopenharmony_ci with test_utils.unix_socket_path() as path: 3107db96d56Sopenharmony_ci path = pathlib.Path(path) 3117db96d56Sopenharmony_ci coro = self.loop.create_unix_connection(lambda: None, path) 3127db96d56Sopenharmony_ci with self.assertRaises(FileNotFoundError): 3137db96d56Sopenharmony_ci # If pathlib.Path wasn't supported, the exception would be 3147db96d56Sopenharmony_ci # different. 3157db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci def test_create_unix_server_existing_path_nonsock(self): 3187db96d56Sopenharmony_ci with tempfile.NamedTemporaryFile() as file: 3197db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, file.name) 3207db96d56Sopenharmony_ci with self.assertRaisesRegex(OSError, 3217db96d56Sopenharmony_ci 'Address.*is already in use'): 3227db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci def test_create_unix_server_ssl_bool(self): 3257db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path='spam', 3267db96d56Sopenharmony_ci ssl=True) 3277db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, 3287db96d56Sopenharmony_ci 'ssl argument must be an SSLContext'): 3297db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3307db96d56Sopenharmony_ci 3317db96d56Sopenharmony_ci def test_create_unix_server_nopath_nosock(self): 3327db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path=None) 3337db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 3347db96d56Sopenharmony_ci 'path was not specified, and no sock'): 3357db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3367db96d56Sopenharmony_ci 3377db96d56Sopenharmony_ci def test_create_unix_server_path_inetsock(self): 3387db96d56Sopenharmony_ci sock = socket.socket() 3397db96d56Sopenharmony_ci with sock: 3407db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path=None, 3417db96d56Sopenharmony_ci sock=sock) 3427db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 3437db96d56Sopenharmony_ci 'A UNIX Domain Stream.*was expected'): 3447db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3457db96d56Sopenharmony_ci 3467db96d56Sopenharmony_ci def test_create_unix_server_path_dgram(self): 3477db96d56Sopenharmony_ci sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) 3487db96d56Sopenharmony_ci with sock: 3497db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path=None, 3507db96d56Sopenharmony_ci sock=sock) 3517db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 3527db96d56Sopenharmony_ci 'A UNIX Domain Stream.*was expected'): 3537db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3547db96d56Sopenharmony_ci 3557db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'), 3567db96d56Sopenharmony_ci 'no socket.SOCK_NONBLOCK (linux only)') 3577db96d56Sopenharmony_ci @socket_helper.skip_unless_bind_unix_socket 3587db96d56Sopenharmony_ci def test_create_unix_server_path_stream_bittype(self): 3597db96d56Sopenharmony_ci sock = socket.socket( 3607db96d56Sopenharmony_ci socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK) 3617db96d56Sopenharmony_ci with tempfile.NamedTemporaryFile() as file: 3627db96d56Sopenharmony_ci fn = file.name 3637db96d56Sopenharmony_ci try: 3647db96d56Sopenharmony_ci with sock: 3657db96d56Sopenharmony_ci sock.bind(fn) 3667db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path=None, 3677db96d56Sopenharmony_ci sock=sock) 3687db96d56Sopenharmony_ci srv = self.loop.run_until_complete(coro) 3697db96d56Sopenharmony_ci srv.close() 3707db96d56Sopenharmony_ci self.loop.run_until_complete(srv.wait_closed()) 3717db96d56Sopenharmony_ci finally: 3727db96d56Sopenharmony_ci os.unlink(fn) 3737db96d56Sopenharmony_ci 3747db96d56Sopenharmony_ci def test_create_unix_server_ssl_timeout_with_plain_sock(self): 3757db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path='spam', 3767db96d56Sopenharmony_ci ssl_handshake_timeout=1) 3777db96d56Sopenharmony_ci with self.assertRaisesRegex( 3787db96d56Sopenharmony_ci ValueError, 3797db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl'): 3807db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3817db96d56Sopenharmony_ci 3827db96d56Sopenharmony_ci def test_create_unix_connection_path_inetsock(self): 3837db96d56Sopenharmony_ci sock = socket.socket() 3847db96d56Sopenharmony_ci with sock: 3857db96d56Sopenharmony_ci coro = self.loop.create_unix_connection(lambda: None, 3867db96d56Sopenharmony_ci sock=sock) 3877db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 3887db96d56Sopenharmony_ci 'A UNIX Domain Stream.*was expected'): 3897db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 3907db96d56Sopenharmony_ci 3917db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.socket') 3927db96d56Sopenharmony_ci def test_create_unix_server_bind_error(self, m_socket): 3937db96d56Sopenharmony_ci # Ensure that the socket is closed on any bind error 3947db96d56Sopenharmony_ci sock = mock.Mock() 3957db96d56Sopenharmony_ci m_socket.socket.return_value = sock 3967db96d56Sopenharmony_ci 3977db96d56Sopenharmony_ci sock.bind.side_effect = OSError 3987db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path="/test") 3997db96d56Sopenharmony_ci with self.assertRaises(OSError): 4007db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 4017db96d56Sopenharmony_ci self.assertTrue(sock.close.called) 4027db96d56Sopenharmony_ci 4037db96d56Sopenharmony_ci sock.bind.side_effect = MemoryError 4047db96d56Sopenharmony_ci coro = self.loop.create_unix_server(lambda: None, path="/test") 4057db96d56Sopenharmony_ci with self.assertRaises(MemoryError): 4067db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 4077db96d56Sopenharmony_ci self.assertTrue(sock.close.called) 4087db96d56Sopenharmony_ci 4097db96d56Sopenharmony_ci def test_create_unix_connection_path_sock(self): 4107db96d56Sopenharmony_ci coro = self.loop.create_unix_connection( 4117db96d56Sopenharmony_ci lambda: None, os.devnull, sock=object()) 4127db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 'path and sock can not be'): 4137db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_ci def test_create_unix_connection_nopath_nosock(self): 4167db96d56Sopenharmony_ci coro = self.loop.create_unix_connection( 4177db96d56Sopenharmony_ci lambda: None, None) 4187db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 4197db96d56Sopenharmony_ci 'no path and sock were specified'): 4207db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 4217db96d56Sopenharmony_ci 4227db96d56Sopenharmony_ci def test_create_unix_connection_nossl_serverhost(self): 4237db96d56Sopenharmony_ci coro = self.loop.create_unix_connection( 4247db96d56Sopenharmony_ci lambda: None, os.devnull, server_hostname='spam') 4257db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, 4267db96d56Sopenharmony_ci 'server_hostname is only meaningful'): 4277db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 4287db96d56Sopenharmony_ci 4297db96d56Sopenharmony_ci def test_create_unix_connection_ssl_noserverhost(self): 4307db96d56Sopenharmony_ci coro = self.loop.create_unix_connection( 4317db96d56Sopenharmony_ci lambda: None, os.devnull, ssl=True) 4327db96d56Sopenharmony_ci 4337db96d56Sopenharmony_ci with self.assertRaisesRegex( 4347db96d56Sopenharmony_ci ValueError, 'you have to pass server_hostname when using ssl'): 4357db96d56Sopenharmony_ci 4367db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 4377db96d56Sopenharmony_ci 4387db96d56Sopenharmony_ci def test_create_unix_connection_ssl_timeout_with_plain_sock(self): 4397db96d56Sopenharmony_ci coro = self.loop.create_unix_connection(lambda: None, path='spam', 4407db96d56Sopenharmony_ci ssl_handshake_timeout=1) 4417db96d56Sopenharmony_ci with self.assertRaisesRegex( 4427db96d56Sopenharmony_ci ValueError, 4437db96d56Sopenharmony_ci 'ssl_handshake_timeout is only meaningful with ssl'): 4447db96d56Sopenharmony_ci self.loop.run_until_complete(coro) 4457db96d56Sopenharmony_ci 4467db96d56Sopenharmony_ci 4477db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(os, 'sendfile'), 4487db96d56Sopenharmony_ci 'sendfile is not supported') 4497db96d56Sopenharmony_ciclass SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase): 4507db96d56Sopenharmony_ci DATA = b"12345abcde" * 16 * 1024 # 160 KiB 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci class MyProto(asyncio.Protocol): 4537db96d56Sopenharmony_ci 4547db96d56Sopenharmony_ci def __init__(self, loop): 4557db96d56Sopenharmony_ci self.started = False 4567db96d56Sopenharmony_ci self.closed = False 4577db96d56Sopenharmony_ci self.data = bytearray() 4587db96d56Sopenharmony_ci self.fut = loop.create_future() 4597db96d56Sopenharmony_ci self.transport = None 4607db96d56Sopenharmony_ci self._ready = loop.create_future() 4617db96d56Sopenharmony_ci 4627db96d56Sopenharmony_ci def connection_made(self, transport): 4637db96d56Sopenharmony_ci self.started = True 4647db96d56Sopenharmony_ci self.transport = transport 4657db96d56Sopenharmony_ci self._ready.set_result(None) 4667db96d56Sopenharmony_ci 4677db96d56Sopenharmony_ci def data_received(self, data): 4687db96d56Sopenharmony_ci self.data.extend(data) 4697db96d56Sopenharmony_ci 4707db96d56Sopenharmony_ci def connection_lost(self, exc): 4717db96d56Sopenharmony_ci self.closed = True 4727db96d56Sopenharmony_ci self.fut.set_result(None) 4737db96d56Sopenharmony_ci 4747db96d56Sopenharmony_ci async def wait_closed(self): 4757db96d56Sopenharmony_ci await self.fut 4767db96d56Sopenharmony_ci 4777db96d56Sopenharmony_ci @classmethod 4787db96d56Sopenharmony_ci def setUpClass(cls): 4797db96d56Sopenharmony_ci with open(os_helper.TESTFN, 'wb') as fp: 4807db96d56Sopenharmony_ci fp.write(cls.DATA) 4817db96d56Sopenharmony_ci super().setUpClass() 4827db96d56Sopenharmony_ci 4837db96d56Sopenharmony_ci @classmethod 4847db96d56Sopenharmony_ci def tearDownClass(cls): 4857db96d56Sopenharmony_ci os_helper.unlink(os_helper.TESTFN) 4867db96d56Sopenharmony_ci super().tearDownClass() 4877db96d56Sopenharmony_ci 4887db96d56Sopenharmony_ci def setUp(self): 4897db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 4907db96d56Sopenharmony_ci self.set_event_loop(self.loop) 4917db96d56Sopenharmony_ci self.file = open(os_helper.TESTFN, 'rb') 4927db96d56Sopenharmony_ci self.addCleanup(self.file.close) 4937db96d56Sopenharmony_ci super().setUp() 4947db96d56Sopenharmony_ci 4957db96d56Sopenharmony_ci def make_socket(self, cleanup=True): 4967db96d56Sopenharmony_ci sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 4977db96d56Sopenharmony_ci sock.setblocking(False) 4987db96d56Sopenharmony_ci sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024) 4997db96d56Sopenharmony_ci sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024) 5007db96d56Sopenharmony_ci if cleanup: 5017db96d56Sopenharmony_ci self.addCleanup(sock.close) 5027db96d56Sopenharmony_ci return sock 5037db96d56Sopenharmony_ci 5047db96d56Sopenharmony_ci def run_loop(self, coro): 5057db96d56Sopenharmony_ci return self.loop.run_until_complete(coro) 5067db96d56Sopenharmony_ci 5077db96d56Sopenharmony_ci def prepare(self): 5087db96d56Sopenharmony_ci sock = self.make_socket() 5097db96d56Sopenharmony_ci proto = self.MyProto(self.loop) 5107db96d56Sopenharmony_ci port = socket_helper.find_unused_port() 5117db96d56Sopenharmony_ci srv_sock = self.make_socket(cleanup=False) 5127db96d56Sopenharmony_ci srv_sock.bind((socket_helper.HOST, port)) 5137db96d56Sopenharmony_ci server = self.run_loop(self.loop.create_server( 5147db96d56Sopenharmony_ci lambda: proto, sock=srv_sock)) 5157db96d56Sopenharmony_ci self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port))) 5167db96d56Sopenharmony_ci self.run_loop(proto._ready) 5177db96d56Sopenharmony_ci 5187db96d56Sopenharmony_ci def cleanup(): 5197db96d56Sopenharmony_ci proto.transport.close() 5207db96d56Sopenharmony_ci self.run_loop(proto.wait_closed()) 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci server.close() 5237db96d56Sopenharmony_ci self.run_loop(server.wait_closed()) 5247db96d56Sopenharmony_ci 5257db96d56Sopenharmony_ci self.addCleanup(cleanup) 5267db96d56Sopenharmony_ci 5277db96d56Sopenharmony_ci return sock, proto 5287db96d56Sopenharmony_ci 5297db96d56Sopenharmony_ci def test_sock_sendfile_not_available(self): 5307db96d56Sopenharmony_ci sock, proto = self.prepare() 5317db96d56Sopenharmony_ci with mock.patch('asyncio.unix_events.os', spec=[]): 5327db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 5337db96d56Sopenharmony_ci "os[.]sendfile[(][)] is not available"): 5347db96d56Sopenharmony_ci self.run_loop(self.loop._sock_sendfile_native(sock, self.file, 5357db96d56Sopenharmony_ci 0, None)) 5367db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), 0) 5377db96d56Sopenharmony_ci 5387db96d56Sopenharmony_ci def test_sock_sendfile_not_a_file(self): 5397db96d56Sopenharmony_ci sock, proto = self.prepare() 5407db96d56Sopenharmony_ci f = object() 5417db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 5427db96d56Sopenharmony_ci "not a regular file"): 5437db96d56Sopenharmony_ci self.run_loop(self.loop._sock_sendfile_native(sock, f, 5447db96d56Sopenharmony_ci 0, None)) 5457db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), 0) 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci def test_sock_sendfile_iobuffer(self): 5487db96d56Sopenharmony_ci sock, proto = self.prepare() 5497db96d56Sopenharmony_ci f = io.BytesIO() 5507db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 5517db96d56Sopenharmony_ci "not a regular file"): 5527db96d56Sopenharmony_ci self.run_loop(self.loop._sock_sendfile_native(sock, f, 5537db96d56Sopenharmony_ci 0, None)) 5547db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), 0) 5557db96d56Sopenharmony_ci 5567db96d56Sopenharmony_ci def test_sock_sendfile_not_regular_file(self): 5577db96d56Sopenharmony_ci sock, proto = self.prepare() 5587db96d56Sopenharmony_ci f = mock.Mock() 5597db96d56Sopenharmony_ci f.fileno.return_value = -1 5607db96d56Sopenharmony_ci with self.assertRaisesRegex(asyncio.SendfileNotAvailableError, 5617db96d56Sopenharmony_ci "not a regular file"): 5627db96d56Sopenharmony_ci self.run_loop(self.loop._sock_sendfile_native(sock, f, 5637db96d56Sopenharmony_ci 0, None)) 5647db96d56Sopenharmony_ci self.assertEqual(self.file.tell(), 0) 5657db96d56Sopenharmony_ci 5667db96d56Sopenharmony_ci def test_sock_sendfile_cancel1(self): 5677db96d56Sopenharmony_ci sock, proto = self.prepare() 5687db96d56Sopenharmony_ci 5697db96d56Sopenharmony_ci fut = self.loop.create_future() 5707db96d56Sopenharmony_ci fileno = self.file.fileno() 5717db96d56Sopenharmony_ci self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 5727db96d56Sopenharmony_ci 0, None, len(self.DATA), 0) 5737db96d56Sopenharmony_ci fut.cancel() 5747db96d56Sopenharmony_ci with contextlib.suppress(asyncio.CancelledError): 5757db96d56Sopenharmony_ci self.run_loop(fut) 5767db96d56Sopenharmony_ci with self.assertRaises(KeyError): 5777db96d56Sopenharmony_ci self.loop._selector.get_key(sock) 5787db96d56Sopenharmony_ci 5797db96d56Sopenharmony_ci def test_sock_sendfile_cancel2(self): 5807db96d56Sopenharmony_ci sock, proto = self.prepare() 5817db96d56Sopenharmony_ci 5827db96d56Sopenharmony_ci fut = self.loop.create_future() 5837db96d56Sopenharmony_ci fileno = self.file.fileno() 5847db96d56Sopenharmony_ci self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 5857db96d56Sopenharmony_ci 0, None, len(self.DATA), 0) 5867db96d56Sopenharmony_ci fut.cancel() 5877db96d56Sopenharmony_ci self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno, 5887db96d56Sopenharmony_ci 0, None, len(self.DATA), 0) 5897db96d56Sopenharmony_ci with self.assertRaises(KeyError): 5907db96d56Sopenharmony_ci self.loop._selector.get_key(sock) 5917db96d56Sopenharmony_ci 5927db96d56Sopenharmony_ci def test_sock_sendfile_blocking_error(self): 5937db96d56Sopenharmony_ci sock, proto = self.prepare() 5947db96d56Sopenharmony_ci 5957db96d56Sopenharmony_ci fileno = self.file.fileno() 5967db96d56Sopenharmony_ci fut = mock.Mock() 5977db96d56Sopenharmony_ci fut.cancelled.return_value = False 5987db96d56Sopenharmony_ci with mock.patch('os.sendfile', side_effect=BlockingIOError()): 5997db96d56Sopenharmony_ci self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 6007db96d56Sopenharmony_ci 0, None, len(self.DATA), 0) 6017db96d56Sopenharmony_ci key = self.loop._selector.get_key(sock) 6027db96d56Sopenharmony_ci self.assertIsNotNone(key) 6037db96d56Sopenharmony_ci fut.add_done_callback.assert_called_once_with(mock.ANY) 6047db96d56Sopenharmony_ci 6057db96d56Sopenharmony_ci def test_sock_sendfile_os_error_first_call(self): 6067db96d56Sopenharmony_ci sock, proto = self.prepare() 6077db96d56Sopenharmony_ci 6087db96d56Sopenharmony_ci fileno = self.file.fileno() 6097db96d56Sopenharmony_ci fut = self.loop.create_future() 6107db96d56Sopenharmony_ci with mock.patch('os.sendfile', side_effect=OSError()): 6117db96d56Sopenharmony_ci self.loop._sock_sendfile_native_impl(fut, None, sock, fileno, 6127db96d56Sopenharmony_ci 0, None, len(self.DATA), 0) 6137db96d56Sopenharmony_ci with self.assertRaises(KeyError): 6147db96d56Sopenharmony_ci self.loop._selector.get_key(sock) 6157db96d56Sopenharmony_ci exc = fut.exception() 6167db96d56Sopenharmony_ci self.assertIsInstance(exc, asyncio.SendfileNotAvailableError) 6177db96d56Sopenharmony_ci self.assertEqual(0, self.file.tell()) 6187db96d56Sopenharmony_ci 6197db96d56Sopenharmony_ci def test_sock_sendfile_os_error_next_call(self): 6207db96d56Sopenharmony_ci sock, proto = self.prepare() 6217db96d56Sopenharmony_ci 6227db96d56Sopenharmony_ci fileno = self.file.fileno() 6237db96d56Sopenharmony_ci fut = self.loop.create_future() 6247db96d56Sopenharmony_ci err = OSError() 6257db96d56Sopenharmony_ci with mock.patch('os.sendfile', side_effect=err): 6267db96d56Sopenharmony_ci self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 6277db96d56Sopenharmony_ci sock, fileno, 6287db96d56Sopenharmony_ci 1000, None, len(self.DATA), 6297db96d56Sopenharmony_ci 1000) 6307db96d56Sopenharmony_ci with self.assertRaises(KeyError): 6317db96d56Sopenharmony_ci self.loop._selector.get_key(sock) 6327db96d56Sopenharmony_ci exc = fut.exception() 6337db96d56Sopenharmony_ci self.assertIs(exc, err) 6347db96d56Sopenharmony_ci self.assertEqual(1000, self.file.tell()) 6357db96d56Sopenharmony_ci 6367db96d56Sopenharmony_ci def test_sock_sendfile_exception(self): 6377db96d56Sopenharmony_ci sock, proto = self.prepare() 6387db96d56Sopenharmony_ci 6397db96d56Sopenharmony_ci fileno = self.file.fileno() 6407db96d56Sopenharmony_ci fut = self.loop.create_future() 6417db96d56Sopenharmony_ci err = asyncio.SendfileNotAvailableError() 6427db96d56Sopenharmony_ci with mock.patch('os.sendfile', side_effect=err): 6437db96d56Sopenharmony_ci self.loop._sock_sendfile_native_impl(fut, sock.fileno(), 6447db96d56Sopenharmony_ci sock, fileno, 6457db96d56Sopenharmony_ci 1000, None, len(self.DATA), 6467db96d56Sopenharmony_ci 1000) 6477db96d56Sopenharmony_ci with self.assertRaises(KeyError): 6487db96d56Sopenharmony_ci self.loop._selector.get_key(sock) 6497db96d56Sopenharmony_ci exc = fut.exception() 6507db96d56Sopenharmony_ci self.assertIs(exc, err) 6517db96d56Sopenharmony_ci self.assertEqual(1000, self.file.tell()) 6527db96d56Sopenharmony_ci 6537db96d56Sopenharmony_ci 6547db96d56Sopenharmony_ciclass UnixReadPipeTransportTests(test_utils.TestCase): 6557db96d56Sopenharmony_ci 6567db96d56Sopenharmony_ci def setUp(self): 6577db96d56Sopenharmony_ci super().setUp() 6587db96d56Sopenharmony_ci self.loop = self.new_test_loop() 6597db96d56Sopenharmony_ci self.protocol = test_utils.make_test_protocol(asyncio.Protocol) 6607db96d56Sopenharmony_ci self.pipe = mock.Mock(spec_set=io.RawIOBase) 6617db96d56Sopenharmony_ci self.pipe.fileno.return_value = 5 6627db96d56Sopenharmony_ci 6637db96d56Sopenharmony_ci blocking_patcher = mock.patch('os.set_blocking') 6647db96d56Sopenharmony_ci blocking_patcher.start() 6657db96d56Sopenharmony_ci self.addCleanup(blocking_patcher.stop) 6667db96d56Sopenharmony_ci 6677db96d56Sopenharmony_ci fstat_patcher = mock.patch('os.fstat') 6687db96d56Sopenharmony_ci m_fstat = fstat_patcher.start() 6697db96d56Sopenharmony_ci st = mock.Mock() 6707db96d56Sopenharmony_ci st.st_mode = stat.S_IFIFO 6717db96d56Sopenharmony_ci m_fstat.return_value = st 6727db96d56Sopenharmony_ci self.addCleanup(fstat_patcher.stop) 6737db96d56Sopenharmony_ci 6747db96d56Sopenharmony_ci def read_pipe_transport(self, waiter=None): 6757db96d56Sopenharmony_ci transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe, 6767db96d56Sopenharmony_ci self.protocol, 6777db96d56Sopenharmony_ci waiter=waiter) 6787db96d56Sopenharmony_ci self.addCleanup(close_pipe_transport, transport) 6797db96d56Sopenharmony_ci return transport 6807db96d56Sopenharmony_ci 6817db96d56Sopenharmony_ci def test_ctor(self): 6827db96d56Sopenharmony_ci waiter = self.loop.create_future() 6837db96d56Sopenharmony_ci tr = self.read_pipe_transport(waiter=waiter) 6847db96d56Sopenharmony_ci self.loop.run_until_complete(waiter) 6857db96d56Sopenharmony_ci 6867db96d56Sopenharmony_ci self.protocol.connection_made.assert_called_with(tr) 6877db96d56Sopenharmony_ci self.loop.assert_reader(5, tr._read_ready) 6887db96d56Sopenharmony_ci self.assertIsNone(waiter.result()) 6897db96d56Sopenharmony_ci 6907db96d56Sopenharmony_ci @mock.patch('os.read') 6917db96d56Sopenharmony_ci def test__read_ready(self, m_read): 6927db96d56Sopenharmony_ci tr = self.read_pipe_transport() 6937db96d56Sopenharmony_ci m_read.return_value = b'data' 6947db96d56Sopenharmony_ci tr._read_ready() 6957db96d56Sopenharmony_ci 6967db96d56Sopenharmony_ci m_read.assert_called_with(5, tr.max_size) 6977db96d56Sopenharmony_ci self.protocol.data_received.assert_called_with(b'data') 6987db96d56Sopenharmony_ci 6997db96d56Sopenharmony_ci @mock.patch('os.read') 7007db96d56Sopenharmony_ci def test__read_ready_eof(self, m_read): 7017db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7027db96d56Sopenharmony_ci m_read.return_value = b'' 7037db96d56Sopenharmony_ci tr._read_ready() 7047db96d56Sopenharmony_ci 7057db96d56Sopenharmony_ci m_read.assert_called_with(5, tr.max_size) 7067db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 7077db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 7087db96d56Sopenharmony_ci self.protocol.eof_received.assert_called_with() 7097db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(None) 7107db96d56Sopenharmony_ci 7117db96d56Sopenharmony_ci @mock.patch('os.read') 7127db96d56Sopenharmony_ci def test__read_ready_blocked(self, m_read): 7137db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7147db96d56Sopenharmony_ci m_read.side_effect = BlockingIOError 7157db96d56Sopenharmony_ci tr._read_ready() 7167db96d56Sopenharmony_ci 7177db96d56Sopenharmony_ci m_read.assert_called_with(5, tr.max_size) 7187db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 7197db96d56Sopenharmony_ci self.assertFalse(self.protocol.data_received.called) 7207db96d56Sopenharmony_ci 7217db96d56Sopenharmony_ci @mock.patch('asyncio.log.logger.error') 7227db96d56Sopenharmony_ci @mock.patch('os.read') 7237db96d56Sopenharmony_ci def test__read_ready_error(self, m_read, m_logexc): 7247db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7257db96d56Sopenharmony_ci err = OSError() 7267db96d56Sopenharmony_ci m_read.side_effect = err 7277db96d56Sopenharmony_ci tr._close = mock.Mock() 7287db96d56Sopenharmony_ci tr._read_ready() 7297db96d56Sopenharmony_ci 7307db96d56Sopenharmony_ci m_read.assert_called_with(5, tr.max_size) 7317db96d56Sopenharmony_ci tr._close.assert_called_with(err) 7327db96d56Sopenharmony_ci m_logexc.assert_called_with( 7337db96d56Sopenharmony_ci test_utils.MockPattern( 7347db96d56Sopenharmony_ci 'Fatal read error on pipe transport' 7357db96d56Sopenharmony_ci '\nprotocol:.*\ntransport:.*'), 7367db96d56Sopenharmony_ci exc_info=(OSError, MOCK_ANY, MOCK_ANY)) 7377db96d56Sopenharmony_ci 7387db96d56Sopenharmony_ci @mock.patch('os.read') 7397db96d56Sopenharmony_ci def test_pause_reading(self, m_read): 7407db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7417db96d56Sopenharmony_ci m = mock.Mock() 7427db96d56Sopenharmony_ci self.loop.add_reader(5, m) 7437db96d56Sopenharmony_ci tr.pause_reading() 7447db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 7457db96d56Sopenharmony_ci 7467db96d56Sopenharmony_ci @mock.patch('os.read') 7477db96d56Sopenharmony_ci def test_resume_reading(self, m_read): 7487db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7497db96d56Sopenharmony_ci tr.pause_reading() 7507db96d56Sopenharmony_ci tr.resume_reading() 7517db96d56Sopenharmony_ci self.loop.assert_reader(5, tr._read_ready) 7527db96d56Sopenharmony_ci 7537db96d56Sopenharmony_ci @mock.patch('os.read') 7547db96d56Sopenharmony_ci def test_close(self, m_read): 7557db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7567db96d56Sopenharmony_ci tr._close = mock.Mock() 7577db96d56Sopenharmony_ci tr.close() 7587db96d56Sopenharmony_ci tr._close.assert_called_with(None) 7597db96d56Sopenharmony_ci 7607db96d56Sopenharmony_ci @mock.patch('os.read') 7617db96d56Sopenharmony_ci def test_close_already_closing(self, m_read): 7627db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7637db96d56Sopenharmony_ci tr._closing = True 7647db96d56Sopenharmony_ci tr._close = mock.Mock() 7657db96d56Sopenharmony_ci tr.close() 7667db96d56Sopenharmony_ci self.assertFalse(tr._close.called) 7677db96d56Sopenharmony_ci 7687db96d56Sopenharmony_ci @mock.patch('os.read') 7697db96d56Sopenharmony_ci def test__close(self, m_read): 7707db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7717db96d56Sopenharmony_ci err = object() 7727db96d56Sopenharmony_ci tr._close(err) 7737db96d56Sopenharmony_ci self.assertTrue(tr.is_closing()) 7747db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 7757db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 7767db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(err) 7777db96d56Sopenharmony_ci 7787db96d56Sopenharmony_ci def test__call_connection_lost(self): 7797db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7807db96d56Sopenharmony_ci self.assertIsNotNone(tr._protocol) 7817db96d56Sopenharmony_ci self.assertIsNotNone(tr._loop) 7827db96d56Sopenharmony_ci 7837db96d56Sopenharmony_ci err = None 7847db96d56Sopenharmony_ci tr._call_connection_lost(err) 7857db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(err) 7867db96d56Sopenharmony_ci self.pipe.close.assert_called_with() 7877db96d56Sopenharmony_ci 7887db96d56Sopenharmony_ci self.assertIsNone(tr._protocol) 7897db96d56Sopenharmony_ci self.assertIsNone(tr._loop) 7907db96d56Sopenharmony_ci 7917db96d56Sopenharmony_ci def test__call_connection_lost_with_err(self): 7927db96d56Sopenharmony_ci tr = self.read_pipe_transport() 7937db96d56Sopenharmony_ci self.assertIsNotNone(tr._protocol) 7947db96d56Sopenharmony_ci self.assertIsNotNone(tr._loop) 7957db96d56Sopenharmony_ci 7967db96d56Sopenharmony_ci err = OSError() 7977db96d56Sopenharmony_ci tr._call_connection_lost(err) 7987db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(err) 7997db96d56Sopenharmony_ci self.pipe.close.assert_called_with() 8007db96d56Sopenharmony_ci 8017db96d56Sopenharmony_ci self.assertIsNone(tr._protocol) 8027db96d56Sopenharmony_ci self.assertIsNone(tr._loop) 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci def test_pause_reading_on_closed_pipe(self): 8057db96d56Sopenharmony_ci tr = self.read_pipe_transport() 8067db96d56Sopenharmony_ci tr.close() 8077db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8087db96d56Sopenharmony_ci self.assertIsNone(tr._loop) 8097db96d56Sopenharmony_ci tr.pause_reading() 8107db96d56Sopenharmony_ci 8117db96d56Sopenharmony_ci def test_pause_reading_on_paused_pipe(self): 8127db96d56Sopenharmony_ci tr = self.read_pipe_transport() 8137db96d56Sopenharmony_ci tr.pause_reading() 8147db96d56Sopenharmony_ci # the second call should do nothing 8157db96d56Sopenharmony_ci tr.pause_reading() 8167db96d56Sopenharmony_ci 8177db96d56Sopenharmony_ci def test_resume_reading_on_closed_pipe(self): 8187db96d56Sopenharmony_ci tr = self.read_pipe_transport() 8197db96d56Sopenharmony_ci tr.close() 8207db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 8217db96d56Sopenharmony_ci self.assertIsNone(tr._loop) 8227db96d56Sopenharmony_ci tr.resume_reading() 8237db96d56Sopenharmony_ci 8247db96d56Sopenharmony_ci def test_resume_reading_on_paused_pipe(self): 8257db96d56Sopenharmony_ci tr = self.read_pipe_transport() 8267db96d56Sopenharmony_ci # the pipe is not paused 8277db96d56Sopenharmony_ci # resuming should do nothing 8287db96d56Sopenharmony_ci tr.resume_reading() 8297db96d56Sopenharmony_ci 8307db96d56Sopenharmony_ci 8317db96d56Sopenharmony_ciclass UnixWritePipeTransportTests(test_utils.TestCase): 8327db96d56Sopenharmony_ci 8337db96d56Sopenharmony_ci def setUp(self): 8347db96d56Sopenharmony_ci super().setUp() 8357db96d56Sopenharmony_ci self.loop = self.new_test_loop() 8367db96d56Sopenharmony_ci self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol) 8377db96d56Sopenharmony_ci self.pipe = mock.Mock(spec_set=io.RawIOBase) 8387db96d56Sopenharmony_ci self.pipe.fileno.return_value = 5 8397db96d56Sopenharmony_ci 8407db96d56Sopenharmony_ci blocking_patcher = mock.patch('os.set_blocking') 8417db96d56Sopenharmony_ci blocking_patcher.start() 8427db96d56Sopenharmony_ci self.addCleanup(blocking_patcher.stop) 8437db96d56Sopenharmony_ci 8447db96d56Sopenharmony_ci fstat_patcher = mock.patch('os.fstat') 8457db96d56Sopenharmony_ci m_fstat = fstat_patcher.start() 8467db96d56Sopenharmony_ci st = mock.Mock() 8477db96d56Sopenharmony_ci st.st_mode = stat.S_IFSOCK 8487db96d56Sopenharmony_ci m_fstat.return_value = st 8497db96d56Sopenharmony_ci self.addCleanup(fstat_patcher.stop) 8507db96d56Sopenharmony_ci 8517db96d56Sopenharmony_ci def write_pipe_transport(self, waiter=None): 8527db96d56Sopenharmony_ci transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe, 8537db96d56Sopenharmony_ci self.protocol, 8547db96d56Sopenharmony_ci waiter=waiter) 8557db96d56Sopenharmony_ci self.addCleanup(close_pipe_transport, transport) 8567db96d56Sopenharmony_ci return transport 8577db96d56Sopenharmony_ci 8587db96d56Sopenharmony_ci def test_ctor(self): 8597db96d56Sopenharmony_ci waiter = self.loop.create_future() 8607db96d56Sopenharmony_ci tr = self.write_pipe_transport(waiter=waiter) 8617db96d56Sopenharmony_ci self.loop.run_until_complete(waiter) 8627db96d56Sopenharmony_ci 8637db96d56Sopenharmony_ci self.protocol.connection_made.assert_called_with(tr) 8647db96d56Sopenharmony_ci self.loop.assert_reader(5, tr._read_ready) 8657db96d56Sopenharmony_ci self.assertEqual(None, waiter.result()) 8667db96d56Sopenharmony_ci 8677db96d56Sopenharmony_ci def test_can_write_eof(self): 8687db96d56Sopenharmony_ci tr = self.write_pipe_transport() 8697db96d56Sopenharmony_ci self.assertTrue(tr.can_write_eof()) 8707db96d56Sopenharmony_ci 8717db96d56Sopenharmony_ci @mock.patch('os.write') 8727db96d56Sopenharmony_ci def test_write(self, m_write): 8737db96d56Sopenharmony_ci tr = self.write_pipe_transport() 8747db96d56Sopenharmony_ci m_write.return_value = 4 8757db96d56Sopenharmony_ci tr.write(b'data') 8767db96d56Sopenharmony_ci m_write.assert_called_with(5, b'data') 8777db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 8787db96d56Sopenharmony_ci self.assertEqual(bytearray(), tr._buffer) 8797db96d56Sopenharmony_ci 8807db96d56Sopenharmony_ci @mock.patch('os.write') 8817db96d56Sopenharmony_ci def test_write_no_data(self, m_write): 8827db96d56Sopenharmony_ci tr = self.write_pipe_transport() 8837db96d56Sopenharmony_ci tr.write(b'') 8847db96d56Sopenharmony_ci self.assertFalse(m_write.called) 8857db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 8867db96d56Sopenharmony_ci self.assertEqual(bytearray(b''), tr._buffer) 8877db96d56Sopenharmony_ci 8887db96d56Sopenharmony_ci @mock.patch('os.write') 8897db96d56Sopenharmony_ci def test_write_partial(self, m_write): 8907db96d56Sopenharmony_ci tr = self.write_pipe_transport() 8917db96d56Sopenharmony_ci m_write.return_value = 2 8927db96d56Sopenharmony_ci tr.write(b'data') 8937db96d56Sopenharmony_ci self.loop.assert_writer(5, tr._write_ready) 8947db96d56Sopenharmony_ci self.assertEqual(bytearray(b'ta'), tr._buffer) 8957db96d56Sopenharmony_ci 8967db96d56Sopenharmony_ci @mock.patch('os.write') 8977db96d56Sopenharmony_ci def test_write_buffer(self, m_write): 8987db96d56Sopenharmony_ci tr = self.write_pipe_transport() 8997db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 9007db96d56Sopenharmony_ci tr._buffer = bytearray(b'previous') 9017db96d56Sopenharmony_ci tr.write(b'data') 9027db96d56Sopenharmony_ci self.assertFalse(m_write.called) 9037db96d56Sopenharmony_ci self.loop.assert_writer(5, tr._write_ready) 9047db96d56Sopenharmony_ci self.assertEqual(bytearray(b'previousdata'), tr._buffer) 9057db96d56Sopenharmony_ci 9067db96d56Sopenharmony_ci @mock.patch('os.write') 9077db96d56Sopenharmony_ci def test_write_again(self, m_write): 9087db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9097db96d56Sopenharmony_ci m_write.side_effect = BlockingIOError() 9107db96d56Sopenharmony_ci tr.write(b'data') 9117db96d56Sopenharmony_ci m_write.assert_called_with(5, bytearray(b'data')) 9127db96d56Sopenharmony_ci self.loop.assert_writer(5, tr._write_ready) 9137db96d56Sopenharmony_ci self.assertEqual(bytearray(b'data'), tr._buffer) 9147db96d56Sopenharmony_ci 9157db96d56Sopenharmony_ci @mock.patch('asyncio.unix_events.logger') 9167db96d56Sopenharmony_ci @mock.patch('os.write') 9177db96d56Sopenharmony_ci def test_write_err(self, m_write, m_log): 9187db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9197db96d56Sopenharmony_ci err = OSError() 9207db96d56Sopenharmony_ci m_write.side_effect = err 9217db96d56Sopenharmony_ci tr._fatal_error = mock.Mock() 9227db96d56Sopenharmony_ci tr.write(b'data') 9237db96d56Sopenharmony_ci m_write.assert_called_with(5, b'data') 9247db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 9257db96d56Sopenharmony_ci self.assertEqual(bytearray(), tr._buffer) 9267db96d56Sopenharmony_ci tr._fatal_error.assert_called_with( 9277db96d56Sopenharmony_ci err, 9287db96d56Sopenharmony_ci 'Fatal write error on pipe transport') 9297db96d56Sopenharmony_ci self.assertEqual(1, tr._conn_lost) 9307db96d56Sopenharmony_ci 9317db96d56Sopenharmony_ci tr.write(b'data') 9327db96d56Sopenharmony_ci self.assertEqual(2, tr._conn_lost) 9337db96d56Sopenharmony_ci tr.write(b'data') 9347db96d56Sopenharmony_ci tr.write(b'data') 9357db96d56Sopenharmony_ci tr.write(b'data') 9367db96d56Sopenharmony_ci tr.write(b'data') 9377db96d56Sopenharmony_ci # This is a bit overspecified. :-( 9387db96d56Sopenharmony_ci m_log.warning.assert_called_with( 9397db96d56Sopenharmony_ci 'pipe closed by peer or os.write(pipe, data) raised exception.') 9407db96d56Sopenharmony_ci tr.close() 9417db96d56Sopenharmony_ci 9427db96d56Sopenharmony_ci @mock.patch('os.write') 9437db96d56Sopenharmony_ci def test_write_close(self, m_write): 9447db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9457db96d56Sopenharmony_ci tr._read_ready() # pipe was closed by peer 9467db96d56Sopenharmony_ci 9477db96d56Sopenharmony_ci tr.write(b'data') 9487db96d56Sopenharmony_ci self.assertEqual(tr._conn_lost, 1) 9497db96d56Sopenharmony_ci tr.write(b'data') 9507db96d56Sopenharmony_ci self.assertEqual(tr._conn_lost, 2) 9517db96d56Sopenharmony_ci 9527db96d56Sopenharmony_ci def test__read_ready(self): 9537db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9547db96d56Sopenharmony_ci tr._read_ready() 9557db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 9567db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 9577db96d56Sopenharmony_ci self.assertTrue(tr.is_closing()) 9587db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 9597db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(None) 9607db96d56Sopenharmony_ci 9617db96d56Sopenharmony_ci @mock.patch('os.write') 9627db96d56Sopenharmony_ci def test__write_ready(self, m_write): 9637db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9647db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 9657db96d56Sopenharmony_ci tr._buffer = bytearray(b'data') 9667db96d56Sopenharmony_ci m_write.return_value = 4 9677db96d56Sopenharmony_ci tr._write_ready() 9687db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 9697db96d56Sopenharmony_ci self.assertEqual(bytearray(), tr._buffer) 9707db96d56Sopenharmony_ci 9717db96d56Sopenharmony_ci @mock.patch('os.write') 9727db96d56Sopenharmony_ci def test__write_ready_partial(self, m_write): 9737db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9747db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 9757db96d56Sopenharmony_ci tr._buffer = bytearray(b'data') 9767db96d56Sopenharmony_ci m_write.return_value = 3 9777db96d56Sopenharmony_ci tr._write_ready() 9787db96d56Sopenharmony_ci self.loop.assert_writer(5, tr._write_ready) 9797db96d56Sopenharmony_ci self.assertEqual(bytearray(b'a'), tr._buffer) 9807db96d56Sopenharmony_ci 9817db96d56Sopenharmony_ci @mock.patch('os.write') 9827db96d56Sopenharmony_ci def test__write_ready_again(self, m_write): 9837db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9847db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 9857db96d56Sopenharmony_ci tr._buffer = bytearray(b'data') 9867db96d56Sopenharmony_ci m_write.side_effect = BlockingIOError() 9877db96d56Sopenharmony_ci tr._write_ready() 9887db96d56Sopenharmony_ci m_write.assert_called_with(5, bytearray(b'data')) 9897db96d56Sopenharmony_ci self.loop.assert_writer(5, tr._write_ready) 9907db96d56Sopenharmony_ci self.assertEqual(bytearray(b'data'), tr._buffer) 9917db96d56Sopenharmony_ci 9927db96d56Sopenharmony_ci @mock.patch('os.write') 9937db96d56Sopenharmony_ci def test__write_ready_empty(self, m_write): 9947db96d56Sopenharmony_ci tr = self.write_pipe_transport() 9957db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 9967db96d56Sopenharmony_ci tr._buffer = bytearray(b'data') 9977db96d56Sopenharmony_ci m_write.return_value = 0 9987db96d56Sopenharmony_ci tr._write_ready() 9997db96d56Sopenharmony_ci m_write.assert_called_with(5, bytearray(b'data')) 10007db96d56Sopenharmony_ci self.loop.assert_writer(5, tr._write_ready) 10017db96d56Sopenharmony_ci self.assertEqual(bytearray(b'data'), tr._buffer) 10027db96d56Sopenharmony_ci 10037db96d56Sopenharmony_ci @mock.patch('asyncio.log.logger.error') 10047db96d56Sopenharmony_ci @mock.patch('os.write') 10057db96d56Sopenharmony_ci def test__write_ready_err(self, m_write, m_logexc): 10067db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10077db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 10087db96d56Sopenharmony_ci tr._buffer = bytearray(b'data') 10097db96d56Sopenharmony_ci m_write.side_effect = err = OSError() 10107db96d56Sopenharmony_ci tr._write_ready() 10117db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 10127db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 10137db96d56Sopenharmony_ci self.assertEqual(bytearray(), tr._buffer) 10147db96d56Sopenharmony_ci self.assertTrue(tr.is_closing()) 10157db96d56Sopenharmony_ci m_logexc.assert_not_called() 10167db96d56Sopenharmony_ci self.assertEqual(1, tr._conn_lost) 10177db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 10187db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(err) 10197db96d56Sopenharmony_ci 10207db96d56Sopenharmony_ci @mock.patch('os.write') 10217db96d56Sopenharmony_ci def test__write_ready_closing(self, m_write): 10227db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10237db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 10247db96d56Sopenharmony_ci tr._closing = True 10257db96d56Sopenharmony_ci tr._buffer = bytearray(b'data') 10267db96d56Sopenharmony_ci m_write.return_value = 4 10277db96d56Sopenharmony_ci tr._write_ready() 10287db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 10297db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 10307db96d56Sopenharmony_ci self.assertEqual(bytearray(), tr._buffer) 10317db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(None) 10327db96d56Sopenharmony_ci self.pipe.close.assert_called_with() 10337db96d56Sopenharmony_ci 10347db96d56Sopenharmony_ci @mock.patch('os.write') 10357db96d56Sopenharmony_ci def test_abort(self, m_write): 10367db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10377db96d56Sopenharmony_ci self.loop.add_writer(5, tr._write_ready) 10387db96d56Sopenharmony_ci self.loop.add_reader(5, tr._read_ready) 10397db96d56Sopenharmony_ci tr._buffer = [b'da', b'ta'] 10407db96d56Sopenharmony_ci tr.abort() 10417db96d56Sopenharmony_ci self.assertFalse(m_write.called) 10427db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 10437db96d56Sopenharmony_ci self.assertFalse(self.loop.writers) 10447db96d56Sopenharmony_ci self.assertEqual([], tr._buffer) 10457db96d56Sopenharmony_ci self.assertTrue(tr.is_closing()) 10467db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 10477db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(None) 10487db96d56Sopenharmony_ci 10497db96d56Sopenharmony_ci def test__call_connection_lost(self): 10507db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10517db96d56Sopenharmony_ci self.assertIsNotNone(tr._protocol) 10527db96d56Sopenharmony_ci self.assertIsNotNone(tr._loop) 10537db96d56Sopenharmony_ci 10547db96d56Sopenharmony_ci err = None 10557db96d56Sopenharmony_ci tr._call_connection_lost(err) 10567db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(err) 10577db96d56Sopenharmony_ci self.pipe.close.assert_called_with() 10587db96d56Sopenharmony_ci 10597db96d56Sopenharmony_ci self.assertIsNone(tr._protocol) 10607db96d56Sopenharmony_ci self.assertIsNone(tr._loop) 10617db96d56Sopenharmony_ci 10627db96d56Sopenharmony_ci def test__call_connection_lost_with_err(self): 10637db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10647db96d56Sopenharmony_ci self.assertIsNotNone(tr._protocol) 10657db96d56Sopenharmony_ci self.assertIsNotNone(tr._loop) 10667db96d56Sopenharmony_ci 10677db96d56Sopenharmony_ci err = OSError() 10687db96d56Sopenharmony_ci tr._call_connection_lost(err) 10697db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(err) 10707db96d56Sopenharmony_ci self.pipe.close.assert_called_with() 10717db96d56Sopenharmony_ci 10727db96d56Sopenharmony_ci self.assertIsNone(tr._protocol) 10737db96d56Sopenharmony_ci self.assertIsNone(tr._loop) 10747db96d56Sopenharmony_ci 10757db96d56Sopenharmony_ci def test_close(self): 10767db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10777db96d56Sopenharmony_ci tr.write_eof = mock.Mock() 10787db96d56Sopenharmony_ci tr.close() 10797db96d56Sopenharmony_ci tr.write_eof.assert_called_with() 10807db96d56Sopenharmony_ci 10817db96d56Sopenharmony_ci # closing the transport twice must not fail 10827db96d56Sopenharmony_ci tr.close() 10837db96d56Sopenharmony_ci 10847db96d56Sopenharmony_ci def test_close_closing(self): 10857db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10867db96d56Sopenharmony_ci tr.write_eof = mock.Mock() 10877db96d56Sopenharmony_ci tr._closing = True 10887db96d56Sopenharmony_ci tr.close() 10897db96d56Sopenharmony_ci self.assertFalse(tr.write_eof.called) 10907db96d56Sopenharmony_ci 10917db96d56Sopenharmony_ci def test_write_eof(self): 10927db96d56Sopenharmony_ci tr = self.write_pipe_transport() 10937db96d56Sopenharmony_ci tr.write_eof() 10947db96d56Sopenharmony_ci self.assertTrue(tr.is_closing()) 10957db96d56Sopenharmony_ci self.assertFalse(self.loop.readers) 10967db96d56Sopenharmony_ci test_utils.run_briefly(self.loop) 10977db96d56Sopenharmony_ci self.protocol.connection_lost.assert_called_with(None) 10987db96d56Sopenharmony_ci 10997db96d56Sopenharmony_ci def test_write_eof_pending(self): 11007db96d56Sopenharmony_ci tr = self.write_pipe_transport() 11017db96d56Sopenharmony_ci tr._buffer = [b'data'] 11027db96d56Sopenharmony_ci tr.write_eof() 11037db96d56Sopenharmony_ci self.assertTrue(tr.is_closing()) 11047db96d56Sopenharmony_ci self.assertFalse(self.protocol.connection_lost.called) 11057db96d56Sopenharmony_ci 11067db96d56Sopenharmony_ci 11077db96d56Sopenharmony_ciclass AbstractChildWatcherTests(unittest.TestCase): 11087db96d56Sopenharmony_ci 11097db96d56Sopenharmony_ci def test_not_implemented(self): 11107db96d56Sopenharmony_ci f = mock.Mock() 11117db96d56Sopenharmony_ci watcher = asyncio.AbstractChildWatcher() 11127db96d56Sopenharmony_ci self.assertRaises( 11137db96d56Sopenharmony_ci NotImplementedError, watcher.add_child_handler, f, f) 11147db96d56Sopenharmony_ci self.assertRaises( 11157db96d56Sopenharmony_ci NotImplementedError, watcher.remove_child_handler, f) 11167db96d56Sopenharmony_ci self.assertRaises( 11177db96d56Sopenharmony_ci NotImplementedError, watcher.attach_loop, f) 11187db96d56Sopenharmony_ci self.assertRaises( 11197db96d56Sopenharmony_ci NotImplementedError, watcher.close) 11207db96d56Sopenharmony_ci self.assertRaises( 11217db96d56Sopenharmony_ci NotImplementedError, watcher.is_active) 11227db96d56Sopenharmony_ci self.assertRaises( 11237db96d56Sopenharmony_ci NotImplementedError, watcher.__enter__) 11247db96d56Sopenharmony_ci self.assertRaises( 11257db96d56Sopenharmony_ci NotImplementedError, watcher.__exit__, f, f, f) 11267db96d56Sopenharmony_ci 11277db96d56Sopenharmony_ci 11287db96d56Sopenharmony_ciclass BaseChildWatcherTests(unittest.TestCase): 11297db96d56Sopenharmony_ci 11307db96d56Sopenharmony_ci def test_not_implemented(self): 11317db96d56Sopenharmony_ci f = mock.Mock() 11327db96d56Sopenharmony_ci watcher = unix_events.BaseChildWatcher() 11337db96d56Sopenharmony_ci self.assertRaises( 11347db96d56Sopenharmony_ci NotImplementedError, watcher._do_waitpid, f) 11357db96d56Sopenharmony_ci 11367db96d56Sopenharmony_ci 11377db96d56Sopenharmony_ciclass ChildWatcherTestsMixin: 11387db96d56Sopenharmony_ci 11397db96d56Sopenharmony_ci ignore_warnings = mock.patch.object(log.logger, "warning") 11407db96d56Sopenharmony_ci 11417db96d56Sopenharmony_ci def setUp(self): 11427db96d56Sopenharmony_ci super().setUp() 11437db96d56Sopenharmony_ci self.loop = self.new_test_loop() 11447db96d56Sopenharmony_ci self.running = False 11457db96d56Sopenharmony_ci self.zombies = {} 11467db96d56Sopenharmony_ci 11477db96d56Sopenharmony_ci with mock.patch.object( 11487db96d56Sopenharmony_ci self.loop, "add_signal_handler") as self.m_add_signal_handler: 11497db96d56Sopenharmony_ci self.watcher = self.create_watcher() 11507db96d56Sopenharmony_ci self.watcher.attach_loop(self.loop) 11517db96d56Sopenharmony_ci 11527db96d56Sopenharmony_ci def waitpid(self, pid, flags): 11537db96d56Sopenharmony_ci if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1: 11547db96d56Sopenharmony_ci self.assertGreater(pid, 0) 11557db96d56Sopenharmony_ci try: 11567db96d56Sopenharmony_ci if pid < 0: 11577db96d56Sopenharmony_ci return self.zombies.popitem() 11587db96d56Sopenharmony_ci else: 11597db96d56Sopenharmony_ci return pid, self.zombies.pop(pid) 11607db96d56Sopenharmony_ci except KeyError: 11617db96d56Sopenharmony_ci pass 11627db96d56Sopenharmony_ci if self.running: 11637db96d56Sopenharmony_ci return 0, 0 11647db96d56Sopenharmony_ci else: 11657db96d56Sopenharmony_ci raise ChildProcessError() 11667db96d56Sopenharmony_ci 11677db96d56Sopenharmony_ci def add_zombie(self, pid, status): 11687db96d56Sopenharmony_ci self.zombies[pid] = status 11697db96d56Sopenharmony_ci 11707db96d56Sopenharmony_ci def waitstatus_to_exitcode(self, status): 11717db96d56Sopenharmony_ci if status > 32768: 11727db96d56Sopenharmony_ci return status - 32768 11737db96d56Sopenharmony_ci elif 32700 < status < 32768: 11747db96d56Sopenharmony_ci return status - 32768 11757db96d56Sopenharmony_ci else: 11767db96d56Sopenharmony_ci return status 11777db96d56Sopenharmony_ci 11787db96d56Sopenharmony_ci def test_create_watcher(self): 11797db96d56Sopenharmony_ci self.m_add_signal_handler.assert_called_once_with( 11807db96d56Sopenharmony_ci signal.SIGCHLD, self.watcher._sig_chld) 11817db96d56Sopenharmony_ci 11827db96d56Sopenharmony_ci def waitpid_mocks(func): 11837db96d56Sopenharmony_ci def wrapped_func(self): 11847db96d56Sopenharmony_ci def patch(target, wrapper): 11857db96d56Sopenharmony_ci return mock.patch(target, wraps=wrapper, 11867db96d56Sopenharmony_ci new_callable=mock.Mock) 11877db96d56Sopenharmony_ci 11887db96d56Sopenharmony_ci with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \ 11897db96d56Sopenharmony_ci patch('os.waitpid', self.waitpid) as m_waitpid: 11907db96d56Sopenharmony_ci func(self, m_waitpid) 11917db96d56Sopenharmony_ci return wrapped_func 11927db96d56Sopenharmony_ci 11937db96d56Sopenharmony_ci @waitpid_mocks 11947db96d56Sopenharmony_ci def test_sigchld(self, m_waitpid): 11957db96d56Sopenharmony_ci # register a child 11967db96d56Sopenharmony_ci callback = mock.Mock() 11977db96d56Sopenharmony_ci 11987db96d56Sopenharmony_ci with self.watcher: 11997db96d56Sopenharmony_ci self.running = True 12007db96d56Sopenharmony_ci self.watcher.add_child_handler(42, callback, 9, 10, 14) 12017db96d56Sopenharmony_ci 12027db96d56Sopenharmony_ci self.assertFalse(callback.called) 12037db96d56Sopenharmony_ci 12047db96d56Sopenharmony_ci # child is running 12057db96d56Sopenharmony_ci self.watcher._sig_chld() 12067db96d56Sopenharmony_ci 12077db96d56Sopenharmony_ci self.assertFalse(callback.called) 12087db96d56Sopenharmony_ci 12097db96d56Sopenharmony_ci # child terminates (returncode 12) 12107db96d56Sopenharmony_ci self.running = False 12117db96d56Sopenharmony_ci self.add_zombie(42, EXITCODE(12)) 12127db96d56Sopenharmony_ci self.watcher._sig_chld() 12137db96d56Sopenharmony_ci 12147db96d56Sopenharmony_ci callback.assert_called_once_with(42, 12, 9, 10, 14) 12157db96d56Sopenharmony_ci 12167db96d56Sopenharmony_ci callback.reset_mock() 12177db96d56Sopenharmony_ci 12187db96d56Sopenharmony_ci # ensure that the child is effectively reaped 12197db96d56Sopenharmony_ci self.add_zombie(42, EXITCODE(13)) 12207db96d56Sopenharmony_ci with self.ignore_warnings: 12217db96d56Sopenharmony_ci self.watcher._sig_chld() 12227db96d56Sopenharmony_ci 12237db96d56Sopenharmony_ci self.assertFalse(callback.called) 12247db96d56Sopenharmony_ci 12257db96d56Sopenharmony_ci # sigchld called again 12267db96d56Sopenharmony_ci self.zombies.clear() 12277db96d56Sopenharmony_ci self.watcher._sig_chld() 12287db96d56Sopenharmony_ci 12297db96d56Sopenharmony_ci self.assertFalse(callback.called) 12307db96d56Sopenharmony_ci 12317db96d56Sopenharmony_ci @waitpid_mocks 12327db96d56Sopenharmony_ci def test_sigchld_two_children(self, m_waitpid): 12337db96d56Sopenharmony_ci callback1 = mock.Mock() 12347db96d56Sopenharmony_ci callback2 = mock.Mock() 12357db96d56Sopenharmony_ci 12367db96d56Sopenharmony_ci # register child 1 12377db96d56Sopenharmony_ci with self.watcher: 12387db96d56Sopenharmony_ci self.running = True 12397db96d56Sopenharmony_ci self.watcher.add_child_handler(43, callback1, 7, 8) 12407db96d56Sopenharmony_ci 12417db96d56Sopenharmony_ci self.assertFalse(callback1.called) 12427db96d56Sopenharmony_ci self.assertFalse(callback2.called) 12437db96d56Sopenharmony_ci 12447db96d56Sopenharmony_ci # register child 2 12457db96d56Sopenharmony_ci with self.watcher: 12467db96d56Sopenharmony_ci self.watcher.add_child_handler(44, callback2, 147, 18) 12477db96d56Sopenharmony_ci 12487db96d56Sopenharmony_ci self.assertFalse(callback1.called) 12497db96d56Sopenharmony_ci self.assertFalse(callback2.called) 12507db96d56Sopenharmony_ci 12517db96d56Sopenharmony_ci # children are running 12527db96d56Sopenharmony_ci self.watcher._sig_chld() 12537db96d56Sopenharmony_ci 12547db96d56Sopenharmony_ci self.assertFalse(callback1.called) 12557db96d56Sopenharmony_ci self.assertFalse(callback2.called) 12567db96d56Sopenharmony_ci 12577db96d56Sopenharmony_ci # child 1 terminates (signal 3) 12587db96d56Sopenharmony_ci self.add_zombie(43, SIGNAL(3)) 12597db96d56Sopenharmony_ci self.watcher._sig_chld() 12607db96d56Sopenharmony_ci 12617db96d56Sopenharmony_ci callback1.assert_called_once_with(43, -3, 7, 8) 12627db96d56Sopenharmony_ci self.assertFalse(callback2.called) 12637db96d56Sopenharmony_ci 12647db96d56Sopenharmony_ci callback1.reset_mock() 12657db96d56Sopenharmony_ci 12667db96d56Sopenharmony_ci # child 2 still running 12677db96d56Sopenharmony_ci self.watcher._sig_chld() 12687db96d56Sopenharmony_ci 12697db96d56Sopenharmony_ci self.assertFalse(callback1.called) 12707db96d56Sopenharmony_ci self.assertFalse(callback2.called) 12717db96d56Sopenharmony_ci 12727db96d56Sopenharmony_ci # child 2 terminates (code 108) 12737db96d56Sopenharmony_ci self.add_zombie(44, EXITCODE(108)) 12747db96d56Sopenharmony_ci self.running = False 12757db96d56Sopenharmony_ci self.watcher._sig_chld() 12767db96d56Sopenharmony_ci 12777db96d56Sopenharmony_ci callback2.assert_called_once_with(44, 108, 147, 18) 12787db96d56Sopenharmony_ci self.assertFalse(callback1.called) 12797db96d56Sopenharmony_ci 12807db96d56Sopenharmony_ci callback2.reset_mock() 12817db96d56Sopenharmony_ci 12827db96d56Sopenharmony_ci # ensure that the children are effectively reaped 12837db96d56Sopenharmony_ci self.add_zombie(43, EXITCODE(14)) 12847db96d56Sopenharmony_ci self.add_zombie(44, EXITCODE(15)) 12857db96d56Sopenharmony_ci with self.ignore_warnings: 12867db96d56Sopenharmony_ci self.watcher._sig_chld() 12877db96d56Sopenharmony_ci 12887db96d56Sopenharmony_ci self.assertFalse(callback1.called) 12897db96d56Sopenharmony_ci self.assertFalse(callback2.called) 12907db96d56Sopenharmony_ci 12917db96d56Sopenharmony_ci # sigchld called again 12927db96d56Sopenharmony_ci self.zombies.clear() 12937db96d56Sopenharmony_ci self.watcher._sig_chld() 12947db96d56Sopenharmony_ci 12957db96d56Sopenharmony_ci self.assertFalse(callback1.called) 12967db96d56Sopenharmony_ci self.assertFalse(callback2.called) 12977db96d56Sopenharmony_ci 12987db96d56Sopenharmony_ci @waitpid_mocks 12997db96d56Sopenharmony_ci def test_sigchld_two_children_terminating_together(self, m_waitpid): 13007db96d56Sopenharmony_ci callback1 = mock.Mock() 13017db96d56Sopenharmony_ci callback2 = mock.Mock() 13027db96d56Sopenharmony_ci 13037db96d56Sopenharmony_ci # register child 1 13047db96d56Sopenharmony_ci with self.watcher: 13057db96d56Sopenharmony_ci self.running = True 13067db96d56Sopenharmony_ci self.watcher.add_child_handler(45, callback1, 17, 8) 13077db96d56Sopenharmony_ci 13087db96d56Sopenharmony_ci self.assertFalse(callback1.called) 13097db96d56Sopenharmony_ci self.assertFalse(callback2.called) 13107db96d56Sopenharmony_ci 13117db96d56Sopenharmony_ci # register child 2 13127db96d56Sopenharmony_ci with self.watcher: 13137db96d56Sopenharmony_ci self.watcher.add_child_handler(46, callback2, 1147, 18) 13147db96d56Sopenharmony_ci 13157db96d56Sopenharmony_ci self.assertFalse(callback1.called) 13167db96d56Sopenharmony_ci self.assertFalse(callback2.called) 13177db96d56Sopenharmony_ci 13187db96d56Sopenharmony_ci # children are running 13197db96d56Sopenharmony_ci self.watcher._sig_chld() 13207db96d56Sopenharmony_ci 13217db96d56Sopenharmony_ci self.assertFalse(callback1.called) 13227db96d56Sopenharmony_ci self.assertFalse(callback2.called) 13237db96d56Sopenharmony_ci 13247db96d56Sopenharmony_ci # child 1 terminates (code 78) 13257db96d56Sopenharmony_ci # child 2 terminates (signal 5) 13267db96d56Sopenharmony_ci self.add_zombie(45, EXITCODE(78)) 13277db96d56Sopenharmony_ci self.add_zombie(46, SIGNAL(5)) 13287db96d56Sopenharmony_ci self.running = False 13297db96d56Sopenharmony_ci self.watcher._sig_chld() 13307db96d56Sopenharmony_ci 13317db96d56Sopenharmony_ci callback1.assert_called_once_with(45, 78, 17, 8) 13327db96d56Sopenharmony_ci callback2.assert_called_once_with(46, -5, 1147, 18) 13337db96d56Sopenharmony_ci 13347db96d56Sopenharmony_ci callback1.reset_mock() 13357db96d56Sopenharmony_ci callback2.reset_mock() 13367db96d56Sopenharmony_ci 13377db96d56Sopenharmony_ci # ensure that the children are effectively reaped 13387db96d56Sopenharmony_ci self.add_zombie(45, EXITCODE(14)) 13397db96d56Sopenharmony_ci self.add_zombie(46, EXITCODE(15)) 13407db96d56Sopenharmony_ci with self.ignore_warnings: 13417db96d56Sopenharmony_ci self.watcher._sig_chld() 13427db96d56Sopenharmony_ci 13437db96d56Sopenharmony_ci self.assertFalse(callback1.called) 13447db96d56Sopenharmony_ci self.assertFalse(callback2.called) 13457db96d56Sopenharmony_ci 13467db96d56Sopenharmony_ci @waitpid_mocks 13477db96d56Sopenharmony_ci def test_sigchld_race_condition(self, m_waitpid): 13487db96d56Sopenharmony_ci # register a child 13497db96d56Sopenharmony_ci callback = mock.Mock() 13507db96d56Sopenharmony_ci 13517db96d56Sopenharmony_ci with self.watcher: 13527db96d56Sopenharmony_ci # child terminates before being registered 13537db96d56Sopenharmony_ci self.add_zombie(50, EXITCODE(4)) 13547db96d56Sopenharmony_ci self.watcher._sig_chld() 13557db96d56Sopenharmony_ci 13567db96d56Sopenharmony_ci self.watcher.add_child_handler(50, callback, 1, 12) 13577db96d56Sopenharmony_ci 13587db96d56Sopenharmony_ci callback.assert_called_once_with(50, 4, 1, 12) 13597db96d56Sopenharmony_ci callback.reset_mock() 13607db96d56Sopenharmony_ci 13617db96d56Sopenharmony_ci # ensure that the child is effectively reaped 13627db96d56Sopenharmony_ci self.add_zombie(50, SIGNAL(1)) 13637db96d56Sopenharmony_ci with self.ignore_warnings: 13647db96d56Sopenharmony_ci self.watcher._sig_chld() 13657db96d56Sopenharmony_ci 13667db96d56Sopenharmony_ci self.assertFalse(callback.called) 13677db96d56Sopenharmony_ci 13687db96d56Sopenharmony_ci @waitpid_mocks 13697db96d56Sopenharmony_ci def test_sigchld_replace_handler(self, m_waitpid): 13707db96d56Sopenharmony_ci callback1 = mock.Mock() 13717db96d56Sopenharmony_ci callback2 = mock.Mock() 13727db96d56Sopenharmony_ci 13737db96d56Sopenharmony_ci # register a child 13747db96d56Sopenharmony_ci with self.watcher: 13757db96d56Sopenharmony_ci self.running = True 13767db96d56Sopenharmony_ci self.watcher.add_child_handler(51, callback1, 19) 13777db96d56Sopenharmony_ci 13787db96d56Sopenharmony_ci self.assertFalse(callback1.called) 13797db96d56Sopenharmony_ci self.assertFalse(callback2.called) 13807db96d56Sopenharmony_ci 13817db96d56Sopenharmony_ci # register the same child again 13827db96d56Sopenharmony_ci with self.watcher: 13837db96d56Sopenharmony_ci self.watcher.add_child_handler(51, callback2, 21) 13847db96d56Sopenharmony_ci 13857db96d56Sopenharmony_ci self.assertFalse(callback1.called) 13867db96d56Sopenharmony_ci self.assertFalse(callback2.called) 13877db96d56Sopenharmony_ci 13887db96d56Sopenharmony_ci # child terminates (signal 8) 13897db96d56Sopenharmony_ci self.running = False 13907db96d56Sopenharmony_ci self.add_zombie(51, SIGNAL(8)) 13917db96d56Sopenharmony_ci self.watcher._sig_chld() 13927db96d56Sopenharmony_ci 13937db96d56Sopenharmony_ci callback2.assert_called_once_with(51, -8, 21) 13947db96d56Sopenharmony_ci self.assertFalse(callback1.called) 13957db96d56Sopenharmony_ci 13967db96d56Sopenharmony_ci callback2.reset_mock() 13977db96d56Sopenharmony_ci 13987db96d56Sopenharmony_ci # ensure that the child is effectively reaped 13997db96d56Sopenharmony_ci self.add_zombie(51, EXITCODE(13)) 14007db96d56Sopenharmony_ci with self.ignore_warnings: 14017db96d56Sopenharmony_ci self.watcher._sig_chld() 14027db96d56Sopenharmony_ci 14037db96d56Sopenharmony_ci self.assertFalse(callback1.called) 14047db96d56Sopenharmony_ci self.assertFalse(callback2.called) 14057db96d56Sopenharmony_ci 14067db96d56Sopenharmony_ci @waitpid_mocks 14077db96d56Sopenharmony_ci def test_sigchld_remove_handler(self, m_waitpid): 14087db96d56Sopenharmony_ci callback = mock.Mock() 14097db96d56Sopenharmony_ci 14107db96d56Sopenharmony_ci # register a child 14117db96d56Sopenharmony_ci with self.watcher: 14127db96d56Sopenharmony_ci self.running = True 14137db96d56Sopenharmony_ci self.watcher.add_child_handler(52, callback, 1984) 14147db96d56Sopenharmony_ci 14157db96d56Sopenharmony_ci self.assertFalse(callback.called) 14167db96d56Sopenharmony_ci 14177db96d56Sopenharmony_ci # unregister the child 14187db96d56Sopenharmony_ci self.watcher.remove_child_handler(52) 14197db96d56Sopenharmony_ci 14207db96d56Sopenharmony_ci self.assertFalse(callback.called) 14217db96d56Sopenharmony_ci 14227db96d56Sopenharmony_ci # child terminates (code 99) 14237db96d56Sopenharmony_ci self.running = False 14247db96d56Sopenharmony_ci self.add_zombie(52, EXITCODE(99)) 14257db96d56Sopenharmony_ci with self.ignore_warnings: 14267db96d56Sopenharmony_ci self.watcher._sig_chld() 14277db96d56Sopenharmony_ci 14287db96d56Sopenharmony_ci self.assertFalse(callback.called) 14297db96d56Sopenharmony_ci 14307db96d56Sopenharmony_ci @waitpid_mocks 14317db96d56Sopenharmony_ci def test_sigchld_unknown_status(self, m_waitpid): 14327db96d56Sopenharmony_ci callback = mock.Mock() 14337db96d56Sopenharmony_ci 14347db96d56Sopenharmony_ci # register a child 14357db96d56Sopenharmony_ci with self.watcher: 14367db96d56Sopenharmony_ci self.running = True 14377db96d56Sopenharmony_ci self.watcher.add_child_handler(53, callback, -19) 14387db96d56Sopenharmony_ci 14397db96d56Sopenharmony_ci self.assertFalse(callback.called) 14407db96d56Sopenharmony_ci 14417db96d56Sopenharmony_ci # terminate with unknown status 14427db96d56Sopenharmony_ci self.zombies[53] = 1178 14437db96d56Sopenharmony_ci self.running = False 14447db96d56Sopenharmony_ci self.watcher._sig_chld() 14457db96d56Sopenharmony_ci 14467db96d56Sopenharmony_ci callback.assert_called_once_with(53, 1178, -19) 14477db96d56Sopenharmony_ci 14487db96d56Sopenharmony_ci callback.reset_mock() 14497db96d56Sopenharmony_ci 14507db96d56Sopenharmony_ci # ensure that the child is effectively reaped 14517db96d56Sopenharmony_ci self.add_zombie(53, EXITCODE(101)) 14527db96d56Sopenharmony_ci with self.ignore_warnings: 14537db96d56Sopenharmony_ci self.watcher._sig_chld() 14547db96d56Sopenharmony_ci 14557db96d56Sopenharmony_ci self.assertFalse(callback.called) 14567db96d56Sopenharmony_ci 14577db96d56Sopenharmony_ci @waitpid_mocks 14587db96d56Sopenharmony_ci def test_remove_child_handler(self, m_waitpid): 14597db96d56Sopenharmony_ci callback1 = mock.Mock() 14607db96d56Sopenharmony_ci callback2 = mock.Mock() 14617db96d56Sopenharmony_ci callback3 = mock.Mock() 14627db96d56Sopenharmony_ci 14637db96d56Sopenharmony_ci # register children 14647db96d56Sopenharmony_ci with self.watcher: 14657db96d56Sopenharmony_ci self.running = True 14667db96d56Sopenharmony_ci self.watcher.add_child_handler(54, callback1, 1) 14677db96d56Sopenharmony_ci self.watcher.add_child_handler(55, callback2, 2) 14687db96d56Sopenharmony_ci self.watcher.add_child_handler(56, callback3, 3) 14697db96d56Sopenharmony_ci 14707db96d56Sopenharmony_ci # remove child handler 1 14717db96d56Sopenharmony_ci self.assertTrue(self.watcher.remove_child_handler(54)) 14727db96d56Sopenharmony_ci 14737db96d56Sopenharmony_ci # remove child handler 2 multiple times 14747db96d56Sopenharmony_ci self.assertTrue(self.watcher.remove_child_handler(55)) 14757db96d56Sopenharmony_ci self.assertFalse(self.watcher.remove_child_handler(55)) 14767db96d56Sopenharmony_ci self.assertFalse(self.watcher.remove_child_handler(55)) 14777db96d56Sopenharmony_ci 14787db96d56Sopenharmony_ci # all children terminate 14797db96d56Sopenharmony_ci self.add_zombie(54, EXITCODE(0)) 14807db96d56Sopenharmony_ci self.add_zombie(55, EXITCODE(1)) 14817db96d56Sopenharmony_ci self.add_zombie(56, EXITCODE(2)) 14827db96d56Sopenharmony_ci self.running = False 14837db96d56Sopenharmony_ci with self.ignore_warnings: 14847db96d56Sopenharmony_ci self.watcher._sig_chld() 14857db96d56Sopenharmony_ci 14867db96d56Sopenharmony_ci self.assertFalse(callback1.called) 14877db96d56Sopenharmony_ci self.assertFalse(callback2.called) 14887db96d56Sopenharmony_ci callback3.assert_called_once_with(56, 2, 3) 14897db96d56Sopenharmony_ci 14907db96d56Sopenharmony_ci @waitpid_mocks 14917db96d56Sopenharmony_ci def test_sigchld_unhandled_exception(self, m_waitpid): 14927db96d56Sopenharmony_ci callback = mock.Mock() 14937db96d56Sopenharmony_ci 14947db96d56Sopenharmony_ci # register a child 14957db96d56Sopenharmony_ci with self.watcher: 14967db96d56Sopenharmony_ci self.running = True 14977db96d56Sopenharmony_ci self.watcher.add_child_handler(57, callback) 14987db96d56Sopenharmony_ci 14997db96d56Sopenharmony_ci # raise an exception 15007db96d56Sopenharmony_ci m_waitpid.side_effect = ValueError 15017db96d56Sopenharmony_ci 15027db96d56Sopenharmony_ci with mock.patch.object(log.logger, 15037db96d56Sopenharmony_ci 'error') as m_error: 15047db96d56Sopenharmony_ci 15057db96d56Sopenharmony_ci self.assertEqual(self.watcher._sig_chld(), None) 15067db96d56Sopenharmony_ci self.assertTrue(m_error.called) 15077db96d56Sopenharmony_ci 15087db96d56Sopenharmony_ci @waitpid_mocks 15097db96d56Sopenharmony_ci def test_sigchld_child_reaped_elsewhere(self, m_waitpid): 15107db96d56Sopenharmony_ci # register a child 15117db96d56Sopenharmony_ci callback = mock.Mock() 15127db96d56Sopenharmony_ci 15137db96d56Sopenharmony_ci with self.watcher: 15147db96d56Sopenharmony_ci self.running = True 15157db96d56Sopenharmony_ci self.watcher.add_child_handler(58, callback) 15167db96d56Sopenharmony_ci 15177db96d56Sopenharmony_ci self.assertFalse(callback.called) 15187db96d56Sopenharmony_ci 15197db96d56Sopenharmony_ci # child terminates 15207db96d56Sopenharmony_ci self.running = False 15217db96d56Sopenharmony_ci self.add_zombie(58, EXITCODE(4)) 15227db96d56Sopenharmony_ci 15237db96d56Sopenharmony_ci # waitpid is called elsewhere 15247db96d56Sopenharmony_ci os.waitpid(58, os.WNOHANG) 15257db96d56Sopenharmony_ci 15267db96d56Sopenharmony_ci m_waitpid.reset_mock() 15277db96d56Sopenharmony_ci 15287db96d56Sopenharmony_ci # sigchld 15297db96d56Sopenharmony_ci with self.ignore_warnings: 15307db96d56Sopenharmony_ci self.watcher._sig_chld() 15317db96d56Sopenharmony_ci 15327db96d56Sopenharmony_ci if isinstance(self.watcher, asyncio.FastChildWatcher): 15337db96d56Sopenharmony_ci # here the FastChildWatcher enters a deadlock 15347db96d56Sopenharmony_ci # (there is no way to prevent it) 15357db96d56Sopenharmony_ci self.assertFalse(callback.called) 15367db96d56Sopenharmony_ci else: 15377db96d56Sopenharmony_ci callback.assert_called_once_with(58, 255) 15387db96d56Sopenharmony_ci 15397db96d56Sopenharmony_ci @waitpid_mocks 15407db96d56Sopenharmony_ci def test_sigchld_unknown_pid_during_registration(self, m_waitpid): 15417db96d56Sopenharmony_ci # register two children 15427db96d56Sopenharmony_ci callback1 = mock.Mock() 15437db96d56Sopenharmony_ci callback2 = mock.Mock() 15447db96d56Sopenharmony_ci 15457db96d56Sopenharmony_ci with self.ignore_warnings, self.watcher: 15467db96d56Sopenharmony_ci self.running = True 15477db96d56Sopenharmony_ci # child 1 terminates 15487db96d56Sopenharmony_ci self.add_zombie(591, EXITCODE(7)) 15497db96d56Sopenharmony_ci # an unknown child terminates 15507db96d56Sopenharmony_ci self.add_zombie(593, EXITCODE(17)) 15517db96d56Sopenharmony_ci 15527db96d56Sopenharmony_ci self.watcher._sig_chld() 15537db96d56Sopenharmony_ci 15547db96d56Sopenharmony_ci self.watcher.add_child_handler(591, callback1) 15557db96d56Sopenharmony_ci self.watcher.add_child_handler(592, callback2) 15567db96d56Sopenharmony_ci 15577db96d56Sopenharmony_ci callback1.assert_called_once_with(591, 7) 15587db96d56Sopenharmony_ci self.assertFalse(callback2.called) 15597db96d56Sopenharmony_ci 15607db96d56Sopenharmony_ci @waitpid_mocks 15617db96d56Sopenharmony_ci def test_set_loop(self, m_waitpid): 15627db96d56Sopenharmony_ci # register a child 15637db96d56Sopenharmony_ci callback = mock.Mock() 15647db96d56Sopenharmony_ci 15657db96d56Sopenharmony_ci with self.watcher: 15667db96d56Sopenharmony_ci self.running = True 15677db96d56Sopenharmony_ci self.watcher.add_child_handler(60, callback) 15687db96d56Sopenharmony_ci 15697db96d56Sopenharmony_ci # attach a new loop 15707db96d56Sopenharmony_ci old_loop = self.loop 15717db96d56Sopenharmony_ci self.loop = self.new_test_loop() 15727db96d56Sopenharmony_ci patch = mock.patch.object 15737db96d56Sopenharmony_ci 15747db96d56Sopenharmony_ci with patch(old_loop, "remove_signal_handler") as m_old_remove, \ 15757db96d56Sopenharmony_ci patch(self.loop, "add_signal_handler") as m_new_add: 15767db96d56Sopenharmony_ci 15777db96d56Sopenharmony_ci self.watcher.attach_loop(self.loop) 15787db96d56Sopenharmony_ci 15797db96d56Sopenharmony_ci m_old_remove.assert_called_once_with( 15807db96d56Sopenharmony_ci signal.SIGCHLD) 15817db96d56Sopenharmony_ci m_new_add.assert_called_once_with( 15827db96d56Sopenharmony_ci signal.SIGCHLD, self.watcher._sig_chld) 15837db96d56Sopenharmony_ci 15847db96d56Sopenharmony_ci # child terminates 15857db96d56Sopenharmony_ci self.running = False 15867db96d56Sopenharmony_ci self.add_zombie(60, EXITCODE(9)) 15877db96d56Sopenharmony_ci self.watcher._sig_chld() 15887db96d56Sopenharmony_ci 15897db96d56Sopenharmony_ci callback.assert_called_once_with(60, 9) 15907db96d56Sopenharmony_ci 15917db96d56Sopenharmony_ci @waitpid_mocks 15927db96d56Sopenharmony_ci def test_set_loop_race_condition(self, m_waitpid): 15937db96d56Sopenharmony_ci # register 3 children 15947db96d56Sopenharmony_ci callback1 = mock.Mock() 15957db96d56Sopenharmony_ci callback2 = mock.Mock() 15967db96d56Sopenharmony_ci callback3 = mock.Mock() 15977db96d56Sopenharmony_ci 15987db96d56Sopenharmony_ci with self.watcher: 15997db96d56Sopenharmony_ci self.running = True 16007db96d56Sopenharmony_ci self.watcher.add_child_handler(61, callback1) 16017db96d56Sopenharmony_ci self.watcher.add_child_handler(62, callback2) 16027db96d56Sopenharmony_ci self.watcher.add_child_handler(622, callback3) 16037db96d56Sopenharmony_ci 16047db96d56Sopenharmony_ci # detach the loop 16057db96d56Sopenharmony_ci old_loop = self.loop 16067db96d56Sopenharmony_ci self.loop = None 16077db96d56Sopenharmony_ci 16087db96d56Sopenharmony_ci with mock.patch.object( 16097db96d56Sopenharmony_ci old_loop, "remove_signal_handler") as m_remove_signal_handler: 16107db96d56Sopenharmony_ci 16117db96d56Sopenharmony_ci with self.assertWarnsRegex( 16127db96d56Sopenharmony_ci RuntimeWarning, 'A loop is being detached'): 16137db96d56Sopenharmony_ci self.watcher.attach_loop(None) 16147db96d56Sopenharmony_ci 16157db96d56Sopenharmony_ci m_remove_signal_handler.assert_called_once_with( 16167db96d56Sopenharmony_ci signal.SIGCHLD) 16177db96d56Sopenharmony_ci 16187db96d56Sopenharmony_ci # child 1 & 2 terminate 16197db96d56Sopenharmony_ci self.add_zombie(61, EXITCODE(11)) 16207db96d56Sopenharmony_ci self.add_zombie(62, SIGNAL(5)) 16217db96d56Sopenharmony_ci 16227db96d56Sopenharmony_ci # SIGCHLD was not caught 16237db96d56Sopenharmony_ci self.assertFalse(callback1.called) 16247db96d56Sopenharmony_ci self.assertFalse(callback2.called) 16257db96d56Sopenharmony_ci self.assertFalse(callback3.called) 16267db96d56Sopenharmony_ci 16277db96d56Sopenharmony_ci # attach a new loop 16287db96d56Sopenharmony_ci self.loop = self.new_test_loop() 16297db96d56Sopenharmony_ci 16307db96d56Sopenharmony_ci with mock.patch.object( 16317db96d56Sopenharmony_ci self.loop, "add_signal_handler") as m_add_signal_handler: 16327db96d56Sopenharmony_ci 16337db96d56Sopenharmony_ci self.watcher.attach_loop(self.loop) 16347db96d56Sopenharmony_ci 16357db96d56Sopenharmony_ci m_add_signal_handler.assert_called_once_with( 16367db96d56Sopenharmony_ci signal.SIGCHLD, self.watcher._sig_chld) 16377db96d56Sopenharmony_ci callback1.assert_called_once_with(61, 11) # race condition! 16387db96d56Sopenharmony_ci callback2.assert_called_once_with(62, -5) # race condition! 16397db96d56Sopenharmony_ci self.assertFalse(callback3.called) 16407db96d56Sopenharmony_ci 16417db96d56Sopenharmony_ci callback1.reset_mock() 16427db96d56Sopenharmony_ci callback2.reset_mock() 16437db96d56Sopenharmony_ci 16447db96d56Sopenharmony_ci # child 3 terminates 16457db96d56Sopenharmony_ci self.running = False 16467db96d56Sopenharmony_ci self.add_zombie(622, EXITCODE(19)) 16477db96d56Sopenharmony_ci self.watcher._sig_chld() 16487db96d56Sopenharmony_ci 16497db96d56Sopenharmony_ci self.assertFalse(callback1.called) 16507db96d56Sopenharmony_ci self.assertFalse(callback2.called) 16517db96d56Sopenharmony_ci callback3.assert_called_once_with(622, 19) 16527db96d56Sopenharmony_ci 16537db96d56Sopenharmony_ci @waitpid_mocks 16547db96d56Sopenharmony_ci def test_close(self, m_waitpid): 16557db96d56Sopenharmony_ci # register two children 16567db96d56Sopenharmony_ci callback1 = mock.Mock() 16577db96d56Sopenharmony_ci 16587db96d56Sopenharmony_ci with self.watcher: 16597db96d56Sopenharmony_ci self.running = True 16607db96d56Sopenharmony_ci # child 1 terminates 16617db96d56Sopenharmony_ci self.add_zombie(63, EXITCODE(9)) 16627db96d56Sopenharmony_ci # other child terminates 16637db96d56Sopenharmony_ci self.add_zombie(65, EXITCODE(18)) 16647db96d56Sopenharmony_ci self.watcher._sig_chld() 16657db96d56Sopenharmony_ci 16667db96d56Sopenharmony_ci self.watcher.add_child_handler(63, callback1) 16677db96d56Sopenharmony_ci self.watcher.add_child_handler(64, callback1) 16687db96d56Sopenharmony_ci 16697db96d56Sopenharmony_ci self.assertEqual(len(self.watcher._callbacks), 1) 16707db96d56Sopenharmony_ci if isinstance(self.watcher, asyncio.FastChildWatcher): 16717db96d56Sopenharmony_ci self.assertEqual(len(self.watcher._zombies), 1) 16727db96d56Sopenharmony_ci 16737db96d56Sopenharmony_ci with mock.patch.object( 16747db96d56Sopenharmony_ci self.loop, 16757db96d56Sopenharmony_ci "remove_signal_handler") as m_remove_signal_handler: 16767db96d56Sopenharmony_ci 16777db96d56Sopenharmony_ci self.watcher.close() 16787db96d56Sopenharmony_ci 16797db96d56Sopenharmony_ci m_remove_signal_handler.assert_called_once_with( 16807db96d56Sopenharmony_ci signal.SIGCHLD) 16817db96d56Sopenharmony_ci self.assertFalse(self.watcher._callbacks) 16827db96d56Sopenharmony_ci if isinstance(self.watcher, asyncio.FastChildWatcher): 16837db96d56Sopenharmony_ci self.assertFalse(self.watcher._zombies) 16847db96d56Sopenharmony_ci 16857db96d56Sopenharmony_ci 16867db96d56Sopenharmony_ciclass SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 16877db96d56Sopenharmony_ci def create_watcher(self): 16887db96d56Sopenharmony_ci return asyncio.SafeChildWatcher() 16897db96d56Sopenharmony_ci 16907db96d56Sopenharmony_ci 16917db96d56Sopenharmony_ciclass FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): 16927db96d56Sopenharmony_ci def create_watcher(self): 16937db96d56Sopenharmony_ci return asyncio.FastChildWatcher() 16947db96d56Sopenharmony_ci 16957db96d56Sopenharmony_ci 16967db96d56Sopenharmony_ciclass PolicyTests(unittest.TestCase): 16977db96d56Sopenharmony_ci 16987db96d56Sopenharmony_ci def create_policy(self): 16997db96d56Sopenharmony_ci return asyncio.DefaultEventLoopPolicy() 17007db96d56Sopenharmony_ci 17017db96d56Sopenharmony_ci def test_get_default_child_watcher(self): 17027db96d56Sopenharmony_ci policy = self.create_policy() 17037db96d56Sopenharmony_ci self.assertIsNone(policy._watcher) 17047db96d56Sopenharmony_ci 17057db96d56Sopenharmony_ci watcher = policy.get_child_watcher() 17067db96d56Sopenharmony_ci self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) 17077db96d56Sopenharmony_ci 17087db96d56Sopenharmony_ci self.assertIs(policy._watcher, watcher) 17097db96d56Sopenharmony_ci 17107db96d56Sopenharmony_ci self.assertIs(watcher, policy.get_child_watcher()) 17117db96d56Sopenharmony_ci 17127db96d56Sopenharmony_ci def test_get_child_watcher_after_set(self): 17137db96d56Sopenharmony_ci policy = self.create_policy() 17147db96d56Sopenharmony_ci watcher = asyncio.FastChildWatcher() 17157db96d56Sopenharmony_ci 17167db96d56Sopenharmony_ci policy.set_child_watcher(watcher) 17177db96d56Sopenharmony_ci self.assertIs(policy._watcher, watcher) 17187db96d56Sopenharmony_ci self.assertIs(watcher, policy.get_child_watcher()) 17197db96d56Sopenharmony_ci 17207db96d56Sopenharmony_ci def test_get_child_watcher_thread(self): 17217db96d56Sopenharmony_ci 17227db96d56Sopenharmony_ci def f(): 17237db96d56Sopenharmony_ci policy.set_event_loop(policy.new_event_loop()) 17247db96d56Sopenharmony_ci 17257db96d56Sopenharmony_ci self.assertIsInstance(policy.get_event_loop(), 17267db96d56Sopenharmony_ci asyncio.AbstractEventLoop) 17277db96d56Sopenharmony_ci watcher = policy.get_child_watcher() 17287db96d56Sopenharmony_ci 17297db96d56Sopenharmony_ci self.assertIsInstance(watcher, asyncio.SafeChildWatcher) 17307db96d56Sopenharmony_ci self.assertIsNone(watcher._loop) 17317db96d56Sopenharmony_ci 17327db96d56Sopenharmony_ci policy.get_event_loop().close() 17337db96d56Sopenharmony_ci 17347db96d56Sopenharmony_ci policy = self.create_policy() 17357db96d56Sopenharmony_ci policy.set_child_watcher(asyncio.SafeChildWatcher()) 17367db96d56Sopenharmony_ci 17377db96d56Sopenharmony_ci th = threading.Thread(target=f) 17387db96d56Sopenharmony_ci th.start() 17397db96d56Sopenharmony_ci th.join() 17407db96d56Sopenharmony_ci 17417db96d56Sopenharmony_ci def test_child_watcher_replace_mainloop_existing(self): 17427db96d56Sopenharmony_ci policy = self.create_policy() 17437db96d56Sopenharmony_ci loop = policy.new_event_loop() 17447db96d56Sopenharmony_ci policy.set_event_loop(loop) 17457db96d56Sopenharmony_ci 17467db96d56Sopenharmony_ci # Explicitly setup SafeChildWatcher, 17477db96d56Sopenharmony_ci # default ThreadedChildWatcher has no _loop property 17487db96d56Sopenharmony_ci watcher = asyncio.SafeChildWatcher() 17497db96d56Sopenharmony_ci policy.set_child_watcher(watcher) 17507db96d56Sopenharmony_ci watcher.attach_loop(loop) 17517db96d56Sopenharmony_ci 17527db96d56Sopenharmony_ci self.assertIs(watcher._loop, loop) 17537db96d56Sopenharmony_ci 17547db96d56Sopenharmony_ci new_loop = policy.new_event_loop() 17557db96d56Sopenharmony_ci policy.set_event_loop(new_loop) 17567db96d56Sopenharmony_ci 17577db96d56Sopenharmony_ci self.assertIs(watcher._loop, new_loop) 17587db96d56Sopenharmony_ci 17597db96d56Sopenharmony_ci policy.set_event_loop(None) 17607db96d56Sopenharmony_ci 17617db96d56Sopenharmony_ci self.assertIs(watcher._loop, None) 17627db96d56Sopenharmony_ci 17637db96d56Sopenharmony_ci loop.close() 17647db96d56Sopenharmony_ci new_loop.close() 17657db96d56Sopenharmony_ci 17667db96d56Sopenharmony_ci 17677db96d56Sopenharmony_ciclass TestFunctional(unittest.TestCase): 17687db96d56Sopenharmony_ci 17697db96d56Sopenharmony_ci def setUp(self): 17707db96d56Sopenharmony_ci self.loop = asyncio.new_event_loop() 17717db96d56Sopenharmony_ci asyncio.set_event_loop(self.loop) 17727db96d56Sopenharmony_ci 17737db96d56Sopenharmony_ci def tearDown(self): 17747db96d56Sopenharmony_ci self.loop.close() 17757db96d56Sopenharmony_ci asyncio.set_event_loop(None) 17767db96d56Sopenharmony_ci 17777db96d56Sopenharmony_ci def test_add_reader_invalid_argument(self): 17787db96d56Sopenharmony_ci def assert_raises(): 17797db96d56Sopenharmony_ci return self.assertRaisesRegex(ValueError, r'Invalid file object') 17807db96d56Sopenharmony_ci 17817db96d56Sopenharmony_ci cb = lambda: None 17827db96d56Sopenharmony_ci 17837db96d56Sopenharmony_ci with assert_raises(): 17847db96d56Sopenharmony_ci self.loop.add_reader(object(), cb) 17857db96d56Sopenharmony_ci with assert_raises(): 17867db96d56Sopenharmony_ci self.loop.add_writer(object(), cb) 17877db96d56Sopenharmony_ci 17887db96d56Sopenharmony_ci with assert_raises(): 17897db96d56Sopenharmony_ci self.loop.remove_reader(object()) 17907db96d56Sopenharmony_ci with assert_raises(): 17917db96d56Sopenharmony_ci self.loop.remove_writer(object()) 17927db96d56Sopenharmony_ci 17937db96d56Sopenharmony_ci def test_add_reader_or_writer_transport_fd(self): 17947db96d56Sopenharmony_ci def assert_raises(): 17957db96d56Sopenharmony_ci return self.assertRaisesRegex( 17967db96d56Sopenharmony_ci RuntimeError, 17977db96d56Sopenharmony_ci r'File descriptor .* is used by transport') 17987db96d56Sopenharmony_ci 17997db96d56Sopenharmony_ci async def runner(): 18007db96d56Sopenharmony_ci tr, pr = await self.loop.create_connection( 18017db96d56Sopenharmony_ci lambda: asyncio.Protocol(), sock=rsock) 18027db96d56Sopenharmony_ci 18037db96d56Sopenharmony_ci try: 18047db96d56Sopenharmony_ci cb = lambda: None 18057db96d56Sopenharmony_ci 18067db96d56Sopenharmony_ci with assert_raises(): 18077db96d56Sopenharmony_ci self.loop.add_reader(rsock, cb) 18087db96d56Sopenharmony_ci with assert_raises(): 18097db96d56Sopenharmony_ci self.loop.add_reader(rsock.fileno(), cb) 18107db96d56Sopenharmony_ci 18117db96d56Sopenharmony_ci with assert_raises(): 18127db96d56Sopenharmony_ci self.loop.remove_reader(rsock) 18137db96d56Sopenharmony_ci with assert_raises(): 18147db96d56Sopenharmony_ci self.loop.remove_reader(rsock.fileno()) 18157db96d56Sopenharmony_ci 18167db96d56Sopenharmony_ci with assert_raises(): 18177db96d56Sopenharmony_ci self.loop.add_writer(rsock, cb) 18187db96d56Sopenharmony_ci with assert_raises(): 18197db96d56Sopenharmony_ci self.loop.add_writer(rsock.fileno(), cb) 18207db96d56Sopenharmony_ci 18217db96d56Sopenharmony_ci with assert_raises(): 18227db96d56Sopenharmony_ci self.loop.remove_writer(rsock) 18237db96d56Sopenharmony_ci with assert_raises(): 18247db96d56Sopenharmony_ci self.loop.remove_writer(rsock.fileno()) 18257db96d56Sopenharmony_ci 18267db96d56Sopenharmony_ci finally: 18277db96d56Sopenharmony_ci tr.close() 18287db96d56Sopenharmony_ci 18297db96d56Sopenharmony_ci rsock, wsock = socket.socketpair() 18307db96d56Sopenharmony_ci try: 18317db96d56Sopenharmony_ci self.loop.run_until_complete(runner()) 18327db96d56Sopenharmony_ci finally: 18337db96d56Sopenharmony_ci rsock.close() 18347db96d56Sopenharmony_ci wsock.close() 18357db96d56Sopenharmony_ci 18367db96d56Sopenharmony_ci 18377db96d56Sopenharmony_ciif __name__ == '__main__': 18387db96d56Sopenharmony_ci unittest.main() 1839