17db96d56Sopenharmony_ciimport enum
27db96d56Sopenharmony_ciimport errno
37db96d56Sopenharmony_ciimport inspect
47db96d56Sopenharmony_ciimport os
57db96d56Sopenharmony_ciimport random
67db96d56Sopenharmony_ciimport signal
77db96d56Sopenharmony_ciimport socket
87db96d56Sopenharmony_ciimport statistics
97db96d56Sopenharmony_ciimport subprocess
107db96d56Sopenharmony_ciimport sys
117db96d56Sopenharmony_ciimport threading
127db96d56Sopenharmony_ciimport time
137db96d56Sopenharmony_ciimport unittest
147db96d56Sopenharmony_cifrom test import support
157db96d56Sopenharmony_cifrom test.support import os_helper
167db96d56Sopenharmony_cifrom test.support.script_helper import assert_python_ok, spawn_python
177db96d56Sopenharmony_cifrom test.support import threading_helper
187db96d56Sopenharmony_citry:
197db96d56Sopenharmony_ci    import _testcapi
207db96d56Sopenharmony_ciexcept ImportError:
217db96d56Sopenharmony_ci    _testcapi = None
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ciclass GenericTests(unittest.TestCase):
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_ci    def test_enums(self):
277db96d56Sopenharmony_ci        for name in dir(signal):
287db96d56Sopenharmony_ci            sig = getattr(signal, name)
297db96d56Sopenharmony_ci            if name in {'SIG_DFL', 'SIG_IGN'}:
307db96d56Sopenharmony_ci                self.assertIsInstance(sig, signal.Handlers)
317db96d56Sopenharmony_ci            elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}:
327db96d56Sopenharmony_ci                self.assertIsInstance(sig, signal.Sigmasks)
337db96d56Sopenharmony_ci            elif name.startswith('SIG') and not name.startswith('SIG_'):
347db96d56Sopenharmony_ci                self.assertIsInstance(sig, signal.Signals)
357db96d56Sopenharmony_ci            elif name.startswith('CTRL_'):
367db96d56Sopenharmony_ci                self.assertIsInstance(sig, signal.Signals)
377db96d56Sopenharmony_ci                self.assertEqual(sys.platform, "win32")
387db96d56Sopenharmony_ci
397db96d56Sopenharmony_ci        CheckedSignals = enum._old_convert_(
407db96d56Sopenharmony_ci                enum.IntEnum, 'Signals', 'signal',
417db96d56Sopenharmony_ci                lambda name:
427db96d56Sopenharmony_ci                    name.isupper()
437db96d56Sopenharmony_ci                    and (name.startswith('SIG') and not name.startswith('SIG_'))
447db96d56Sopenharmony_ci                    or name.startswith('CTRL_'),
457db96d56Sopenharmony_ci                source=signal,
467db96d56Sopenharmony_ci                )
477db96d56Sopenharmony_ci        enum._test_simple_enum(CheckedSignals, signal.Signals)
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ci        CheckedHandlers = enum._old_convert_(
507db96d56Sopenharmony_ci                enum.IntEnum, 'Handlers', 'signal',
517db96d56Sopenharmony_ci                lambda name: name in ('SIG_DFL', 'SIG_IGN'),
527db96d56Sopenharmony_ci                source=signal,
537db96d56Sopenharmony_ci                )
547db96d56Sopenharmony_ci        enum._test_simple_enum(CheckedHandlers, signal.Handlers)
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ci        Sigmasks = getattr(signal, 'Sigmasks', None)
577db96d56Sopenharmony_ci        if Sigmasks is not None:
587db96d56Sopenharmony_ci            CheckedSigmasks = enum._old_convert_(
597db96d56Sopenharmony_ci                    enum.IntEnum, 'Sigmasks', 'signal',
607db96d56Sopenharmony_ci                    lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'),
617db96d56Sopenharmony_ci                    source=signal,
627db96d56Sopenharmony_ci                    )
637db96d56Sopenharmony_ci            enum._test_simple_enum(CheckedSigmasks, Sigmasks)
647db96d56Sopenharmony_ci
657db96d56Sopenharmony_ci    def test_functions_module_attr(self):
667db96d56Sopenharmony_ci        # Issue #27718: If __all__ is not defined all non-builtin functions
677db96d56Sopenharmony_ci        # should have correct __module__ to be displayed by pydoc.
687db96d56Sopenharmony_ci        for name in dir(signal):
697db96d56Sopenharmony_ci            value = getattr(signal, name)
707db96d56Sopenharmony_ci            if inspect.isroutine(value) and not inspect.isbuiltin(value):
717db96d56Sopenharmony_ci                self.assertEqual(value.__module__, 'signal')
727db96d56Sopenharmony_ci
737db96d56Sopenharmony_ci
747db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
757db96d56Sopenharmony_ciclass PosixTests(unittest.TestCase):
767db96d56Sopenharmony_ci    def trivial_signal_handler(self, *args):
777db96d56Sopenharmony_ci        pass
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci    def test_out_of_range_signal_number_raises_error(self):
807db96d56Sopenharmony_ci        self.assertRaises(ValueError, signal.getsignal, 4242)
817db96d56Sopenharmony_ci
827db96d56Sopenharmony_ci        self.assertRaises(ValueError, signal.signal, 4242,
837db96d56Sopenharmony_ci                          self.trivial_signal_handler)
847db96d56Sopenharmony_ci
857db96d56Sopenharmony_ci        self.assertRaises(ValueError, signal.strsignal, 4242)
867db96d56Sopenharmony_ci
877db96d56Sopenharmony_ci    def test_setting_signal_handler_to_none_raises_error(self):
887db96d56Sopenharmony_ci        self.assertRaises(TypeError, signal.signal,
897db96d56Sopenharmony_ci                          signal.SIGUSR1, None)
907db96d56Sopenharmony_ci
917db96d56Sopenharmony_ci    def test_getsignal(self):
927db96d56Sopenharmony_ci        hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
937db96d56Sopenharmony_ci        self.assertIsInstance(hup, signal.Handlers)
947db96d56Sopenharmony_ci        self.assertEqual(signal.getsignal(signal.SIGHUP),
957db96d56Sopenharmony_ci                         self.trivial_signal_handler)
967db96d56Sopenharmony_ci        signal.signal(signal.SIGHUP, hup)
977db96d56Sopenharmony_ci        self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
987db96d56Sopenharmony_ci
997db96d56Sopenharmony_ci    def test_strsignal(self):
1007db96d56Sopenharmony_ci        self.assertIn("Interrupt", signal.strsignal(signal.SIGINT))
1017db96d56Sopenharmony_ci        self.assertIn("Terminated", signal.strsignal(signal.SIGTERM))
1027db96d56Sopenharmony_ci        self.assertIn("Hangup", signal.strsignal(signal.SIGHUP))
1037db96d56Sopenharmony_ci
1047db96d56Sopenharmony_ci    # Issue 3864, unknown if this affects earlier versions of freebsd also
1057db96d56Sopenharmony_ci    def test_interprocess_signal(self):
1067db96d56Sopenharmony_ci        dirname = os.path.dirname(__file__)
1077db96d56Sopenharmony_ci        script = os.path.join(dirname, 'signalinterproctester.py')
1087db96d56Sopenharmony_ci        assert_python_ok(script)
1097db96d56Sopenharmony_ci
1107db96d56Sopenharmony_ci    @unittest.skipUnless(
1117db96d56Sopenharmony_ci        hasattr(signal, "valid_signals"),
1127db96d56Sopenharmony_ci        "requires signal.valid_signals"
1137db96d56Sopenharmony_ci    )
1147db96d56Sopenharmony_ci    def test_valid_signals(self):
1157db96d56Sopenharmony_ci        s = signal.valid_signals()
1167db96d56Sopenharmony_ci        self.assertIsInstance(s, set)
1177db96d56Sopenharmony_ci        self.assertIn(signal.Signals.SIGINT, s)
1187db96d56Sopenharmony_ci        self.assertIn(signal.Signals.SIGALRM, s)
1197db96d56Sopenharmony_ci        self.assertNotIn(0, s)
1207db96d56Sopenharmony_ci        self.assertNotIn(signal.NSIG, s)
1217db96d56Sopenharmony_ci        self.assertLess(len(s), signal.NSIG)
1227db96d56Sopenharmony_ci
1237db96d56Sopenharmony_ci        # gh-91145: Make sure that all SIGxxx constants exposed by the Python
1247db96d56Sopenharmony_ci        # signal module have a number in the [0; signal.NSIG-1] range.
1257db96d56Sopenharmony_ci        for name in dir(signal):
1267db96d56Sopenharmony_ci            if not name.startswith("SIG"):
1277db96d56Sopenharmony_ci                continue
1287db96d56Sopenharmony_ci            if name in {"SIG_IGN", "SIG_DFL"}:
1297db96d56Sopenharmony_ci                # SIG_IGN and SIG_DFL are pointers
1307db96d56Sopenharmony_ci                continue
1317db96d56Sopenharmony_ci            with self.subTest(name=name):
1327db96d56Sopenharmony_ci                signum = getattr(signal, name)
1337db96d56Sopenharmony_ci                self.assertGreaterEqual(signum, 0)
1347db96d56Sopenharmony_ci                self.assertLess(signum, signal.NSIG)
1357db96d56Sopenharmony_ci
1367db96d56Sopenharmony_ci    @unittest.skipUnless(sys.executable, "sys.executable required.")
1377db96d56Sopenharmony_ci    @support.requires_subprocess()
1387db96d56Sopenharmony_ci    def test_keyboard_interrupt_exit_code(self):
1397db96d56Sopenharmony_ci        """KeyboardInterrupt triggers exit via SIGINT."""
1407db96d56Sopenharmony_ci        process = subprocess.run(
1417db96d56Sopenharmony_ci                [sys.executable, "-c",
1427db96d56Sopenharmony_ci                 "import os, signal, time\n"
1437db96d56Sopenharmony_ci                 "os.kill(os.getpid(), signal.SIGINT)\n"
1447db96d56Sopenharmony_ci                 "for _ in range(999): time.sleep(0.01)"],
1457db96d56Sopenharmony_ci                stderr=subprocess.PIPE)
1467db96d56Sopenharmony_ci        self.assertIn(b"KeyboardInterrupt", process.stderr)
1477db96d56Sopenharmony_ci        self.assertEqual(process.returncode, -signal.SIGINT)
1487db96d56Sopenharmony_ci        # Caveat: The exit code is insufficient to guarantee we actually died
1497db96d56Sopenharmony_ci        # via a signal.  POSIX shells do more than look at the 8 bit value.
1507db96d56Sopenharmony_ci        # Writing an automation friendly test of an interactive shell
1517db96d56Sopenharmony_ci        # to confirm that our process died via a SIGINT proved too complex.
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci@unittest.skipUnless(sys.platform == "win32", "Windows specific")
1557db96d56Sopenharmony_ciclass WindowsSignalTests(unittest.TestCase):
1567db96d56Sopenharmony_ci
1577db96d56Sopenharmony_ci    def test_valid_signals(self):
1587db96d56Sopenharmony_ci        s = signal.valid_signals()
1597db96d56Sopenharmony_ci        self.assertIsInstance(s, set)
1607db96d56Sopenharmony_ci        self.assertGreaterEqual(len(s), 6)
1617db96d56Sopenharmony_ci        self.assertIn(signal.Signals.SIGINT, s)
1627db96d56Sopenharmony_ci        self.assertNotIn(0, s)
1637db96d56Sopenharmony_ci        self.assertNotIn(signal.NSIG, s)
1647db96d56Sopenharmony_ci        self.assertLess(len(s), signal.NSIG)
1657db96d56Sopenharmony_ci
1667db96d56Sopenharmony_ci    def test_issue9324(self):
1677db96d56Sopenharmony_ci        # Updated for issue #10003, adding SIGBREAK
1687db96d56Sopenharmony_ci        handler = lambda x, y: None
1697db96d56Sopenharmony_ci        checked = set()
1707db96d56Sopenharmony_ci        for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
1717db96d56Sopenharmony_ci                    signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
1727db96d56Sopenharmony_ci                    signal.SIGTERM):
1737db96d56Sopenharmony_ci            # Set and then reset a handler for signals that work on windows.
1747db96d56Sopenharmony_ci            # Issue #18396, only for signals without a C-level handler.
1757db96d56Sopenharmony_ci            if signal.getsignal(sig) is not None:
1767db96d56Sopenharmony_ci                signal.signal(sig, signal.signal(sig, handler))
1777db96d56Sopenharmony_ci                checked.add(sig)
1787db96d56Sopenharmony_ci        # Issue #18396: Ensure the above loop at least tested *something*
1797db96d56Sopenharmony_ci        self.assertTrue(checked)
1807db96d56Sopenharmony_ci
1817db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1827db96d56Sopenharmony_ci            signal.signal(-1, handler)
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1857db96d56Sopenharmony_ci            signal.signal(7, handler)
1867db96d56Sopenharmony_ci
1877db96d56Sopenharmony_ci    @unittest.skipUnless(sys.executable, "sys.executable required.")
1887db96d56Sopenharmony_ci    @support.requires_subprocess()
1897db96d56Sopenharmony_ci    def test_keyboard_interrupt_exit_code(self):
1907db96d56Sopenharmony_ci        """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT."""
1917db96d56Sopenharmony_ci        # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here
1927db96d56Sopenharmony_ci        # as that requires setting up a console control handler in a child
1937db96d56Sopenharmony_ci        # in its own process group.  Doable, but quite complicated.  (see
1947db96d56Sopenharmony_ci        # @eryksun on https://github.com/python/cpython/pull/11862)
1957db96d56Sopenharmony_ci        process = subprocess.run(
1967db96d56Sopenharmony_ci                [sys.executable, "-c", "raise KeyboardInterrupt"],
1977db96d56Sopenharmony_ci                stderr=subprocess.PIPE)
1987db96d56Sopenharmony_ci        self.assertIn(b"KeyboardInterrupt", process.stderr)
1997db96d56Sopenharmony_ci        STATUS_CONTROL_C_EXIT = 0xC000013A
2007db96d56Sopenharmony_ci        self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT)
2017db96d56Sopenharmony_ci
2027db96d56Sopenharmony_ci
2037db96d56Sopenharmony_ciclass WakeupFDTests(unittest.TestCase):
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci    def test_invalid_call(self):
2067db96d56Sopenharmony_ci        # First parameter is positional-only
2077db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
2087db96d56Sopenharmony_ci            signal.set_wakeup_fd(signum=signal.SIGINT)
2097db96d56Sopenharmony_ci
2107db96d56Sopenharmony_ci        # warn_on_full_buffer is a keyword-only parameter
2117db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
2127db96d56Sopenharmony_ci            signal.set_wakeup_fd(signal.SIGINT, False)
2137db96d56Sopenharmony_ci
2147db96d56Sopenharmony_ci    def test_invalid_fd(self):
2157db96d56Sopenharmony_ci        fd = os_helper.make_bad_fd()
2167db96d56Sopenharmony_ci        self.assertRaises((ValueError, OSError),
2177db96d56Sopenharmony_ci                          signal.set_wakeup_fd, fd)
2187db96d56Sopenharmony_ci
2197db96d56Sopenharmony_ci    @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
2207db96d56Sopenharmony_ci    def test_invalid_socket(self):
2217db96d56Sopenharmony_ci        sock = socket.socket()
2227db96d56Sopenharmony_ci        fd = sock.fileno()
2237db96d56Sopenharmony_ci        sock.close()
2247db96d56Sopenharmony_ci        self.assertRaises((ValueError, OSError),
2257db96d56Sopenharmony_ci                          signal.set_wakeup_fd, fd)
2267db96d56Sopenharmony_ci
2277db96d56Sopenharmony_ci    # Emscripten does not support fstat on pipes yet.
2287db96d56Sopenharmony_ci    # https://github.com/emscripten-core/emscripten/issues/16414
2297db96d56Sopenharmony_ci    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
2307db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
2317db96d56Sopenharmony_ci    def test_set_wakeup_fd_result(self):
2327db96d56Sopenharmony_ci        r1, w1 = os.pipe()
2337db96d56Sopenharmony_ci        self.addCleanup(os.close, r1)
2347db96d56Sopenharmony_ci        self.addCleanup(os.close, w1)
2357db96d56Sopenharmony_ci        r2, w2 = os.pipe()
2367db96d56Sopenharmony_ci        self.addCleanup(os.close, r2)
2377db96d56Sopenharmony_ci        self.addCleanup(os.close, w2)
2387db96d56Sopenharmony_ci
2397db96d56Sopenharmony_ci        if hasattr(os, 'set_blocking'):
2407db96d56Sopenharmony_ci            os.set_blocking(w1, False)
2417db96d56Sopenharmony_ci            os.set_blocking(w2, False)
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ci        signal.set_wakeup_fd(w1)
2447db96d56Sopenharmony_ci        self.assertEqual(signal.set_wakeup_fd(w2), w1)
2457db96d56Sopenharmony_ci        self.assertEqual(signal.set_wakeup_fd(-1), w2)
2467db96d56Sopenharmony_ci        self.assertEqual(signal.set_wakeup_fd(-1), -1)
2477db96d56Sopenharmony_ci
2487db96d56Sopenharmony_ci    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
2497db96d56Sopenharmony_ci    @unittest.skipUnless(support.has_socket_support, "needs working sockets.")
2507db96d56Sopenharmony_ci    def test_set_wakeup_fd_socket_result(self):
2517db96d56Sopenharmony_ci        sock1 = socket.socket()
2527db96d56Sopenharmony_ci        self.addCleanup(sock1.close)
2537db96d56Sopenharmony_ci        sock1.setblocking(False)
2547db96d56Sopenharmony_ci        fd1 = sock1.fileno()
2557db96d56Sopenharmony_ci
2567db96d56Sopenharmony_ci        sock2 = socket.socket()
2577db96d56Sopenharmony_ci        self.addCleanup(sock2.close)
2587db96d56Sopenharmony_ci        sock2.setblocking(False)
2597db96d56Sopenharmony_ci        fd2 = sock2.fileno()
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci        signal.set_wakeup_fd(fd1)
2627db96d56Sopenharmony_ci        self.assertEqual(signal.set_wakeup_fd(fd2), fd1)
2637db96d56Sopenharmony_ci        self.assertEqual(signal.set_wakeup_fd(-1), fd2)
2647db96d56Sopenharmony_ci        self.assertEqual(signal.set_wakeup_fd(-1), -1)
2657db96d56Sopenharmony_ci
2667db96d56Sopenharmony_ci    # On Windows, files are always blocking and Windows does not provide a
2677db96d56Sopenharmony_ci    # function to test if a socket is in non-blocking mode.
2687db96d56Sopenharmony_ci    @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX")
2697db96d56Sopenharmony_ci    @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.")
2707db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
2717db96d56Sopenharmony_ci    def test_set_wakeup_fd_blocking(self):
2727db96d56Sopenharmony_ci        rfd, wfd = os.pipe()
2737db96d56Sopenharmony_ci        self.addCleanup(os.close, rfd)
2747db96d56Sopenharmony_ci        self.addCleanup(os.close, wfd)
2757db96d56Sopenharmony_ci
2767db96d56Sopenharmony_ci        # fd must be non-blocking
2777db96d56Sopenharmony_ci        os.set_blocking(wfd, True)
2787db96d56Sopenharmony_ci        with self.assertRaises(ValueError) as cm:
2797db96d56Sopenharmony_ci            signal.set_wakeup_fd(wfd)
2807db96d56Sopenharmony_ci        self.assertEqual(str(cm.exception),
2817db96d56Sopenharmony_ci                         "the fd %s must be in non-blocking mode" % wfd)
2827db96d56Sopenharmony_ci
2837db96d56Sopenharmony_ci        # non-blocking is ok
2847db96d56Sopenharmony_ci        os.set_blocking(wfd, False)
2857db96d56Sopenharmony_ci        signal.set_wakeup_fd(wfd)
2867db96d56Sopenharmony_ci        signal.set_wakeup_fd(-1)
2877db96d56Sopenharmony_ci
2887db96d56Sopenharmony_ci
2897db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
2907db96d56Sopenharmony_ciclass WakeupSignalTests(unittest.TestCase):
2917db96d56Sopenharmony_ci    @unittest.skipIf(_testcapi is None, 'need _testcapi')
2927db96d56Sopenharmony_ci    def check_wakeup(self, test_body, *signals, ordered=True):
2937db96d56Sopenharmony_ci        # use a subprocess to have only one thread
2947db96d56Sopenharmony_ci        code = """if 1:
2957db96d56Sopenharmony_ci        import _testcapi
2967db96d56Sopenharmony_ci        import os
2977db96d56Sopenharmony_ci        import signal
2987db96d56Sopenharmony_ci        import struct
2997db96d56Sopenharmony_ci
3007db96d56Sopenharmony_ci        signals = {!r}
3017db96d56Sopenharmony_ci
3027db96d56Sopenharmony_ci        def handler(signum, frame):
3037db96d56Sopenharmony_ci            pass
3047db96d56Sopenharmony_ci
3057db96d56Sopenharmony_ci        def check_signum(signals):
3067db96d56Sopenharmony_ci            data = os.read(read, len(signals)+1)
3077db96d56Sopenharmony_ci            raised = struct.unpack('%uB' % len(data), data)
3087db96d56Sopenharmony_ci            if not {!r}:
3097db96d56Sopenharmony_ci                raised = set(raised)
3107db96d56Sopenharmony_ci                signals = set(signals)
3117db96d56Sopenharmony_ci            if raised != signals:
3127db96d56Sopenharmony_ci                raise Exception("%r != %r" % (raised, signals))
3137db96d56Sopenharmony_ci
3147db96d56Sopenharmony_ci        {}
3157db96d56Sopenharmony_ci
3167db96d56Sopenharmony_ci        signal.signal(signal.SIGALRM, handler)
3177db96d56Sopenharmony_ci        read, write = os.pipe()
3187db96d56Sopenharmony_ci        os.set_blocking(write, False)
3197db96d56Sopenharmony_ci        signal.set_wakeup_fd(write)
3207db96d56Sopenharmony_ci
3217db96d56Sopenharmony_ci        test()
3227db96d56Sopenharmony_ci        check_signum(signals)
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ci        os.close(read)
3257db96d56Sopenharmony_ci        os.close(write)
3267db96d56Sopenharmony_ci        """.format(tuple(map(int, signals)), ordered, test_body)
3277db96d56Sopenharmony_ci
3287db96d56Sopenharmony_ci        assert_python_ok('-c', code)
3297db96d56Sopenharmony_ci
3307db96d56Sopenharmony_ci    @unittest.skipIf(_testcapi is None, 'need _testcapi')
3317db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
3327db96d56Sopenharmony_ci    def test_wakeup_write_error(self):
3337db96d56Sopenharmony_ci        # Issue #16105: write() errors in the C signal handler should not
3347db96d56Sopenharmony_ci        # pass silently.
3357db96d56Sopenharmony_ci        # Use a subprocess to have only one thread.
3367db96d56Sopenharmony_ci        code = """if 1:
3377db96d56Sopenharmony_ci        import _testcapi
3387db96d56Sopenharmony_ci        import errno
3397db96d56Sopenharmony_ci        import os
3407db96d56Sopenharmony_ci        import signal
3417db96d56Sopenharmony_ci        import sys
3427db96d56Sopenharmony_ci        from test.support import captured_stderr
3437db96d56Sopenharmony_ci
3447db96d56Sopenharmony_ci        def handler(signum, frame):
3457db96d56Sopenharmony_ci            1/0
3467db96d56Sopenharmony_ci
3477db96d56Sopenharmony_ci        signal.signal(signal.SIGALRM, handler)
3487db96d56Sopenharmony_ci        r, w = os.pipe()
3497db96d56Sopenharmony_ci        os.set_blocking(r, False)
3507db96d56Sopenharmony_ci
3517db96d56Sopenharmony_ci        # Set wakeup_fd a read-only file descriptor to trigger the error
3527db96d56Sopenharmony_ci        signal.set_wakeup_fd(r)
3537db96d56Sopenharmony_ci        try:
3547db96d56Sopenharmony_ci            with captured_stderr() as err:
3557db96d56Sopenharmony_ci                signal.raise_signal(signal.SIGALRM)
3567db96d56Sopenharmony_ci        except ZeroDivisionError:
3577db96d56Sopenharmony_ci            # An ignored exception should have been printed out on stderr
3587db96d56Sopenharmony_ci            err = err.getvalue()
3597db96d56Sopenharmony_ci            if ('Exception ignored when trying to write to the signal wakeup fd'
3607db96d56Sopenharmony_ci                not in err):
3617db96d56Sopenharmony_ci                raise AssertionError(err)
3627db96d56Sopenharmony_ci            if ('OSError: [Errno %d]' % errno.EBADF) not in err:
3637db96d56Sopenharmony_ci                raise AssertionError(err)
3647db96d56Sopenharmony_ci        else:
3657db96d56Sopenharmony_ci            raise AssertionError("ZeroDivisionError not raised")
3667db96d56Sopenharmony_ci
3677db96d56Sopenharmony_ci        os.close(r)
3687db96d56Sopenharmony_ci        os.close(w)
3697db96d56Sopenharmony_ci        """
3707db96d56Sopenharmony_ci        r, w = os.pipe()
3717db96d56Sopenharmony_ci        try:
3727db96d56Sopenharmony_ci            os.write(r, b'x')
3737db96d56Sopenharmony_ci        except OSError:
3747db96d56Sopenharmony_ci            pass
3757db96d56Sopenharmony_ci        else:
3767db96d56Sopenharmony_ci            self.skipTest("OS doesn't report write() error on the read end of a pipe")
3777db96d56Sopenharmony_ci        finally:
3787db96d56Sopenharmony_ci            os.close(r)
3797db96d56Sopenharmony_ci            os.close(w)
3807db96d56Sopenharmony_ci
3817db96d56Sopenharmony_ci        assert_python_ok('-c', code)
3827db96d56Sopenharmony_ci
3837db96d56Sopenharmony_ci    def test_wakeup_fd_early(self):
3847db96d56Sopenharmony_ci        self.check_wakeup("""def test():
3857db96d56Sopenharmony_ci            import select
3867db96d56Sopenharmony_ci            import time
3877db96d56Sopenharmony_ci
3887db96d56Sopenharmony_ci            TIMEOUT_FULL = 10
3897db96d56Sopenharmony_ci            TIMEOUT_HALF = 5
3907db96d56Sopenharmony_ci
3917db96d56Sopenharmony_ci            class InterruptSelect(Exception):
3927db96d56Sopenharmony_ci                pass
3937db96d56Sopenharmony_ci
3947db96d56Sopenharmony_ci            def handler(signum, frame):
3957db96d56Sopenharmony_ci                raise InterruptSelect
3967db96d56Sopenharmony_ci            signal.signal(signal.SIGALRM, handler)
3977db96d56Sopenharmony_ci
3987db96d56Sopenharmony_ci            signal.alarm(1)
3997db96d56Sopenharmony_ci
4007db96d56Sopenharmony_ci            # We attempt to get a signal during the sleep,
4017db96d56Sopenharmony_ci            # before select is called
4027db96d56Sopenharmony_ci            try:
4037db96d56Sopenharmony_ci                select.select([], [], [], TIMEOUT_FULL)
4047db96d56Sopenharmony_ci            except InterruptSelect:
4057db96d56Sopenharmony_ci                pass
4067db96d56Sopenharmony_ci            else:
4077db96d56Sopenharmony_ci                raise Exception("select() was not interrupted")
4087db96d56Sopenharmony_ci
4097db96d56Sopenharmony_ci            before_time = time.monotonic()
4107db96d56Sopenharmony_ci            select.select([read], [], [], TIMEOUT_FULL)
4117db96d56Sopenharmony_ci            after_time = time.monotonic()
4127db96d56Sopenharmony_ci            dt = after_time - before_time
4137db96d56Sopenharmony_ci            if dt >= TIMEOUT_HALF:
4147db96d56Sopenharmony_ci                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
4157db96d56Sopenharmony_ci        """, signal.SIGALRM)
4167db96d56Sopenharmony_ci
4177db96d56Sopenharmony_ci    def test_wakeup_fd_during(self):
4187db96d56Sopenharmony_ci        self.check_wakeup("""def test():
4197db96d56Sopenharmony_ci            import select
4207db96d56Sopenharmony_ci            import time
4217db96d56Sopenharmony_ci
4227db96d56Sopenharmony_ci            TIMEOUT_FULL = 10
4237db96d56Sopenharmony_ci            TIMEOUT_HALF = 5
4247db96d56Sopenharmony_ci
4257db96d56Sopenharmony_ci            class InterruptSelect(Exception):
4267db96d56Sopenharmony_ci                pass
4277db96d56Sopenharmony_ci
4287db96d56Sopenharmony_ci            def handler(signum, frame):
4297db96d56Sopenharmony_ci                raise InterruptSelect
4307db96d56Sopenharmony_ci            signal.signal(signal.SIGALRM, handler)
4317db96d56Sopenharmony_ci
4327db96d56Sopenharmony_ci            signal.alarm(1)
4337db96d56Sopenharmony_ci            before_time = time.monotonic()
4347db96d56Sopenharmony_ci            # We attempt to get a signal during the select call
4357db96d56Sopenharmony_ci            try:
4367db96d56Sopenharmony_ci                select.select([read], [], [], TIMEOUT_FULL)
4377db96d56Sopenharmony_ci            except InterruptSelect:
4387db96d56Sopenharmony_ci                pass
4397db96d56Sopenharmony_ci            else:
4407db96d56Sopenharmony_ci                raise Exception("select() was not interrupted")
4417db96d56Sopenharmony_ci            after_time = time.monotonic()
4427db96d56Sopenharmony_ci            dt = after_time - before_time
4437db96d56Sopenharmony_ci            if dt >= TIMEOUT_HALF:
4447db96d56Sopenharmony_ci                raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
4457db96d56Sopenharmony_ci        """, signal.SIGALRM)
4467db96d56Sopenharmony_ci
4477db96d56Sopenharmony_ci    def test_signum(self):
4487db96d56Sopenharmony_ci        self.check_wakeup("""def test():
4497db96d56Sopenharmony_ci            signal.signal(signal.SIGUSR1, handler)
4507db96d56Sopenharmony_ci            signal.raise_signal(signal.SIGUSR1)
4517db96d56Sopenharmony_ci            signal.raise_signal(signal.SIGALRM)
4527db96d56Sopenharmony_ci        """, signal.SIGUSR1, signal.SIGALRM)
4537db96d56Sopenharmony_ci
4547db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
4557db96d56Sopenharmony_ci                         'need signal.pthread_sigmask()')
4567db96d56Sopenharmony_ci    def test_pending(self):
4577db96d56Sopenharmony_ci        self.check_wakeup("""def test():
4587db96d56Sopenharmony_ci            signum1 = signal.SIGUSR1
4597db96d56Sopenharmony_ci            signum2 = signal.SIGUSR2
4607db96d56Sopenharmony_ci
4617db96d56Sopenharmony_ci            signal.signal(signum1, handler)
4627db96d56Sopenharmony_ci            signal.signal(signum2, handler)
4637db96d56Sopenharmony_ci
4647db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
4657db96d56Sopenharmony_ci            signal.raise_signal(signum1)
4667db96d56Sopenharmony_ci            signal.raise_signal(signum2)
4677db96d56Sopenharmony_ci            # Unblocking the 2 signals calls the C signal handler twice
4687db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
4697db96d56Sopenharmony_ci        """,  signal.SIGUSR1, signal.SIGUSR2, ordered=False)
4707db96d56Sopenharmony_ci
4717db96d56Sopenharmony_ci
4727db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair')
4737db96d56Sopenharmony_ciclass WakeupSocketSignalTests(unittest.TestCase):
4747db96d56Sopenharmony_ci
4757db96d56Sopenharmony_ci    @unittest.skipIf(_testcapi is None, 'need _testcapi')
4767db96d56Sopenharmony_ci    def test_socket(self):
4777db96d56Sopenharmony_ci        # use a subprocess to have only one thread
4787db96d56Sopenharmony_ci        code = """if 1:
4797db96d56Sopenharmony_ci        import signal
4807db96d56Sopenharmony_ci        import socket
4817db96d56Sopenharmony_ci        import struct
4827db96d56Sopenharmony_ci        import _testcapi
4837db96d56Sopenharmony_ci
4847db96d56Sopenharmony_ci        signum = signal.SIGINT
4857db96d56Sopenharmony_ci        signals = (signum,)
4867db96d56Sopenharmony_ci
4877db96d56Sopenharmony_ci        def handler(signum, frame):
4887db96d56Sopenharmony_ci            pass
4897db96d56Sopenharmony_ci
4907db96d56Sopenharmony_ci        signal.signal(signum, handler)
4917db96d56Sopenharmony_ci
4927db96d56Sopenharmony_ci        read, write = socket.socketpair()
4937db96d56Sopenharmony_ci        write.setblocking(False)
4947db96d56Sopenharmony_ci        signal.set_wakeup_fd(write.fileno())
4957db96d56Sopenharmony_ci
4967db96d56Sopenharmony_ci        signal.raise_signal(signum)
4977db96d56Sopenharmony_ci
4987db96d56Sopenharmony_ci        data = read.recv(1)
4997db96d56Sopenharmony_ci        if not data:
5007db96d56Sopenharmony_ci            raise Exception("no signum written")
5017db96d56Sopenharmony_ci        raised = struct.unpack('B', data)
5027db96d56Sopenharmony_ci        if raised != signals:
5037db96d56Sopenharmony_ci            raise Exception("%r != %r" % (raised, signals))
5047db96d56Sopenharmony_ci
5057db96d56Sopenharmony_ci        read.close()
5067db96d56Sopenharmony_ci        write.close()
5077db96d56Sopenharmony_ci        """
5087db96d56Sopenharmony_ci
5097db96d56Sopenharmony_ci        assert_python_ok('-c', code)
5107db96d56Sopenharmony_ci
5117db96d56Sopenharmony_ci    @unittest.skipIf(_testcapi is None, 'need _testcapi')
5127db96d56Sopenharmony_ci    def test_send_error(self):
5137db96d56Sopenharmony_ci        # Use a subprocess to have only one thread.
5147db96d56Sopenharmony_ci        if os.name == 'nt':
5157db96d56Sopenharmony_ci            action = 'send'
5167db96d56Sopenharmony_ci        else:
5177db96d56Sopenharmony_ci            action = 'write'
5187db96d56Sopenharmony_ci        code = """if 1:
5197db96d56Sopenharmony_ci        import errno
5207db96d56Sopenharmony_ci        import signal
5217db96d56Sopenharmony_ci        import socket
5227db96d56Sopenharmony_ci        import sys
5237db96d56Sopenharmony_ci        import time
5247db96d56Sopenharmony_ci        import _testcapi
5257db96d56Sopenharmony_ci        from test.support import captured_stderr
5267db96d56Sopenharmony_ci
5277db96d56Sopenharmony_ci        signum = signal.SIGINT
5287db96d56Sopenharmony_ci
5297db96d56Sopenharmony_ci        def handler(signum, frame):
5307db96d56Sopenharmony_ci            pass
5317db96d56Sopenharmony_ci
5327db96d56Sopenharmony_ci        signal.signal(signum, handler)
5337db96d56Sopenharmony_ci
5347db96d56Sopenharmony_ci        read, write = socket.socketpair()
5357db96d56Sopenharmony_ci        read.setblocking(False)
5367db96d56Sopenharmony_ci        write.setblocking(False)
5377db96d56Sopenharmony_ci
5387db96d56Sopenharmony_ci        signal.set_wakeup_fd(write.fileno())
5397db96d56Sopenharmony_ci
5407db96d56Sopenharmony_ci        # Close sockets: send() will fail
5417db96d56Sopenharmony_ci        read.close()
5427db96d56Sopenharmony_ci        write.close()
5437db96d56Sopenharmony_ci
5447db96d56Sopenharmony_ci        with captured_stderr() as err:
5457db96d56Sopenharmony_ci            signal.raise_signal(signum)
5467db96d56Sopenharmony_ci
5477db96d56Sopenharmony_ci        err = err.getvalue()
5487db96d56Sopenharmony_ci        if ('Exception ignored when trying to {action} to the signal wakeup fd'
5497db96d56Sopenharmony_ci            not in err):
5507db96d56Sopenharmony_ci            raise AssertionError(err)
5517db96d56Sopenharmony_ci        """.format(action=action)
5527db96d56Sopenharmony_ci        assert_python_ok('-c', code)
5537db96d56Sopenharmony_ci
5547db96d56Sopenharmony_ci    @unittest.skipIf(_testcapi is None, 'need _testcapi')
5557db96d56Sopenharmony_ci    def test_warn_on_full_buffer(self):
5567db96d56Sopenharmony_ci        # Use a subprocess to have only one thread.
5577db96d56Sopenharmony_ci        if os.name == 'nt':
5587db96d56Sopenharmony_ci            action = 'send'
5597db96d56Sopenharmony_ci        else:
5607db96d56Sopenharmony_ci            action = 'write'
5617db96d56Sopenharmony_ci        code = """if 1:
5627db96d56Sopenharmony_ci        import errno
5637db96d56Sopenharmony_ci        import signal
5647db96d56Sopenharmony_ci        import socket
5657db96d56Sopenharmony_ci        import sys
5667db96d56Sopenharmony_ci        import time
5677db96d56Sopenharmony_ci        import _testcapi
5687db96d56Sopenharmony_ci        from test.support import captured_stderr
5697db96d56Sopenharmony_ci
5707db96d56Sopenharmony_ci        signum = signal.SIGINT
5717db96d56Sopenharmony_ci
5727db96d56Sopenharmony_ci        # This handler will be called, but we intentionally won't read from
5737db96d56Sopenharmony_ci        # the wakeup fd.
5747db96d56Sopenharmony_ci        def handler(signum, frame):
5757db96d56Sopenharmony_ci            pass
5767db96d56Sopenharmony_ci
5777db96d56Sopenharmony_ci        signal.signal(signum, handler)
5787db96d56Sopenharmony_ci
5797db96d56Sopenharmony_ci        read, write = socket.socketpair()
5807db96d56Sopenharmony_ci
5817db96d56Sopenharmony_ci        # Fill the socketpair buffer
5827db96d56Sopenharmony_ci        if sys.platform == 'win32':
5837db96d56Sopenharmony_ci            # bpo-34130: On Windows, sometimes non-blocking send fails to fill
5847db96d56Sopenharmony_ci            # the full socketpair buffer, so use a timeout of 50 ms instead.
5857db96d56Sopenharmony_ci            write.settimeout(0.050)
5867db96d56Sopenharmony_ci        else:
5877db96d56Sopenharmony_ci            write.setblocking(False)
5887db96d56Sopenharmony_ci
5897db96d56Sopenharmony_ci        written = 0
5907db96d56Sopenharmony_ci        if sys.platform == "vxworks":
5917db96d56Sopenharmony_ci            CHUNK_SIZES = (1,)
5927db96d56Sopenharmony_ci        else:
5937db96d56Sopenharmony_ci            # Start with large chunk size to reduce the
5947db96d56Sopenharmony_ci            # number of send needed to fill the buffer.
5957db96d56Sopenharmony_ci            CHUNK_SIZES = (2 ** 16, 2 ** 8, 1)
5967db96d56Sopenharmony_ci        for chunk_size in CHUNK_SIZES:
5977db96d56Sopenharmony_ci            chunk = b"x" * chunk_size
5987db96d56Sopenharmony_ci            try:
5997db96d56Sopenharmony_ci                while True:
6007db96d56Sopenharmony_ci                    write.send(chunk)
6017db96d56Sopenharmony_ci                    written += chunk_size
6027db96d56Sopenharmony_ci            except (BlockingIOError, TimeoutError):
6037db96d56Sopenharmony_ci                pass
6047db96d56Sopenharmony_ci
6057db96d56Sopenharmony_ci        print(f"%s bytes written into the socketpair" % written, flush=True)
6067db96d56Sopenharmony_ci
6077db96d56Sopenharmony_ci        write.setblocking(False)
6087db96d56Sopenharmony_ci        try:
6097db96d56Sopenharmony_ci            write.send(b"x")
6107db96d56Sopenharmony_ci        except BlockingIOError:
6117db96d56Sopenharmony_ci            # The socketpair buffer seems full
6127db96d56Sopenharmony_ci            pass
6137db96d56Sopenharmony_ci        else:
6147db96d56Sopenharmony_ci            raise AssertionError("%s bytes failed to fill the socketpair "
6157db96d56Sopenharmony_ci                                 "buffer" % written)
6167db96d56Sopenharmony_ci
6177db96d56Sopenharmony_ci        # By default, we get a warning when a signal arrives
6187db96d56Sopenharmony_ci        msg = ('Exception ignored when trying to {action} '
6197db96d56Sopenharmony_ci               'to the signal wakeup fd')
6207db96d56Sopenharmony_ci        signal.set_wakeup_fd(write.fileno())
6217db96d56Sopenharmony_ci
6227db96d56Sopenharmony_ci        with captured_stderr() as err:
6237db96d56Sopenharmony_ci            signal.raise_signal(signum)
6247db96d56Sopenharmony_ci
6257db96d56Sopenharmony_ci        err = err.getvalue()
6267db96d56Sopenharmony_ci        if msg not in err:
6277db96d56Sopenharmony_ci            raise AssertionError("first set_wakeup_fd() test failed, "
6287db96d56Sopenharmony_ci                                 "stderr: %r" % err)
6297db96d56Sopenharmony_ci
6307db96d56Sopenharmony_ci        # And also if warn_on_full_buffer=True
6317db96d56Sopenharmony_ci        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True)
6327db96d56Sopenharmony_ci
6337db96d56Sopenharmony_ci        with captured_stderr() as err:
6347db96d56Sopenharmony_ci            signal.raise_signal(signum)
6357db96d56Sopenharmony_ci
6367db96d56Sopenharmony_ci        err = err.getvalue()
6377db96d56Sopenharmony_ci        if msg not in err:
6387db96d56Sopenharmony_ci            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) "
6397db96d56Sopenharmony_ci                                 "test failed, stderr: %r" % err)
6407db96d56Sopenharmony_ci
6417db96d56Sopenharmony_ci        # But not if warn_on_full_buffer=False
6427db96d56Sopenharmony_ci        signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False)
6437db96d56Sopenharmony_ci
6447db96d56Sopenharmony_ci        with captured_stderr() as err:
6457db96d56Sopenharmony_ci            signal.raise_signal(signum)
6467db96d56Sopenharmony_ci
6477db96d56Sopenharmony_ci        err = err.getvalue()
6487db96d56Sopenharmony_ci        if err != "":
6497db96d56Sopenharmony_ci            raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) "
6507db96d56Sopenharmony_ci                                 "test failed, stderr: %r" % err)
6517db96d56Sopenharmony_ci
6527db96d56Sopenharmony_ci        # And then check the default again, to make sure warn_on_full_buffer
6537db96d56Sopenharmony_ci        # settings don't leak across calls.
6547db96d56Sopenharmony_ci        signal.set_wakeup_fd(write.fileno())
6557db96d56Sopenharmony_ci
6567db96d56Sopenharmony_ci        with captured_stderr() as err:
6577db96d56Sopenharmony_ci            signal.raise_signal(signum)
6587db96d56Sopenharmony_ci
6597db96d56Sopenharmony_ci        err = err.getvalue()
6607db96d56Sopenharmony_ci        if msg not in err:
6617db96d56Sopenharmony_ci            raise AssertionError("second set_wakeup_fd() test failed, "
6627db96d56Sopenharmony_ci                                 "stderr: %r" % err)
6637db96d56Sopenharmony_ci
6647db96d56Sopenharmony_ci        """.format(action=action)
6657db96d56Sopenharmony_ci        assert_python_ok('-c', code)
6667db96d56Sopenharmony_ci
6677db96d56Sopenharmony_ci
6687db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
6697db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()")
6707db96d56Sopenharmony_ci@support.requires_subprocess()
6717db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
6727db96d56Sopenharmony_ciclass SiginterruptTest(unittest.TestCase):
6737db96d56Sopenharmony_ci
6747db96d56Sopenharmony_ci    def readpipe_interrupted(self, interrupt):
6757db96d56Sopenharmony_ci        """Perform a read during which a signal will arrive.  Return True if the
6767db96d56Sopenharmony_ci        read is interrupted by the signal and raises an exception.  Return False
6777db96d56Sopenharmony_ci        if it returns normally.
6787db96d56Sopenharmony_ci        """
6797db96d56Sopenharmony_ci        # use a subprocess to have only one thread, to have a timeout on the
6807db96d56Sopenharmony_ci        # blocking read and to not touch signal handling in this process
6817db96d56Sopenharmony_ci        code = """if 1:
6827db96d56Sopenharmony_ci            import errno
6837db96d56Sopenharmony_ci            import os
6847db96d56Sopenharmony_ci            import signal
6857db96d56Sopenharmony_ci            import sys
6867db96d56Sopenharmony_ci
6877db96d56Sopenharmony_ci            interrupt = %r
6887db96d56Sopenharmony_ci            r, w = os.pipe()
6897db96d56Sopenharmony_ci
6907db96d56Sopenharmony_ci            def handler(signum, frame):
6917db96d56Sopenharmony_ci                1 / 0
6927db96d56Sopenharmony_ci
6937db96d56Sopenharmony_ci            signal.signal(signal.SIGALRM, handler)
6947db96d56Sopenharmony_ci            if interrupt is not None:
6957db96d56Sopenharmony_ci                signal.siginterrupt(signal.SIGALRM, interrupt)
6967db96d56Sopenharmony_ci
6977db96d56Sopenharmony_ci            print("ready")
6987db96d56Sopenharmony_ci            sys.stdout.flush()
6997db96d56Sopenharmony_ci
7007db96d56Sopenharmony_ci            # run the test twice
7017db96d56Sopenharmony_ci            try:
7027db96d56Sopenharmony_ci                for loop in range(2):
7037db96d56Sopenharmony_ci                    # send a SIGALRM in a second (during the read)
7047db96d56Sopenharmony_ci                    signal.alarm(1)
7057db96d56Sopenharmony_ci                    try:
7067db96d56Sopenharmony_ci                        # blocking call: read from a pipe without data
7077db96d56Sopenharmony_ci                        os.read(r, 1)
7087db96d56Sopenharmony_ci                    except ZeroDivisionError:
7097db96d56Sopenharmony_ci                        pass
7107db96d56Sopenharmony_ci                    else:
7117db96d56Sopenharmony_ci                        sys.exit(2)
7127db96d56Sopenharmony_ci                sys.exit(3)
7137db96d56Sopenharmony_ci            finally:
7147db96d56Sopenharmony_ci                os.close(r)
7157db96d56Sopenharmony_ci                os.close(w)
7167db96d56Sopenharmony_ci        """ % (interrupt,)
7177db96d56Sopenharmony_ci        with spawn_python('-c', code) as process:
7187db96d56Sopenharmony_ci            try:
7197db96d56Sopenharmony_ci                # wait until the child process is loaded and has started
7207db96d56Sopenharmony_ci                first_line = process.stdout.readline()
7217db96d56Sopenharmony_ci
7227db96d56Sopenharmony_ci                stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT)
7237db96d56Sopenharmony_ci            except subprocess.TimeoutExpired:
7247db96d56Sopenharmony_ci                process.kill()
7257db96d56Sopenharmony_ci                return False
7267db96d56Sopenharmony_ci            else:
7277db96d56Sopenharmony_ci                stdout = first_line + stdout
7287db96d56Sopenharmony_ci                exitcode = process.wait()
7297db96d56Sopenharmony_ci                if exitcode not in (2, 3):
7307db96d56Sopenharmony_ci                    raise Exception("Child error (exit code %s): %r"
7317db96d56Sopenharmony_ci                                    % (exitcode, stdout))
7327db96d56Sopenharmony_ci                return (exitcode == 3)
7337db96d56Sopenharmony_ci
7347db96d56Sopenharmony_ci    def test_without_siginterrupt(self):
7357db96d56Sopenharmony_ci        # If a signal handler is installed and siginterrupt is not called
7367db96d56Sopenharmony_ci        # at all, when that signal arrives, it interrupts a syscall that's in
7377db96d56Sopenharmony_ci        # progress.
7387db96d56Sopenharmony_ci        interrupted = self.readpipe_interrupted(None)
7397db96d56Sopenharmony_ci        self.assertTrue(interrupted)
7407db96d56Sopenharmony_ci
7417db96d56Sopenharmony_ci    def test_siginterrupt_on(self):
7427db96d56Sopenharmony_ci        # If a signal handler is installed and siginterrupt is called with
7437db96d56Sopenharmony_ci        # a true value for the second argument, when that signal arrives, it
7447db96d56Sopenharmony_ci        # interrupts a syscall that's in progress.
7457db96d56Sopenharmony_ci        interrupted = self.readpipe_interrupted(True)
7467db96d56Sopenharmony_ci        self.assertTrue(interrupted)
7477db96d56Sopenharmony_ci
7487db96d56Sopenharmony_ci    def test_siginterrupt_off(self):
7497db96d56Sopenharmony_ci        # If a signal handler is installed and siginterrupt is called with
7507db96d56Sopenharmony_ci        # a false value for the second argument, when that signal arrives, it
7517db96d56Sopenharmony_ci        # does not interrupt a syscall that's in progress.
7527db96d56Sopenharmony_ci        interrupted = self.readpipe_interrupted(False)
7537db96d56Sopenharmony_ci        self.assertFalse(interrupted)
7547db96d56Sopenharmony_ci
7557db96d56Sopenharmony_ci
7567db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
7577db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'),
7587db96d56Sopenharmony_ci                         "needs signal.getitimer() and signal.setitimer()")
7597db96d56Sopenharmony_ciclass ItimerTest(unittest.TestCase):
7607db96d56Sopenharmony_ci    def setUp(self):
7617db96d56Sopenharmony_ci        self.hndl_called = False
7627db96d56Sopenharmony_ci        self.hndl_count = 0
7637db96d56Sopenharmony_ci        self.itimer = None
7647db96d56Sopenharmony_ci        self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
7657db96d56Sopenharmony_ci
7667db96d56Sopenharmony_ci    def tearDown(self):
7677db96d56Sopenharmony_ci        signal.signal(signal.SIGALRM, self.old_alarm)
7687db96d56Sopenharmony_ci        if self.itimer is not None: # test_itimer_exc doesn't change this attr
7697db96d56Sopenharmony_ci            # just ensure that itimer is stopped
7707db96d56Sopenharmony_ci            signal.setitimer(self.itimer, 0)
7717db96d56Sopenharmony_ci
7727db96d56Sopenharmony_ci    def sig_alrm(self, *args):
7737db96d56Sopenharmony_ci        self.hndl_called = True
7747db96d56Sopenharmony_ci
7757db96d56Sopenharmony_ci    def sig_vtalrm(self, *args):
7767db96d56Sopenharmony_ci        self.hndl_called = True
7777db96d56Sopenharmony_ci
7787db96d56Sopenharmony_ci        if self.hndl_count > 3:
7797db96d56Sopenharmony_ci            # it shouldn't be here, because it should have been disabled.
7807db96d56Sopenharmony_ci            raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
7817db96d56Sopenharmony_ci                "timer.")
7827db96d56Sopenharmony_ci        elif self.hndl_count == 3:
7837db96d56Sopenharmony_ci            # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
7847db96d56Sopenharmony_ci            signal.setitimer(signal.ITIMER_VIRTUAL, 0)
7857db96d56Sopenharmony_ci
7867db96d56Sopenharmony_ci        self.hndl_count += 1
7877db96d56Sopenharmony_ci
7887db96d56Sopenharmony_ci    def sig_prof(self, *args):
7897db96d56Sopenharmony_ci        self.hndl_called = True
7907db96d56Sopenharmony_ci        signal.setitimer(signal.ITIMER_PROF, 0)
7917db96d56Sopenharmony_ci
7927db96d56Sopenharmony_ci    def test_itimer_exc(self):
7937db96d56Sopenharmony_ci        # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
7947db96d56Sopenharmony_ci        # defines it ?
7957db96d56Sopenharmony_ci        self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
7967db96d56Sopenharmony_ci        # Negative times are treated as zero on some platforms.
7977db96d56Sopenharmony_ci        if 0:
7987db96d56Sopenharmony_ci            self.assertRaises(signal.ItimerError,
7997db96d56Sopenharmony_ci                              signal.setitimer, signal.ITIMER_REAL, -1)
8007db96d56Sopenharmony_ci
8017db96d56Sopenharmony_ci    def test_itimer_real(self):
8027db96d56Sopenharmony_ci        self.itimer = signal.ITIMER_REAL
8037db96d56Sopenharmony_ci        signal.setitimer(self.itimer, 1.0)
8047db96d56Sopenharmony_ci        signal.pause()
8057db96d56Sopenharmony_ci        self.assertEqual(self.hndl_called, True)
8067db96d56Sopenharmony_ci
8077db96d56Sopenharmony_ci    # Issue 3864, unknown if this affects earlier versions of freebsd also
8087db96d56Sopenharmony_ci    @unittest.skipIf(sys.platform in ('netbsd5',),
8097db96d56Sopenharmony_ci        'itimer not reliable (does not mix well with threading) on some BSDs.')
8107db96d56Sopenharmony_ci    def test_itimer_virtual(self):
8117db96d56Sopenharmony_ci        self.itimer = signal.ITIMER_VIRTUAL
8127db96d56Sopenharmony_ci        signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
8137db96d56Sopenharmony_ci        signal.setitimer(self.itimer, 0.3, 0.2)
8147db96d56Sopenharmony_ci
8157db96d56Sopenharmony_ci        start_time = time.monotonic()
8167db96d56Sopenharmony_ci        while time.monotonic() - start_time < 60.0:
8177db96d56Sopenharmony_ci            # use up some virtual time by doing real work
8187db96d56Sopenharmony_ci            _ = pow(12345, 67890, 10000019)
8197db96d56Sopenharmony_ci            if signal.getitimer(self.itimer) == (0.0, 0.0):
8207db96d56Sopenharmony_ci                break # sig_vtalrm handler stopped this itimer
8217db96d56Sopenharmony_ci        else: # Issue 8424
8227db96d56Sopenharmony_ci            self.skipTest("timeout: likely cause: machine too slow or load too "
8237db96d56Sopenharmony_ci                          "high")
8247db96d56Sopenharmony_ci
8257db96d56Sopenharmony_ci        # virtual itimer should be (0.0, 0.0) now
8267db96d56Sopenharmony_ci        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
8277db96d56Sopenharmony_ci        # and the handler should have been called
8287db96d56Sopenharmony_ci        self.assertEqual(self.hndl_called, True)
8297db96d56Sopenharmony_ci
8307db96d56Sopenharmony_ci    def test_itimer_prof(self):
8317db96d56Sopenharmony_ci        self.itimer = signal.ITIMER_PROF
8327db96d56Sopenharmony_ci        signal.signal(signal.SIGPROF, self.sig_prof)
8337db96d56Sopenharmony_ci        signal.setitimer(self.itimer, 0.2, 0.2)
8347db96d56Sopenharmony_ci
8357db96d56Sopenharmony_ci        start_time = time.monotonic()
8367db96d56Sopenharmony_ci        while time.monotonic() - start_time < 60.0:
8377db96d56Sopenharmony_ci            # do some work
8387db96d56Sopenharmony_ci            _ = pow(12345, 67890, 10000019)
8397db96d56Sopenharmony_ci            if signal.getitimer(self.itimer) == (0.0, 0.0):
8407db96d56Sopenharmony_ci                break # sig_prof handler stopped this itimer
8417db96d56Sopenharmony_ci        else: # Issue 8424
8427db96d56Sopenharmony_ci            self.skipTest("timeout: likely cause: machine too slow or load too "
8437db96d56Sopenharmony_ci                          "high")
8447db96d56Sopenharmony_ci
8457db96d56Sopenharmony_ci        # profiling itimer should be (0.0, 0.0) now
8467db96d56Sopenharmony_ci        self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
8477db96d56Sopenharmony_ci        # and the handler should have been called
8487db96d56Sopenharmony_ci        self.assertEqual(self.hndl_called, True)
8497db96d56Sopenharmony_ci
8507db96d56Sopenharmony_ci    def test_setitimer_tiny(self):
8517db96d56Sopenharmony_ci        # bpo-30807: C setitimer() takes a microsecond-resolution interval.
8527db96d56Sopenharmony_ci        # Check that float -> timeval conversion doesn't round
8537db96d56Sopenharmony_ci        # the interval down to zero, which would disable the timer.
8547db96d56Sopenharmony_ci        self.itimer = signal.ITIMER_REAL
8557db96d56Sopenharmony_ci        signal.setitimer(self.itimer, 1e-6)
8567db96d56Sopenharmony_ci        time.sleep(1)
8577db96d56Sopenharmony_ci        self.assertEqual(self.hndl_called, True)
8587db96d56Sopenharmony_ci
8597db96d56Sopenharmony_ci
8607db96d56Sopenharmony_ciclass PendingSignalsTests(unittest.TestCase):
8617db96d56Sopenharmony_ci    """
8627db96d56Sopenharmony_ci    Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
8637db96d56Sopenharmony_ci    functions.
8647db96d56Sopenharmony_ci    """
8657db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigpending'),
8667db96d56Sopenharmony_ci                         'need signal.sigpending()')
8677db96d56Sopenharmony_ci    def test_sigpending_empty(self):
8687db96d56Sopenharmony_ci        self.assertEqual(signal.sigpending(), set())
8697db96d56Sopenharmony_ci
8707db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
8717db96d56Sopenharmony_ci                         'need signal.pthread_sigmask()')
8727db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigpending'),
8737db96d56Sopenharmony_ci                         'need signal.sigpending()')
8747db96d56Sopenharmony_ci    def test_sigpending(self):
8757db96d56Sopenharmony_ci        code = """if 1:
8767db96d56Sopenharmony_ci            import os
8777db96d56Sopenharmony_ci            import signal
8787db96d56Sopenharmony_ci
8797db96d56Sopenharmony_ci            def handler(signum, frame):
8807db96d56Sopenharmony_ci                1/0
8817db96d56Sopenharmony_ci
8827db96d56Sopenharmony_ci            signum = signal.SIGUSR1
8837db96d56Sopenharmony_ci            signal.signal(signum, handler)
8847db96d56Sopenharmony_ci
8857db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
8867db96d56Sopenharmony_ci            os.kill(os.getpid(), signum)
8877db96d56Sopenharmony_ci            pending = signal.sigpending()
8887db96d56Sopenharmony_ci            for sig in pending:
8897db96d56Sopenharmony_ci                assert isinstance(sig, signal.Signals), repr(pending)
8907db96d56Sopenharmony_ci            if pending != {signum}:
8917db96d56Sopenharmony_ci                raise Exception('%s != {%s}' % (pending, signum))
8927db96d56Sopenharmony_ci            try:
8937db96d56Sopenharmony_ci                signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
8947db96d56Sopenharmony_ci            except ZeroDivisionError:
8957db96d56Sopenharmony_ci                pass
8967db96d56Sopenharmony_ci            else:
8977db96d56Sopenharmony_ci                raise Exception("ZeroDivisionError not raised")
8987db96d56Sopenharmony_ci        """
8997db96d56Sopenharmony_ci        assert_python_ok('-c', code)
9007db96d56Sopenharmony_ci
9017db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
9027db96d56Sopenharmony_ci                         'need signal.pthread_kill()')
9037db96d56Sopenharmony_ci    @threading_helper.requires_working_threading()
9047db96d56Sopenharmony_ci    def test_pthread_kill(self):
9057db96d56Sopenharmony_ci        code = """if 1:
9067db96d56Sopenharmony_ci            import signal
9077db96d56Sopenharmony_ci            import threading
9087db96d56Sopenharmony_ci            import sys
9097db96d56Sopenharmony_ci
9107db96d56Sopenharmony_ci            signum = signal.SIGUSR1
9117db96d56Sopenharmony_ci
9127db96d56Sopenharmony_ci            def handler(signum, frame):
9137db96d56Sopenharmony_ci                1/0
9147db96d56Sopenharmony_ci
9157db96d56Sopenharmony_ci            signal.signal(signum, handler)
9167db96d56Sopenharmony_ci
9177db96d56Sopenharmony_ci            tid = threading.get_ident()
9187db96d56Sopenharmony_ci            try:
9197db96d56Sopenharmony_ci                signal.pthread_kill(tid, signum)
9207db96d56Sopenharmony_ci            except ZeroDivisionError:
9217db96d56Sopenharmony_ci                pass
9227db96d56Sopenharmony_ci            else:
9237db96d56Sopenharmony_ci                raise Exception("ZeroDivisionError not raised")
9247db96d56Sopenharmony_ci        """
9257db96d56Sopenharmony_ci        assert_python_ok('-c', code)
9267db96d56Sopenharmony_ci
9277db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
9287db96d56Sopenharmony_ci                         'need signal.pthread_sigmask()')
9297db96d56Sopenharmony_ci    def wait_helper(self, blocked, test):
9307db96d56Sopenharmony_ci        """
9317db96d56Sopenharmony_ci        test: body of the "def test(signum):" function.
9327db96d56Sopenharmony_ci        blocked: number of the blocked signal
9337db96d56Sopenharmony_ci        """
9347db96d56Sopenharmony_ci        code = '''if 1:
9357db96d56Sopenharmony_ci        import signal
9367db96d56Sopenharmony_ci        import sys
9377db96d56Sopenharmony_ci        from signal import Signals
9387db96d56Sopenharmony_ci
9397db96d56Sopenharmony_ci        def handler(signum, frame):
9407db96d56Sopenharmony_ci            1/0
9417db96d56Sopenharmony_ci
9427db96d56Sopenharmony_ci        %s
9437db96d56Sopenharmony_ci
9447db96d56Sopenharmony_ci        blocked = %s
9457db96d56Sopenharmony_ci        signum = signal.SIGALRM
9467db96d56Sopenharmony_ci
9477db96d56Sopenharmony_ci        # child: block and wait the signal
9487db96d56Sopenharmony_ci        try:
9497db96d56Sopenharmony_ci            signal.signal(signum, handler)
9507db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
9517db96d56Sopenharmony_ci
9527db96d56Sopenharmony_ci            # Do the tests
9537db96d56Sopenharmony_ci            test(signum)
9547db96d56Sopenharmony_ci
9557db96d56Sopenharmony_ci            # The handler must not be called on unblock
9567db96d56Sopenharmony_ci            try:
9577db96d56Sopenharmony_ci                signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
9587db96d56Sopenharmony_ci            except ZeroDivisionError:
9597db96d56Sopenharmony_ci                print("the signal handler has been called",
9607db96d56Sopenharmony_ci                      file=sys.stderr)
9617db96d56Sopenharmony_ci                sys.exit(1)
9627db96d56Sopenharmony_ci        except BaseException as err:
9637db96d56Sopenharmony_ci            print("error: {}".format(err), file=sys.stderr)
9647db96d56Sopenharmony_ci            sys.stderr.flush()
9657db96d56Sopenharmony_ci            sys.exit(1)
9667db96d56Sopenharmony_ci        ''' % (test.strip(), blocked)
9677db96d56Sopenharmony_ci
9687db96d56Sopenharmony_ci        # sig*wait* must be called with the signal blocked: since the current
9697db96d56Sopenharmony_ci        # process might have several threads running, use a subprocess to have
9707db96d56Sopenharmony_ci        # a single thread.
9717db96d56Sopenharmony_ci        assert_python_ok('-c', code)
9727db96d56Sopenharmony_ci
9737db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigwait'),
9747db96d56Sopenharmony_ci                         'need signal.sigwait()')
9757db96d56Sopenharmony_ci    def test_sigwait(self):
9767db96d56Sopenharmony_ci        self.wait_helper(signal.SIGALRM, '''
9777db96d56Sopenharmony_ci        def test(signum):
9787db96d56Sopenharmony_ci            signal.alarm(1)
9797db96d56Sopenharmony_ci            received = signal.sigwait([signum])
9807db96d56Sopenharmony_ci            assert isinstance(received, signal.Signals), received
9817db96d56Sopenharmony_ci            if received != signum:
9827db96d56Sopenharmony_ci                raise Exception('received %s, not %s' % (received, signum))
9837db96d56Sopenharmony_ci        ''')
9847db96d56Sopenharmony_ci
9857db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
9867db96d56Sopenharmony_ci                         'need signal.sigwaitinfo()')
9877db96d56Sopenharmony_ci    def test_sigwaitinfo(self):
9887db96d56Sopenharmony_ci        self.wait_helper(signal.SIGALRM, '''
9897db96d56Sopenharmony_ci        def test(signum):
9907db96d56Sopenharmony_ci            signal.alarm(1)
9917db96d56Sopenharmony_ci            info = signal.sigwaitinfo([signum])
9927db96d56Sopenharmony_ci            if info.si_signo != signum:
9937db96d56Sopenharmony_ci                raise Exception("info.si_signo != %s" % signum)
9947db96d56Sopenharmony_ci        ''')
9957db96d56Sopenharmony_ci
9967db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
9977db96d56Sopenharmony_ci                         'need signal.sigtimedwait()')
9987db96d56Sopenharmony_ci    def test_sigtimedwait(self):
9997db96d56Sopenharmony_ci        self.wait_helper(signal.SIGALRM, '''
10007db96d56Sopenharmony_ci        def test(signum):
10017db96d56Sopenharmony_ci            signal.alarm(1)
10027db96d56Sopenharmony_ci            info = signal.sigtimedwait([signum], 10.1000)
10037db96d56Sopenharmony_ci            if info.si_signo != signum:
10047db96d56Sopenharmony_ci                raise Exception('info.si_signo != %s' % signum)
10057db96d56Sopenharmony_ci        ''')
10067db96d56Sopenharmony_ci
10077db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
10087db96d56Sopenharmony_ci                         'need signal.sigtimedwait()')
10097db96d56Sopenharmony_ci    def test_sigtimedwait_poll(self):
10107db96d56Sopenharmony_ci        # check that polling with sigtimedwait works
10117db96d56Sopenharmony_ci        self.wait_helper(signal.SIGALRM, '''
10127db96d56Sopenharmony_ci        def test(signum):
10137db96d56Sopenharmony_ci            import os
10147db96d56Sopenharmony_ci            os.kill(os.getpid(), signum)
10157db96d56Sopenharmony_ci            info = signal.sigtimedwait([signum], 0)
10167db96d56Sopenharmony_ci            if info.si_signo != signum:
10177db96d56Sopenharmony_ci                raise Exception('info.si_signo != %s' % signum)
10187db96d56Sopenharmony_ci        ''')
10197db96d56Sopenharmony_ci
10207db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
10217db96d56Sopenharmony_ci                         'need signal.sigtimedwait()')
10227db96d56Sopenharmony_ci    def test_sigtimedwait_timeout(self):
10237db96d56Sopenharmony_ci        self.wait_helper(signal.SIGALRM, '''
10247db96d56Sopenharmony_ci        def test(signum):
10257db96d56Sopenharmony_ci            received = signal.sigtimedwait([signum], 1.0)
10267db96d56Sopenharmony_ci            if received is not None:
10277db96d56Sopenharmony_ci                raise Exception("received=%r" % (received,))
10287db96d56Sopenharmony_ci        ''')
10297db96d56Sopenharmony_ci
10307db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
10317db96d56Sopenharmony_ci                         'need signal.sigtimedwait()')
10327db96d56Sopenharmony_ci    def test_sigtimedwait_negative_timeout(self):
10337db96d56Sopenharmony_ci        signum = signal.SIGALRM
10347db96d56Sopenharmony_ci        self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0)
10357db96d56Sopenharmony_ci
10367db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'sigwait'),
10377db96d56Sopenharmony_ci                         'need signal.sigwait()')
10387db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
10397db96d56Sopenharmony_ci                         'need signal.pthread_sigmask()')
10407db96d56Sopenharmony_ci    @threading_helper.requires_working_threading()
10417db96d56Sopenharmony_ci    def test_sigwait_thread(self):
10427db96d56Sopenharmony_ci        # Check that calling sigwait() from a thread doesn't suspend the whole
10437db96d56Sopenharmony_ci        # process. A new interpreter is spawned to avoid problems when mixing
10447db96d56Sopenharmony_ci        # threads and fork(): only async-safe functions are allowed between
10457db96d56Sopenharmony_ci        # fork() and exec().
10467db96d56Sopenharmony_ci        assert_python_ok("-c", """if True:
10477db96d56Sopenharmony_ci            import os, threading, sys, time, signal
10487db96d56Sopenharmony_ci
10497db96d56Sopenharmony_ci            # the default handler terminates the process
10507db96d56Sopenharmony_ci            signum = signal.SIGUSR1
10517db96d56Sopenharmony_ci
10527db96d56Sopenharmony_ci            def kill_later():
10537db96d56Sopenharmony_ci                # wait until the main thread is waiting in sigwait()
10547db96d56Sopenharmony_ci                time.sleep(1)
10557db96d56Sopenharmony_ci                os.kill(os.getpid(), signum)
10567db96d56Sopenharmony_ci
10577db96d56Sopenharmony_ci            # the signal must be blocked by all the threads
10587db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
10597db96d56Sopenharmony_ci            killer = threading.Thread(target=kill_later)
10607db96d56Sopenharmony_ci            killer.start()
10617db96d56Sopenharmony_ci            received = signal.sigwait([signum])
10627db96d56Sopenharmony_ci            if received != signum:
10637db96d56Sopenharmony_ci                print("sigwait() received %s, not %s" % (received, signum),
10647db96d56Sopenharmony_ci                      file=sys.stderr)
10657db96d56Sopenharmony_ci                sys.exit(1)
10667db96d56Sopenharmony_ci            killer.join()
10677db96d56Sopenharmony_ci            # unblock the signal, which should have been cleared by sigwait()
10687db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
10697db96d56Sopenharmony_ci        """)
10707db96d56Sopenharmony_ci
10717db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
10727db96d56Sopenharmony_ci                         'need signal.pthread_sigmask()')
10737db96d56Sopenharmony_ci    def test_pthread_sigmask_arguments(self):
10747db96d56Sopenharmony_ci        self.assertRaises(TypeError, signal.pthread_sigmask)
10757db96d56Sopenharmony_ci        self.assertRaises(TypeError, signal.pthread_sigmask, 1)
10767db96d56Sopenharmony_ci        self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3)
10777db96d56Sopenharmony_ci        self.assertRaises(OSError, signal.pthread_sigmask, 1700, [])
10787db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
10797db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG])
10807db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
10817db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, [0])
10827db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
10837db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000])
10847db96d56Sopenharmony_ci
10857db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
10867db96d56Sopenharmony_ci                         'need signal.pthread_sigmask()')
10877db96d56Sopenharmony_ci    def test_pthread_sigmask_valid_signals(self):
10887db96d56Sopenharmony_ci        s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals())
10897db96d56Sopenharmony_ci        self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s)
10907db96d56Sopenharmony_ci        # Get current blocked set
10917db96d56Sopenharmony_ci        s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals())
10927db96d56Sopenharmony_ci        self.assertLessEqual(s, signal.valid_signals())
10937db96d56Sopenharmony_ci
10947db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
10957db96d56Sopenharmony_ci                         'need signal.pthread_sigmask()')
10967db96d56Sopenharmony_ci    @threading_helper.requires_working_threading()
10977db96d56Sopenharmony_ci    def test_pthread_sigmask(self):
10987db96d56Sopenharmony_ci        code = """if 1:
10997db96d56Sopenharmony_ci        import signal
11007db96d56Sopenharmony_ci        import os; import threading
11017db96d56Sopenharmony_ci
11027db96d56Sopenharmony_ci        def handler(signum, frame):
11037db96d56Sopenharmony_ci            1/0
11047db96d56Sopenharmony_ci
11057db96d56Sopenharmony_ci        def kill(signum):
11067db96d56Sopenharmony_ci            os.kill(os.getpid(), signum)
11077db96d56Sopenharmony_ci
11087db96d56Sopenharmony_ci        def check_mask(mask):
11097db96d56Sopenharmony_ci            for sig in mask:
11107db96d56Sopenharmony_ci                assert isinstance(sig, signal.Signals), repr(sig)
11117db96d56Sopenharmony_ci
11127db96d56Sopenharmony_ci        def read_sigmask():
11137db96d56Sopenharmony_ci            sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, [])
11147db96d56Sopenharmony_ci            check_mask(sigmask)
11157db96d56Sopenharmony_ci            return sigmask
11167db96d56Sopenharmony_ci
11177db96d56Sopenharmony_ci        signum = signal.SIGUSR1
11187db96d56Sopenharmony_ci
11197db96d56Sopenharmony_ci        # Install our signal handler
11207db96d56Sopenharmony_ci        old_handler = signal.signal(signum, handler)
11217db96d56Sopenharmony_ci
11227db96d56Sopenharmony_ci        # Unblock SIGUSR1 (and copy the old mask) to test our signal handler
11237db96d56Sopenharmony_ci        old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
11247db96d56Sopenharmony_ci        check_mask(old_mask)
11257db96d56Sopenharmony_ci        try:
11267db96d56Sopenharmony_ci            kill(signum)
11277db96d56Sopenharmony_ci        except ZeroDivisionError:
11287db96d56Sopenharmony_ci            pass
11297db96d56Sopenharmony_ci        else:
11307db96d56Sopenharmony_ci            raise Exception("ZeroDivisionError not raised")
11317db96d56Sopenharmony_ci
11327db96d56Sopenharmony_ci        # Block and then raise SIGUSR1. The signal is blocked: the signal
11337db96d56Sopenharmony_ci        # handler is not called, and the signal is now pending
11347db96d56Sopenharmony_ci        mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
11357db96d56Sopenharmony_ci        check_mask(mask)
11367db96d56Sopenharmony_ci        kill(signum)
11377db96d56Sopenharmony_ci
11387db96d56Sopenharmony_ci        # Check the new mask
11397db96d56Sopenharmony_ci        blocked = read_sigmask()
11407db96d56Sopenharmony_ci        check_mask(blocked)
11417db96d56Sopenharmony_ci        if signum not in blocked:
11427db96d56Sopenharmony_ci            raise Exception("%s not in %s" % (signum, blocked))
11437db96d56Sopenharmony_ci        if old_mask ^ blocked != {signum}:
11447db96d56Sopenharmony_ci            raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
11457db96d56Sopenharmony_ci
11467db96d56Sopenharmony_ci        # Unblock SIGUSR1
11477db96d56Sopenharmony_ci        try:
11487db96d56Sopenharmony_ci            # unblock the pending signal calls immediately the signal handler
11497db96d56Sopenharmony_ci            signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
11507db96d56Sopenharmony_ci        except ZeroDivisionError:
11517db96d56Sopenharmony_ci            pass
11527db96d56Sopenharmony_ci        else:
11537db96d56Sopenharmony_ci            raise Exception("ZeroDivisionError not raised")
11547db96d56Sopenharmony_ci        try:
11557db96d56Sopenharmony_ci            kill(signum)
11567db96d56Sopenharmony_ci        except ZeroDivisionError:
11577db96d56Sopenharmony_ci            pass
11587db96d56Sopenharmony_ci        else:
11597db96d56Sopenharmony_ci            raise Exception("ZeroDivisionError not raised")
11607db96d56Sopenharmony_ci
11617db96d56Sopenharmony_ci        # Check the new mask
11627db96d56Sopenharmony_ci        unblocked = read_sigmask()
11637db96d56Sopenharmony_ci        if signum in unblocked:
11647db96d56Sopenharmony_ci            raise Exception("%s in %s" % (signum, unblocked))
11657db96d56Sopenharmony_ci        if blocked ^ unblocked != {signum}:
11667db96d56Sopenharmony_ci            raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
11677db96d56Sopenharmony_ci        if old_mask != unblocked:
11687db96d56Sopenharmony_ci            raise Exception("%s != %s" % (old_mask, unblocked))
11697db96d56Sopenharmony_ci        """
11707db96d56Sopenharmony_ci        assert_python_ok('-c', code)
11717db96d56Sopenharmony_ci
11727db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
11737db96d56Sopenharmony_ci                         'need signal.pthread_kill()')
11747db96d56Sopenharmony_ci    @threading_helper.requires_working_threading()
11757db96d56Sopenharmony_ci    def test_pthread_kill_main_thread(self):
11767db96d56Sopenharmony_ci        # Test that a signal can be sent to the main thread with pthread_kill()
11777db96d56Sopenharmony_ci        # before any other thread has been created (see issue #12392).
11787db96d56Sopenharmony_ci        code = """if True:
11797db96d56Sopenharmony_ci            import threading
11807db96d56Sopenharmony_ci            import signal
11817db96d56Sopenharmony_ci            import sys
11827db96d56Sopenharmony_ci
11837db96d56Sopenharmony_ci            def handler(signum, frame):
11847db96d56Sopenharmony_ci                sys.exit(3)
11857db96d56Sopenharmony_ci
11867db96d56Sopenharmony_ci            signal.signal(signal.SIGUSR1, handler)
11877db96d56Sopenharmony_ci            signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
11887db96d56Sopenharmony_ci            sys.exit(2)
11897db96d56Sopenharmony_ci        """
11907db96d56Sopenharmony_ci
11917db96d56Sopenharmony_ci        with spawn_python('-c', code) as process:
11927db96d56Sopenharmony_ci            stdout, stderr = process.communicate()
11937db96d56Sopenharmony_ci            exitcode = process.wait()
11947db96d56Sopenharmony_ci            if exitcode != 3:
11957db96d56Sopenharmony_ci                raise Exception("Child error (exit code %s): %s" %
11967db96d56Sopenharmony_ci                                (exitcode, stdout))
11977db96d56Sopenharmony_ci
11987db96d56Sopenharmony_ci
11997db96d56Sopenharmony_ciclass StressTest(unittest.TestCase):
12007db96d56Sopenharmony_ci    """
12017db96d56Sopenharmony_ci    Stress signal delivery, especially when a signal arrives in
12027db96d56Sopenharmony_ci    the middle of recomputing the signal state or executing
12037db96d56Sopenharmony_ci    previously tripped signal handlers.
12047db96d56Sopenharmony_ci    """
12057db96d56Sopenharmony_ci
12067db96d56Sopenharmony_ci    def setsig(self, signum, handler):
12077db96d56Sopenharmony_ci        old_handler = signal.signal(signum, handler)
12087db96d56Sopenharmony_ci        self.addCleanup(signal.signal, signum, old_handler)
12097db96d56Sopenharmony_ci
12107db96d56Sopenharmony_ci    def measure_itimer_resolution(self):
12117db96d56Sopenharmony_ci        N = 20
12127db96d56Sopenharmony_ci        times = []
12137db96d56Sopenharmony_ci
12147db96d56Sopenharmony_ci        def handler(signum=None, frame=None):
12157db96d56Sopenharmony_ci            if len(times) < N:
12167db96d56Sopenharmony_ci                times.append(time.perf_counter())
12177db96d56Sopenharmony_ci                # 1 µs is the smallest possible timer interval,
12187db96d56Sopenharmony_ci                # we want to measure what the concrete duration
12197db96d56Sopenharmony_ci                # will be on this platform
12207db96d56Sopenharmony_ci                signal.setitimer(signal.ITIMER_REAL, 1e-6)
12217db96d56Sopenharmony_ci
12227db96d56Sopenharmony_ci        self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
12237db96d56Sopenharmony_ci        self.setsig(signal.SIGALRM, handler)
12247db96d56Sopenharmony_ci        handler()
12257db96d56Sopenharmony_ci        while len(times) < N:
12267db96d56Sopenharmony_ci            time.sleep(1e-3)
12277db96d56Sopenharmony_ci
12287db96d56Sopenharmony_ci        durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
12297db96d56Sopenharmony_ci        med = statistics.median(durations)
12307db96d56Sopenharmony_ci        if support.verbose:
12317db96d56Sopenharmony_ci            print("detected median itimer() resolution: %.6f s." % (med,))
12327db96d56Sopenharmony_ci        return med
12337db96d56Sopenharmony_ci
12347db96d56Sopenharmony_ci    def decide_itimer_count(self):
12357db96d56Sopenharmony_ci        # Some systems have poor setitimer() resolution (for example
12367db96d56Sopenharmony_ci        # measured around 20 ms. on FreeBSD 9), so decide on a reasonable
12377db96d56Sopenharmony_ci        # number of sequential timers based on that.
12387db96d56Sopenharmony_ci        reso = self.measure_itimer_resolution()
12397db96d56Sopenharmony_ci        if reso <= 1e-4:
12407db96d56Sopenharmony_ci            return 10000
12417db96d56Sopenharmony_ci        elif reso <= 1e-2:
12427db96d56Sopenharmony_ci            return 100
12437db96d56Sopenharmony_ci        else:
12447db96d56Sopenharmony_ci            self.skipTest("detected itimer resolution (%.3f s.) too high "
12457db96d56Sopenharmony_ci                          "(> 10 ms.) on this platform (or system too busy)"
12467db96d56Sopenharmony_ci                          % (reso,))
12477db96d56Sopenharmony_ci
12487db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, "setitimer"),
12497db96d56Sopenharmony_ci                         "test needs setitimer()")
12507db96d56Sopenharmony_ci    def test_stress_delivery_dependent(self):
12517db96d56Sopenharmony_ci        """
12527db96d56Sopenharmony_ci        This test uses dependent signal handlers.
12537db96d56Sopenharmony_ci        """
12547db96d56Sopenharmony_ci        N = self.decide_itimer_count()
12557db96d56Sopenharmony_ci        sigs = []
12567db96d56Sopenharmony_ci
12577db96d56Sopenharmony_ci        def first_handler(signum, frame):
12587db96d56Sopenharmony_ci            # 1e-6 is the minimum non-zero value for `setitimer()`.
12597db96d56Sopenharmony_ci            # Choose a random delay so as to improve chances of
12607db96d56Sopenharmony_ci            # triggering a race condition.  Ideally the signal is received
12617db96d56Sopenharmony_ci            # when inside critical signal-handling routines such as
12627db96d56Sopenharmony_ci            # Py_MakePendingCalls().
12637db96d56Sopenharmony_ci            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
12647db96d56Sopenharmony_ci
12657db96d56Sopenharmony_ci        def second_handler(signum=None, frame=None):
12667db96d56Sopenharmony_ci            sigs.append(signum)
12677db96d56Sopenharmony_ci
12687db96d56Sopenharmony_ci        # Here on Linux, SIGPROF > SIGALRM > SIGUSR1.  By using both
12697db96d56Sopenharmony_ci        # ascending and descending sequences (SIGUSR1 then SIGALRM,
12707db96d56Sopenharmony_ci        # SIGPROF then SIGALRM), we maximize chances of hitting a bug.
12717db96d56Sopenharmony_ci        self.setsig(signal.SIGPROF, first_handler)
12727db96d56Sopenharmony_ci        self.setsig(signal.SIGUSR1, first_handler)
12737db96d56Sopenharmony_ci        self.setsig(signal.SIGALRM, second_handler)  # for ITIMER_REAL
12747db96d56Sopenharmony_ci
12757db96d56Sopenharmony_ci        expected_sigs = 0
12767db96d56Sopenharmony_ci        deadline = time.monotonic() + support.SHORT_TIMEOUT
12777db96d56Sopenharmony_ci
12787db96d56Sopenharmony_ci        while expected_sigs < N:
12797db96d56Sopenharmony_ci            os.kill(os.getpid(), signal.SIGPROF)
12807db96d56Sopenharmony_ci            expected_sigs += 1
12817db96d56Sopenharmony_ci            # Wait for handlers to run to avoid signal coalescing
12827db96d56Sopenharmony_ci            while len(sigs) < expected_sigs and time.monotonic() < deadline:
12837db96d56Sopenharmony_ci                time.sleep(1e-5)
12847db96d56Sopenharmony_ci
12857db96d56Sopenharmony_ci            os.kill(os.getpid(), signal.SIGUSR1)
12867db96d56Sopenharmony_ci            expected_sigs += 1
12877db96d56Sopenharmony_ci            while len(sigs) < expected_sigs and time.monotonic() < deadline:
12887db96d56Sopenharmony_ci                time.sleep(1e-5)
12897db96d56Sopenharmony_ci
12907db96d56Sopenharmony_ci        # All ITIMER_REAL signals should have been delivered to the
12917db96d56Sopenharmony_ci        # Python handler
12927db96d56Sopenharmony_ci        self.assertEqual(len(sigs), N, "Some signals were lost")
12937db96d56Sopenharmony_ci
12947db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, "setitimer"),
12957db96d56Sopenharmony_ci                         "test needs setitimer()")
12967db96d56Sopenharmony_ci    def test_stress_delivery_simultaneous(self):
12977db96d56Sopenharmony_ci        """
12987db96d56Sopenharmony_ci        This test uses simultaneous signal handlers.
12997db96d56Sopenharmony_ci        """
13007db96d56Sopenharmony_ci        N = self.decide_itimer_count()
13017db96d56Sopenharmony_ci        sigs = []
13027db96d56Sopenharmony_ci
13037db96d56Sopenharmony_ci        def handler(signum, frame):
13047db96d56Sopenharmony_ci            sigs.append(signum)
13057db96d56Sopenharmony_ci
13067db96d56Sopenharmony_ci        self.setsig(signal.SIGUSR1, handler)
13077db96d56Sopenharmony_ci        self.setsig(signal.SIGALRM, handler)  # for ITIMER_REAL
13087db96d56Sopenharmony_ci
13097db96d56Sopenharmony_ci        expected_sigs = 0
13107db96d56Sopenharmony_ci        deadline = time.monotonic() + support.SHORT_TIMEOUT
13117db96d56Sopenharmony_ci
13127db96d56Sopenharmony_ci        while expected_sigs < N:
13137db96d56Sopenharmony_ci            # Hopefully the SIGALRM will be received somewhere during
13147db96d56Sopenharmony_ci            # initial processing of SIGUSR1.
13157db96d56Sopenharmony_ci            signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
13167db96d56Sopenharmony_ci            os.kill(os.getpid(), signal.SIGUSR1)
13177db96d56Sopenharmony_ci
13187db96d56Sopenharmony_ci            expected_sigs += 2
13197db96d56Sopenharmony_ci            # Wait for handlers to run to avoid signal coalescing
13207db96d56Sopenharmony_ci            while len(sigs) < expected_sigs and time.monotonic() < deadline:
13217db96d56Sopenharmony_ci                time.sleep(1e-5)
13227db96d56Sopenharmony_ci
13237db96d56Sopenharmony_ci        # All ITIMER_REAL signals should have been delivered to the
13247db96d56Sopenharmony_ci        # Python handler
13257db96d56Sopenharmony_ci        self.assertEqual(len(sigs), N, "Some signals were lost")
13267db96d56Sopenharmony_ci
13277db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(signal, "SIGUSR1"),
13287db96d56Sopenharmony_ci                         "test needs SIGUSR1")
13297db96d56Sopenharmony_ci    @threading_helper.requires_working_threading()
13307db96d56Sopenharmony_ci    def test_stress_modifying_handlers(self):
13317db96d56Sopenharmony_ci        # bpo-43406: race condition between trip_signal() and signal.signal
13327db96d56Sopenharmony_ci        signum = signal.SIGUSR1
13337db96d56Sopenharmony_ci        num_sent_signals = 0
13347db96d56Sopenharmony_ci        num_received_signals = 0
13357db96d56Sopenharmony_ci        do_stop = False
13367db96d56Sopenharmony_ci
13377db96d56Sopenharmony_ci        def custom_handler(signum, frame):
13387db96d56Sopenharmony_ci            nonlocal num_received_signals
13397db96d56Sopenharmony_ci            num_received_signals += 1
13407db96d56Sopenharmony_ci
13417db96d56Sopenharmony_ci        def set_interrupts():
13427db96d56Sopenharmony_ci            nonlocal num_sent_signals
13437db96d56Sopenharmony_ci            while not do_stop:
13447db96d56Sopenharmony_ci                signal.raise_signal(signum)
13457db96d56Sopenharmony_ci                num_sent_signals += 1
13467db96d56Sopenharmony_ci
13477db96d56Sopenharmony_ci        def cycle_handlers():
13487db96d56Sopenharmony_ci            while num_sent_signals < 100:
13497db96d56Sopenharmony_ci                for i in range(20000):
13507db96d56Sopenharmony_ci                    # Cycle between a Python-defined and a non-Python handler
13517db96d56Sopenharmony_ci                    for handler in [custom_handler, signal.SIG_IGN]:
13527db96d56Sopenharmony_ci                        signal.signal(signum, handler)
13537db96d56Sopenharmony_ci
13547db96d56Sopenharmony_ci        old_handler = signal.signal(signum, custom_handler)
13557db96d56Sopenharmony_ci        self.addCleanup(signal.signal, signum, old_handler)
13567db96d56Sopenharmony_ci
13577db96d56Sopenharmony_ci        t = threading.Thread(target=set_interrupts)
13587db96d56Sopenharmony_ci        try:
13597db96d56Sopenharmony_ci            ignored = False
13607db96d56Sopenharmony_ci            with support.catch_unraisable_exception() as cm:
13617db96d56Sopenharmony_ci                t.start()
13627db96d56Sopenharmony_ci                cycle_handlers()
13637db96d56Sopenharmony_ci                do_stop = True
13647db96d56Sopenharmony_ci                t.join()
13657db96d56Sopenharmony_ci
13667db96d56Sopenharmony_ci                if cm.unraisable is not None:
13677db96d56Sopenharmony_ci                    # An unraisable exception may be printed out when
13687db96d56Sopenharmony_ci                    # a signal is ignored due to the aforementioned
13697db96d56Sopenharmony_ci                    # race condition, check it.
13707db96d56Sopenharmony_ci                    self.assertIsInstance(cm.unraisable.exc_value, OSError)
13717db96d56Sopenharmony_ci                    self.assertIn(
13727db96d56Sopenharmony_ci                        f"Signal {signum:d} ignored due to race condition",
13737db96d56Sopenharmony_ci                        str(cm.unraisable.exc_value))
13747db96d56Sopenharmony_ci                    ignored = True
13757db96d56Sopenharmony_ci
13767db96d56Sopenharmony_ci            # bpo-43406: Even if it is unlikely, it's technically possible that
13777db96d56Sopenharmony_ci            # all signals were ignored because of race conditions.
13787db96d56Sopenharmony_ci            if not ignored:
13797db96d56Sopenharmony_ci                # Sanity check that some signals were received, but not all
13807db96d56Sopenharmony_ci                self.assertGreater(num_received_signals, 0)
13817db96d56Sopenharmony_ci            self.assertLess(num_received_signals, num_sent_signals)
13827db96d56Sopenharmony_ci        finally:
13837db96d56Sopenharmony_ci            do_stop = True
13847db96d56Sopenharmony_ci            t.join()
13857db96d56Sopenharmony_ci
13867db96d56Sopenharmony_ci
13877db96d56Sopenharmony_ciclass RaiseSignalTest(unittest.TestCase):
13887db96d56Sopenharmony_ci
13897db96d56Sopenharmony_ci    def test_sigint(self):
13907db96d56Sopenharmony_ci        with self.assertRaises(KeyboardInterrupt):
13917db96d56Sopenharmony_ci            signal.raise_signal(signal.SIGINT)
13927db96d56Sopenharmony_ci
13937db96d56Sopenharmony_ci    @unittest.skipIf(sys.platform != "win32", "Windows specific test")
13947db96d56Sopenharmony_ci    def test_invalid_argument(self):
13957db96d56Sopenharmony_ci        try:
13967db96d56Sopenharmony_ci            SIGHUP = 1 # not supported on win32
13977db96d56Sopenharmony_ci            signal.raise_signal(SIGHUP)
13987db96d56Sopenharmony_ci            self.fail("OSError (Invalid argument) expected")
13997db96d56Sopenharmony_ci        except OSError as e:
14007db96d56Sopenharmony_ci            if e.errno == errno.EINVAL:
14017db96d56Sopenharmony_ci                pass
14027db96d56Sopenharmony_ci            else:
14037db96d56Sopenharmony_ci                raise
14047db96d56Sopenharmony_ci
14057db96d56Sopenharmony_ci    def test_handler(self):
14067db96d56Sopenharmony_ci        is_ok = False
14077db96d56Sopenharmony_ci        def handler(a, b):
14087db96d56Sopenharmony_ci            nonlocal is_ok
14097db96d56Sopenharmony_ci            is_ok = True
14107db96d56Sopenharmony_ci        old_signal = signal.signal(signal.SIGINT, handler)
14117db96d56Sopenharmony_ci        self.addCleanup(signal.signal, signal.SIGINT, old_signal)
14127db96d56Sopenharmony_ci
14137db96d56Sopenharmony_ci        signal.raise_signal(signal.SIGINT)
14147db96d56Sopenharmony_ci        self.assertTrue(is_ok)
14157db96d56Sopenharmony_ci
14167db96d56Sopenharmony_ci    def test__thread_interrupt_main(self):
14177db96d56Sopenharmony_ci        # See https://github.com/python/cpython/issues/102397
14187db96d56Sopenharmony_ci        code = """if 1:
14197db96d56Sopenharmony_ci        import _thread
14207db96d56Sopenharmony_ci        class Foo():
14217db96d56Sopenharmony_ci            def __del__(self):
14227db96d56Sopenharmony_ci                _thread.interrupt_main()
14237db96d56Sopenharmony_ci
14247db96d56Sopenharmony_ci        x = Foo()
14257db96d56Sopenharmony_ci        """
14267db96d56Sopenharmony_ci
14277db96d56Sopenharmony_ci        rc, out, err = assert_python_ok('-c', code)
14287db96d56Sopenharmony_ci        self.assertIn(b'OSError: Signal 2 ignored due to race condition', err)
14297db96d56Sopenharmony_ci
14307db96d56Sopenharmony_ci
14317db96d56Sopenharmony_ci
14327db96d56Sopenharmony_ciclass PidfdSignalTest(unittest.TestCase):
14337db96d56Sopenharmony_ci
14347db96d56Sopenharmony_ci    @unittest.skipUnless(
14357db96d56Sopenharmony_ci        hasattr(signal, "pidfd_send_signal"),
14367db96d56Sopenharmony_ci        "pidfd support not built in",
14377db96d56Sopenharmony_ci    )
14387db96d56Sopenharmony_ci    def test_pidfd_send_signal(self):
14397db96d56Sopenharmony_ci        with self.assertRaises(OSError) as cm:
14407db96d56Sopenharmony_ci            signal.pidfd_send_signal(0, signal.SIGINT)
14417db96d56Sopenharmony_ci        if cm.exception.errno == errno.ENOSYS:
14427db96d56Sopenharmony_ci            self.skipTest("kernel does not support pidfds")
14437db96d56Sopenharmony_ci        elif cm.exception.errno == errno.EPERM:
14447db96d56Sopenharmony_ci            self.skipTest("Not enough privileges to use pidfs")
14457db96d56Sopenharmony_ci        self.assertEqual(cm.exception.errno, errno.EBADF)
14467db96d56Sopenharmony_ci        my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY)
14477db96d56Sopenharmony_ci        self.addCleanup(os.close, my_pidfd)
14487db96d56Sopenharmony_ci        with self.assertRaisesRegex(TypeError, "^siginfo must be None$"):
14497db96d56Sopenharmony_ci            signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0)
14507db96d56Sopenharmony_ci        with self.assertRaises(KeyboardInterrupt):
14517db96d56Sopenharmony_ci            signal.pidfd_send_signal(my_pidfd, signal.SIGINT)
14527db96d56Sopenharmony_ci
14537db96d56Sopenharmony_cidef tearDownModule():
14547db96d56Sopenharmony_ci    support.reap_children()
14557db96d56Sopenharmony_ci
14567db96d56Sopenharmony_ciif __name__ == "__main__":
14577db96d56Sopenharmony_ci    unittest.main()
1458