17db96d56Sopenharmony_ci"""Tests for unix_events.py."""
27db96d56Sopenharmony_ci
37db96d56Sopenharmony_ciimport contextlib
47db96d56Sopenharmony_ciimport errno
57db96d56Sopenharmony_ciimport io
67db96d56Sopenharmony_ciimport os
77db96d56Sopenharmony_ciimport pathlib
87db96d56Sopenharmony_ciimport signal
97db96d56Sopenharmony_ciimport socket
107db96d56Sopenharmony_ciimport stat
117db96d56Sopenharmony_ciimport sys
127db96d56Sopenharmony_ciimport tempfile
137db96d56Sopenharmony_ciimport threading
147db96d56Sopenharmony_ciimport unittest
157db96d56Sopenharmony_cifrom unittest import mock
167db96d56Sopenharmony_cifrom test.support import os_helper
177db96d56Sopenharmony_cifrom test.support import socket_helper
187db96d56Sopenharmony_ci
197db96d56Sopenharmony_ciif sys.platform == 'win32':
207db96d56Sopenharmony_ci    raise unittest.SkipTest('UNIX only')
217db96d56Sopenharmony_ci
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ciimport asyncio
247db96d56Sopenharmony_cifrom asyncio import log
257db96d56Sopenharmony_cifrom asyncio import unix_events
267db96d56Sopenharmony_cifrom test.test_asyncio import utils as test_utils
277db96d56Sopenharmony_ci
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_cidef tearDownModule():
307db96d56Sopenharmony_ci    asyncio.set_event_loop_policy(None)
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ciMOCK_ANY = mock.ANY
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_cidef EXITCODE(exitcode):
377db96d56Sopenharmony_ci    return 32768 + exitcode
387db96d56Sopenharmony_ci
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_cidef SIGNAL(signum):
417db96d56Sopenharmony_ci    if not 1 <= signum <= 68:
427db96d56Sopenharmony_ci        raise AssertionError(f'invalid signum {signum}')
437db96d56Sopenharmony_ci    return 32768 - signum
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_cidef close_pipe_transport(transport):
477db96d56Sopenharmony_ci    # Don't call transport.close() because the event loop and the selector
487db96d56Sopenharmony_ci    # are mocked
497db96d56Sopenharmony_ci    if transport._pipe is None:
507db96d56Sopenharmony_ci        return
517db96d56Sopenharmony_ci    transport._pipe.close()
527db96d56Sopenharmony_ci    transport._pipe = None
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ci@unittest.skipUnless(signal, 'Signals are not supported')
567db96d56Sopenharmony_ciclass SelectorEventLoopSignalTests(test_utils.TestCase):
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci    def setUp(self):
597db96d56Sopenharmony_ci        super().setUp()
607db96d56Sopenharmony_ci        self.loop = asyncio.SelectorEventLoop()
617db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ci    def test_check_signal(self):
647db96d56Sopenharmony_ci        self.assertRaises(
657db96d56Sopenharmony_ci            TypeError, self.loop._check_signal, '1')
667db96d56Sopenharmony_ci        self.assertRaises(
677db96d56Sopenharmony_ci            ValueError, self.loop._check_signal, signal.NSIG + 1)
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci    def test_handle_signal_no_handler(self):
707db96d56Sopenharmony_ci        self.loop._handle_signal(signal.NSIG + 1)
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci    def test_handle_signal_cancelled_handler(self):
737db96d56Sopenharmony_ci        h = asyncio.Handle(mock.Mock(), (),
747db96d56Sopenharmony_ci                           loop=mock.Mock())
757db96d56Sopenharmony_ci        h.cancel()
767db96d56Sopenharmony_ci        self.loop._signal_handlers[signal.NSIG + 1] = h
777db96d56Sopenharmony_ci        self.loop.remove_signal_handler = mock.Mock()
787db96d56Sopenharmony_ci        self.loop._handle_signal(signal.NSIG + 1)
797db96d56Sopenharmony_ci        self.loop.remove_signal_handler.assert_called_with(signal.NSIG + 1)
807db96d56Sopenharmony_ci
817db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
827db96d56Sopenharmony_ci    def test_add_signal_handler_setup_error(self, m_signal):
837db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
847db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
857db96d56Sopenharmony_ci        m_signal.set_wakeup_fd.side_effect = ValueError
867db96d56Sopenharmony_ci
877db96d56Sopenharmony_ci        self.assertRaises(
887db96d56Sopenharmony_ci            RuntimeError,
897db96d56Sopenharmony_ci            self.loop.add_signal_handler,
907db96d56Sopenharmony_ci            signal.SIGINT, lambda: True)
917db96d56Sopenharmony_ci
927db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
937db96d56Sopenharmony_ci    def test_add_signal_handler_coroutine_error(self, m_signal):
947db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
957db96d56Sopenharmony_ci
967db96d56Sopenharmony_ci        async def simple_coroutine():
977db96d56Sopenharmony_ci            pass
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci        # callback must not be a coroutine function
1007db96d56Sopenharmony_ci        coro_func = simple_coroutine
1017db96d56Sopenharmony_ci        coro_obj = coro_func()
1027db96d56Sopenharmony_ci        self.addCleanup(coro_obj.close)
1037db96d56Sopenharmony_ci        for func in (coro_func, coro_obj):
1047db96d56Sopenharmony_ci            self.assertRaisesRegex(
1057db96d56Sopenharmony_ci                TypeError, 'coroutines cannot be used with add_signal_handler',
1067db96d56Sopenharmony_ci                self.loop.add_signal_handler,
1077db96d56Sopenharmony_ci                signal.SIGINT, func)
1087db96d56Sopenharmony_ci
1097db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
1107db96d56Sopenharmony_ci    def test_add_signal_handler(self, m_signal):
1117db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
1127db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
1137db96d56Sopenharmony_ci
1147db96d56Sopenharmony_ci        cb = lambda: True
1157db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGHUP, cb)
1167db96d56Sopenharmony_ci        h = self.loop._signal_handlers.get(signal.SIGHUP)
1177db96d56Sopenharmony_ci        self.assertIsInstance(h, asyncio.Handle)
1187db96d56Sopenharmony_ci        self.assertEqual(h._callback, cb)
1197db96d56Sopenharmony_ci
1207db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
1217db96d56Sopenharmony_ci    def test_add_signal_handler_install_error(self, m_signal):
1227db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
1237db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
1247db96d56Sopenharmony_ci
1257db96d56Sopenharmony_ci        def set_wakeup_fd(fd):
1267db96d56Sopenharmony_ci            if fd == -1:
1277db96d56Sopenharmony_ci                raise ValueError()
1287db96d56Sopenharmony_ci        m_signal.set_wakeup_fd = set_wakeup_fd
1297db96d56Sopenharmony_ci
1307db96d56Sopenharmony_ci        class Err(OSError):
1317db96d56Sopenharmony_ci            errno = errno.EFAULT
1327db96d56Sopenharmony_ci        m_signal.signal.side_effect = Err
1337db96d56Sopenharmony_ci
1347db96d56Sopenharmony_ci        self.assertRaises(
1357db96d56Sopenharmony_ci            Err,
1367db96d56Sopenharmony_ci            self.loop.add_signal_handler,
1377db96d56Sopenharmony_ci            signal.SIGINT, lambda: True)
1387db96d56Sopenharmony_ci
1397db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
1407db96d56Sopenharmony_ci    @mock.patch('asyncio.base_events.logger')
1417db96d56Sopenharmony_ci    def test_add_signal_handler_install_error2(self, m_logging, m_signal):
1427db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
1437db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
1447db96d56Sopenharmony_ci
1457db96d56Sopenharmony_ci        class Err(OSError):
1467db96d56Sopenharmony_ci            errno = errno.EINVAL
1477db96d56Sopenharmony_ci        m_signal.signal.side_effect = Err
1487db96d56Sopenharmony_ci
1497db96d56Sopenharmony_ci        self.loop._signal_handlers[signal.SIGHUP] = lambda: True
1507db96d56Sopenharmony_ci        self.assertRaises(
1517db96d56Sopenharmony_ci            RuntimeError,
1527db96d56Sopenharmony_ci            self.loop.add_signal_handler,
1537db96d56Sopenharmony_ci            signal.SIGINT, lambda: True)
1547db96d56Sopenharmony_ci        self.assertFalse(m_logging.info.called)
1557db96d56Sopenharmony_ci        self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
1567db96d56Sopenharmony_ci
1577db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
1587db96d56Sopenharmony_ci    @mock.patch('asyncio.base_events.logger')
1597db96d56Sopenharmony_ci    def test_add_signal_handler_install_error3(self, m_logging, m_signal):
1607db96d56Sopenharmony_ci        class Err(OSError):
1617db96d56Sopenharmony_ci            errno = errno.EINVAL
1627db96d56Sopenharmony_ci        m_signal.signal.side_effect = Err
1637db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
1647db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
1657db96d56Sopenharmony_ci
1667db96d56Sopenharmony_ci        self.assertRaises(
1677db96d56Sopenharmony_ci            RuntimeError,
1687db96d56Sopenharmony_ci            self.loop.add_signal_handler,
1697db96d56Sopenharmony_ci            signal.SIGINT, lambda: True)
1707db96d56Sopenharmony_ci        self.assertFalse(m_logging.info.called)
1717db96d56Sopenharmony_ci        self.assertEqual(2, m_signal.set_wakeup_fd.call_count)
1727db96d56Sopenharmony_ci
1737db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
1747db96d56Sopenharmony_ci    def test_remove_signal_handler(self, m_signal):
1757db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
1767db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
1797db96d56Sopenharmony_ci
1807db96d56Sopenharmony_ci        self.assertTrue(
1817db96d56Sopenharmony_ci            self.loop.remove_signal_handler(signal.SIGHUP))
1827db96d56Sopenharmony_ci        self.assertTrue(m_signal.set_wakeup_fd.called)
1837db96d56Sopenharmony_ci        self.assertTrue(m_signal.signal.called)
1847db96d56Sopenharmony_ci        self.assertEqual(
1857db96d56Sopenharmony_ci            (signal.SIGHUP, m_signal.SIG_DFL), m_signal.signal.call_args[0])
1867db96d56Sopenharmony_ci
1877db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
1887db96d56Sopenharmony_ci    def test_remove_signal_handler_2(self, m_signal):
1897db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
1907db96d56Sopenharmony_ci        m_signal.SIGINT = signal.SIGINT
1917db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGINT, lambda: True)
1947db96d56Sopenharmony_ci        self.loop._signal_handlers[signal.SIGHUP] = object()
1957db96d56Sopenharmony_ci        m_signal.set_wakeup_fd.reset_mock()
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci        self.assertTrue(
1987db96d56Sopenharmony_ci            self.loop.remove_signal_handler(signal.SIGINT))
1997db96d56Sopenharmony_ci        self.assertFalse(m_signal.set_wakeup_fd.called)
2007db96d56Sopenharmony_ci        self.assertTrue(m_signal.signal.called)
2017db96d56Sopenharmony_ci        self.assertEqual(
2027db96d56Sopenharmony_ci            (signal.SIGINT, m_signal.default_int_handler),
2037db96d56Sopenharmony_ci            m_signal.signal.call_args[0])
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
2067db96d56Sopenharmony_ci    @mock.patch('asyncio.base_events.logger')
2077db96d56Sopenharmony_ci    def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
2087db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
2097db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
2107db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
2117db96d56Sopenharmony_ci
2127db96d56Sopenharmony_ci        m_signal.set_wakeup_fd.side_effect = ValueError
2137db96d56Sopenharmony_ci
2147db96d56Sopenharmony_ci        self.loop.remove_signal_handler(signal.SIGHUP)
2157db96d56Sopenharmony_ci        self.assertTrue(m_logging.info)
2167db96d56Sopenharmony_ci
2177db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
2187db96d56Sopenharmony_ci    def test_remove_signal_handler_error(self, m_signal):
2197db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
2207db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
2217db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
2227db96d56Sopenharmony_ci
2237db96d56Sopenharmony_ci        m_signal.signal.side_effect = OSError
2247db96d56Sopenharmony_ci
2257db96d56Sopenharmony_ci        self.assertRaises(
2267db96d56Sopenharmony_ci            OSError, self.loop.remove_signal_handler, signal.SIGHUP)
2277db96d56Sopenharmony_ci
2287db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
2297db96d56Sopenharmony_ci    def test_remove_signal_handler_error2(self, m_signal):
2307db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
2317db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
2327db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
2337db96d56Sopenharmony_ci
2347db96d56Sopenharmony_ci        class Err(OSError):
2357db96d56Sopenharmony_ci            errno = errno.EINVAL
2367db96d56Sopenharmony_ci        m_signal.signal.side_effect = Err
2377db96d56Sopenharmony_ci
2387db96d56Sopenharmony_ci        self.assertRaises(
2397db96d56Sopenharmony_ci            RuntimeError, self.loop.remove_signal_handler, signal.SIGHUP)
2407db96d56Sopenharmony_ci
2417db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
2427db96d56Sopenharmony_ci    def test_close(self, m_signal):
2437db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
2447db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
2457db96d56Sopenharmony_ci
2467db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
2477db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGCHLD, lambda: True)
2487db96d56Sopenharmony_ci
2497db96d56Sopenharmony_ci        self.assertEqual(len(self.loop._signal_handlers), 2)
2507db96d56Sopenharmony_ci
2517db96d56Sopenharmony_ci        m_signal.set_wakeup_fd.reset_mock()
2527db96d56Sopenharmony_ci
2537db96d56Sopenharmony_ci        self.loop.close()
2547db96d56Sopenharmony_ci
2557db96d56Sopenharmony_ci        self.assertEqual(len(self.loop._signal_handlers), 0)
2567db96d56Sopenharmony_ci        m_signal.set_wakeup_fd.assert_called_once_with(-1)
2577db96d56Sopenharmony_ci
2587db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.sys')
2597db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.signal')
2607db96d56Sopenharmony_ci    def test_close_on_finalizing(self, m_signal, m_sys):
2617db96d56Sopenharmony_ci        m_signal.NSIG = signal.NSIG
2627db96d56Sopenharmony_ci        m_signal.valid_signals = signal.valid_signals
2637db96d56Sopenharmony_ci        self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
2647db96d56Sopenharmony_ci
2657db96d56Sopenharmony_ci        self.assertEqual(len(self.loop._signal_handlers), 1)
2667db96d56Sopenharmony_ci        m_sys.is_finalizing.return_value = True
2677db96d56Sopenharmony_ci        m_signal.signal.reset_mock()
2687db96d56Sopenharmony_ci
2697db96d56Sopenharmony_ci        with self.assertWarnsRegex(ResourceWarning,
2707db96d56Sopenharmony_ci                                   "skipping signal handlers removal"):
2717db96d56Sopenharmony_ci            self.loop.close()
2727db96d56Sopenharmony_ci
2737db96d56Sopenharmony_ci        self.assertEqual(len(self.loop._signal_handlers), 0)
2747db96d56Sopenharmony_ci        self.assertFalse(m_signal.signal.called)
2757db96d56Sopenharmony_ci
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
2787db96d56Sopenharmony_ci                     'UNIX Sockets are not supported')
2797db96d56Sopenharmony_ciclass SelectorEventLoopUnixSocketTests(test_utils.TestCase):
2807db96d56Sopenharmony_ci
2817db96d56Sopenharmony_ci    def setUp(self):
2827db96d56Sopenharmony_ci        super().setUp()
2837db96d56Sopenharmony_ci        self.loop = asyncio.SelectorEventLoop()
2847db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
2857db96d56Sopenharmony_ci
2867db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
2877db96d56Sopenharmony_ci    def test_create_unix_server_existing_path_sock(self):
2887db96d56Sopenharmony_ci        with test_utils.unix_socket_path() as path:
2897db96d56Sopenharmony_ci            sock = socket.socket(socket.AF_UNIX)
2907db96d56Sopenharmony_ci            sock.bind(path)
2917db96d56Sopenharmony_ci            sock.listen(1)
2927db96d56Sopenharmony_ci            sock.close()
2937db96d56Sopenharmony_ci
2947db96d56Sopenharmony_ci            coro = self.loop.create_unix_server(lambda: None, path)
2957db96d56Sopenharmony_ci            srv = self.loop.run_until_complete(coro)
2967db96d56Sopenharmony_ci            srv.close()
2977db96d56Sopenharmony_ci            self.loop.run_until_complete(srv.wait_closed())
2987db96d56Sopenharmony_ci
2997db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
3007db96d56Sopenharmony_ci    def test_create_unix_server_pathlib(self):
3017db96d56Sopenharmony_ci        with test_utils.unix_socket_path() as path:
3027db96d56Sopenharmony_ci            path = pathlib.Path(path)
3037db96d56Sopenharmony_ci            srv_coro = self.loop.create_unix_server(lambda: None, path)
3047db96d56Sopenharmony_ci            srv = self.loop.run_until_complete(srv_coro)
3057db96d56Sopenharmony_ci            srv.close()
3067db96d56Sopenharmony_ci            self.loop.run_until_complete(srv.wait_closed())
3077db96d56Sopenharmony_ci
3087db96d56Sopenharmony_ci    def test_create_unix_connection_pathlib(self):
3097db96d56Sopenharmony_ci        with test_utils.unix_socket_path() as path:
3107db96d56Sopenharmony_ci            path = pathlib.Path(path)
3117db96d56Sopenharmony_ci            coro = self.loop.create_unix_connection(lambda: None, path)
3127db96d56Sopenharmony_ci            with self.assertRaises(FileNotFoundError):
3137db96d56Sopenharmony_ci                # If pathlib.Path wasn't supported, the exception would be
3147db96d56Sopenharmony_ci                # different.
3157db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ci    def test_create_unix_server_existing_path_nonsock(self):
3187db96d56Sopenharmony_ci        with tempfile.NamedTemporaryFile() as file:
3197db96d56Sopenharmony_ci            coro = self.loop.create_unix_server(lambda: None, file.name)
3207db96d56Sopenharmony_ci            with self.assertRaisesRegex(OSError,
3217db96d56Sopenharmony_ci                                        'Address.*is already in use'):
3227db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ci    def test_create_unix_server_ssl_bool(self):
3257db96d56Sopenharmony_ci        coro = self.loop.create_unix_server(lambda: None, path='spam',
3267db96d56Sopenharmony_ci                                            ssl=True)
3277db96d56Sopenharmony_ci        with self.assertRaisesRegex(TypeError,
3287db96d56Sopenharmony_ci                                    'ssl argument must be an SSLContext'):
3297db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
3307db96d56Sopenharmony_ci
3317db96d56Sopenharmony_ci    def test_create_unix_server_nopath_nosock(self):
3327db96d56Sopenharmony_ci        coro = self.loop.create_unix_server(lambda: None, path=None)
3337db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError,
3347db96d56Sopenharmony_ci                                    'path was not specified, and no sock'):
3357db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
3367db96d56Sopenharmony_ci
3377db96d56Sopenharmony_ci    def test_create_unix_server_path_inetsock(self):
3387db96d56Sopenharmony_ci        sock = socket.socket()
3397db96d56Sopenharmony_ci        with sock:
3407db96d56Sopenharmony_ci            coro = self.loop.create_unix_server(lambda: None, path=None,
3417db96d56Sopenharmony_ci                                                sock=sock)
3427db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError,
3437db96d56Sopenharmony_ci                                        'A UNIX Domain Stream.*was expected'):
3447db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
3457db96d56Sopenharmony_ci
3467db96d56Sopenharmony_ci    def test_create_unix_server_path_dgram(self):
3477db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
3487db96d56Sopenharmony_ci        with sock:
3497db96d56Sopenharmony_ci            coro = self.loop.create_unix_server(lambda: None, path=None,
3507db96d56Sopenharmony_ci                                                sock=sock)
3517db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError,
3527db96d56Sopenharmony_ci                                        'A UNIX Domain Stream.*was expected'):
3537db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
3547db96d56Sopenharmony_ci
3557db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
3567db96d56Sopenharmony_ci                         'no socket.SOCK_NONBLOCK (linux only)')
3577db96d56Sopenharmony_ci    @socket_helper.skip_unless_bind_unix_socket
3587db96d56Sopenharmony_ci    def test_create_unix_server_path_stream_bittype(self):
3597db96d56Sopenharmony_ci        sock = socket.socket(
3607db96d56Sopenharmony_ci            socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
3617db96d56Sopenharmony_ci        with tempfile.NamedTemporaryFile() as file:
3627db96d56Sopenharmony_ci            fn = file.name
3637db96d56Sopenharmony_ci        try:
3647db96d56Sopenharmony_ci            with sock:
3657db96d56Sopenharmony_ci                sock.bind(fn)
3667db96d56Sopenharmony_ci                coro = self.loop.create_unix_server(lambda: None, path=None,
3677db96d56Sopenharmony_ci                                                    sock=sock)
3687db96d56Sopenharmony_ci                srv = self.loop.run_until_complete(coro)
3697db96d56Sopenharmony_ci                srv.close()
3707db96d56Sopenharmony_ci                self.loop.run_until_complete(srv.wait_closed())
3717db96d56Sopenharmony_ci        finally:
3727db96d56Sopenharmony_ci            os.unlink(fn)
3737db96d56Sopenharmony_ci
3747db96d56Sopenharmony_ci    def test_create_unix_server_ssl_timeout_with_plain_sock(self):
3757db96d56Sopenharmony_ci        coro = self.loop.create_unix_server(lambda: None, path='spam',
3767db96d56Sopenharmony_ci                                            ssl_handshake_timeout=1)
3777db96d56Sopenharmony_ci        with self.assertRaisesRegex(
3787db96d56Sopenharmony_ci                ValueError,
3797db96d56Sopenharmony_ci                'ssl_handshake_timeout is only meaningful with ssl'):
3807db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
3817db96d56Sopenharmony_ci
3827db96d56Sopenharmony_ci    def test_create_unix_connection_path_inetsock(self):
3837db96d56Sopenharmony_ci        sock = socket.socket()
3847db96d56Sopenharmony_ci        with sock:
3857db96d56Sopenharmony_ci            coro = self.loop.create_unix_connection(lambda: None,
3867db96d56Sopenharmony_ci                                                    sock=sock)
3877db96d56Sopenharmony_ci            with self.assertRaisesRegex(ValueError,
3887db96d56Sopenharmony_ci                                        'A UNIX Domain Stream.*was expected'):
3897db96d56Sopenharmony_ci                self.loop.run_until_complete(coro)
3907db96d56Sopenharmony_ci
3917db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.socket')
3927db96d56Sopenharmony_ci    def test_create_unix_server_bind_error(self, m_socket):
3937db96d56Sopenharmony_ci        # Ensure that the socket is closed on any bind error
3947db96d56Sopenharmony_ci        sock = mock.Mock()
3957db96d56Sopenharmony_ci        m_socket.socket.return_value = sock
3967db96d56Sopenharmony_ci
3977db96d56Sopenharmony_ci        sock.bind.side_effect = OSError
3987db96d56Sopenharmony_ci        coro = self.loop.create_unix_server(lambda: None, path="/test")
3997db96d56Sopenharmony_ci        with self.assertRaises(OSError):
4007db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
4017db96d56Sopenharmony_ci        self.assertTrue(sock.close.called)
4027db96d56Sopenharmony_ci
4037db96d56Sopenharmony_ci        sock.bind.side_effect = MemoryError
4047db96d56Sopenharmony_ci        coro = self.loop.create_unix_server(lambda: None, path="/test")
4057db96d56Sopenharmony_ci        with self.assertRaises(MemoryError):
4067db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
4077db96d56Sopenharmony_ci        self.assertTrue(sock.close.called)
4087db96d56Sopenharmony_ci
4097db96d56Sopenharmony_ci    def test_create_unix_connection_path_sock(self):
4107db96d56Sopenharmony_ci        coro = self.loop.create_unix_connection(
4117db96d56Sopenharmony_ci            lambda: None, os.devnull, sock=object())
4127db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError, 'path and sock can not be'):
4137db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
4147db96d56Sopenharmony_ci
4157db96d56Sopenharmony_ci    def test_create_unix_connection_nopath_nosock(self):
4167db96d56Sopenharmony_ci        coro = self.loop.create_unix_connection(
4177db96d56Sopenharmony_ci            lambda: None, None)
4187db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError,
4197db96d56Sopenharmony_ci                                    'no path and sock were specified'):
4207db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
4217db96d56Sopenharmony_ci
4227db96d56Sopenharmony_ci    def test_create_unix_connection_nossl_serverhost(self):
4237db96d56Sopenharmony_ci        coro = self.loop.create_unix_connection(
4247db96d56Sopenharmony_ci            lambda: None, os.devnull, server_hostname='spam')
4257db96d56Sopenharmony_ci        with self.assertRaisesRegex(ValueError,
4267db96d56Sopenharmony_ci                                    'server_hostname is only meaningful'):
4277db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
4287db96d56Sopenharmony_ci
4297db96d56Sopenharmony_ci    def test_create_unix_connection_ssl_noserverhost(self):
4307db96d56Sopenharmony_ci        coro = self.loop.create_unix_connection(
4317db96d56Sopenharmony_ci            lambda: None, os.devnull, ssl=True)
4327db96d56Sopenharmony_ci
4337db96d56Sopenharmony_ci        with self.assertRaisesRegex(
4347db96d56Sopenharmony_ci            ValueError, 'you have to pass server_hostname when using ssl'):
4357db96d56Sopenharmony_ci
4367db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
4377db96d56Sopenharmony_ci
4387db96d56Sopenharmony_ci    def test_create_unix_connection_ssl_timeout_with_plain_sock(self):
4397db96d56Sopenharmony_ci        coro = self.loop.create_unix_connection(lambda: None, path='spam',
4407db96d56Sopenharmony_ci                                            ssl_handshake_timeout=1)
4417db96d56Sopenharmony_ci        with self.assertRaisesRegex(
4427db96d56Sopenharmony_ci                ValueError,
4437db96d56Sopenharmony_ci                'ssl_handshake_timeout is only meaningful with ssl'):
4447db96d56Sopenharmony_ci            self.loop.run_until_complete(coro)
4457db96d56Sopenharmony_ci
4467db96d56Sopenharmony_ci
4477db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(os, 'sendfile'),
4487db96d56Sopenharmony_ci                     'sendfile is not supported')
4497db96d56Sopenharmony_ciclass SelectorEventLoopUnixSockSendfileTests(test_utils.TestCase):
4507db96d56Sopenharmony_ci    DATA = b"12345abcde" * 16 * 1024  # 160 KiB
4517db96d56Sopenharmony_ci
4527db96d56Sopenharmony_ci    class MyProto(asyncio.Protocol):
4537db96d56Sopenharmony_ci
4547db96d56Sopenharmony_ci        def __init__(self, loop):
4557db96d56Sopenharmony_ci            self.started = False
4567db96d56Sopenharmony_ci            self.closed = False
4577db96d56Sopenharmony_ci            self.data = bytearray()
4587db96d56Sopenharmony_ci            self.fut = loop.create_future()
4597db96d56Sopenharmony_ci            self.transport = None
4607db96d56Sopenharmony_ci            self._ready = loop.create_future()
4617db96d56Sopenharmony_ci
4627db96d56Sopenharmony_ci        def connection_made(self, transport):
4637db96d56Sopenharmony_ci            self.started = True
4647db96d56Sopenharmony_ci            self.transport = transport
4657db96d56Sopenharmony_ci            self._ready.set_result(None)
4667db96d56Sopenharmony_ci
4677db96d56Sopenharmony_ci        def data_received(self, data):
4687db96d56Sopenharmony_ci            self.data.extend(data)
4697db96d56Sopenharmony_ci
4707db96d56Sopenharmony_ci        def connection_lost(self, exc):
4717db96d56Sopenharmony_ci            self.closed = True
4727db96d56Sopenharmony_ci            self.fut.set_result(None)
4737db96d56Sopenharmony_ci
4747db96d56Sopenharmony_ci        async def wait_closed(self):
4757db96d56Sopenharmony_ci            await self.fut
4767db96d56Sopenharmony_ci
4777db96d56Sopenharmony_ci    @classmethod
4787db96d56Sopenharmony_ci    def setUpClass(cls):
4797db96d56Sopenharmony_ci        with open(os_helper.TESTFN, 'wb') as fp:
4807db96d56Sopenharmony_ci            fp.write(cls.DATA)
4817db96d56Sopenharmony_ci        super().setUpClass()
4827db96d56Sopenharmony_ci
4837db96d56Sopenharmony_ci    @classmethod
4847db96d56Sopenharmony_ci    def tearDownClass(cls):
4857db96d56Sopenharmony_ci        os_helper.unlink(os_helper.TESTFN)
4867db96d56Sopenharmony_ci        super().tearDownClass()
4877db96d56Sopenharmony_ci
4887db96d56Sopenharmony_ci    def setUp(self):
4897db96d56Sopenharmony_ci        self.loop = asyncio.new_event_loop()
4907db96d56Sopenharmony_ci        self.set_event_loop(self.loop)
4917db96d56Sopenharmony_ci        self.file = open(os_helper.TESTFN, 'rb')
4927db96d56Sopenharmony_ci        self.addCleanup(self.file.close)
4937db96d56Sopenharmony_ci        super().setUp()
4947db96d56Sopenharmony_ci
4957db96d56Sopenharmony_ci    def make_socket(self, cleanup=True):
4967db96d56Sopenharmony_ci        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4977db96d56Sopenharmony_ci        sock.setblocking(False)
4987db96d56Sopenharmony_ci        sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
4997db96d56Sopenharmony_ci        sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1024)
5007db96d56Sopenharmony_ci        if cleanup:
5017db96d56Sopenharmony_ci            self.addCleanup(sock.close)
5027db96d56Sopenharmony_ci        return sock
5037db96d56Sopenharmony_ci
5047db96d56Sopenharmony_ci    def run_loop(self, coro):
5057db96d56Sopenharmony_ci        return self.loop.run_until_complete(coro)
5067db96d56Sopenharmony_ci
5077db96d56Sopenharmony_ci    def prepare(self):
5087db96d56Sopenharmony_ci        sock = self.make_socket()
5097db96d56Sopenharmony_ci        proto = self.MyProto(self.loop)
5107db96d56Sopenharmony_ci        port = socket_helper.find_unused_port()
5117db96d56Sopenharmony_ci        srv_sock = self.make_socket(cleanup=False)
5127db96d56Sopenharmony_ci        srv_sock.bind((socket_helper.HOST, port))
5137db96d56Sopenharmony_ci        server = self.run_loop(self.loop.create_server(
5147db96d56Sopenharmony_ci            lambda: proto, sock=srv_sock))
5157db96d56Sopenharmony_ci        self.run_loop(self.loop.sock_connect(sock, (socket_helper.HOST, port)))
5167db96d56Sopenharmony_ci        self.run_loop(proto._ready)
5177db96d56Sopenharmony_ci
5187db96d56Sopenharmony_ci        def cleanup():
5197db96d56Sopenharmony_ci            proto.transport.close()
5207db96d56Sopenharmony_ci            self.run_loop(proto.wait_closed())
5217db96d56Sopenharmony_ci
5227db96d56Sopenharmony_ci            server.close()
5237db96d56Sopenharmony_ci            self.run_loop(server.wait_closed())
5247db96d56Sopenharmony_ci
5257db96d56Sopenharmony_ci        self.addCleanup(cleanup)
5267db96d56Sopenharmony_ci
5277db96d56Sopenharmony_ci        return sock, proto
5287db96d56Sopenharmony_ci
5297db96d56Sopenharmony_ci    def test_sock_sendfile_not_available(self):
5307db96d56Sopenharmony_ci        sock, proto = self.prepare()
5317db96d56Sopenharmony_ci        with mock.patch('asyncio.unix_events.os', spec=[]):
5327db96d56Sopenharmony_ci            with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
5337db96d56Sopenharmony_ci                                        "os[.]sendfile[(][)] is not available"):
5347db96d56Sopenharmony_ci                self.run_loop(self.loop._sock_sendfile_native(sock, self.file,
5357db96d56Sopenharmony_ci                                                              0, None))
5367db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), 0)
5377db96d56Sopenharmony_ci
5387db96d56Sopenharmony_ci    def test_sock_sendfile_not_a_file(self):
5397db96d56Sopenharmony_ci        sock, proto = self.prepare()
5407db96d56Sopenharmony_ci        f = object()
5417db96d56Sopenharmony_ci        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
5427db96d56Sopenharmony_ci                                    "not a regular file"):
5437db96d56Sopenharmony_ci            self.run_loop(self.loop._sock_sendfile_native(sock, f,
5447db96d56Sopenharmony_ci                                                          0, None))
5457db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), 0)
5467db96d56Sopenharmony_ci
5477db96d56Sopenharmony_ci    def test_sock_sendfile_iobuffer(self):
5487db96d56Sopenharmony_ci        sock, proto = self.prepare()
5497db96d56Sopenharmony_ci        f = io.BytesIO()
5507db96d56Sopenharmony_ci        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
5517db96d56Sopenharmony_ci                                    "not a regular file"):
5527db96d56Sopenharmony_ci            self.run_loop(self.loop._sock_sendfile_native(sock, f,
5537db96d56Sopenharmony_ci                                                          0, None))
5547db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), 0)
5557db96d56Sopenharmony_ci
5567db96d56Sopenharmony_ci    def test_sock_sendfile_not_regular_file(self):
5577db96d56Sopenharmony_ci        sock, proto = self.prepare()
5587db96d56Sopenharmony_ci        f = mock.Mock()
5597db96d56Sopenharmony_ci        f.fileno.return_value = -1
5607db96d56Sopenharmony_ci        with self.assertRaisesRegex(asyncio.SendfileNotAvailableError,
5617db96d56Sopenharmony_ci                                    "not a regular file"):
5627db96d56Sopenharmony_ci            self.run_loop(self.loop._sock_sendfile_native(sock, f,
5637db96d56Sopenharmony_ci                                                          0, None))
5647db96d56Sopenharmony_ci        self.assertEqual(self.file.tell(), 0)
5657db96d56Sopenharmony_ci
5667db96d56Sopenharmony_ci    def test_sock_sendfile_cancel1(self):
5677db96d56Sopenharmony_ci        sock, proto = self.prepare()
5687db96d56Sopenharmony_ci
5697db96d56Sopenharmony_ci        fut = self.loop.create_future()
5707db96d56Sopenharmony_ci        fileno = self.file.fileno()
5717db96d56Sopenharmony_ci        self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
5727db96d56Sopenharmony_ci                                             0, None, len(self.DATA), 0)
5737db96d56Sopenharmony_ci        fut.cancel()
5747db96d56Sopenharmony_ci        with contextlib.suppress(asyncio.CancelledError):
5757db96d56Sopenharmony_ci            self.run_loop(fut)
5767db96d56Sopenharmony_ci        with self.assertRaises(KeyError):
5777db96d56Sopenharmony_ci            self.loop._selector.get_key(sock)
5787db96d56Sopenharmony_ci
5797db96d56Sopenharmony_ci    def test_sock_sendfile_cancel2(self):
5807db96d56Sopenharmony_ci        sock, proto = self.prepare()
5817db96d56Sopenharmony_ci
5827db96d56Sopenharmony_ci        fut = self.loop.create_future()
5837db96d56Sopenharmony_ci        fileno = self.file.fileno()
5847db96d56Sopenharmony_ci        self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
5857db96d56Sopenharmony_ci                                             0, None, len(self.DATA), 0)
5867db96d56Sopenharmony_ci        fut.cancel()
5877db96d56Sopenharmony_ci        self.loop._sock_sendfile_native_impl(fut, sock.fileno(), sock, fileno,
5887db96d56Sopenharmony_ci                                             0, None, len(self.DATA), 0)
5897db96d56Sopenharmony_ci        with self.assertRaises(KeyError):
5907db96d56Sopenharmony_ci            self.loop._selector.get_key(sock)
5917db96d56Sopenharmony_ci
5927db96d56Sopenharmony_ci    def test_sock_sendfile_blocking_error(self):
5937db96d56Sopenharmony_ci        sock, proto = self.prepare()
5947db96d56Sopenharmony_ci
5957db96d56Sopenharmony_ci        fileno = self.file.fileno()
5967db96d56Sopenharmony_ci        fut = mock.Mock()
5977db96d56Sopenharmony_ci        fut.cancelled.return_value = False
5987db96d56Sopenharmony_ci        with mock.patch('os.sendfile', side_effect=BlockingIOError()):
5997db96d56Sopenharmony_ci            self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
6007db96d56Sopenharmony_ci                                                 0, None, len(self.DATA), 0)
6017db96d56Sopenharmony_ci        key = self.loop._selector.get_key(sock)
6027db96d56Sopenharmony_ci        self.assertIsNotNone(key)
6037db96d56Sopenharmony_ci        fut.add_done_callback.assert_called_once_with(mock.ANY)
6047db96d56Sopenharmony_ci
6057db96d56Sopenharmony_ci    def test_sock_sendfile_os_error_first_call(self):
6067db96d56Sopenharmony_ci        sock, proto = self.prepare()
6077db96d56Sopenharmony_ci
6087db96d56Sopenharmony_ci        fileno = self.file.fileno()
6097db96d56Sopenharmony_ci        fut = self.loop.create_future()
6107db96d56Sopenharmony_ci        with mock.patch('os.sendfile', side_effect=OSError()):
6117db96d56Sopenharmony_ci            self.loop._sock_sendfile_native_impl(fut, None, sock, fileno,
6127db96d56Sopenharmony_ci                                                 0, None, len(self.DATA), 0)
6137db96d56Sopenharmony_ci        with self.assertRaises(KeyError):
6147db96d56Sopenharmony_ci            self.loop._selector.get_key(sock)
6157db96d56Sopenharmony_ci        exc = fut.exception()
6167db96d56Sopenharmony_ci        self.assertIsInstance(exc, asyncio.SendfileNotAvailableError)
6177db96d56Sopenharmony_ci        self.assertEqual(0, self.file.tell())
6187db96d56Sopenharmony_ci
6197db96d56Sopenharmony_ci    def test_sock_sendfile_os_error_next_call(self):
6207db96d56Sopenharmony_ci        sock, proto = self.prepare()
6217db96d56Sopenharmony_ci
6227db96d56Sopenharmony_ci        fileno = self.file.fileno()
6237db96d56Sopenharmony_ci        fut = self.loop.create_future()
6247db96d56Sopenharmony_ci        err = OSError()
6257db96d56Sopenharmony_ci        with mock.patch('os.sendfile', side_effect=err):
6267db96d56Sopenharmony_ci            self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
6277db96d56Sopenharmony_ci                                                 sock, fileno,
6287db96d56Sopenharmony_ci                                                 1000, None, len(self.DATA),
6297db96d56Sopenharmony_ci                                                 1000)
6307db96d56Sopenharmony_ci        with self.assertRaises(KeyError):
6317db96d56Sopenharmony_ci            self.loop._selector.get_key(sock)
6327db96d56Sopenharmony_ci        exc = fut.exception()
6337db96d56Sopenharmony_ci        self.assertIs(exc, err)
6347db96d56Sopenharmony_ci        self.assertEqual(1000, self.file.tell())
6357db96d56Sopenharmony_ci
6367db96d56Sopenharmony_ci    def test_sock_sendfile_exception(self):
6377db96d56Sopenharmony_ci        sock, proto = self.prepare()
6387db96d56Sopenharmony_ci
6397db96d56Sopenharmony_ci        fileno = self.file.fileno()
6407db96d56Sopenharmony_ci        fut = self.loop.create_future()
6417db96d56Sopenharmony_ci        err = asyncio.SendfileNotAvailableError()
6427db96d56Sopenharmony_ci        with mock.patch('os.sendfile', side_effect=err):
6437db96d56Sopenharmony_ci            self.loop._sock_sendfile_native_impl(fut, sock.fileno(),
6447db96d56Sopenharmony_ci                                                 sock, fileno,
6457db96d56Sopenharmony_ci                                                 1000, None, len(self.DATA),
6467db96d56Sopenharmony_ci                                                 1000)
6477db96d56Sopenharmony_ci        with self.assertRaises(KeyError):
6487db96d56Sopenharmony_ci            self.loop._selector.get_key(sock)
6497db96d56Sopenharmony_ci        exc = fut.exception()
6507db96d56Sopenharmony_ci        self.assertIs(exc, err)
6517db96d56Sopenharmony_ci        self.assertEqual(1000, self.file.tell())
6527db96d56Sopenharmony_ci
6537db96d56Sopenharmony_ci
6547db96d56Sopenharmony_ciclass UnixReadPipeTransportTests(test_utils.TestCase):
6557db96d56Sopenharmony_ci
6567db96d56Sopenharmony_ci    def setUp(self):
6577db96d56Sopenharmony_ci        super().setUp()
6587db96d56Sopenharmony_ci        self.loop = self.new_test_loop()
6597db96d56Sopenharmony_ci        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
6607db96d56Sopenharmony_ci        self.pipe = mock.Mock(spec_set=io.RawIOBase)
6617db96d56Sopenharmony_ci        self.pipe.fileno.return_value = 5
6627db96d56Sopenharmony_ci
6637db96d56Sopenharmony_ci        blocking_patcher = mock.patch('os.set_blocking')
6647db96d56Sopenharmony_ci        blocking_patcher.start()
6657db96d56Sopenharmony_ci        self.addCleanup(blocking_patcher.stop)
6667db96d56Sopenharmony_ci
6677db96d56Sopenharmony_ci        fstat_patcher = mock.patch('os.fstat')
6687db96d56Sopenharmony_ci        m_fstat = fstat_patcher.start()
6697db96d56Sopenharmony_ci        st = mock.Mock()
6707db96d56Sopenharmony_ci        st.st_mode = stat.S_IFIFO
6717db96d56Sopenharmony_ci        m_fstat.return_value = st
6727db96d56Sopenharmony_ci        self.addCleanup(fstat_patcher.stop)
6737db96d56Sopenharmony_ci
6747db96d56Sopenharmony_ci    def read_pipe_transport(self, waiter=None):
6757db96d56Sopenharmony_ci        transport = unix_events._UnixReadPipeTransport(self.loop, self.pipe,
6767db96d56Sopenharmony_ci                                                       self.protocol,
6777db96d56Sopenharmony_ci                                                       waiter=waiter)
6787db96d56Sopenharmony_ci        self.addCleanup(close_pipe_transport, transport)
6797db96d56Sopenharmony_ci        return transport
6807db96d56Sopenharmony_ci
6817db96d56Sopenharmony_ci    def test_ctor(self):
6827db96d56Sopenharmony_ci        waiter = self.loop.create_future()
6837db96d56Sopenharmony_ci        tr = self.read_pipe_transport(waiter=waiter)
6847db96d56Sopenharmony_ci        self.loop.run_until_complete(waiter)
6857db96d56Sopenharmony_ci
6867db96d56Sopenharmony_ci        self.protocol.connection_made.assert_called_with(tr)
6877db96d56Sopenharmony_ci        self.loop.assert_reader(5, tr._read_ready)
6887db96d56Sopenharmony_ci        self.assertIsNone(waiter.result())
6897db96d56Sopenharmony_ci
6907db96d56Sopenharmony_ci    @mock.patch('os.read')
6917db96d56Sopenharmony_ci    def test__read_ready(self, m_read):
6927db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
6937db96d56Sopenharmony_ci        m_read.return_value = b'data'
6947db96d56Sopenharmony_ci        tr._read_ready()
6957db96d56Sopenharmony_ci
6967db96d56Sopenharmony_ci        m_read.assert_called_with(5, tr.max_size)
6977db96d56Sopenharmony_ci        self.protocol.data_received.assert_called_with(b'data')
6987db96d56Sopenharmony_ci
6997db96d56Sopenharmony_ci    @mock.patch('os.read')
7007db96d56Sopenharmony_ci    def test__read_ready_eof(self, m_read):
7017db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7027db96d56Sopenharmony_ci        m_read.return_value = b''
7037db96d56Sopenharmony_ci        tr._read_ready()
7047db96d56Sopenharmony_ci
7057db96d56Sopenharmony_ci        m_read.assert_called_with(5, tr.max_size)
7067db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
7077db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
7087db96d56Sopenharmony_ci        self.protocol.eof_received.assert_called_with()
7097db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(None)
7107db96d56Sopenharmony_ci
7117db96d56Sopenharmony_ci    @mock.patch('os.read')
7127db96d56Sopenharmony_ci    def test__read_ready_blocked(self, m_read):
7137db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7147db96d56Sopenharmony_ci        m_read.side_effect = BlockingIOError
7157db96d56Sopenharmony_ci        tr._read_ready()
7167db96d56Sopenharmony_ci
7177db96d56Sopenharmony_ci        m_read.assert_called_with(5, tr.max_size)
7187db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
7197db96d56Sopenharmony_ci        self.assertFalse(self.protocol.data_received.called)
7207db96d56Sopenharmony_ci
7217db96d56Sopenharmony_ci    @mock.patch('asyncio.log.logger.error')
7227db96d56Sopenharmony_ci    @mock.patch('os.read')
7237db96d56Sopenharmony_ci    def test__read_ready_error(self, m_read, m_logexc):
7247db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7257db96d56Sopenharmony_ci        err = OSError()
7267db96d56Sopenharmony_ci        m_read.side_effect = err
7277db96d56Sopenharmony_ci        tr._close = mock.Mock()
7287db96d56Sopenharmony_ci        tr._read_ready()
7297db96d56Sopenharmony_ci
7307db96d56Sopenharmony_ci        m_read.assert_called_with(5, tr.max_size)
7317db96d56Sopenharmony_ci        tr._close.assert_called_with(err)
7327db96d56Sopenharmony_ci        m_logexc.assert_called_with(
7337db96d56Sopenharmony_ci            test_utils.MockPattern(
7347db96d56Sopenharmony_ci                'Fatal read error on pipe transport'
7357db96d56Sopenharmony_ci                '\nprotocol:.*\ntransport:.*'),
7367db96d56Sopenharmony_ci            exc_info=(OSError, MOCK_ANY, MOCK_ANY))
7377db96d56Sopenharmony_ci
7387db96d56Sopenharmony_ci    @mock.patch('os.read')
7397db96d56Sopenharmony_ci    def test_pause_reading(self, m_read):
7407db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7417db96d56Sopenharmony_ci        m = mock.Mock()
7427db96d56Sopenharmony_ci        self.loop.add_reader(5, m)
7437db96d56Sopenharmony_ci        tr.pause_reading()
7447db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
7457db96d56Sopenharmony_ci
7467db96d56Sopenharmony_ci    @mock.patch('os.read')
7477db96d56Sopenharmony_ci    def test_resume_reading(self, m_read):
7487db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7497db96d56Sopenharmony_ci        tr.pause_reading()
7507db96d56Sopenharmony_ci        tr.resume_reading()
7517db96d56Sopenharmony_ci        self.loop.assert_reader(5, tr._read_ready)
7527db96d56Sopenharmony_ci
7537db96d56Sopenharmony_ci    @mock.patch('os.read')
7547db96d56Sopenharmony_ci    def test_close(self, m_read):
7557db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7567db96d56Sopenharmony_ci        tr._close = mock.Mock()
7577db96d56Sopenharmony_ci        tr.close()
7587db96d56Sopenharmony_ci        tr._close.assert_called_with(None)
7597db96d56Sopenharmony_ci
7607db96d56Sopenharmony_ci    @mock.patch('os.read')
7617db96d56Sopenharmony_ci    def test_close_already_closing(self, m_read):
7627db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7637db96d56Sopenharmony_ci        tr._closing = True
7647db96d56Sopenharmony_ci        tr._close = mock.Mock()
7657db96d56Sopenharmony_ci        tr.close()
7667db96d56Sopenharmony_ci        self.assertFalse(tr._close.called)
7677db96d56Sopenharmony_ci
7687db96d56Sopenharmony_ci    @mock.patch('os.read')
7697db96d56Sopenharmony_ci    def test__close(self, m_read):
7707db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7717db96d56Sopenharmony_ci        err = object()
7727db96d56Sopenharmony_ci        tr._close(err)
7737db96d56Sopenharmony_ci        self.assertTrue(tr.is_closing())
7747db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
7757db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
7767db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(err)
7777db96d56Sopenharmony_ci
7787db96d56Sopenharmony_ci    def test__call_connection_lost(self):
7797db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7807db96d56Sopenharmony_ci        self.assertIsNotNone(tr._protocol)
7817db96d56Sopenharmony_ci        self.assertIsNotNone(tr._loop)
7827db96d56Sopenharmony_ci
7837db96d56Sopenharmony_ci        err = None
7847db96d56Sopenharmony_ci        tr._call_connection_lost(err)
7857db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(err)
7867db96d56Sopenharmony_ci        self.pipe.close.assert_called_with()
7877db96d56Sopenharmony_ci
7887db96d56Sopenharmony_ci        self.assertIsNone(tr._protocol)
7897db96d56Sopenharmony_ci        self.assertIsNone(tr._loop)
7907db96d56Sopenharmony_ci
7917db96d56Sopenharmony_ci    def test__call_connection_lost_with_err(self):
7927db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
7937db96d56Sopenharmony_ci        self.assertIsNotNone(tr._protocol)
7947db96d56Sopenharmony_ci        self.assertIsNotNone(tr._loop)
7957db96d56Sopenharmony_ci
7967db96d56Sopenharmony_ci        err = OSError()
7977db96d56Sopenharmony_ci        tr._call_connection_lost(err)
7987db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(err)
7997db96d56Sopenharmony_ci        self.pipe.close.assert_called_with()
8007db96d56Sopenharmony_ci
8017db96d56Sopenharmony_ci        self.assertIsNone(tr._protocol)
8027db96d56Sopenharmony_ci        self.assertIsNone(tr._loop)
8037db96d56Sopenharmony_ci
8047db96d56Sopenharmony_ci    def test_pause_reading_on_closed_pipe(self):
8057db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
8067db96d56Sopenharmony_ci        tr.close()
8077db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
8087db96d56Sopenharmony_ci        self.assertIsNone(tr._loop)
8097db96d56Sopenharmony_ci        tr.pause_reading()
8107db96d56Sopenharmony_ci
8117db96d56Sopenharmony_ci    def test_pause_reading_on_paused_pipe(self):
8127db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
8137db96d56Sopenharmony_ci        tr.pause_reading()
8147db96d56Sopenharmony_ci        # the second call should do nothing
8157db96d56Sopenharmony_ci        tr.pause_reading()
8167db96d56Sopenharmony_ci
8177db96d56Sopenharmony_ci    def test_resume_reading_on_closed_pipe(self):
8187db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
8197db96d56Sopenharmony_ci        tr.close()
8207db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
8217db96d56Sopenharmony_ci        self.assertIsNone(tr._loop)
8227db96d56Sopenharmony_ci        tr.resume_reading()
8237db96d56Sopenharmony_ci
8247db96d56Sopenharmony_ci    def test_resume_reading_on_paused_pipe(self):
8257db96d56Sopenharmony_ci        tr = self.read_pipe_transport()
8267db96d56Sopenharmony_ci        # the pipe is not paused
8277db96d56Sopenharmony_ci        # resuming should do nothing
8287db96d56Sopenharmony_ci        tr.resume_reading()
8297db96d56Sopenharmony_ci
8307db96d56Sopenharmony_ci
8317db96d56Sopenharmony_ciclass UnixWritePipeTransportTests(test_utils.TestCase):
8327db96d56Sopenharmony_ci
8337db96d56Sopenharmony_ci    def setUp(self):
8347db96d56Sopenharmony_ci        super().setUp()
8357db96d56Sopenharmony_ci        self.loop = self.new_test_loop()
8367db96d56Sopenharmony_ci        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
8377db96d56Sopenharmony_ci        self.pipe = mock.Mock(spec_set=io.RawIOBase)
8387db96d56Sopenharmony_ci        self.pipe.fileno.return_value = 5
8397db96d56Sopenharmony_ci
8407db96d56Sopenharmony_ci        blocking_patcher = mock.patch('os.set_blocking')
8417db96d56Sopenharmony_ci        blocking_patcher.start()
8427db96d56Sopenharmony_ci        self.addCleanup(blocking_patcher.stop)
8437db96d56Sopenharmony_ci
8447db96d56Sopenharmony_ci        fstat_patcher = mock.patch('os.fstat')
8457db96d56Sopenharmony_ci        m_fstat = fstat_patcher.start()
8467db96d56Sopenharmony_ci        st = mock.Mock()
8477db96d56Sopenharmony_ci        st.st_mode = stat.S_IFSOCK
8487db96d56Sopenharmony_ci        m_fstat.return_value = st
8497db96d56Sopenharmony_ci        self.addCleanup(fstat_patcher.stop)
8507db96d56Sopenharmony_ci
8517db96d56Sopenharmony_ci    def write_pipe_transport(self, waiter=None):
8527db96d56Sopenharmony_ci        transport = unix_events._UnixWritePipeTransport(self.loop, self.pipe,
8537db96d56Sopenharmony_ci                                                        self.protocol,
8547db96d56Sopenharmony_ci                                                        waiter=waiter)
8557db96d56Sopenharmony_ci        self.addCleanup(close_pipe_transport, transport)
8567db96d56Sopenharmony_ci        return transport
8577db96d56Sopenharmony_ci
8587db96d56Sopenharmony_ci    def test_ctor(self):
8597db96d56Sopenharmony_ci        waiter = self.loop.create_future()
8607db96d56Sopenharmony_ci        tr = self.write_pipe_transport(waiter=waiter)
8617db96d56Sopenharmony_ci        self.loop.run_until_complete(waiter)
8627db96d56Sopenharmony_ci
8637db96d56Sopenharmony_ci        self.protocol.connection_made.assert_called_with(tr)
8647db96d56Sopenharmony_ci        self.loop.assert_reader(5, tr._read_ready)
8657db96d56Sopenharmony_ci        self.assertEqual(None, waiter.result())
8667db96d56Sopenharmony_ci
8677db96d56Sopenharmony_ci    def test_can_write_eof(self):
8687db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
8697db96d56Sopenharmony_ci        self.assertTrue(tr.can_write_eof())
8707db96d56Sopenharmony_ci
8717db96d56Sopenharmony_ci    @mock.patch('os.write')
8727db96d56Sopenharmony_ci    def test_write(self, m_write):
8737db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
8747db96d56Sopenharmony_ci        m_write.return_value = 4
8757db96d56Sopenharmony_ci        tr.write(b'data')
8767db96d56Sopenharmony_ci        m_write.assert_called_with(5, b'data')
8777db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
8787db96d56Sopenharmony_ci        self.assertEqual(bytearray(), tr._buffer)
8797db96d56Sopenharmony_ci
8807db96d56Sopenharmony_ci    @mock.patch('os.write')
8817db96d56Sopenharmony_ci    def test_write_no_data(self, m_write):
8827db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
8837db96d56Sopenharmony_ci        tr.write(b'')
8847db96d56Sopenharmony_ci        self.assertFalse(m_write.called)
8857db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
8867db96d56Sopenharmony_ci        self.assertEqual(bytearray(b''), tr._buffer)
8877db96d56Sopenharmony_ci
8887db96d56Sopenharmony_ci    @mock.patch('os.write')
8897db96d56Sopenharmony_ci    def test_write_partial(self, m_write):
8907db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
8917db96d56Sopenharmony_ci        m_write.return_value = 2
8927db96d56Sopenharmony_ci        tr.write(b'data')
8937db96d56Sopenharmony_ci        self.loop.assert_writer(5, tr._write_ready)
8947db96d56Sopenharmony_ci        self.assertEqual(bytearray(b'ta'), tr._buffer)
8957db96d56Sopenharmony_ci
8967db96d56Sopenharmony_ci    @mock.patch('os.write')
8977db96d56Sopenharmony_ci    def test_write_buffer(self, m_write):
8987db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
8997db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
9007db96d56Sopenharmony_ci        tr._buffer = bytearray(b'previous')
9017db96d56Sopenharmony_ci        tr.write(b'data')
9027db96d56Sopenharmony_ci        self.assertFalse(m_write.called)
9037db96d56Sopenharmony_ci        self.loop.assert_writer(5, tr._write_ready)
9047db96d56Sopenharmony_ci        self.assertEqual(bytearray(b'previousdata'), tr._buffer)
9057db96d56Sopenharmony_ci
9067db96d56Sopenharmony_ci    @mock.patch('os.write')
9077db96d56Sopenharmony_ci    def test_write_again(self, m_write):
9087db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9097db96d56Sopenharmony_ci        m_write.side_effect = BlockingIOError()
9107db96d56Sopenharmony_ci        tr.write(b'data')
9117db96d56Sopenharmony_ci        m_write.assert_called_with(5, bytearray(b'data'))
9127db96d56Sopenharmony_ci        self.loop.assert_writer(5, tr._write_ready)
9137db96d56Sopenharmony_ci        self.assertEqual(bytearray(b'data'), tr._buffer)
9147db96d56Sopenharmony_ci
9157db96d56Sopenharmony_ci    @mock.patch('asyncio.unix_events.logger')
9167db96d56Sopenharmony_ci    @mock.patch('os.write')
9177db96d56Sopenharmony_ci    def test_write_err(self, m_write, m_log):
9187db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9197db96d56Sopenharmony_ci        err = OSError()
9207db96d56Sopenharmony_ci        m_write.side_effect = err
9217db96d56Sopenharmony_ci        tr._fatal_error = mock.Mock()
9227db96d56Sopenharmony_ci        tr.write(b'data')
9237db96d56Sopenharmony_ci        m_write.assert_called_with(5, b'data')
9247db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
9257db96d56Sopenharmony_ci        self.assertEqual(bytearray(), tr._buffer)
9267db96d56Sopenharmony_ci        tr._fatal_error.assert_called_with(
9277db96d56Sopenharmony_ci                            err,
9287db96d56Sopenharmony_ci                            'Fatal write error on pipe transport')
9297db96d56Sopenharmony_ci        self.assertEqual(1, tr._conn_lost)
9307db96d56Sopenharmony_ci
9317db96d56Sopenharmony_ci        tr.write(b'data')
9327db96d56Sopenharmony_ci        self.assertEqual(2, tr._conn_lost)
9337db96d56Sopenharmony_ci        tr.write(b'data')
9347db96d56Sopenharmony_ci        tr.write(b'data')
9357db96d56Sopenharmony_ci        tr.write(b'data')
9367db96d56Sopenharmony_ci        tr.write(b'data')
9377db96d56Sopenharmony_ci        # This is a bit overspecified. :-(
9387db96d56Sopenharmony_ci        m_log.warning.assert_called_with(
9397db96d56Sopenharmony_ci            'pipe closed by peer or os.write(pipe, data) raised exception.')
9407db96d56Sopenharmony_ci        tr.close()
9417db96d56Sopenharmony_ci
9427db96d56Sopenharmony_ci    @mock.patch('os.write')
9437db96d56Sopenharmony_ci    def test_write_close(self, m_write):
9447db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9457db96d56Sopenharmony_ci        tr._read_ready()  # pipe was closed by peer
9467db96d56Sopenharmony_ci
9477db96d56Sopenharmony_ci        tr.write(b'data')
9487db96d56Sopenharmony_ci        self.assertEqual(tr._conn_lost, 1)
9497db96d56Sopenharmony_ci        tr.write(b'data')
9507db96d56Sopenharmony_ci        self.assertEqual(tr._conn_lost, 2)
9517db96d56Sopenharmony_ci
9527db96d56Sopenharmony_ci    def test__read_ready(self):
9537db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9547db96d56Sopenharmony_ci        tr._read_ready()
9557db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
9567db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
9577db96d56Sopenharmony_ci        self.assertTrue(tr.is_closing())
9587db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
9597db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(None)
9607db96d56Sopenharmony_ci
9617db96d56Sopenharmony_ci    @mock.patch('os.write')
9627db96d56Sopenharmony_ci    def test__write_ready(self, m_write):
9637db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9647db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
9657db96d56Sopenharmony_ci        tr._buffer = bytearray(b'data')
9667db96d56Sopenharmony_ci        m_write.return_value = 4
9677db96d56Sopenharmony_ci        tr._write_ready()
9687db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
9697db96d56Sopenharmony_ci        self.assertEqual(bytearray(), tr._buffer)
9707db96d56Sopenharmony_ci
9717db96d56Sopenharmony_ci    @mock.patch('os.write')
9727db96d56Sopenharmony_ci    def test__write_ready_partial(self, m_write):
9737db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9747db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
9757db96d56Sopenharmony_ci        tr._buffer = bytearray(b'data')
9767db96d56Sopenharmony_ci        m_write.return_value = 3
9777db96d56Sopenharmony_ci        tr._write_ready()
9787db96d56Sopenharmony_ci        self.loop.assert_writer(5, tr._write_ready)
9797db96d56Sopenharmony_ci        self.assertEqual(bytearray(b'a'), tr._buffer)
9807db96d56Sopenharmony_ci
9817db96d56Sopenharmony_ci    @mock.patch('os.write')
9827db96d56Sopenharmony_ci    def test__write_ready_again(self, m_write):
9837db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9847db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
9857db96d56Sopenharmony_ci        tr._buffer = bytearray(b'data')
9867db96d56Sopenharmony_ci        m_write.side_effect = BlockingIOError()
9877db96d56Sopenharmony_ci        tr._write_ready()
9887db96d56Sopenharmony_ci        m_write.assert_called_with(5, bytearray(b'data'))
9897db96d56Sopenharmony_ci        self.loop.assert_writer(5, tr._write_ready)
9907db96d56Sopenharmony_ci        self.assertEqual(bytearray(b'data'), tr._buffer)
9917db96d56Sopenharmony_ci
9927db96d56Sopenharmony_ci    @mock.patch('os.write')
9937db96d56Sopenharmony_ci    def test__write_ready_empty(self, m_write):
9947db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
9957db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
9967db96d56Sopenharmony_ci        tr._buffer = bytearray(b'data')
9977db96d56Sopenharmony_ci        m_write.return_value = 0
9987db96d56Sopenharmony_ci        tr._write_ready()
9997db96d56Sopenharmony_ci        m_write.assert_called_with(5, bytearray(b'data'))
10007db96d56Sopenharmony_ci        self.loop.assert_writer(5, tr._write_ready)
10017db96d56Sopenharmony_ci        self.assertEqual(bytearray(b'data'), tr._buffer)
10027db96d56Sopenharmony_ci
10037db96d56Sopenharmony_ci    @mock.patch('asyncio.log.logger.error')
10047db96d56Sopenharmony_ci    @mock.patch('os.write')
10057db96d56Sopenharmony_ci    def test__write_ready_err(self, m_write, m_logexc):
10067db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10077db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
10087db96d56Sopenharmony_ci        tr._buffer = bytearray(b'data')
10097db96d56Sopenharmony_ci        m_write.side_effect = err = OSError()
10107db96d56Sopenharmony_ci        tr._write_ready()
10117db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
10127db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
10137db96d56Sopenharmony_ci        self.assertEqual(bytearray(), tr._buffer)
10147db96d56Sopenharmony_ci        self.assertTrue(tr.is_closing())
10157db96d56Sopenharmony_ci        m_logexc.assert_not_called()
10167db96d56Sopenharmony_ci        self.assertEqual(1, tr._conn_lost)
10177db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
10187db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(err)
10197db96d56Sopenharmony_ci
10207db96d56Sopenharmony_ci    @mock.patch('os.write')
10217db96d56Sopenharmony_ci    def test__write_ready_closing(self, m_write):
10227db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10237db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
10247db96d56Sopenharmony_ci        tr._closing = True
10257db96d56Sopenharmony_ci        tr._buffer = bytearray(b'data')
10267db96d56Sopenharmony_ci        m_write.return_value = 4
10277db96d56Sopenharmony_ci        tr._write_ready()
10287db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
10297db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
10307db96d56Sopenharmony_ci        self.assertEqual(bytearray(), tr._buffer)
10317db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(None)
10327db96d56Sopenharmony_ci        self.pipe.close.assert_called_with()
10337db96d56Sopenharmony_ci
10347db96d56Sopenharmony_ci    @mock.patch('os.write')
10357db96d56Sopenharmony_ci    def test_abort(self, m_write):
10367db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10377db96d56Sopenharmony_ci        self.loop.add_writer(5, tr._write_ready)
10387db96d56Sopenharmony_ci        self.loop.add_reader(5, tr._read_ready)
10397db96d56Sopenharmony_ci        tr._buffer = [b'da', b'ta']
10407db96d56Sopenharmony_ci        tr.abort()
10417db96d56Sopenharmony_ci        self.assertFalse(m_write.called)
10427db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
10437db96d56Sopenharmony_ci        self.assertFalse(self.loop.writers)
10447db96d56Sopenharmony_ci        self.assertEqual([], tr._buffer)
10457db96d56Sopenharmony_ci        self.assertTrue(tr.is_closing())
10467db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
10477db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(None)
10487db96d56Sopenharmony_ci
10497db96d56Sopenharmony_ci    def test__call_connection_lost(self):
10507db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10517db96d56Sopenharmony_ci        self.assertIsNotNone(tr._protocol)
10527db96d56Sopenharmony_ci        self.assertIsNotNone(tr._loop)
10537db96d56Sopenharmony_ci
10547db96d56Sopenharmony_ci        err = None
10557db96d56Sopenharmony_ci        tr._call_connection_lost(err)
10567db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(err)
10577db96d56Sopenharmony_ci        self.pipe.close.assert_called_with()
10587db96d56Sopenharmony_ci
10597db96d56Sopenharmony_ci        self.assertIsNone(tr._protocol)
10607db96d56Sopenharmony_ci        self.assertIsNone(tr._loop)
10617db96d56Sopenharmony_ci
10627db96d56Sopenharmony_ci    def test__call_connection_lost_with_err(self):
10637db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10647db96d56Sopenharmony_ci        self.assertIsNotNone(tr._protocol)
10657db96d56Sopenharmony_ci        self.assertIsNotNone(tr._loop)
10667db96d56Sopenharmony_ci
10677db96d56Sopenharmony_ci        err = OSError()
10687db96d56Sopenharmony_ci        tr._call_connection_lost(err)
10697db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(err)
10707db96d56Sopenharmony_ci        self.pipe.close.assert_called_with()
10717db96d56Sopenharmony_ci
10727db96d56Sopenharmony_ci        self.assertIsNone(tr._protocol)
10737db96d56Sopenharmony_ci        self.assertIsNone(tr._loop)
10747db96d56Sopenharmony_ci
10757db96d56Sopenharmony_ci    def test_close(self):
10767db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10777db96d56Sopenharmony_ci        tr.write_eof = mock.Mock()
10787db96d56Sopenharmony_ci        tr.close()
10797db96d56Sopenharmony_ci        tr.write_eof.assert_called_with()
10807db96d56Sopenharmony_ci
10817db96d56Sopenharmony_ci        # closing the transport twice must not fail
10827db96d56Sopenharmony_ci        tr.close()
10837db96d56Sopenharmony_ci
10847db96d56Sopenharmony_ci    def test_close_closing(self):
10857db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10867db96d56Sopenharmony_ci        tr.write_eof = mock.Mock()
10877db96d56Sopenharmony_ci        tr._closing = True
10887db96d56Sopenharmony_ci        tr.close()
10897db96d56Sopenharmony_ci        self.assertFalse(tr.write_eof.called)
10907db96d56Sopenharmony_ci
10917db96d56Sopenharmony_ci    def test_write_eof(self):
10927db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
10937db96d56Sopenharmony_ci        tr.write_eof()
10947db96d56Sopenharmony_ci        self.assertTrue(tr.is_closing())
10957db96d56Sopenharmony_ci        self.assertFalse(self.loop.readers)
10967db96d56Sopenharmony_ci        test_utils.run_briefly(self.loop)
10977db96d56Sopenharmony_ci        self.protocol.connection_lost.assert_called_with(None)
10987db96d56Sopenharmony_ci
10997db96d56Sopenharmony_ci    def test_write_eof_pending(self):
11007db96d56Sopenharmony_ci        tr = self.write_pipe_transport()
11017db96d56Sopenharmony_ci        tr._buffer = [b'data']
11027db96d56Sopenharmony_ci        tr.write_eof()
11037db96d56Sopenharmony_ci        self.assertTrue(tr.is_closing())
11047db96d56Sopenharmony_ci        self.assertFalse(self.protocol.connection_lost.called)
11057db96d56Sopenharmony_ci
11067db96d56Sopenharmony_ci
11077db96d56Sopenharmony_ciclass AbstractChildWatcherTests(unittest.TestCase):
11087db96d56Sopenharmony_ci
11097db96d56Sopenharmony_ci    def test_not_implemented(self):
11107db96d56Sopenharmony_ci        f = mock.Mock()
11117db96d56Sopenharmony_ci        watcher = asyncio.AbstractChildWatcher()
11127db96d56Sopenharmony_ci        self.assertRaises(
11137db96d56Sopenharmony_ci            NotImplementedError, watcher.add_child_handler, f, f)
11147db96d56Sopenharmony_ci        self.assertRaises(
11157db96d56Sopenharmony_ci            NotImplementedError, watcher.remove_child_handler, f)
11167db96d56Sopenharmony_ci        self.assertRaises(
11177db96d56Sopenharmony_ci            NotImplementedError, watcher.attach_loop, f)
11187db96d56Sopenharmony_ci        self.assertRaises(
11197db96d56Sopenharmony_ci            NotImplementedError, watcher.close)
11207db96d56Sopenharmony_ci        self.assertRaises(
11217db96d56Sopenharmony_ci            NotImplementedError, watcher.is_active)
11227db96d56Sopenharmony_ci        self.assertRaises(
11237db96d56Sopenharmony_ci            NotImplementedError, watcher.__enter__)
11247db96d56Sopenharmony_ci        self.assertRaises(
11257db96d56Sopenharmony_ci            NotImplementedError, watcher.__exit__, f, f, f)
11267db96d56Sopenharmony_ci
11277db96d56Sopenharmony_ci
11287db96d56Sopenharmony_ciclass BaseChildWatcherTests(unittest.TestCase):
11297db96d56Sopenharmony_ci
11307db96d56Sopenharmony_ci    def test_not_implemented(self):
11317db96d56Sopenharmony_ci        f = mock.Mock()
11327db96d56Sopenharmony_ci        watcher = unix_events.BaseChildWatcher()
11337db96d56Sopenharmony_ci        self.assertRaises(
11347db96d56Sopenharmony_ci            NotImplementedError, watcher._do_waitpid, f)
11357db96d56Sopenharmony_ci
11367db96d56Sopenharmony_ci
11377db96d56Sopenharmony_ciclass ChildWatcherTestsMixin:
11387db96d56Sopenharmony_ci
11397db96d56Sopenharmony_ci    ignore_warnings = mock.patch.object(log.logger, "warning")
11407db96d56Sopenharmony_ci
11417db96d56Sopenharmony_ci    def setUp(self):
11427db96d56Sopenharmony_ci        super().setUp()
11437db96d56Sopenharmony_ci        self.loop = self.new_test_loop()
11447db96d56Sopenharmony_ci        self.running = False
11457db96d56Sopenharmony_ci        self.zombies = {}
11467db96d56Sopenharmony_ci
11477db96d56Sopenharmony_ci        with mock.patch.object(
11487db96d56Sopenharmony_ci                self.loop, "add_signal_handler") as self.m_add_signal_handler:
11497db96d56Sopenharmony_ci            self.watcher = self.create_watcher()
11507db96d56Sopenharmony_ci            self.watcher.attach_loop(self.loop)
11517db96d56Sopenharmony_ci
11527db96d56Sopenharmony_ci    def waitpid(self, pid, flags):
11537db96d56Sopenharmony_ci        if isinstance(self.watcher, asyncio.SafeChildWatcher) or pid != -1:
11547db96d56Sopenharmony_ci            self.assertGreater(pid, 0)
11557db96d56Sopenharmony_ci        try:
11567db96d56Sopenharmony_ci            if pid < 0:
11577db96d56Sopenharmony_ci                return self.zombies.popitem()
11587db96d56Sopenharmony_ci            else:
11597db96d56Sopenharmony_ci                return pid, self.zombies.pop(pid)
11607db96d56Sopenharmony_ci        except KeyError:
11617db96d56Sopenharmony_ci            pass
11627db96d56Sopenharmony_ci        if self.running:
11637db96d56Sopenharmony_ci            return 0, 0
11647db96d56Sopenharmony_ci        else:
11657db96d56Sopenharmony_ci            raise ChildProcessError()
11667db96d56Sopenharmony_ci
11677db96d56Sopenharmony_ci    def add_zombie(self, pid, status):
11687db96d56Sopenharmony_ci        self.zombies[pid] = status
11697db96d56Sopenharmony_ci
11707db96d56Sopenharmony_ci    def waitstatus_to_exitcode(self, status):
11717db96d56Sopenharmony_ci        if status > 32768:
11727db96d56Sopenharmony_ci            return status - 32768
11737db96d56Sopenharmony_ci        elif 32700 < status < 32768:
11747db96d56Sopenharmony_ci            return status - 32768
11757db96d56Sopenharmony_ci        else:
11767db96d56Sopenharmony_ci            return status
11777db96d56Sopenharmony_ci
11787db96d56Sopenharmony_ci    def test_create_watcher(self):
11797db96d56Sopenharmony_ci        self.m_add_signal_handler.assert_called_once_with(
11807db96d56Sopenharmony_ci            signal.SIGCHLD, self.watcher._sig_chld)
11817db96d56Sopenharmony_ci
11827db96d56Sopenharmony_ci    def waitpid_mocks(func):
11837db96d56Sopenharmony_ci        def wrapped_func(self):
11847db96d56Sopenharmony_ci            def patch(target, wrapper):
11857db96d56Sopenharmony_ci                return mock.patch(target, wraps=wrapper,
11867db96d56Sopenharmony_ci                                  new_callable=mock.Mock)
11877db96d56Sopenharmony_ci
11887db96d56Sopenharmony_ci            with patch('asyncio.unix_events.waitstatus_to_exitcode', self.waitstatus_to_exitcode), \
11897db96d56Sopenharmony_ci                 patch('os.waitpid', self.waitpid) as m_waitpid:
11907db96d56Sopenharmony_ci                func(self, m_waitpid)
11917db96d56Sopenharmony_ci        return wrapped_func
11927db96d56Sopenharmony_ci
11937db96d56Sopenharmony_ci    @waitpid_mocks
11947db96d56Sopenharmony_ci    def test_sigchld(self, m_waitpid):
11957db96d56Sopenharmony_ci        # register a child
11967db96d56Sopenharmony_ci        callback = mock.Mock()
11977db96d56Sopenharmony_ci
11987db96d56Sopenharmony_ci        with self.watcher:
11997db96d56Sopenharmony_ci            self.running = True
12007db96d56Sopenharmony_ci            self.watcher.add_child_handler(42, callback, 9, 10, 14)
12017db96d56Sopenharmony_ci
12027db96d56Sopenharmony_ci        self.assertFalse(callback.called)
12037db96d56Sopenharmony_ci
12047db96d56Sopenharmony_ci        # child is running
12057db96d56Sopenharmony_ci        self.watcher._sig_chld()
12067db96d56Sopenharmony_ci
12077db96d56Sopenharmony_ci        self.assertFalse(callback.called)
12087db96d56Sopenharmony_ci
12097db96d56Sopenharmony_ci        # child terminates (returncode 12)
12107db96d56Sopenharmony_ci        self.running = False
12117db96d56Sopenharmony_ci        self.add_zombie(42, EXITCODE(12))
12127db96d56Sopenharmony_ci        self.watcher._sig_chld()
12137db96d56Sopenharmony_ci
12147db96d56Sopenharmony_ci        callback.assert_called_once_with(42, 12, 9, 10, 14)
12157db96d56Sopenharmony_ci
12167db96d56Sopenharmony_ci        callback.reset_mock()
12177db96d56Sopenharmony_ci
12187db96d56Sopenharmony_ci        # ensure that the child is effectively reaped
12197db96d56Sopenharmony_ci        self.add_zombie(42, EXITCODE(13))
12207db96d56Sopenharmony_ci        with self.ignore_warnings:
12217db96d56Sopenharmony_ci            self.watcher._sig_chld()
12227db96d56Sopenharmony_ci
12237db96d56Sopenharmony_ci        self.assertFalse(callback.called)
12247db96d56Sopenharmony_ci
12257db96d56Sopenharmony_ci        # sigchld called again
12267db96d56Sopenharmony_ci        self.zombies.clear()
12277db96d56Sopenharmony_ci        self.watcher._sig_chld()
12287db96d56Sopenharmony_ci
12297db96d56Sopenharmony_ci        self.assertFalse(callback.called)
12307db96d56Sopenharmony_ci
12317db96d56Sopenharmony_ci    @waitpid_mocks
12327db96d56Sopenharmony_ci    def test_sigchld_two_children(self, m_waitpid):
12337db96d56Sopenharmony_ci        callback1 = mock.Mock()
12347db96d56Sopenharmony_ci        callback2 = mock.Mock()
12357db96d56Sopenharmony_ci
12367db96d56Sopenharmony_ci        # register child 1
12377db96d56Sopenharmony_ci        with self.watcher:
12387db96d56Sopenharmony_ci            self.running = True
12397db96d56Sopenharmony_ci            self.watcher.add_child_handler(43, callback1, 7, 8)
12407db96d56Sopenharmony_ci
12417db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
12427db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
12437db96d56Sopenharmony_ci
12447db96d56Sopenharmony_ci        # register child 2
12457db96d56Sopenharmony_ci        with self.watcher:
12467db96d56Sopenharmony_ci            self.watcher.add_child_handler(44, callback2, 147, 18)
12477db96d56Sopenharmony_ci
12487db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
12497db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
12507db96d56Sopenharmony_ci
12517db96d56Sopenharmony_ci        # children are running
12527db96d56Sopenharmony_ci        self.watcher._sig_chld()
12537db96d56Sopenharmony_ci
12547db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
12557db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
12567db96d56Sopenharmony_ci
12577db96d56Sopenharmony_ci        # child 1 terminates (signal 3)
12587db96d56Sopenharmony_ci        self.add_zombie(43, SIGNAL(3))
12597db96d56Sopenharmony_ci        self.watcher._sig_chld()
12607db96d56Sopenharmony_ci
12617db96d56Sopenharmony_ci        callback1.assert_called_once_with(43, -3, 7, 8)
12627db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
12637db96d56Sopenharmony_ci
12647db96d56Sopenharmony_ci        callback1.reset_mock()
12657db96d56Sopenharmony_ci
12667db96d56Sopenharmony_ci        # child 2 still running
12677db96d56Sopenharmony_ci        self.watcher._sig_chld()
12687db96d56Sopenharmony_ci
12697db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
12707db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
12717db96d56Sopenharmony_ci
12727db96d56Sopenharmony_ci        # child 2 terminates (code 108)
12737db96d56Sopenharmony_ci        self.add_zombie(44, EXITCODE(108))
12747db96d56Sopenharmony_ci        self.running = False
12757db96d56Sopenharmony_ci        self.watcher._sig_chld()
12767db96d56Sopenharmony_ci
12777db96d56Sopenharmony_ci        callback2.assert_called_once_with(44, 108, 147, 18)
12787db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
12797db96d56Sopenharmony_ci
12807db96d56Sopenharmony_ci        callback2.reset_mock()
12817db96d56Sopenharmony_ci
12827db96d56Sopenharmony_ci        # ensure that the children are effectively reaped
12837db96d56Sopenharmony_ci        self.add_zombie(43, EXITCODE(14))
12847db96d56Sopenharmony_ci        self.add_zombie(44, EXITCODE(15))
12857db96d56Sopenharmony_ci        with self.ignore_warnings:
12867db96d56Sopenharmony_ci            self.watcher._sig_chld()
12877db96d56Sopenharmony_ci
12887db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
12897db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
12907db96d56Sopenharmony_ci
12917db96d56Sopenharmony_ci        # sigchld called again
12927db96d56Sopenharmony_ci        self.zombies.clear()
12937db96d56Sopenharmony_ci        self.watcher._sig_chld()
12947db96d56Sopenharmony_ci
12957db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
12967db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
12977db96d56Sopenharmony_ci
12987db96d56Sopenharmony_ci    @waitpid_mocks
12997db96d56Sopenharmony_ci    def test_sigchld_two_children_terminating_together(self, m_waitpid):
13007db96d56Sopenharmony_ci        callback1 = mock.Mock()
13017db96d56Sopenharmony_ci        callback2 = mock.Mock()
13027db96d56Sopenharmony_ci
13037db96d56Sopenharmony_ci        # register child 1
13047db96d56Sopenharmony_ci        with self.watcher:
13057db96d56Sopenharmony_ci            self.running = True
13067db96d56Sopenharmony_ci            self.watcher.add_child_handler(45, callback1, 17, 8)
13077db96d56Sopenharmony_ci
13087db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
13097db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
13107db96d56Sopenharmony_ci
13117db96d56Sopenharmony_ci        # register child 2
13127db96d56Sopenharmony_ci        with self.watcher:
13137db96d56Sopenharmony_ci            self.watcher.add_child_handler(46, callback2, 1147, 18)
13147db96d56Sopenharmony_ci
13157db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
13167db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
13177db96d56Sopenharmony_ci
13187db96d56Sopenharmony_ci        # children are running
13197db96d56Sopenharmony_ci        self.watcher._sig_chld()
13207db96d56Sopenharmony_ci
13217db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
13227db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
13237db96d56Sopenharmony_ci
13247db96d56Sopenharmony_ci        # child 1 terminates (code 78)
13257db96d56Sopenharmony_ci        # child 2 terminates (signal 5)
13267db96d56Sopenharmony_ci        self.add_zombie(45, EXITCODE(78))
13277db96d56Sopenharmony_ci        self.add_zombie(46, SIGNAL(5))
13287db96d56Sopenharmony_ci        self.running = False
13297db96d56Sopenharmony_ci        self.watcher._sig_chld()
13307db96d56Sopenharmony_ci
13317db96d56Sopenharmony_ci        callback1.assert_called_once_with(45, 78, 17, 8)
13327db96d56Sopenharmony_ci        callback2.assert_called_once_with(46, -5, 1147, 18)
13337db96d56Sopenharmony_ci
13347db96d56Sopenharmony_ci        callback1.reset_mock()
13357db96d56Sopenharmony_ci        callback2.reset_mock()
13367db96d56Sopenharmony_ci
13377db96d56Sopenharmony_ci        # ensure that the children are effectively reaped
13387db96d56Sopenharmony_ci        self.add_zombie(45, EXITCODE(14))
13397db96d56Sopenharmony_ci        self.add_zombie(46, EXITCODE(15))
13407db96d56Sopenharmony_ci        with self.ignore_warnings:
13417db96d56Sopenharmony_ci            self.watcher._sig_chld()
13427db96d56Sopenharmony_ci
13437db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
13447db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
13457db96d56Sopenharmony_ci
13467db96d56Sopenharmony_ci    @waitpid_mocks
13477db96d56Sopenharmony_ci    def test_sigchld_race_condition(self, m_waitpid):
13487db96d56Sopenharmony_ci        # register a child
13497db96d56Sopenharmony_ci        callback = mock.Mock()
13507db96d56Sopenharmony_ci
13517db96d56Sopenharmony_ci        with self.watcher:
13527db96d56Sopenharmony_ci            # child terminates before being registered
13537db96d56Sopenharmony_ci            self.add_zombie(50, EXITCODE(4))
13547db96d56Sopenharmony_ci            self.watcher._sig_chld()
13557db96d56Sopenharmony_ci
13567db96d56Sopenharmony_ci            self.watcher.add_child_handler(50, callback, 1, 12)
13577db96d56Sopenharmony_ci
13587db96d56Sopenharmony_ci        callback.assert_called_once_with(50, 4, 1, 12)
13597db96d56Sopenharmony_ci        callback.reset_mock()
13607db96d56Sopenharmony_ci
13617db96d56Sopenharmony_ci        # ensure that the child is effectively reaped
13627db96d56Sopenharmony_ci        self.add_zombie(50, SIGNAL(1))
13637db96d56Sopenharmony_ci        with self.ignore_warnings:
13647db96d56Sopenharmony_ci            self.watcher._sig_chld()
13657db96d56Sopenharmony_ci
13667db96d56Sopenharmony_ci        self.assertFalse(callback.called)
13677db96d56Sopenharmony_ci
13687db96d56Sopenharmony_ci    @waitpid_mocks
13697db96d56Sopenharmony_ci    def test_sigchld_replace_handler(self, m_waitpid):
13707db96d56Sopenharmony_ci        callback1 = mock.Mock()
13717db96d56Sopenharmony_ci        callback2 = mock.Mock()
13727db96d56Sopenharmony_ci
13737db96d56Sopenharmony_ci        # register a child
13747db96d56Sopenharmony_ci        with self.watcher:
13757db96d56Sopenharmony_ci            self.running = True
13767db96d56Sopenharmony_ci            self.watcher.add_child_handler(51, callback1, 19)
13777db96d56Sopenharmony_ci
13787db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
13797db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
13807db96d56Sopenharmony_ci
13817db96d56Sopenharmony_ci        # register the same child again
13827db96d56Sopenharmony_ci        with self.watcher:
13837db96d56Sopenharmony_ci            self.watcher.add_child_handler(51, callback2, 21)
13847db96d56Sopenharmony_ci
13857db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
13867db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
13877db96d56Sopenharmony_ci
13887db96d56Sopenharmony_ci        # child terminates (signal 8)
13897db96d56Sopenharmony_ci        self.running = False
13907db96d56Sopenharmony_ci        self.add_zombie(51, SIGNAL(8))
13917db96d56Sopenharmony_ci        self.watcher._sig_chld()
13927db96d56Sopenharmony_ci
13937db96d56Sopenharmony_ci        callback2.assert_called_once_with(51, -8, 21)
13947db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
13957db96d56Sopenharmony_ci
13967db96d56Sopenharmony_ci        callback2.reset_mock()
13977db96d56Sopenharmony_ci
13987db96d56Sopenharmony_ci        # ensure that the child is effectively reaped
13997db96d56Sopenharmony_ci        self.add_zombie(51, EXITCODE(13))
14007db96d56Sopenharmony_ci        with self.ignore_warnings:
14017db96d56Sopenharmony_ci            self.watcher._sig_chld()
14027db96d56Sopenharmony_ci
14037db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
14047db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
14057db96d56Sopenharmony_ci
14067db96d56Sopenharmony_ci    @waitpid_mocks
14077db96d56Sopenharmony_ci    def test_sigchld_remove_handler(self, m_waitpid):
14087db96d56Sopenharmony_ci        callback = mock.Mock()
14097db96d56Sopenharmony_ci
14107db96d56Sopenharmony_ci        # register a child
14117db96d56Sopenharmony_ci        with self.watcher:
14127db96d56Sopenharmony_ci            self.running = True
14137db96d56Sopenharmony_ci            self.watcher.add_child_handler(52, callback, 1984)
14147db96d56Sopenharmony_ci
14157db96d56Sopenharmony_ci        self.assertFalse(callback.called)
14167db96d56Sopenharmony_ci
14177db96d56Sopenharmony_ci        # unregister the child
14187db96d56Sopenharmony_ci        self.watcher.remove_child_handler(52)
14197db96d56Sopenharmony_ci
14207db96d56Sopenharmony_ci        self.assertFalse(callback.called)
14217db96d56Sopenharmony_ci
14227db96d56Sopenharmony_ci        # child terminates (code 99)
14237db96d56Sopenharmony_ci        self.running = False
14247db96d56Sopenharmony_ci        self.add_zombie(52, EXITCODE(99))
14257db96d56Sopenharmony_ci        with self.ignore_warnings:
14267db96d56Sopenharmony_ci            self.watcher._sig_chld()
14277db96d56Sopenharmony_ci
14287db96d56Sopenharmony_ci        self.assertFalse(callback.called)
14297db96d56Sopenharmony_ci
14307db96d56Sopenharmony_ci    @waitpid_mocks
14317db96d56Sopenharmony_ci    def test_sigchld_unknown_status(self, m_waitpid):
14327db96d56Sopenharmony_ci        callback = mock.Mock()
14337db96d56Sopenharmony_ci
14347db96d56Sopenharmony_ci        # register a child
14357db96d56Sopenharmony_ci        with self.watcher:
14367db96d56Sopenharmony_ci            self.running = True
14377db96d56Sopenharmony_ci            self.watcher.add_child_handler(53, callback, -19)
14387db96d56Sopenharmony_ci
14397db96d56Sopenharmony_ci        self.assertFalse(callback.called)
14407db96d56Sopenharmony_ci
14417db96d56Sopenharmony_ci        # terminate with unknown status
14427db96d56Sopenharmony_ci        self.zombies[53] = 1178
14437db96d56Sopenharmony_ci        self.running = False
14447db96d56Sopenharmony_ci        self.watcher._sig_chld()
14457db96d56Sopenharmony_ci
14467db96d56Sopenharmony_ci        callback.assert_called_once_with(53, 1178, -19)
14477db96d56Sopenharmony_ci
14487db96d56Sopenharmony_ci        callback.reset_mock()
14497db96d56Sopenharmony_ci
14507db96d56Sopenharmony_ci        # ensure that the child is effectively reaped
14517db96d56Sopenharmony_ci        self.add_zombie(53, EXITCODE(101))
14527db96d56Sopenharmony_ci        with self.ignore_warnings:
14537db96d56Sopenharmony_ci            self.watcher._sig_chld()
14547db96d56Sopenharmony_ci
14557db96d56Sopenharmony_ci        self.assertFalse(callback.called)
14567db96d56Sopenharmony_ci
14577db96d56Sopenharmony_ci    @waitpid_mocks
14587db96d56Sopenharmony_ci    def test_remove_child_handler(self, m_waitpid):
14597db96d56Sopenharmony_ci        callback1 = mock.Mock()
14607db96d56Sopenharmony_ci        callback2 = mock.Mock()
14617db96d56Sopenharmony_ci        callback3 = mock.Mock()
14627db96d56Sopenharmony_ci
14637db96d56Sopenharmony_ci        # register children
14647db96d56Sopenharmony_ci        with self.watcher:
14657db96d56Sopenharmony_ci            self.running = True
14667db96d56Sopenharmony_ci            self.watcher.add_child_handler(54, callback1, 1)
14677db96d56Sopenharmony_ci            self.watcher.add_child_handler(55, callback2, 2)
14687db96d56Sopenharmony_ci            self.watcher.add_child_handler(56, callback3, 3)
14697db96d56Sopenharmony_ci
14707db96d56Sopenharmony_ci        # remove child handler 1
14717db96d56Sopenharmony_ci        self.assertTrue(self.watcher.remove_child_handler(54))
14727db96d56Sopenharmony_ci
14737db96d56Sopenharmony_ci        # remove child handler 2 multiple times
14747db96d56Sopenharmony_ci        self.assertTrue(self.watcher.remove_child_handler(55))
14757db96d56Sopenharmony_ci        self.assertFalse(self.watcher.remove_child_handler(55))
14767db96d56Sopenharmony_ci        self.assertFalse(self.watcher.remove_child_handler(55))
14777db96d56Sopenharmony_ci
14787db96d56Sopenharmony_ci        # all children terminate
14797db96d56Sopenharmony_ci        self.add_zombie(54, EXITCODE(0))
14807db96d56Sopenharmony_ci        self.add_zombie(55, EXITCODE(1))
14817db96d56Sopenharmony_ci        self.add_zombie(56, EXITCODE(2))
14827db96d56Sopenharmony_ci        self.running = False
14837db96d56Sopenharmony_ci        with self.ignore_warnings:
14847db96d56Sopenharmony_ci            self.watcher._sig_chld()
14857db96d56Sopenharmony_ci
14867db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
14877db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
14887db96d56Sopenharmony_ci        callback3.assert_called_once_with(56, 2, 3)
14897db96d56Sopenharmony_ci
14907db96d56Sopenharmony_ci    @waitpid_mocks
14917db96d56Sopenharmony_ci    def test_sigchld_unhandled_exception(self, m_waitpid):
14927db96d56Sopenharmony_ci        callback = mock.Mock()
14937db96d56Sopenharmony_ci
14947db96d56Sopenharmony_ci        # register a child
14957db96d56Sopenharmony_ci        with self.watcher:
14967db96d56Sopenharmony_ci            self.running = True
14977db96d56Sopenharmony_ci            self.watcher.add_child_handler(57, callback)
14987db96d56Sopenharmony_ci
14997db96d56Sopenharmony_ci        # raise an exception
15007db96d56Sopenharmony_ci        m_waitpid.side_effect = ValueError
15017db96d56Sopenharmony_ci
15027db96d56Sopenharmony_ci        with mock.patch.object(log.logger,
15037db96d56Sopenharmony_ci                               'error') as m_error:
15047db96d56Sopenharmony_ci
15057db96d56Sopenharmony_ci            self.assertEqual(self.watcher._sig_chld(), None)
15067db96d56Sopenharmony_ci            self.assertTrue(m_error.called)
15077db96d56Sopenharmony_ci
15087db96d56Sopenharmony_ci    @waitpid_mocks
15097db96d56Sopenharmony_ci    def test_sigchld_child_reaped_elsewhere(self, m_waitpid):
15107db96d56Sopenharmony_ci        # register a child
15117db96d56Sopenharmony_ci        callback = mock.Mock()
15127db96d56Sopenharmony_ci
15137db96d56Sopenharmony_ci        with self.watcher:
15147db96d56Sopenharmony_ci            self.running = True
15157db96d56Sopenharmony_ci            self.watcher.add_child_handler(58, callback)
15167db96d56Sopenharmony_ci
15177db96d56Sopenharmony_ci        self.assertFalse(callback.called)
15187db96d56Sopenharmony_ci
15197db96d56Sopenharmony_ci        # child terminates
15207db96d56Sopenharmony_ci        self.running = False
15217db96d56Sopenharmony_ci        self.add_zombie(58, EXITCODE(4))
15227db96d56Sopenharmony_ci
15237db96d56Sopenharmony_ci        # waitpid is called elsewhere
15247db96d56Sopenharmony_ci        os.waitpid(58, os.WNOHANG)
15257db96d56Sopenharmony_ci
15267db96d56Sopenharmony_ci        m_waitpid.reset_mock()
15277db96d56Sopenharmony_ci
15287db96d56Sopenharmony_ci        # sigchld
15297db96d56Sopenharmony_ci        with self.ignore_warnings:
15307db96d56Sopenharmony_ci            self.watcher._sig_chld()
15317db96d56Sopenharmony_ci
15327db96d56Sopenharmony_ci        if isinstance(self.watcher, asyncio.FastChildWatcher):
15337db96d56Sopenharmony_ci            # here the FastChildWatcher enters a deadlock
15347db96d56Sopenharmony_ci            # (there is no way to prevent it)
15357db96d56Sopenharmony_ci            self.assertFalse(callback.called)
15367db96d56Sopenharmony_ci        else:
15377db96d56Sopenharmony_ci            callback.assert_called_once_with(58, 255)
15387db96d56Sopenharmony_ci
15397db96d56Sopenharmony_ci    @waitpid_mocks
15407db96d56Sopenharmony_ci    def test_sigchld_unknown_pid_during_registration(self, m_waitpid):
15417db96d56Sopenharmony_ci        # register two children
15427db96d56Sopenharmony_ci        callback1 = mock.Mock()
15437db96d56Sopenharmony_ci        callback2 = mock.Mock()
15447db96d56Sopenharmony_ci
15457db96d56Sopenharmony_ci        with self.ignore_warnings, self.watcher:
15467db96d56Sopenharmony_ci            self.running = True
15477db96d56Sopenharmony_ci            # child 1 terminates
15487db96d56Sopenharmony_ci            self.add_zombie(591, EXITCODE(7))
15497db96d56Sopenharmony_ci            # an unknown child terminates
15507db96d56Sopenharmony_ci            self.add_zombie(593, EXITCODE(17))
15517db96d56Sopenharmony_ci
15527db96d56Sopenharmony_ci            self.watcher._sig_chld()
15537db96d56Sopenharmony_ci
15547db96d56Sopenharmony_ci            self.watcher.add_child_handler(591, callback1)
15557db96d56Sopenharmony_ci            self.watcher.add_child_handler(592, callback2)
15567db96d56Sopenharmony_ci
15577db96d56Sopenharmony_ci        callback1.assert_called_once_with(591, 7)
15587db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
15597db96d56Sopenharmony_ci
15607db96d56Sopenharmony_ci    @waitpid_mocks
15617db96d56Sopenharmony_ci    def test_set_loop(self, m_waitpid):
15627db96d56Sopenharmony_ci        # register a child
15637db96d56Sopenharmony_ci        callback = mock.Mock()
15647db96d56Sopenharmony_ci
15657db96d56Sopenharmony_ci        with self.watcher:
15667db96d56Sopenharmony_ci            self.running = True
15677db96d56Sopenharmony_ci            self.watcher.add_child_handler(60, callback)
15687db96d56Sopenharmony_ci
15697db96d56Sopenharmony_ci        # attach a new loop
15707db96d56Sopenharmony_ci        old_loop = self.loop
15717db96d56Sopenharmony_ci        self.loop = self.new_test_loop()
15727db96d56Sopenharmony_ci        patch = mock.patch.object
15737db96d56Sopenharmony_ci
15747db96d56Sopenharmony_ci        with patch(old_loop, "remove_signal_handler") as m_old_remove, \
15757db96d56Sopenharmony_ci             patch(self.loop, "add_signal_handler") as m_new_add:
15767db96d56Sopenharmony_ci
15777db96d56Sopenharmony_ci            self.watcher.attach_loop(self.loop)
15787db96d56Sopenharmony_ci
15797db96d56Sopenharmony_ci            m_old_remove.assert_called_once_with(
15807db96d56Sopenharmony_ci                signal.SIGCHLD)
15817db96d56Sopenharmony_ci            m_new_add.assert_called_once_with(
15827db96d56Sopenharmony_ci                signal.SIGCHLD, self.watcher._sig_chld)
15837db96d56Sopenharmony_ci
15847db96d56Sopenharmony_ci        # child terminates
15857db96d56Sopenharmony_ci        self.running = False
15867db96d56Sopenharmony_ci        self.add_zombie(60, EXITCODE(9))
15877db96d56Sopenharmony_ci        self.watcher._sig_chld()
15887db96d56Sopenharmony_ci
15897db96d56Sopenharmony_ci        callback.assert_called_once_with(60, 9)
15907db96d56Sopenharmony_ci
15917db96d56Sopenharmony_ci    @waitpid_mocks
15927db96d56Sopenharmony_ci    def test_set_loop_race_condition(self, m_waitpid):
15937db96d56Sopenharmony_ci        # register 3 children
15947db96d56Sopenharmony_ci        callback1 = mock.Mock()
15957db96d56Sopenharmony_ci        callback2 = mock.Mock()
15967db96d56Sopenharmony_ci        callback3 = mock.Mock()
15977db96d56Sopenharmony_ci
15987db96d56Sopenharmony_ci        with self.watcher:
15997db96d56Sopenharmony_ci            self.running = True
16007db96d56Sopenharmony_ci            self.watcher.add_child_handler(61, callback1)
16017db96d56Sopenharmony_ci            self.watcher.add_child_handler(62, callback2)
16027db96d56Sopenharmony_ci            self.watcher.add_child_handler(622, callback3)
16037db96d56Sopenharmony_ci
16047db96d56Sopenharmony_ci        # detach the loop
16057db96d56Sopenharmony_ci        old_loop = self.loop
16067db96d56Sopenharmony_ci        self.loop = None
16077db96d56Sopenharmony_ci
16087db96d56Sopenharmony_ci        with mock.patch.object(
16097db96d56Sopenharmony_ci                old_loop, "remove_signal_handler") as m_remove_signal_handler:
16107db96d56Sopenharmony_ci
16117db96d56Sopenharmony_ci            with self.assertWarnsRegex(
16127db96d56Sopenharmony_ci                    RuntimeWarning, 'A loop is being detached'):
16137db96d56Sopenharmony_ci                self.watcher.attach_loop(None)
16147db96d56Sopenharmony_ci
16157db96d56Sopenharmony_ci            m_remove_signal_handler.assert_called_once_with(
16167db96d56Sopenharmony_ci                signal.SIGCHLD)
16177db96d56Sopenharmony_ci
16187db96d56Sopenharmony_ci        # child 1 & 2 terminate
16197db96d56Sopenharmony_ci        self.add_zombie(61, EXITCODE(11))
16207db96d56Sopenharmony_ci        self.add_zombie(62, SIGNAL(5))
16217db96d56Sopenharmony_ci
16227db96d56Sopenharmony_ci        # SIGCHLD was not caught
16237db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
16247db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
16257db96d56Sopenharmony_ci        self.assertFalse(callback3.called)
16267db96d56Sopenharmony_ci
16277db96d56Sopenharmony_ci        # attach a new loop
16287db96d56Sopenharmony_ci        self.loop = self.new_test_loop()
16297db96d56Sopenharmony_ci
16307db96d56Sopenharmony_ci        with mock.patch.object(
16317db96d56Sopenharmony_ci                self.loop, "add_signal_handler") as m_add_signal_handler:
16327db96d56Sopenharmony_ci
16337db96d56Sopenharmony_ci            self.watcher.attach_loop(self.loop)
16347db96d56Sopenharmony_ci
16357db96d56Sopenharmony_ci            m_add_signal_handler.assert_called_once_with(
16367db96d56Sopenharmony_ci                signal.SIGCHLD, self.watcher._sig_chld)
16377db96d56Sopenharmony_ci            callback1.assert_called_once_with(61, 11)  # race condition!
16387db96d56Sopenharmony_ci            callback2.assert_called_once_with(62, -5)  # race condition!
16397db96d56Sopenharmony_ci            self.assertFalse(callback3.called)
16407db96d56Sopenharmony_ci
16417db96d56Sopenharmony_ci        callback1.reset_mock()
16427db96d56Sopenharmony_ci        callback2.reset_mock()
16437db96d56Sopenharmony_ci
16447db96d56Sopenharmony_ci        # child 3 terminates
16457db96d56Sopenharmony_ci        self.running = False
16467db96d56Sopenharmony_ci        self.add_zombie(622, EXITCODE(19))
16477db96d56Sopenharmony_ci        self.watcher._sig_chld()
16487db96d56Sopenharmony_ci
16497db96d56Sopenharmony_ci        self.assertFalse(callback1.called)
16507db96d56Sopenharmony_ci        self.assertFalse(callback2.called)
16517db96d56Sopenharmony_ci        callback3.assert_called_once_with(622, 19)
16527db96d56Sopenharmony_ci
16537db96d56Sopenharmony_ci    @waitpid_mocks
16547db96d56Sopenharmony_ci    def test_close(self, m_waitpid):
16557db96d56Sopenharmony_ci        # register two children
16567db96d56Sopenharmony_ci        callback1 = mock.Mock()
16577db96d56Sopenharmony_ci
16587db96d56Sopenharmony_ci        with self.watcher:
16597db96d56Sopenharmony_ci            self.running = True
16607db96d56Sopenharmony_ci            # child 1 terminates
16617db96d56Sopenharmony_ci            self.add_zombie(63, EXITCODE(9))
16627db96d56Sopenharmony_ci            # other child terminates
16637db96d56Sopenharmony_ci            self.add_zombie(65, EXITCODE(18))
16647db96d56Sopenharmony_ci            self.watcher._sig_chld()
16657db96d56Sopenharmony_ci
16667db96d56Sopenharmony_ci            self.watcher.add_child_handler(63, callback1)
16677db96d56Sopenharmony_ci            self.watcher.add_child_handler(64, callback1)
16687db96d56Sopenharmony_ci
16697db96d56Sopenharmony_ci            self.assertEqual(len(self.watcher._callbacks), 1)
16707db96d56Sopenharmony_ci            if isinstance(self.watcher, asyncio.FastChildWatcher):
16717db96d56Sopenharmony_ci                self.assertEqual(len(self.watcher._zombies), 1)
16727db96d56Sopenharmony_ci
16737db96d56Sopenharmony_ci            with mock.patch.object(
16747db96d56Sopenharmony_ci                    self.loop,
16757db96d56Sopenharmony_ci                    "remove_signal_handler") as m_remove_signal_handler:
16767db96d56Sopenharmony_ci
16777db96d56Sopenharmony_ci                self.watcher.close()
16787db96d56Sopenharmony_ci
16797db96d56Sopenharmony_ci                m_remove_signal_handler.assert_called_once_with(
16807db96d56Sopenharmony_ci                    signal.SIGCHLD)
16817db96d56Sopenharmony_ci                self.assertFalse(self.watcher._callbacks)
16827db96d56Sopenharmony_ci                if isinstance(self.watcher, asyncio.FastChildWatcher):
16837db96d56Sopenharmony_ci                    self.assertFalse(self.watcher._zombies)
16847db96d56Sopenharmony_ci
16857db96d56Sopenharmony_ci
16867db96d56Sopenharmony_ciclass SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
16877db96d56Sopenharmony_ci    def create_watcher(self):
16887db96d56Sopenharmony_ci        return asyncio.SafeChildWatcher()
16897db96d56Sopenharmony_ci
16907db96d56Sopenharmony_ci
16917db96d56Sopenharmony_ciclass FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
16927db96d56Sopenharmony_ci    def create_watcher(self):
16937db96d56Sopenharmony_ci        return asyncio.FastChildWatcher()
16947db96d56Sopenharmony_ci
16957db96d56Sopenharmony_ci
16967db96d56Sopenharmony_ciclass PolicyTests(unittest.TestCase):
16977db96d56Sopenharmony_ci
16987db96d56Sopenharmony_ci    def create_policy(self):
16997db96d56Sopenharmony_ci        return asyncio.DefaultEventLoopPolicy()
17007db96d56Sopenharmony_ci
17017db96d56Sopenharmony_ci    def test_get_default_child_watcher(self):
17027db96d56Sopenharmony_ci        policy = self.create_policy()
17037db96d56Sopenharmony_ci        self.assertIsNone(policy._watcher)
17047db96d56Sopenharmony_ci
17057db96d56Sopenharmony_ci        watcher = policy.get_child_watcher()
17067db96d56Sopenharmony_ci        self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher)
17077db96d56Sopenharmony_ci
17087db96d56Sopenharmony_ci        self.assertIs(policy._watcher, watcher)
17097db96d56Sopenharmony_ci
17107db96d56Sopenharmony_ci        self.assertIs(watcher, policy.get_child_watcher())
17117db96d56Sopenharmony_ci
17127db96d56Sopenharmony_ci    def test_get_child_watcher_after_set(self):
17137db96d56Sopenharmony_ci        policy = self.create_policy()
17147db96d56Sopenharmony_ci        watcher = asyncio.FastChildWatcher()
17157db96d56Sopenharmony_ci
17167db96d56Sopenharmony_ci        policy.set_child_watcher(watcher)
17177db96d56Sopenharmony_ci        self.assertIs(policy._watcher, watcher)
17187db96d56Sopenharmony_ci        self.assertIs(watcher, policy.get_child_watcher())
17197db96d56Sopenharmony_ci
17207db96d56Sopenharmony_ci    def test_get_child_watcher_thread(self):
17217db96d56Sopenharmony_ci
17227db96d56Sopenharmony_ci        def f():
17237db96d56Sopenharmony_ci            policy.set_event_loop(policy.new_event_loop())
17247db96d56Sopenharmony_ci
17257db96d56Sopenharmony_ci            self.assertIsInstance(policy.get_event_loop(),
17267db96d56Sopenharmony_ci                                  asyncio.AbstractEventLoop)
17277db96d56Sopenharmony_ci            watcher = policy.get_child_watcher()
17287db96d56Sopenharmony_ci
17297db96d56Sopenharmony_ci            self.assertIsInstance(watcher, asyncio.SafeChildWatcher)
17307db96d56Sopenharmony_ci            self.assertIsNone(watcher._loop)
17317db96d56Sopenharmony_ci
17327db96d56Sopenharmony_ci            policy.get_event_loop().close()
17337db96d56Sopenharmony_ci
17347db96d56Sopenharmony_ci        policy = self.create_policy()
17357db96d56Sopenharmony_ci        policy.set_child_watcher(asyncio.SafeChildWatcher())
17367db96d56Sopenharmony_ci
17377db96d56Sopenharmony_ci        th = threading.Thread(target=f)
17387db96d56Sopenharmony_ci        th.start()
17397db96d56Sopenharmony_ci        th.join()
17407db96d56Sopenharmony_ci
17417db96d56Sopenharmony_ci    def test_child_watcher_replace_mainloop_existing(self):
17427db96d56Sopenharmony_ci        policy = self.create_policy()
17437db96d56Sopenharmony_ci        loop = policy.new_event_loop()
17447db96d56Sopenharmony_ci        policy.set_event_loop(loop)
17457db96d56Sopenharmony_ci
17467db96d56Sopenharmony_ci        # Explicitly setup SafeChildWatcher,
17477db96d56Sopenharmony_ci        # default ThreadedChildWatcher has no _loop property
17487db96d56Sopenharmony_ci        watcher = asyncio.SafeChildWatcher()
17497db96d56Sopenharmony_ci        policy.set_child_watcher(watcher)
17507db96d56Sopenharmony_ci        watcher.attach_loop(loop)
17517db96d56Sopenharmony_ci
17527db96d56Sopenharmony_ci        self.assertIs(watcher._loop, loop)
17537db96d56Sopenharmony_ci
17547db96d56Sopenharmony_ci        new_loop = policy.new_event_loop()
17557db96d56Sopenharmony_ci        policy.set_event_loop(new_loop)
17567db96d56Sopenharmony_ci
17577db96d56Sopenharmony_ci        self.assertIs(watcher._loop, new_loop)
17587db96d56Sopenharmony_ci
17597db96d56Sopenharmony_ci        policy.set_event_loop(None)
17607db96d56Sopenharmony_ci
17617db96d56Sopenharmony_ci        self.assertIs(watcher._loop, None)
17627db96d56Sopenharmony_ci
17637db96d56Sopenharmony_ci        loop.close()
17647db96d56Sopenharmony_ci        new_loop.close()
17657db96d56Sopenharmony_ci
17667db96d56Sopenharmony_ci
17677db96d56Sopenharmony_ciclass TestFunctional(unittest.TestCase):
17687db96d56Sopenharmony_ci
17697db96d56Sopenharmony_ci    def setUp(self):
17707db96d56Sopenharmony_ci        self.loop = asyncio.new_event_loop()
17717db96d56Sopenharmony_ci        asyncio.set_event_loop(self.loop)
17727db96d56Sopenharmony_ci
17737db96d56Sopenharmony_ci    def tearDown(self):
17747db96d56Sopenharmony_ci        self.loop.close()
17757db96d56Sopenharmony_ci        asyncio.set_event_loop(None)
17767db96d56Sopenharmony_ci
17777db96d56Sopenharmony_ci    def test_add_reader_invalid_argument(self):
17787db96d56Sopenharmony_ci        def assert_raises():
17797db96d56Sopenharmony_ci            return self.assertRaisesRegex(ValueError, r'Invalid file object')
17807db96d56Sopenharmony_ci
17817db96d56Sopenharmony_ci        cb = lambda: None
17827db96d56Sopenharmony_ci
17837db96d56Sopenharmony_ci        with assert_raises():
17847db96d56Sopenharmony_ci            self.loop.add_reader(object(), cb)
17857db96d56Sopenharmony_ci        with assert_raises():
17867db96d56Sopenharmony_ci            self.loop.add_writer(object(), cb)
17877db96d56Sopenharmony_ci
17887db96d56Sopenharmony_ci        with assert_raises():
17897db96d56Sopenharmony_ci            self.loop.remove_reader(object())
17907db96d56Sopenharmony_ci        with assert_raises():
17917db96d56Sopenharmony_ci            self.loop.remove_writer(object())
17927db96d56Sopenharmony_ci
17937db96d56Sopenharmony_ci    def test_add_reader_or_writer_transport_fd(self):
17947db96d56Sopenharmony_ci        def assert_raises():
17957db96d56Sopenharmony_ci            return self.assertRaisesRegex(
17967db96d56Sopenharmony_ci                RuntimeError,
17977db96d56Sopenharmony_ci                r'File descriptor .* is used by transport')
17987db96d56Sopenharmony_ci
17997db96d56Sopenharmony_ci        async def runner():
18007db96d56Sopenharmony_ci            tr, pr = await self.loop.create_connection(
18017db96d56Sopenharmony_ci                lambda: asyncio.Protocol(), sock=rsock)
18027db96d56Sopenharmony_ci
18037db96d56Sopenharmony_ci            try:
18047db96d56Sopenharmony_ci                cb = lambda: None
18057db96d56Sopenharmony_ci
18067db96d56Sopenharmony_ci                with assert_raises():
18077db96d56Sopenharmony_ci                    self.loop.add_reader(rsock, cb)
18087db96d56Sopenharmony_ci                with assert_raises():
18097db96d56Sopenharmony_ci                    self.loop.add_reader(rsock.fileno(), cb)
18107db96d56Sopenharmony_ci
18117db96d56Sopenharmony_ci                with assert_raises():
18127db96d56Sopenharmony_ci                    self.loop.remove_reader(rsock)
18137db96d56Sopenharmony_ci                with assert_raises():
18147db96d56Sopenharmony_ci                    self.loop.remove_reader(rsock.fileno())
18157db96d56Sopenharmony_ci
18167db96d56Sopenharmony_ci                with assert_raises():
18177db96d56Sopenharmony_ci                    self.loop.add_writer(rsock, cb)
18187db96d56Sopenharmony_ci                with assert_raises():
18197db96d56Sopenharmony_ci                    self.loop.add_writer(rsock.fileno(), cb)
18207db96d56Sopenharmony_ci
18217db96d56Sopenharmony_ci                with assert_raises():
18227db96d56Sopenharmony_ci                    self.loop.remove_writer(rsock)
18237db96d56Sopenharmony_ci                with assert_raises():
18247db96d56Sopenharmony_ci                    self.loop.remove_writer(rsock.fileno())
18257db96d56Sopenharmony_ci
18267db96d56Sopenharmony_ci            finally:
18277db96d56Sopenharmony_ci                tr.close()
18287db96d56Sopenharmony_ci
18297db96d56Sopenharmony_ci        rsock, wsock = socket.socketpair()
18307db96d56Sopenharmony_ci        try:
18317db96d56Sopenharmony_ci            self.loop.run_until_complete(runner())
18327db96d56Sopenharmony_ci        finally:
18337db96d56Sopenharmony_ci            rsock.close()
18347db96d56Sopenharmony_ci            wsock.close()
18357db96d56Sopenharmony_ci
18367db96d56Sopenharmony_ci
18377db96d56Sopenharmony_ciif __name__ == '__main__':
18387db96d56Sopenharmony_ci    unittest.main()
1839