17db96d56Sopenharmony_ciimport enum 27db96d56Sopenharmony_ciimport errno 37db96d56Sopenharmony_ciimport inspect 47db96d56Sopenharmony_ciimport os 57db96d56Sopenharmony_ciimport random 67db96d56Sopenharmony_ciimport signal 77db96d56Sopenharmony_ciimport socket 87db96d56Sopenharmony_ciimport statistics 97db96d56Sopenharmony_ciimport subprocess 107db96d56Sopenharmony_ciimport sys 117db96d56Sopenharmony_ciimport threading 127db96d56Sopenharmony_ciimport time 137db96d56Sopenharmony_ciimport unittest 147db96d56Sopenharmony_cifrom test import support 157db96d56Sopenharmony_cifrom test.support import os_helper 167db96d56Sopenharmony_cifrom test.support.script_helper import assert_python_ok, spawn_python 177db96d56Sopenharmony_cifrom test.support import threading_helper 187db96d56Sopenharmony_citry: 197db96d56Sopenharmony_ci import _testcapi 207db96d56Sopenharmony_ciexcept ImportError: 217db96d56Sopenharmony_ci _testcapi = None 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ciclass GenericTests(unittest.TestCase): 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_ci def test_enums(self): 277db96d56Sopenharmony_ci for name in dir(signal): 287db96d56Sopenharmony_ci sig = getattr(signal, name) 297db96d56Sopenharmony_ci if name in {'SIG_DFL', 'SIG_IGN'}: 307db96d56Sopenharmony_ci self.assertIsInstance(sig, signal.Handlers) 317db96d56Sopenharmony_ci elif name in {'SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'}: 327db96d56Sopenharmony_ci self.assertIsInstance(sig, signal.Sigmasks) 337db96d56Sopenharmony_ci elif name.startswith('SIG') and not name.startswith('SIG_'): 347db96d56Sopenharmony_ci self.assertIsInstance(sig, signal.Signals) 357db96d56Sopenharmony_ci elif name.startswith('CTRL_'): 367db96d56Sopenharmony_ci self.assertIsInstance(sig, signal.Signals) 377db96d56Sopenharmony_ci self.assertEqual(sys.platform, "win32") 387db96d56Sopenharmony_ci 397db96d56Sopenharmony_ci CheckedSignals = enum._old_convert_( 407db96d56Sopenharmony_ci enum.IntEnum, 'Signals', 'signal', 417db96d56Sopenharmony_ci lambda name: 427db96d56Sopenharmony_ci name.isupper() 437db96d56Sopenharmony_ci and (name.startswith('SIG') and not name.startswith('SIG_')) 447db96d56Sopenharmony_ci or name.startswith('CTRL_'), 457db96d56Sopenharmony_ci source=signal, 467db96d56Sopenharmony_ci ) 477db96d56Sopenharmony_ci enum._test_simple_enum(CheckedSignals, signal.Signals) 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci CheckedHandlers = enum._old_convert_( 507db96d56Sopenharmony_ci enum.IntEnum, 'Handlers', 'signal', 517db96d56Sopenharmony_ci lambda name: name in ('SIG_DFL', 'SIG_IGN'), 527db96d56Sopenharmony_ci source=signal, 537db96d56Sopenharmony_ci ) 547db96d56Sopenharmony_ci enum._test_simple_enum(CheckedHandlers, signal.Handlers) 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci Sigmasks = getattr(signal, 'Sigmasks', None) 577db96d56Sopenharmony_ci if Sigmasks is not None: 587db96d56Sopenharmony_ci CheckedSigmasks = enum._old_convert_( 597db96d56Sopenharmony_ci enum.IntEnum, 'Sigmasks', 'signal', 607db96d56Sopenharmony_ci lambda name: name in ('SIG_BLOCK', 'SIG_UNBLOCK', 'SIG_SETMASK'), 617db96d56Sopenharmony_ci source=signal, 627db96d56Sopenharmony_ci ) 637db96d56Sopenharmony_ci enum._test_simple_enum(CheckedSigmasks, Sigmasks) 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ci def test_functions_module_attr(self): 667db96d56Sopenharmony_ci # Issue #27718: If __all__ is not defined all non-builtin functions 677db96d56Sopenharmony_ci # should have correct __module__ to be displayed by pydoc. 687db96d56Sopenharmony_ci for name in dir(signal): 697db96d56Sopenharmony_ci value = getattr(signal, name) 707db96d56Sopenharmony_ci if inspect.isroutine(value) and not inspect.isbuiltin(value): 717db96d56Sopenharmony_ci self.assertEqual(value.__module__, 'signal') 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ci 747db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 757db96d56Sopenharmony_ciclass PosixTests(unittest.TestCase): 767db96d56Sopenharmony_ci def trivial_signal_handler(self, *args): 777db96d56Sopenharmony_ci pass 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci def test_out_of_range_signal_number_raises_error(self): 807db96d56Sopenharmony_ci self.assertRaises(ValueError, signal.getsignal, 4242) 817db96d56Sopenharmony_ci 827db96d56Sopenharmony_ci self.assertRaises(ValueError, signal.signal, 4242, 837db96d56Sopenharmony_ci self.trivial_signal_handler) 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci self.assertRaises(ValueError, signal.strsignal, 4242) 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci def test_setting_signal_handler_to_none_raises_error(self): 887db96d56Sopenharmony_ci self.assertRaises(TypeError, signal.signal, 897db96d56Sopenharmony_ci signal.SIGUSR1, None) 907db96d56Sopenharmony_ci 917db96d56Sopenharmony_ci def test_getsignal(self): 927db96d56Sopenharmony_ci hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler) 937db96d56Sopenharmony_ci self.assertIsInstance(hup, signal.Handlers) 947db96d56Sopenharmony_ci self.assertEqual(signal.getsignal(signal.SIGHUP), 957db96d56Sopenharmony_ci self.trivial_signal_handler) 967db96d56Sopenharmony_ci signal.signal(signal.SIGHUP, hup) 977db96d56Sopenharmony_ci self.assertEqual(signal.getsignal(signal.SIGHUP), hup) 987db96d56Sopenharmony_ci 997db96d56Sopenharmony_ci def test_strsignal(self): 1007db96d56Sopenharmony_ci self.assertIn("Interrupt", signal.strsignal(signal.SIGINT)) 1017db96d56Sopenharmony_ci self.assertIn("Terminated", signal.strsignal(signal.SIGTERM)) 1027db96d56Sopenharmony_ci self.assertIn("Hangup", signal.strsignal(signal.SIGHUP)) 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ci # Issue 3864, unknown if this affects earlier versions of freebsd also 1057db96d56Sopenharmony_ci def test_interprocess_signal(self): 1067db96d56Sopenharmony_ci dirname = os.path.dirname(__file__) 1077db96d56Sopenharmony_ci script = os.path.join(dirname, 'signalinterproctester.py') 1087db96d56Sopenharmony_ci assert_python_ok(script) 1097db96d56Sopenharmony_ci 1107db96d56Sopenharmony_ci @unittest.skipUnless( 1117db96d56Sopenharmony_ci hasattr(signal, "valid_signals"), 1127db96d56Sopenharmony_ci "requires signal.valid_signals" 1137db96d56Sopenharmony_ci ) 1147db96d56Sopenharmony_ci def test_valid_signals(self): 1157db96d56Sopenharmony_ci s = signal.valid_signals() 1167db96d56Sopenharmony_ci self.assertIsInstance(s, set) 1177db96d56Sopenharmony_ci self.assertIn(signal.Signals.SIGINT, s) 1187db96d56Sopenharmony_ci self.assertIn(signal.Signals.SIGALRM, s) 1197db96d56Sopenharmony_ci self.assertNotIn(0, s) 1207db96d56Sopenharmony_ci self.assertNotIn(signal.NSIG, s) 1217db96d56Sopenharmony_ci self.assertLess(len(s), signal.NSIG) 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci # gh-91145: Make sure that all SIGxxx constants exposed by the Python 1247db96d56Sopenharmony_ci # signal module have a number in the [0; signal.NSIG-1] range. 1257db96d56Sopenharmony_ci for name in dir(signal): 1267db96d56Sopenharmony_ci if not name.startswith("SIG"): 1277db96d56Sopenharmony_ci continue 1287db96d56Sopenharmony_ci if name in {"SIG_IGN", "SIG_DFL"}: 1297db96d56Sopenharmony_ci # SIG_IGN and SIG_DFL are pointers 1307db96d56Sopenharmony_ci continue 1317db96d56Sopenharmony_ci with self.subTest(name=name): 1327db96d56Sopenharmony_ci signum = getattr(signal, name) 1337db96d56Sopenharmony_ci self.assertGreaterEqual(signum, 0) 1347db96d56Sopenharmony_ci self.assertLess(signum, signal.NSIG) 1357db96d56Sopenharmony_ci 1367db96d56Sopenharmony_ci @unittest.skipUnless(sys.executable, "sys.executable required.") 1377db96d56Sopenharmony_ci @support.requires_subprocess() 1387db96d56Sopenharmony_ci def test_keyboard_interrupt_exit_code(self): 1397db96d56Sopenharmony_ci """KeyboardInterrupt triggers exit via SIGINT.""" 1407db96d56Sopenharmony_ci process = subprocess.run( 1417db96d56Sopenharmony_ci [sys.executable, "-c", 1427db96d56Sopenharmony_ci "import os, signal, time\n" 1437db96d56Sopenharmony_ci "os.kill(os.getpid(), signal.SIGINT)\n" 1447db96d56Sopenharmony_ci "for _ in range(999): time.sleep(0.01)"], 1457db96d56Sopenharmony_ci stderr=subprocess.PIPE) 1467db96d56Sopenharmony_ci self.assertIn(b"KeyboardInterrupt", process.stderr) 1477db96d56Sopenharmony_ci self.assertEqual(process.returncode, -signal.SIGINT) 1487db96d56Sopenharmony_ci # Caveat: The exit code is insufficient to guarantee we actually died 1497db96d56Sopenharmony_ci # via a signal. POSIX shells do more than look at the 8 bit value. 1507db96d56Sopenharmony_ci # Writing an automation friendly test of an interactive shell 1517db96d56Sopenharmony_ci # to confirm that our process died via a SIGINT proved too complex. 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci@unittest.skipUnless(sys.platform == "win32", "Windows specific") 1557db96d56Sopenharmony_ciclass WindowsSignalTests(unittest.TestCase): 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci def test_valid_signals(self): 1587db96d56Sopenharmony_ci s = signal.valid_signals() 1597db96d56Sopenharmony_ci self.assertIsInstance(s, set) 1607db96d56Sopenharmony_ci self.assertGreaterEqual(len(s), 6) 1617db96d56Sopenharmony_ci self.assertIn(signal.Signals.SIGINT, s) 1627db96d56Sopenharmony_ci self.assertNotIn(0, s) 1637db96d56Sopenharmony_ci self.assertNotIn(signal.NSIG, s) 1647db96d56Sopenharmony_ci self.assertLess(len(s), signal.NSIG) 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci def test_issue9324(self): 1677db96d56Sopenharmony_ci # Updated for issue #10003, adding SIGBREAK 1687db96d56Sopenharmony_ci handler = lambda x, y: None 1697db96d56Sopenharmony_ci checked = set() 1707db96d56Sopenharmony_ci for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE, 1717db96d56Sopenharmony_ci signal.SIGILL, signal.SIGINT, signal.SIGSEGV, 1727db96d56Sopenharmony_ci signal.SIGTERM): 1737db96d56Sopenharmony_ci # Set and then reset a handler for signals that work on windows. 1747db96d56Sopenharmony_ci # Issue #18396, only for signals without a C-level handler. 1757db96d56Sopenharmony_ci if signal.getsignal(sig) is not None: 1767db96d56Sopenharmony_ci signal.signal(sig, signal.signal(sig, handler)) 1777db96d56Sopenharmony_ci checked.add(sig) 1787db96d56Sopenharmony_ci # Issue #18396: Ensure the above loop at least tested *something* 1797db96d56Sopenharmony_ci self.assertTrue(checked) 1807db96d56Sopenharmony_ci 1817db96d56Sopenharmony_ci with self.assertRaises(ValueError): 1827db96d56Sopenharmony_ci signal.signal(-1, handler) 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ci with self.assertRaises(ValueError): 1857db96d56Sopenharmony_ci signal.signal(7, handler) 1867db96d56Sopenharmony_ci 1877db96d56Sopenharmony_ci @unittest.skipUnless(sys.executable, "sys.executable required.") 1887db96d56Sopenharmony_ci @support.requires_subprocess() 1897db96d56Sopenharmony_ci def test_keyboard_interrupt_exit_code(self): 1907db96d56Sopenharmony_ci """KeyboardInterrupt triggers an exit using STATUS_CONTROL_C_EXIT.""" 1917db96d56Sopenharmony_ci # We don't test via os.kill(os.getpid(), signal.CTRL_C_EVENT) here 1927db96d56Sopenharmony_ci # as that requires setting up a console control handler in a child 1937db96d56Sopenharmony_ci # in its own process group. Doable, but quite complicated. (see 1947db96d56Sopenharmony_ci # @eryksun on https://github.com/python/cpython/pull/11862) 1957db96d56Sopenharmony_ci process = subprocess.run( 1967db96d56Sopenharmony_ci [sys.executable, "-c", "raise KeyboardInterrupt"], 1977db96d56Sopenharmony_ci stderr=subprocess.PIPE) 1987db96d56Sopenharmony_ci self.assertIn(b"KeyboardInterrupt", process.stderr) 1997db96d56Sopenharmony_ci STATUS_CONTROL_C_EXIT = 0xC000013A 2007db96d56Sopenharmony_ci self.assertEqual(process.returncode, STATUS_CONTROL_C_EXIT) 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_ciclass WakeupFDTests(unittest.TestCase): 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci def test_invalid_call(self): 2067db96d56Sopenharmony_ci # First parameter is positional-only 2077db96d56Sopenharmony_ci with self.assertRaises(TypeError): 2087db96d56Sopenharmony_ci signal.set_wakeup_fd(signum=signal.SIGINT) 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ci # warn_on_full_buffer is a keyword-only parameter 2117db96d56Sopenharmony_ci with self.assertRaises(TypeError): 2127db96d56Sopenharmony_ci signal.set_wakeup_fd(signal.SIGINT, False) 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ci def test_invalid_fd(self): 2157db96d56Sopenharmony_ci fd = os_helper.make_bad_fd() 2167db96d56Sopenharmony_ci self.assertRaises((ValueError, OSError), 2177db96d56Sopenharmony_ci signal.set_wakeup_fd, fd) 2187db96d56Sopenharmony_ci 2197db96d56Sopenharmony_ci @unittest.skipUnless(support.has_socket_support, "needs working sockets.") 2207db96d56Sopenharmony_ci def test_invalid_socket(self): 2217db96d56Sopenharmony_ci sock = socket.socket() 2227db96d56Sopenharmony_ci fd = sock.fileno() 2237db96d56Sopenharmony_ci sock.close() 2247db96d56Sopenharmony_ci self.assertRaises((ValueError, OSError), 2257db96d56Sopenharmony_ci signal.set_wakeup_fd, fd) 2267db96d56Sopenharmony_ci 2277db96d56Sopenharmony_ci # Emscripten does not support fstat on pipes yet. 2287db96d56Sopenharmony_ci # https://github.com/emscripten-core/emscripten/issues/16414 2297db96d56Sopenharmony_ci @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 2307db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 2317db96d56Sopenharmony_ci def test_set_wakeup_fd_result(self): 2327db96d56Sopenharmony_ci r1, w1 = os.pipe() 2337db96d56Sopenharmony_ci self.addCleanup(os.close, r1) 2347db96d56Sopenharmony_ci self.addCleanup(os.close, w1) 2357db96d56Sopenharmony_ci r2, w2 = os.pipe() 2367db96d56Sopenharmony_ci self.addCleanup(os.close, r2) 2377db96d56Sopenharmony_ci self.addCleanup(os.close, w2) 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci if hasattr(os, 'set_blocking'): 2407db96d56Sopenharmony_ci os.set_blocking(w1, False) 2417db96d56Sopenharmony_ci os.set_blocking(w2, False) 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci signal.set_wakeup_fd(w1) 2447db96d56Sopenharmony_ci self.assertEqual(signal.set_wakeup_fd(w2), w1) 2457db96d56Sopenharmony_ci self.assertEqual(signal.set_wakeup_fd(-1), w2) 2467db96d56Sopenharmony_ci self.assertEqual(signal.set_wakeup_fd(-1), -1) 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 2497db96d56Sopenharmony_ci @unittest.skipUnless(support.has_socket_support, "needs working sockets.") 2507db96d56Sopenharmony_ci def test_set_wakeup_fd_socket_result(self): 2517db96d56Sopenharmony_ci sock1 = socket.socket() 2527db96d56Sopenharmony_ci self.addCleanup(sock1.close) 2537db96d56Sopenharmony_ci sock1.setblocking(False) 2547db96d56Sopenharmony_ci fd1 = sock1.fileno() 2557db96d56Sopenharmony_ci 2567db96d56Sopenharmony_ci sock2 = socket.socket() 2577db96d56Sopenharmony_ci self.addCleanup(sock2.close) 2587db96d56Sopenharmony_ci sock2.setblocking(False) 2597db96d56Sopenharmony_ci fd2 = sock2.fileno() 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci signal.set_wakeup_fd(fd1) 2627db96d56Sopenharmony_ci self.assertEqual(signal.set_wakeup_fd(fd2), fd1) 2637db96d56Sopenharmony_ci self.assertEqual(signal.set_wakeup_fd(-1), fd2) 2647db96d56Sopenharmony_ci self.assertEqual(signal.set_wakeup_fd(-1), -1) 2657db96d56Sopenharmony_ci 2667db96d56Sopenharmony_ci # On Windows, files are always blocking and Windows does not provide a 2677db96d56Sopenharmony_ci # function to test if a socket is in non-blocking mode. 2687db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "win32", "tests specific to POSIX") 2697db96d56Sopenharmony_ci @unittest.skipIf(support.is_emscripten, "Emscripten cannot fstat pipes.") 2707db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 2717db96d56Sopenharmony_ci def test_set_wakeup_fd_blocking(self): 2727db96d56Sopenharmony_ci rfd, wfd = os.pipe() 2737db96d56Sopenharmony_ci self.addCleanup(os.close, rfd) 2747db96d56Sopenharmony_ci self.addCleanup(os.close, wfd) 2757db96d56Sopenharmony_ci 2767db96d56Sopenharmony_ci # fd must be non-blocking 2777db96d56Sopenharmony_ci os.set_blocking(wfd, True) 2787db96d56Sopenharmony_ci with self.assertRaises(ValueError) as cm: 2797db96d56Sopenharmony_ci signal.set_wakeup_fd(wfd) 2807db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), 2817db96d56Sopenharmony_ci "the fd %s must be in non-blocking mode" % wfd) 2827db96d56Sopenharmony_ci 2837db96d56Sopenharmony_ci # non-blocking is ok 2847db96d56Sopenharmony_ci os.set_blocking(wfd, False) 2857db96d56Sopenharmony_ci signal.set_wakeup_fd(wfd) 2867db96d56Sopenharmony_ci signal.set_wakeup_fd(-1) 2877db96d56Sopenharmony_ci 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 2907db96d56Sopenharmony_ciclass WakeupSignalTests(unittest.TestCase): 2917db96d56Sopenharmony_ci @unittest.skipIf(_testcapi is None, 'need _testcapi') 2927db96d56Sopenharmony_ci def check_wakeup(self, test_body, *signals, ordered=True): 2937db96d56Sopenharmony_ci # use a subprocess to have only one thread 2947db96d56Sopenharmony_ci code = """if 1: 2957db96d56Sopenharmony_ci import _testcapi 2967db96d56Sopenharmony_ci import os 2977db96d56Sopenharmony_ci import signal 2987db96d56Sopenharmony_ci import struct 2997db96d56Sopenharmony_ci 3007db96d56Sopenharmony_ci signals = {!r} 3017db96d56Sopenharmony_ci 3027db96d56Sopenharmony_ci def handler(signum, frame): 3037db96d56Sopenharmony_ci pass 3047db96d56Sopenharmony_ci 3057db96d56Sopenharmony_ci def check_signum(signals): 3067db96d56Sopenharmony_ci data = os.read(read, len(signals)+1) 3077db96d56Sopenharmony_ci raised = struct.unpack('%uB' % len(data), data) 3087db96d56Sopenharmony_ci if not {!r}: 3097db96d56Sopenharmony_ci raised = set(raised) 3107db96d56Sopenharmony_ci signals = set(signals) 3117db96d56Sopenharmony_ci if raised != signals: 3127db96d56Sopenharmony_ci raise Exception("%r != %r" % (raised, signals)) 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_ci {} 3157db96d56Sopenharmony_ci 3167db96d56Sopenharmony_ci signal.signal(signal.SIGALRM, handler) 3177db96d56Sopenharmony_ci read, write = os.pipe() 3187db96d56Sopenharmony_ci os.set_blocking(write, False) 3197db96d56Sopenharmony_ci signal.set_wakeup_fd(write) 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci test() 3227db96d56Sopenharmony_ci check_signum(signals) 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci os.close(read) 3257db96d56Sopenharmony_ci os.close(write) 3267db96d56Sopenharmony_ci """.format(tuple(map(int, signals)), ordered, test_body) 3277db96d56Sopenharmony_ci 3287db96d56Sopenharmony_ci assert_python_ok('-c', code) 3297db96d56Sopenharmony_ci 3307db96d56Sopenharmony_ci @unittest.skipIf(_testcapi is None, 'need _testcapi') 3317db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 3327db96d56Sopenharmony_ci def test_wakeup_write_error(self): 3337db96d56Sopenharmony_ci # Issue #16105: write() errors in the C signal handler should not 3347db96d56Sopenharmony_ci # pass silently. 3357db96d56Sopenharmony_ci # Use a subprocess to have only one thread. 3367db96d56Sopenharmony_ci code = """if 1: 3377db96d56Sopenharmony_ci import _testcapi 3387db96d56Sopenharmony_ci import errno 3397db96d56Sopenharmony_ci import os 3407db96d56Sopenharmony_ci import signal 3417db96d56Sopenharmony_ci import sys 3427db96d56Sopenharmony_ci from test.support import captured_stderr 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci def handler(signum, frame): 3457db96d56Sopenharmony_ci 1/0 3467db96d56Sopenharmony_ci 3477db96d56Sopenharmony_ci signal.signal(signal.SIGALRM, handler) 3487db96d56Sopenharmony_ci r, w = os.pipe() 3497db96d56Sopenharmony_ci os.set_blocking(r, False) 3507db96d56Sopenharmony_ci 3517db96d56Sopenharmony_ci # Set wakeup_fd a read-only file descriptor to trigger the error 3527db96d56Sopenharmony_ci signal.set_wakeup_fd(r) 3537db96d56Sopenharmony_ci try: 3547db96d56Sopenharmony_ci with captured_stderr() as err: 3557db96d56Sopenharmony_ci signal.raise_signal(signal.SIGALRM) 3567db96d56Sopenharmony_ci except ZeroDivisionError: 3577db96d56Sopenharmony_ci # An ignored exception should have been printed out on stderr 3587db96d56Sopenharmony_ci err = err.getvalue() 3597db96d56Sopenharmony_ci if ('Exception ignored when trying to write to the signal wakeup fd' 3607db96d56Sopenharmony_ci not in err): 3617db96d56Sopenharmony_ci raise AssertionError(err) 3627db96d56Sopenharmony_ci if ('OSError: [Errno %d]' % errno.EBADF) not in err: 3637db96d56Sopenharmony_ci raise AssertionError(err) 3647db96d56Sopenharmony_ci else: 3657db96d56Sopenharmony_ci raise AssertionError("ZeroDivisionError not raised") 3667db96d56Sopenharmony_ci 3677db96d56Sopenharmony_ci os.close(r) 3687db96d56Sopenharmony_ci os.close(w) 3697db96d56Sopenharmony_ci """ 3707db96d56Sopenharmony_ci r, w = os.pipe() 3717db96d56Sopenharmony_ci try: 3727db96d56Sopenharmony_ci os.write(r, b'x') 3737db96d56Sopenharmony_ci except OSError: 3747db96d56Sopenharmony_ci pass 3757db96d56Sopenharmony_ci else: 3767db96d56Sopenharmony_ci self.skipTest("OS doesn't report write() error on the read end of a pipe") 3777db96d56Sopenharmony_ci finally: 3787db96d56Sopenharmony_ci os.close(r) 3797db96d56Sopenharmony_ci os.close(w) 3807db96d56Sopenharmony_ci 3817db96d56Sopenharmony_ci assert_python_ok('-c', code) 3827db96d56Sopenharmony_ci 3837db96d56Sopenharmony_ci def test_wakeup_fd_early(self): 3847db96d56Sopenharmony_ci self.check_wakeup("""def test(): 3857db96d56Sopenharmony_ci import select 3867db96d56Sopenharmony_ci import time 3877db96d56Sopenharmony_ci 3887db96d56Sopenharmony_ci TIMEOUT_FULL = 10 3897db96d56Sopenharmony_ci TIMEOUT_HALF = 5 3907db96d56Sopenharmony_ci 3917db96d56Sopenharmony_ci class InterruptSelect(Exception): 3927db96d56Sopenharmony_ci pass 3937db96d56Sopenharmony_ci 3947db96d56Sopenharmony_ci def handler(signum, frame): 3957db96d56Sopenharmony_ci raise InterruptSelect 3967db96d56Sopenharmony_ci signal.signal(signal.SIGALRM, handler) 3977db96d56Sopenharmony_ci 3987db96d56Sopenharmony_ci signal.alarm(1) 3997db96d56Sopenharmony_ci 4007db96d56Sopenharmony_ci # We attempt to get a signal during the sleep, 4017db96d56Sopenharmony_ci # before select is called 4027db96d56Sopenharmony_ci try: 4037db96d56Sopenharmony_ci select.select([], [], [], TIMEOUT_FULL) 4047db96d56Sopenharmony_ci except InterruptSelect: 4057db96d56Sopenharmony_ci pass 4067db96d56Sopenharmony_ci else: 4077db96d56Sopenharmony_ci raise Exception("select() was not interrupted") 4087db96d56Sopenharmony_ci 4097db96d56Sopenharmony_ci before_time = time.monotonic() 4107db96d56Sopenharmony_ci select.select([read], [], [], TIMEOUT_FULL) 4117db96d56Sopenharmony_ci after_time = time.monotonic() 4127db96d56Sopenharmony_ci dt = after_time - before_time 4137db96d56Sopenharmony_ci if dt >= TIMEOUT_HALF: 4147db96d56Sopenharmony_ci raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 4157db96d56Sopenharmony_ci """, signal.SIGALRM) 4167db96d56Sopenharmony_ci 4177db96d56Sopenharmony_ci def test_wakeup_fd_during(self): 4187db96d56Sopenharmony_ci self.check_wakeup("""def test(): 4197db96d56Sopenharmony_ci import select 4207db96d56Sopenharmony_ci import time 4217db96d56Sopenharmony_ci 4227db96d56Sopenharmony_ci TIMEOUT_FULL = 10 4237db96d56Sopenharmony_ci TIMEOUT_HALF = 5 4247db96d56Sopenharmony_ci 4257db96d56Sopenharmony_ci class InterruptSelect(Exception): 4267db96d56Sopenharmony_ci pass 4277db96d56Sopenharmony_ci 4287db96d56Sopenharmony_ci def handler(signum, frame): 4297db96d56Sopenharmony_ci raise InterruptSelect 4307db96d56Sopenharmony_ci signal.signal(signal.SIGALRM, handler) 4317db96d56Sopenharmony_ci 4327db96d56Sopenharmony_ci signal.alarm(1) 4337db96d56Sopenharmony_ci before_time = time.monotonic() 4347db96d56Sopenharmony_ci # We attempt to get a signal during the select call 4357db96d56Sopenharmony_ci try: 4367db96d56Sopenharmony_ci select.select([read], [], [], TIMEOUT_FULL) 4377db96d56Sopenharmony_ci except InterruptSelect: 4387db96d56Sopenharmony_ci pass 4397db96d56Sopenharmony_ci else: 4407db96d56Sopenharmony_ci raise Exception("select() was not interrupted") 4417db96d56Sopenharmony_ci after_time = time.monotonic() 4427db96d56Sopenharmony_ci dt = after_time - before_time 4437db96d56Sopenharmony_ci if dt >= TIMEOUT_HALF: 4447db96d56Sopenharmony_ci raise Exception("%s >= %s" % (dt, TIMEOUT_HALF)) 4457db96d56Sopenharmony_ci """, signal.SIGALRM) 4467db96d56Sopenharmony_ci 4477db96d56Sopenharmony_ci def test_signum(self): 4487db96d56Sopenharmony_ci self.check_wakeup("""def test(): 4497db96d56Sopenharmony_ci signal.signal(signal.SIGUSR1, handler) 4507db96d56Sopenharmony_ci signal.raise_signal(signal.SIGUSR1) 4517db96d56Sopenharmony_ci signal.raise_signal(signal.SIGALRM) 4527db96d56Sopenharmony_ci """, signal.SIGUSR1, signal.SIGALRM) 4537db96d56Sopenharmony_ci 4547db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 4557db96d56Sopenharmony_ci 'need signal.pthread_sigmask()') 4567db96d56Sopenharmony_ci def test_pending(self): 4577db96d56Sopenharmony_ci self.check_wakeup("""def test(): 4587db96d56Sopenharmony_ci signum1 = signal.SIGUSR1 4597db96d56Sopenharmony_ci signum2 = signal.SIGUSR2 4607db96d56Sopenharmony_ci 4617db96d56Sopenharmony_ci signal.signal(signum1, handler) 4627db96d56Sopenharmony_ci signal.signal(signum2, handler) 4637db96d56Sopenharmony_ci 4647db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2)) 4657db96d56Sopenharmony_ci signal.raise_signal(signum1) 4667db96d56Sopenharmony_ci signal.raise_signal(signum2) 4677db96d56Sopenharmony_ci # Unblocking the 2 signals calls the C signal handler twice 4687db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2)) 4697db96d56Sopenharmony_ci """, signal.SIGUSR1, signal.SIGUSR2, ordered=False) 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci 4727db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(socket, 'socketpair'), 'need socket.socketpair') 4737db96d56Sopenharmony_ciclass WakeupSocketSignalTests(unittest.TestCase): 4747db96d56Sopenharmony_ci 4757db96d56Sopenharmony_ci @unittest.skipIf(_testcapi is None, 'need _testcapi') 4767db96d56Sopenharmony_ci def test_socket(self): 4777db96d56Sopenharmony_ci # use a subprocess to have only one thread 4787db96d56Sopenharmony_ci code = """if 1: 4797db96d56Sopenharmony_ci import signal 4807db96d56Sopenharmony_ci import socket 4817db96d56Sopenharmony_ci import struct 4827db96d56Sopenharmony_ci import _testcapi 4837db96d56Sopenharmony_ci 4847db96d56Sopenharmony_ci signum = signal.SIGINT 4857db96d56Sopenharmony_ci signals = (signum,) 4867db96d56Sopenharmony_ci 4877db96d56Sopenharmony_ci def handler(signum, frame): 4887db96d56Sopenharmony_ci pass 4897db96d56Sopenharmony_ci 4907db96d56Sopenharmony_ci signal.signal(signum, handler) 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_ci read, write = socket.socketpair() 4937db96d56Sopenharmony_ci write.setblocking(False) 4947db96d56Sopenharmony_ci signal.set_wakeup_fd(write.fileno()) 4957db96d56Sopenharmony_ci 4967db96d56Sopenharmony_ci signal.raise_signal(signum) 4977db96d56Sopenharmony_ci 4987db96d56Sopenharmony_ci data = read.recv(1) 4997db96d56Sopenharmony_ci if not data: 5007db96d56Sopenharmony_ci raise Exception("no signum written") 5017db96d56Sopenharmony_ci raised = struct.unpack('B', data) 5027db96d56Sopenharmony_ci if raised != signals: 5037db96d56Sopenharmony_ci raise Exception("%r != %r" % (raised, signals)) 5047db96d56Sopenharmony_ci 5057db96d56Sopenharmony_ci read.close() 5067db96d56Sopenharmony_ci write.close() 5077db96d56Sopenharmony_ci """ 5087db96d56Sopenharmony_ci 5097db96d56Sopenharmony_ci assert_python_ok('-c', code) 5107db96d56Sopenharmony_ci 5117db96d56Sopenharmony_ci @unittest.skipIf(_testcapi is None, 'need _testcapi') 5127db96d56Sopenharmony_ci def test_send_error(self): 5137db96d56Sopenharmony_ci # Use a subprocess to have only one thread. 5147db96d56Sopenharmony_ci if os.name == 'nt': 5157db96d56Sopenharmony_ci action = 'send' 5167db96d56Sopenharmony_ci else: 5177db96d56Sopenharmony_ci action = 'write' 5187db96d56Sopenharmony_ci code = """if 1: 5197db96d56Sopenharmony_ci import errno 5207db96d56Sopenharmony_ci import signal 5217db96d56Sopenharmony_ci import socket 5227db96d56Sopenharmony_ci import sys 5237db96d56Sopenharmony_ci import time 5247db96d56Sopenharmony_ci import _testcapi 5257db96d56Sopenharmony_ci from test.support import captured_stderr 5267db96d56Sopenharmony_ci 5277db96d56Sopenharmony_ci signum = signal.SIGINT 5287db96d56Sopenharmony_ci 5297db96d56Sopenharmony_ci def handler(signum, frame): 5307db96d56Sopenharmony_ci pass 5317db96d56Sopenharmony_ci 5327db96d56Sopenharmony_ci signal.signal(signum, handler) 5337db96d56Sopenharmony_ci 5347db96d56Sopenharmony_ci read, write = socket.socketpair() 5357db96d56Sopenharmony_ci read.setblocking(False) 5367db96d56Sopenharmony_ci write.setblocking(False) 5377db96d56Sopenharmony_ci 5387db96d56Sopenharmony_ci signal.set_wakeup_fd(write.fileno()) 5397db96d56Sopenharmony_ci 5407db96d56Sopenharmony_ci # Close sockets: send() will fail 5417db96d56Sopenharmony_ci read.close() 5427db96d56Sopenharmony_ci write.close() 5437db96d56Sopenharmony_ci 5447db96d56Sopenharmony_ci with captured_stderr() as err: 5457db96d56Sopenharmony_ci signal.raise_signal(signum) 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci err = err.getvalue() 5487db96d56Sopenharmony_ci if ('Exception ignored when trying to {action} to the signal wakeup fd' 5497db96d56Sopenharmony_ci not in err): 5507db96d56Sopenharmony_ci raise AssertionError(err) 5517db96d56Sopenharmony_ci """.format(action=action) 5527db96d56Sopenharmony_ci assert_python_ok('-c', code) 5537db96d56Sopenharmony_ci 5547db96d56Sopenharmony_ci @unittest.skipIf(_testcapi is None, 'need _testcapi') 5557db96d56Sopenharmony_ci def test_warn_on_full_buffer(self): 5567db96d56Sopenharmony_ci # Use a subprocess to have only one thread. 5577db96d56Sopenharmony_ci if os.name == 'nt': 5587db96d56Sopenharmony_ci action = 'send' 5597db96d56Sopenharmony_ci else: 5607db96d56Sopenharmony_ci action = 'write' 5617db96d56Sopenharmony_ci code = """if 1: 5627db96d56Sopenharmony_ci import errno 5637db96d56Sopenharmony_ci import signal 5647db96d56Sopenharmony_ci import socket 5657db96d56Sopenharmony_ci import sys 5667db96d56Sopenharmony_ci import time 5677db96d56Sopenharmony_ci import _testcapi 5687db96d56Sopenharmony_ci from test.support import captured_stderr 5697db96d56Sopenharmony_ci 5707db96d56Sopenharmony_ci signum = signal.SIGINT 5717db96d56Sopenharmony_ci 5727db96d56Sopenharmony_ci # This handler will be called, but we intentionally won't read from 5737db96d56Sopenharmony_ci # the wakeup fd. 5747db96d56Sopenharmony_ci def handler(signum, frame): 5757db96d56Sopenharmony_ci pass 5767db96d56Sopenharmony_ci 5777db96d56Sopenharmony_ci signal.signal(signum, handler) 5787db96d56Sopenharmony_ci 5797db96d56Sopenharmony_ci read, write = socket.socketpair() 5807db96d56Sopenharmony_ci 5817db96d56Sopenharmony_ci # Fill the socketpair buffer 5827db96d56Sopenharmony_ci if sys.platform == 'win32': 5837db96d56Sopenharmony_ci # bpo-34130: On Windows, sometimes non-blocking send fails to fill 5847db96d56Sopenharmony_ci # the full socketpair buffer, so use a timeout of 50 ms instead. 5857db96d56Sopenharmony_ci write.settimeout(0.050) 5867db96d56Sopenharmony_ci else: 5877db96d56Sopenharmony_ci write.setblocking(False) 5887db96d56Sopenharmony_ci 5897db96d56Sopenharmony_ci written = 0 5907db96d56Sopenharmony_ci if sys.platform == "vxworks": 5917db96d56Sopenharmony_ci CHUNK_SIZES = (1,) 5927db96d56Sopenharmony_ci else: 5937db96d56Sopenharmony_ci # Start with large chunk size to reduce the 5947db96d56Sopenharmony_ci # number of send needed to fill the buffer. 5957db96d56Sopenharmony_ci CHUNK_SIZES = (2 ** 16, 2 ** 8, 1) 5967db96d56Sopenharmony_ci for chunk_size in CHUNK_SIZES: 5977db96d56Sopenharmony_ci chunk = b"x" * chunk_size 5987db96d56Sopenharmony_ci try: 5997db96d56Sopenharmony_ci while True: 6007db96d56Sopenharmony_ci write.send(chunk) 6017db96d56Sopenharmony_ci written += chunk_size 6027db96d56Sopenharmony_ci except (BlockingIOError, TimeoutError): 6037db96d56Sopenharmony_ci pass 6047db96d56Sopenharmony_ci 6057db96d56Sopenharmony_ci print(f"%s bytes written into the socketpair" % written, flush=True) 6067db96d56Sopenharmony_ci 6077db96d56Sopenharmony_ci write.setblocking(False) 6087db96d56Sopenharmony_ci try: 6097db96d56Sopenharmony_ci write.send(b"x") 6107db96d56Sopenharmony_ci except BlockingIOError: 6117db96d56Sopenharmony_ci # The socketpair buffer seems full 6127db96d56Sopenharmony_ci pass 6137db96d56Sopenharmony_ci else: 6147db96d56Sopenharmony_ci raise AssertionError("%s bytes failed to fill the socketpair " 6157db96d56Sopenharmony_ci "buffer" % written) 6167db96d56Sopenharmony_ci 6177db96d56Sopenharmony_ci # By default, we get a warning when a signal arrives 6187db96d56Sopenharmony_ci msg = ('Exception ignored when trying to {action} ' 6197db96d56Sopenharmony_ci 'to the signal wakeup fd') 6207db96d56Sopenharmony_ci signal.set_wakeup_fd(write.fileno()) 6217db96d56Sopenharmony_ci 6227db96d56Sopenharmony_ci with captured_stderr() as err: 6237db96d56Sopenharmony_ci signal.raise_signal(signum) 6247db96d56Sopenharmony_ci 6257db96d56Sopenharmony_ci err = err.getvalue() 6267db96d56Sopenharmony_ci if msg not in err: 6277db96d56Sopenharmony_ci raise AssertionError("first set_wakeup_fd() test failed, " 6287db96d56Sopenharmony_ci "stderr: %r" % err) 6297db96d56Sopenharmony_ci 6307db96d56Sopenharmony_ci # And also if warn_on_full_buffer=True 6317db96d56Sopenharmony_ci signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=True) 6327db96d56Sopenharmony_ci 6337db96d56Sopenharmony_ci with captured_stderr() as err: 6347db96d56Sopenharmony_ci signal.raise_signal(signum) 6357db96d56Sopenharmony_ci 6367db96d56Sopenharmony_ci err = err.getvalue() 6377db96d56Sopenharmony_ci if msg not in err: 6387db96d56Sopenharmony_ci raise AssertionError("set_wakeup_fd(warn_on_full_buffer=True) " 6397db96d56Sopenharmony_ci "test failed, stderr: %r" % err) 6407db96d56Sopenharmony_ci 6417db96d56Sopenharmony_ci # But not if warn_on_full_buffer=False 6427db96d56Sopenharmony_ci signal.set_wakeup_fd(write.fileno(), warn_on_full_buffer=False) 6437db96d56Sopenharmony_ci 6447db96d56Sopenharmony_ci with captured_stderr() as err: 6457db96d56Sopenharmony_ci signal.raise_signal(signum) 6467db96d56Sopenharmony_ci 6477db96d56Sopenharmony_ci err = err.getvalue() 6487db96d56Sopenharmony_ci if err != "": 6497db96d56Sopenharmony_ci raise AssertionError("set_wakeup_fd(warn_on_full_buffer=False) " 6507db96d56Sopenharmony_ci "test failed, stderr: %r" % err) 6517db96d56Sopenharmony_ci 6527db96d56Sopenharmony_ci # And then check the default again, to make sure warn_on_full_buffer 6537db96d56Sopenharmony_ci # settings don't leak across calls. 6547db96d56Sopenharmony_ci signal.set_wakeup_fd(write.fileno()) 6557db96d56Sopenharmony_ci 6567db96d56Sopenharmony_ci with captured_stderr() as err: 6577db96d56Sopenharmony_ci signal.raise_signal(signum) 6587db96d56Sopenharmony_ci 6597db96d56Sopenharmony_ci err = err.getvalue() 6607db96d56Sopenharmony_ci if msg not in err: 6617db96d56Sopenharmony_ci raise AssertionError("second set_wakeup_fd() test failed, " 6627db96d56Sopenharmony_ci "stderr: %r" % err) 6637db96d56Sopenharmony_ci 6647db96d56Sopenharmony_ci """.format(action=action) 6657db96d56Sopenharmony_ci assert_python_ok('-c', code) 6667db96d56Sopenharmony_ci 6677db96d56Sopenharmony_ci 6687db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 6697db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(signal, 'siginterrupt'), "needs signal.siginterrupt()") 6707db96d56Sopenharmony_ci@support.requires_subprocess() 6717db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 6727db96d56Sopenharmony_ciclass SiginterruptTest(unittest.TestCase): 6737db96d56Sopenharmony_ci 6747db96d56Sopenharmony_ci def readpipe_interrupted(self, interrupt): 6757db96d56Sopenharmony_ci """Perform a read during which a signal will arrive. Return True if the 6767db96d56Sopenharmony_ci read is interrupted by the signal and raises an exception. Return False 6777db96d56Sopenharmony_ci if it returns normally. 6787db96d56Sopenharmony_ci """ 6797db96d56Sopenharmony_ci # use a subprocess to have only one thread, to have a timeout on the 6807db96d56Sopenharmony_ci # blocking read and to not touch signal handling in this process 6817db96d56Sopenharmony_ci code = """if 1: 6827db96d56Sopenharmony_ci import errno 6837db96d56Sopenharmony_ci import os 6847db96d56Sopenharmony_ci import signal 6857db96d56Sopenharmony_ci import sys 6867db96d56Sopenharmony_ci 6877db96d56Sopenharmony_ci interrupt = %r 6887db96d56Sopenharmony_ci r, w = os.pipe() 6897db96d56Sopenharmony_ci 6907db96d56Sopenharmony_ci def handler(signum, frame): 6917db96d56Sopenharmony_ci 1 / 0 6927db96d56Sopenharmony_ci 6937db96d56Sopenharmony_ci signal.signal(signal.SIGALRM, handler) 6947db96d56Sopenharmony_ci if interrupt is not None: 6957db96d56Sopenharmony_ci signal.siginterrupt(signal.SIGALRM, interrupt) 6967db96d56Sopenharmony_ci 6977db96d56Sopenharmony_ci print("ready") 6987db96d56Sopenharmony_ci sys.stdout.flush() 6997db96d56Sopenharmony_ci 7007db96d56Sopenharmony_ci # run the test twice 7017db96d56Sopenharmony_ci try: 7027db96d56Sopenharmony_ci for loop in range(2): 7037db96d56Sopenharmony_ci # send a SIGALRM in a second (during the read) 7047db96d56Sopenharmony_ci signal.alarm(1) 7057db96d56Sopenharmony_ci try: 7067db96d56Sopenharmony_ci # blocking call: read from a pipe without data 7077db96d56Sopenharmony_ci os.read(r, 1) 7087db96d56Sopenharmony_ci except ZeroDivisionError: 7097db96d56Sopenharmony_ci pass 7107db96d56Sopenharmony_ci else: 7117db96d56Sopenharmony_ci sys.exit(2) 7127db96d56Sopenharmony_ci sys.exit(3) 7137db96d56Sopenharmony_ci finally: 7147db96d56Sopenharmony_ci os.close(r) 7157db96d56Sopenharmony_ci os.close(w) 7167db96d56Sopenharmony_ci """ % (interrupt,) 7177db96d56Sopenharmony_ci with spawn_python('-c', code) as process: 7187db96d56Sopenharmony_ci try: 7197db96d56Sopenharmony_ci # wait until the child process is loaded and has started 7207db96d56Sopenharmony_ci first_line = process.stdout.readline() 7217db96d56Sopenharmony_ci 7227db96d56Sopenharmony_ci stdout, stderr = process.communicate(timeout=support.SHORT_TIMEOUT) 7237db96d56Sopenharmony_ci except subprocess.TimeoutExpired: 7247db96d56Sopenharmony_ci process.kill() 7257db96d56Sopenharmony_ci return False 7267db96d56Sopenharmony_ci else: 7277db96d56Sopenharmony_ci stdout = first_line + stdout 7287db96d56Sopenharmony_ci exitcode = process.wait() 7297db96d56Sopenharmony_ci if exitcode not in (2, 3): 7307db96d56Sopenharmony_ci raise Exception("Child error (exit code %s): %r" 7317db96d56Sopenharmony_ci % (exitcode, stdout)) 7327db96d56Sopenharmony_ci return (exitcode == 3) 7337db96d56Sopenharmony_ci 7347db96d56Sopenharmony_ci def test_without_siginterrupt(self): 7357db96d56Sopenharmony_ci # If a signal handler is installed and siginterrupt is not called 7367db96d56Sopenharmony_ci # at all, when that signal arrives, it interrupts a syscall that's in 7377db96d56Sopenharmony_ci # progress. 7387db96d56Sopenharmony_ci interrupted = self.readpipe_interrupted(None) 7397db96d56Sopenharmony_ci self.assertTrue(interrupted) 7407db96d56Sopenharmony_ci 7417db96d56Sopenharmony_ci def test_siginterrupt_on(self): 7427db96d56Sopenharmony_ci # If a signal handler is installed and siginterrupt is called with 7437db96d56Sopenharmony_ci # a true value for the second argument, when that signal arrives, it 7447db96d56Sopenharmony_ci # interrupts a syscall that's in progress. 7457db96d56Sopenharmony_ci interrupted = self.readpipe_interrupted(True) 7467db96d56Sopenharmony_ci self.assertTrue(interrupted) 7477db96d56Sopenharmony_ci 7487db96d56Sopenharmony_ci def test_siginterrupt_off(self): 7497db96d56Sopenharmony_ci # If a signal handler is installed and siginterrupt is called with 7507db96d56Sopenharmony_ci # a false value for the second argument, when that signal arrives, it 7517db96d56Sopenharmony_ci # does not interrupt a syscall that's in progress. 7527db96d56Sopenharmony_ci interrupted = self.readpipe_interrupted(False) 7537db96d56Sopenharmony_ci self.assertFalse(interrupted) 7547db96d56Sopenharmony_ci 7557db96d56Sopenharmony_ci 7567db96d56Sopenharmony_ci@unittest.skipIf(sys.platform == "win32", "Not valid on Windows") 7577db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(signal, 'getitimer') and hasattr(signal, 'setitimer'), 7587db96d56Sopenharmony_ci "needs signal.getitimer() and signal.setitimer()") 7597db96d56Sopenharmony_ciclass ItimerTest(unittest.TestCase): 7607db96d56Sopenharmony_ci def setUp(self): 7617db96d56Sopenharmony_ci self.hndl_called = False 7627db96d56Sopenharmony_ci self.hndl_count = 0 7637db96d56Sopenharmony_ci self.itimer = None 7647db96d56Sopenharmony_ci self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm) 7657db96d56Sopenharmony_ci 7667db96d56Sopenharmony_ci def tearDown(self): 7677db96d56Sopenharmony_ci signal.signal(signal.SIGALRM, self.old_alarm) 7687db96d56Sopenharmony_ci if self.itimer is not None: # test_itimer_exc doesn't change this attr 7697db96d56Sopenharmony_ci # just ensure that itimer is stopped 7707db96d56Sopenharmony_ci signal.setitimer(self.itimer, 0) 7717db96d56Sopenharmony_ci 7727db96d56Sopenharmony_ci def sig_alrm(self, *args): 7737db96d56Sopenharmony_ci self.hndl_called = True 7747db96d56Sopenharmony_ci 7757db96d56Sopenharmony_ci def sig_vtalrm(self, *args): 7767db96d56Sopenharmony_ci self.hndl_called = True 7777db96d56Sopenharmony_ci 7787db96d56Sopenharmony_ci if self.hndl_count > 3: 7797db96d56Sopenharmony_ci # it shouldn't be here, because it should have been disabled. 7807db96d56Sopenharmony_ci raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL " 7817db96d56Sopenharmony_ci "timer.") 7827db96d56Sopenharmony_ci elif self.hndl_count == 3: 7837db96d56Sopenharmony_ci # disable ITIMER_VIRTUAL, this function shouldn't be called anymore 7847db96d56Sopenharmony_ci signal.setitimer(signal.ITIMER_VIRTUAL, 0) 7857db96d56Sopenharmony_ci 7867db96d56Sopenharmony_ci self.hndl_count += 1 7877db96d56Sopenharmony_ci 7887db96d56Sopenharmony_ci def sig_prof(self, *args): 7897db96d56Sopenharmony_ci self.hndl_called = True 7907db96d56Sopenharmony_ci signal.setitimer(signal.ITIMER_PROF, 0) 7917db96d56Sopenharmony_ci 7927db96d56Sopenharmony_ci def test_itimer_exc(self): 7937db96d56Sopenharmony_ci # XXX I'm assuming -1 is an invalid itimer, but maybe some platform 7947db96d56Sopenharmony_ci # defines it ? 7957db96d56Sopenharmony_ci self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0) 7967db96d56Sopenharmony_ci # Negative times are treated as zero on some platforms. 7977db96d56Sopenharmony_ci if 0: 7987db96d56Sopenharmony_ci self.assertRaises(signal.ItimerError, 7997db96d56Sopenharmony_ci signal.setitimer, signal.ITIMER_REAL, -1) 8007db96d56Sopenharmony_ci 8017db96d56Sopenharmony_ci def test_itimer_real(self): 8027db96d56Sopenharmony_ci self.itimer = signal.ITIMER_REAL 8037db96d56Sopenharmony_ci signal.setitimer(self.itimer, 1.0) 8047db96d56Sopenharmony_ci signal.pause() 8057db96d56Sopenharmony_ci self.assertEqual(self.hndl_called, True) 8067db96d56Sopenharmony_ci 8077db96d56Sopenharmony_ci # Issue 3864, unknown if this affects earlier versions of freebsd also 8087db96d56Sopenharmony_ci @unittest.skipIf(sys.platform in ('netbsd5',), 8097db96d56Sopenharmony_ci 'itimer not reliable (does not mix well with threading) on some BSDs.') 8107db96d56Sopenharmony_ci def test_itimer_virtual(self): 8117db96d56Sopenharmony_ci self.itimer = signal.ITIMER_VIRTUAL 8127db96d56Sopenharmony_ci signal.signal(signal.SIGVTALRM, self.sig_vtalrm) 8137db96d56Sopenharmony_ci signal.setitimer(self.itimer, 0.3, 0.2) 8147db96d56Sopenharmony_ci 8157db96d56Sopenharmony_ci start_time = time.monotonic() 8167db96d56Sopenharmony_ci while time.monotonic() - start_time < 60.0: 8177db96d56Sopenharmony_ci # use up some virtual time by doing real work 8187db96d56Sopenharmony_ci _ = pow(12345, 67890, 10000019) 8197db96d56Sopenharmony_ci if signal.getitimer(self.itimer) == (0.0, 0.0): 8207db96d56Sopenharmony_ci break # sig_vtalrm handler stopped this itimer 8217db96d56Sopenharmony_ci else: # Issue 8424 8227db96d56Sopenharmony_ci self.skipTest("timeout: likely cause: machine too slow or load too " 8237db96d56Sopenharmony_ci "high") 8247db96d56Sopenharmony_ci 8257db96d56Sopenharmony_ci # virtual itimer should be (0.0, 0.0) now 8267db96d56Sopenharmony_ci self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 8277db96d56Sopenharmony_ci # and the handler should have been called 8287db96d56Sopenharmony_ci self.assertEqual(self.hndl_called, True) 8297db96d56Sopenharmony_ci 8307db96d56Sopenharmony_ci def test_itimer_prof(self): 8317db96d56Sopenharmony_ci self.itimer = signal.ITIMER_PROF 8327db96d56Sopenharmony_ci signal.signal(signal.SIGPROF, self.sig_prof) 8337db96d56Sopenharmony_ci signal.setitimer(self.itimer, 0.2, 0.2) 8347db96d56Sopenharmony_ci 8357db96d56Sopenharmony_ci start_time = time.monotonic() 8367db96d56Sopenharmony_ci while time.monotonic() - start_time < 60.0: 8377db96d56Sopenharmony_ci # do some work 8387db96d56Sopenharmony_ci _ = pow(12345, 67890, 10000019) 8397db96d56Sopenharmony_ci if signal.getitimer(self.itimer) == (0.0, 0.0): 8407db96d56Sopenharmony_ci break # sig_prof handler stopped this itimer 8417db96d56Sopenharmony_ci else: # Issue 8424 8427db96d56Sopenharmony_ci self.skipTest("timeout: likely cause: machine too slow or load too " 8437db96d56Sopenharmony_ci "high") 8447db96d56Sopenharmony_ci 8457db96d56Sopenharmony_ci # profiling itimer should be (0.0, 0.0) now 8467db96d56Sopenharmony_ci self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0)) 8477db96d56Sopenharmony_ci # and the handler should have been called 8487db96d56Sopenharmony_ci self.assertEqual(self.hndl_called, True) 8497db96d56Sopenharmony_ci 8507db96d56Sopenharmony_ci def test_setitimer_tiny(self): 8517db96d56Sopenharmony_ci # bpo-30807: C setitimer() takes a microsecond-resolution interval. 8527db96d56Sopenharmony_ci # Check that float -> timeval conversion doesn't round 8537db96d56Sopenharmony_ci # the interval down to zero, which would disable the timer. 8547db96d56Sopenharmony_ci self.itimer = signal.ITIMER_REAL 8557db96d56Sopenharmony_ci signal.setitimer(self.itimer, 1e-6) 8567db96d56Sopenharmony_ci time.sleep(1) 8577db96d56Sopenharmony_ci self.assertEqual(self.hndl_called, True) 8587db96d56Sopenharmony_ci 8597db96d56Sopenharmony_ci 8607db96d56Sopenharmony_ciclass PendingSignalsTests(unittest.TestCase): 8617db96d56Sopenharmony_ci """ 8627db96d56Sopenharmony_ci Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait() 8637db96d56Sopenharmony_ci functions. 8647db96d56Sopenharmony_ci """ 8657db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigpending'), 8667db96d56Sopenharmony_ci 'need signal.sigpending()') 8677db96d56Sopenharmony_ci def test_sigpending_empty(self): 8687db96d56Sopenharmony_ci self.assertEqual(signal.sigpending(), set()) 8697db96d56Sopenharmony_ci 8707db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 8717db96d56Sopenharmony_ci 'need signal.pthread_sigmask()') 8727db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigpending'), 8737db96d56Sopenharmony_ci 'need signal.sigpending()') 8747db96d56Sopenharmony_ci def test_sigpending(self): 8757db96d56Sopenharmony_ci code = """if 1: 8767db96d56Sopenharmony_ci import os 8777db96d56Sopenharmony_ci import signal 8787db96d56Sopenharmony_ci 8797db96d56Sopenharmony_ci def handler(signum, frame): 8807db96d56Sopenharmony_ci 1/0 8817db96d56Sopenharmony_ci 8827db96d56Sopenharmony_ci signum = signal.SIGUSR1 8837db96d56Sopenharmony_ci signal.signal(signum, handler) 8847db96d56Sopenharmony_ci 8857db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 8867db96d56Sopenharmony_ci os.kill(os.getpid(), signum) 8877db96d56Sopenharmony_ci pending = signal.sigpending() 8887db96d56Sopenharmony_ci for sig in pending: 8897db96d56Sopenharmony_ci assert isinstance(sig, signal.Signals), repr(pending) 8907db96d56Sopenharmony_ci if pending != {signum}: 8917db96d56Sopenharmony_ci raise Exception('%s != {%s}' % (pending, signum)) 8927db96d56Sopenharmony_ci try: 8937db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 8947db96d56Sopenharmony_ci except ZeroDivisionError: 8957db96d56Sopenharmony_ci pass 8967db96d56Sopenharmony_ci else: 8977db96d56Sopenharmony_ci raise Exception("ZeroDivisionError not raised") 8987db96d56Sopenharmony_ci """ 8997db96d56Sopenharmony_ci assert_python_ok('-c', code) 9007db96d56Sopenharmony_ci 9017db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 9027db96d56Sopenharmony_ci 'need signal.pthread_kill()') 9037db96d56Sopenharmony_ci @threading_helper.requires_working_threading() 9047db96d56Sopenharmony_ci def test_pthread_kill(self): 9057db96d56Sopenharmony_ci code = """if 1: 9067db96d56Sopenharmony_ci import signal 9077db96d56Sopenharmony_ci import threading 9087db96d56Sopenharmony_ci import sys 9097db96d56Sopenharmony_ci 9107db96d56Sopenharmony_ci signum = signal.SIGUSR1 9117db96d56Sopenharmony_ci 9127db96d56Sopenharmony_ci def handler(signum, frame): 9137db96d56Sopenharmony_ci 1/0 9147db96d56Sopenharmony_ci 9157db96d56Sopenharmony_ci signal.signal(signum, handler) 9167db96d56Sopenharmony_ci 9177db96d56Sopenharmony_ci tid = threading.get_ident() 9187db96d56Sopenharmony_ci try: 9197db96d56Sopenharmony_ci signal.pthread_kill(tid, signum) 9207db96d56Sopenharmony_ci except ZeroDivisionError: 9217db96d56Sopenharmony_ci pass 9227db96d56Sopenharmony_ci else: 9237db96d56Sopenharmony_ci raise Exception("ZeroDivisionError not raised") 9247db96d56Sopenharmony_ci """ 9257db96d56Sopenharmony_ci assert_python_ok('-c', code) 9267db96d56Sopenharmony_ci 9277db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 9287db96d56Sopenharmony_ci 'need signal.pthread_sigmask()') 9297db96d56Sopenharmony_ci def wait_helper(self, blocked, test): 9307db96d56Sopenharmony_ci """ 9317db96d56Sopenharmony_ci test: body of the "def test(signum):" function. 9327db96d56Sopenharmony_ci blocked: number of the blocked signal 9337db96d56Sopenharmony_ci """ 9347db96d56Sopenharmony_ci code = '''if 1: 9357db96d56Sopenharmony_ci import signal 9367db96d56Sopenharmony_ci import sys 9377db96d56Sopenharmony_ci from signal import Signals 9387db96d56Sopenharmony_ci 9397db96d56Sopenharmony_ci def handler(signum, frame): 9407db96d56Sopenharmony_ci 1/0 9417db96d56Sopenharmony_ci 9427db96d56Sopenharmony_ci %s 9437db96d56Sopenharmony_ci 9447db96d56Sopenharmony_ci blocked = %s 9457db96d56Sopenharmony_ci signum = signal.SIGALRM 9467db96d56Sopenharmony_ci 9477db96d56Sopenharmony_ci # child: block and wait the signal 9487db96d56Sopenharmony_ci try: 9497db96d56Sopenharmony_ci signal.signal(signum, handler) 9507db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, [blocked]) 9517db96d56Sopenharmony_ci 9527db96d56Sopenharmony_ci # Do the tests 9537db96d56Sopenharmony_ci test(signum) 9547db96d56Sopenharmony_ci 9557db96d56Sopenharmony_ci # The handler must not be called on unblock 9567db96d56Sopenharmony_ci try: 9577db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked]) 9587db96d56Sopenharmony_ci except ZeroDivisionError: 9597db96d56Sopenharmony_ci print("the signal handler has been called", 9607db96d56Sopenharmony_ci file=sys.stderr) 9617db96d56Sopenharmony_ci sys.exit(1) 9627db96d56Sopenharmony_ci except BaseException as err: 9637db96d56Sopenharmony_ci print("error: {}".format(err), file=sys.stderr) 9647db96d56Sopenharmony_ci sys.stderr.flush() 9657db96d56Sopenharmony_ci sys.exit(1) 9667db96d56Sopenharmony_ci ''' % (test.strip(), blocked) 9677db96d56Sopenharmony_ci 9687db96d56Sopenharmony_ci # sig*wait* must be called with the signal blocked: since the current 9697db96d56Sopenharmony_ci # process might have several threads running, use a subprocess to have 9707db96d56Sopenharmony_ci # a single thread. 9717db96d56Sopenharmony_ci assert_python_ok('-c', code) 9727db96d56Sopenharmony_ci 9737db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigwait'), 9747db96d56Sopenharmony_ci 'need signal.sigwait()') 9757db96d56Sopenharmony_ci def test_sigwait(self): 9767db96d56Sopenharmony_ci self.wait_helper(signal.SIGALRM, ''' 9777db96d56Sopenharmony_ci def test(signum): 9787db96d56Sopenharmony_ci signal.alarm(1) 9797db96d56Sopenharmony_ci received = signal.sigwait([signum]) 9807db96d56Sopenharmony_ci assert isinstance(received, signal.Signals), received 9817db96d56Sopenharmony_ci if received != signum: 9827db96d56Sopenharmony_ci raise Exception('received %s, not %s' % (received, signum)) 9837db96d56Sopenharmony_ci ''') 9847db96d56Sopenharmony_ci 9857db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'), 9867db96d56Sopenharmony_ci 'need signal.sigwaitinfo()') 9877db96d56Sopenharmony_ci def test_sigwaitinfo(self): 9887db96d56Sopenharmony_ci self.wait_helper(signal.SIGALRM, ''' 9897db96d56Sopenharmony_ci def test(signum): 9907db96d56Sopenharmony_ci signal.alarm(1) 9917db96d56Sopenharmony_ci info = signal.sigwaitinfo([signum]) 9927db96d56Sopenharmony_ci if info.si_signo != signum: 9937db96d56Sopenharmony_ci raise Exception("info.si_signo != %s" % signum) 9947db96d56Sopenharmony_ci ''') 9957db96d56Sopenharmony_ci 9967db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 9977db96d56Sopenharmony_ci 'need signal.sigtimedwait()') 9987db96d56Sopenharmony_ci def test_sigtimedwait(self): 9997db96d56Sopenharmony_ci self.wait_helper(signal.SIGALRM, ''' 10007db96d56Sopenharmony_ci def test(signum): 10017db96d56Sopenharmony_ci signal.alarm(1) 10027db96d56Sopenharmony_ci info = signal.sigtimedwait([signum], 10.1000) 10037db96d56Sopenharmony_ci if info.si_signo != signum: 10047db96d56Sopenharmony_ci raise Exception('info.si_signo != %s' % signum) 10057db96d56Sopenharmony_ci ''') 10067db96d56Sopenharmony_ci 10077db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 10087db96d56Sopenharmony_ci 'need signal.sigtimedwait()') 10097db96d56Sopenharmony_ci def test_sigtimedwait_poll(self): 10107db96d56Sopenharmony_ci # check that polling with sigtimedwait works 10117db96d56Sopenharmony_ci self.wait_helper(signal.SIGALRM, ''' 10127db96d56Sopenharmony_ci def test(signum): 10137db96d56Sopenharmony_ci import os 10147db96d56Sopenharmony_ci os.kill(os.getpid(), signum) 10157db96d56Sopenharmony_ci info = signal.sigtimedwait([signum], 0) 10167db96d56Sopenharmony_ci if info.si_signo != signum: 10177db96d56Sopenharmony_ci raise Exception('info.si_signo != %s' % signum) 10187db96d56Sopenharmony_ci ''') 10197db96d56Sopenharmony_ci 10207db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 10217db96d56Sopenharmony_ci 'need signal.sigtimedwait()') 10227db96d56Sopenharmony_ci def test_sigtimedwait_timeout(self): 10237db96d56Sopenharmony_ci self.wait_helper(signal.SIGALRM, ''' 10247db96d56Sopenharmony_ci def test(signum): 10257db96d56Sopenharmony_ci received = signal.sigtimedwait([signum], 1.0) 10267db96d56Sopenharmony_ci if received is not None: 10277db96d56Sopenharmony_ci raise Exception("received=%r" % (received,)) 10287db96d56Sopenharmony_ci ''') 10297db96d56Sopenharmony_ci 10307db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigtimedwait'), 10317db96d56Sopenharmony_ci 'need signal.sigtimedwait()') 10327db96d56Sopenharmony_ci def test_sigtimedwait_negative_timeout(self): 10337db96d56Sopenharmony_ci signum = signal.SIGALRM 10347db96d56Sopenharmony_ci self.assertRaises(ValueError, signal.sigtimedwait, [signum], -1.0) 10357db96d56Sopenharmony_ci 10367db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'sigwait'), 10377db96d56Sopenharmony_ci 'need signal.sigwait()') 10387db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 10397db96d56Sopenharmony_ci 'need signal.pthread_sigmask()') 10407db96d56Sopenharmony_ci @threading_helper.requires_working_threading() 10417db96d56Sopenharmony_ci def test_sigwait_thread(self): 10427db96d56Sopenharmony_ci # Check that calling sigwait() from a thread doesn't suspend the whole 10437db96d56Sopenharmony_ci # process. A new interpreter is spawned to avoid problems when mixing 10447db96d56Sopenharmony_ci # threads and fork(): only async-safe functions are allowed between 10457db96d56Sopenharmony_ci # fork() and exec(). 10467db96d56Sopenharmony_ci assert_python_ok("-c", """if True: 10477db96d56Sopenharmony_ci import os, threading, sys, time, signal 10487db96d56Sopenharmony_ci 10497db96d56Sopenharmony_ci # the default handler terminates the process 10507db96d56Sopenharmony_ci signum = signal.SIGUSR1 10517db96d56Sopenharmony_ci 10527db96d56Sopenharmony_ci def kill_later(): 10537db96d56Sopenharmony_ci # wait until the main thread is waiting in sigwait() 10547db96d56Sopenharmony_ci time.sleep(1) 10557db96d56Sopenharmony_ci os.kill(os.getpid(), signum) 10567db96d56Sopenharmony_ci 10577db96d56Sopenharmony_ci # the signal must be blocked by all the threads 10587db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 10597db96d56Sopenharmony_ci killer = threading.Thread(target=kill_later) 10607db96d56Sopenharmony_ci killer.start() 10617db96d56Sopenharmony_ci received = signal.sigwait([signum]) 10627db96d56Sopenharmony_ci if received != signum: 10637db96d56Sopenharmony_ci print("sigwait() received %s, not %s" % (received, signum), 10647db96d56Sopenharmony_ci file=sys.stderr) 10657db96d56Sopenharmony_ci sys.exit(1) 10667db96d56Sopenharmony_ci killer.join() 10677db96d56Sopenharmony_ci # unblock the signal, which should have been cleared by sigwait() 10687db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 10697db96d56Sopenharmony_ci """) 10707db96d56Sopenharmony_ci 10717db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 10727db96d56Sopenharmony_ci 'need signal.pthread_sigmask()') 10737db96d56Sopenharmony_ci def test_pthread_sigmask_arguments(self): 10747db96d56Sopenharmony_ci self.assertRaises(TypeError, signal.pthread_sigmask) 10757db96d56Sopenharmony_ci self.assertRaises(TypeError, signal.pthread_sigmask, 1) 10767db96d56Sopenharmony_ci self.assertRaises(TypeError, signal.pthread_sigmask, 1, 2, 3) 10777db96d56Sopenharmony_ci self.assertRaises(OSError, signal.pthread_sigmask, 1700, []) 10787db96d56Sopenharmony_ci with self.assertRaises(ValueError): 10797db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, [signal.NSIG]) 10807db96d56Sopenharmony_ci with self.assertRaises(ValueError): 10817db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, [0]) 10827db96d56Sopenharmony_ci with self.assertRaises(ValueError): 10837db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_BLOCK, [1<<1000]) 10847db96d56Sopenharmony_ci 10857db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 10867db96d56Sopenharmony_ci 'need signal.pthread_sigmask()') 10877db96d56Sopenharmony_ci def test_pthread_sigmask_valid_signals(self): 10887db96d56Sopenharmony_ci s = signal.pthread_sigmask(signal.SIG_BLOCK, signal.valid_signals()) 10897db96d56Sopenharmony_ci self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, s) 10907db96d56Sopenharmony_ci # Get current blocked set 10917db96d56Sopenharmony_ci s = signal.pthread_sigmask(signal.SIG_UNBLOCK, signal.valid_signals()) 10927db96d56Sopenharmony_ci self.assertLessEqual(s, signal.valid_signals()) 10937db96d56Sopenharmony_ci 10947db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'), 10957db96d56Sopenharmony_ci 'need signal.pthread_sigmask()') 10967db96d56Sopenharmony_ci @threading_helper.requires_working_threading() 10977db96d56Sopenharmony_ci def test_pthread_sigmask(self): 10987db96d56Sopenharmony_ci code = """if 1: 10997db96d56Sopenharmony_ci import signal 11007db96d56Sopenharmony_ci import os; import threading 11017db96d56Sopenharmony_ci 11027db96d56Sopenharmony_ci def handler(signum, frame): 11037db96d56Sopenharmony_ci 1/0 11047db96d56Sopenharmony_ci 11057db96d56Sopenharmony_ci def kill(signum): 11067db96d56Sopenharmony_ci os.kill(os.getpid(), signum) 11077db96d56Sopenharmony_ci 11087db96d56Sopenharmony_ci def check_mask(mask): 11097db96d56Sopenharmony_ci for sig in mask: 11107db96d56Sopenharmony_ci assert isinstance(sig, signal.Signals), repr(sig) 11117db96d56Sopenharmony_ci 11127db96d56Sopenharmony_ci def read_sigmask(): 11137db96d56Sopenharmony_ci sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, []) 11147db96d56Sopenharmony_ci check_mask(sigmask) 11157db96d56Sopenharmony_ci return sigmask 11167db96d56Sopenharmony_ci 11177db96d56Sopenharmony_ci signum = signal.SIGUSR1 11187db96d56Sopenharmony_ci 11197db96d56Sopenharmony_ci # Install our signal handler 11207db96d56Sopenharmony_ci old_handler = signal.signal(signum, handler) 11217db96d56Sopenharmony_ci 11227db96d56Sopenharmony_ci # Unblock SIGUSR1 (and copy the old mask) to test our signal handler 11237db96d56Sopenharmony_ci old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 11247db96d56Sopenharmony_ci check_mask(old_mask) 11257db96d56Sopenharmony_ci try: 11267db96d56Sopenharmony_ci kill(signum) 11277db96d56Sopenharmony_ci except ZeroDivisionError: 11287db96d56Sopenharmony_ci pass 11297db96d56Sopenharmony_ci else: 11307db96d56Sopenharmony_ci raise Exception("ZeroDivisionError not raised") 11317db96d56Sopenharmony_ci 11327db96d56Sopenharmony_ci # Block and then raise SIGUSR1. The signal is blocked: the signal 11337db96d56Sopenharmony_ci # handler is not called, and the signal is now pending 11347db96d56Sopenharmony_ci mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum]) 11357db96d56Sopenharmony_ci check_mask(mask) 11367db96d56Sopenharmony_ci kill(signum) 11377db96d56Sopenharmony_ci 11387db96d56Sopenharmony_ci # Check the new mask 11397db96d56Sopenharmony_ci blocked = read_sigmask() 11407db96d56Sopenharmony_ci check_mask(blocked) 11417db96d56Sopenharmony_ci if signum not in blocked: 11427db96d56Sopenharmony_ci raise Exception("%s not in %s" % (signum, blocked)) 11437db96d56Sopenharmony_ci if old_mask ^ blocked != {signum}: 11447db96d56Sopenharmony_ci raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum)) 11457db96d56Sopenharmony_ci 11467db96d56Sopenharmony_ci # Unblock SIGUSR1 11477db96d56Sopenharmony_ci try: 11487db96d56Sopenharmony_ci # unblock the pending signal calls immediately the signal handler 11497db96d56Sopenharmony_ci signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum]) 11507db96d56Sopenharmony_ci except ZeroDivisionError: 11517db96d56Sopenharmony_ci pass 11527db96d56Sopenharmony_ci else: 11537db96d56Sopenharmony_ci raise Exception("ZeroDivisionError not raised") 11547db96d56Sopenharmony_ci try: 11557db96d56Sopenharmony_ci kill(signum) 11567db96d56Sopenharmony_ci except ZeroDivisionError: 11577db96d56Sopenharmony_ci pass 11587db96d56Sopenharmony_ci else: 11597db96d56Sopenharmony_ci raise Exception("ZeroDivisionError not raised") 11607db96d56Sopenharmony_ci 11617db96d56Sopenharmony_ci # Check the new mask 11627db96d56Sopenharmony_ci unblocked = read_sigmask() 11637db96d56Sopenharmony_ci if signum in unblocked: 11647db96d56Sopenharmony_ci raise Exception("%s in %s" % (signum, unblocked)) 11657db96d56Sopenharmony_ci if blocked ^ unblocked != {signum}: 11667db96d56Sopenharmony_ci raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum)) 11677db96d56Sopenharmony_ci if old_mask != unblocked: 11687db96d56Sopenharmony_ci raise Exception("%s != %s" % (old_mask, unblocked)) 11697db96d56Sopenharmony_ci """ 11707db96d56Sopenharmony_ci assert_python_ok('-c', code) 11717db96d56Sopenharmony_ci 11727db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, 'pthread_kill'), 11737db96d56Sopenharmony_ci 'need signal.pthread_kill()') 11747db96d56Sopenharmony_ci @threading_helper.requires_working_threading() 11757db96d56Sopenharmony_ci def test_pthread_kill_main_thread(self): 11767db96d56Sopenharmony_ci # Test that a signal can be sent to the main thread with pthread_kill() 11777db96d56Sopenharmony_ci # before any other thread has been created (see issue #12392). 11787db96d56Sopenharmony_ci code = """if True: 11797db96d56Sopenharmony_ci import threading 11807db96d56Sopenharmony_ci import signal 11817db96d56Sopenharmony_ci import sys 11827db96d56Sopenharmony_ci 11837db96d56Sopenharmony_ci def handler(signum, frame): 11847db96d56Sopenharmony_ci sys.exit(3) 11857db96d56Sopenharmony_ci 11867db96d56Sopenharmony_ci signal.signal(signal.SIGUSR1, handler) 11877db96d56Sopenharmony_ci signal.pthread_kill(threading.get_ident(), signal.SIGUSR1) 11887db96d56Sopenharmony_ci sys.exit(2) 11897db96d56Sopenharmony_ci """ 11907db96d56Sopenharmony_ci 11917db96d56Sopenharmony_ci with spawn_python('-c', code) as process: 11927db96d56Sopenharmony_ci stdout, stderr = process.communicate() 11937db96d56Sopenharmony_ci exitcode = process.wait() 11947db96d56Sopenharmony_ci if exitcode != 3: 11957db96d56Sopenharmony_ci raise Exception("Child error (exit code %s): %s" % 11967db96d56Sopenharmony_ci (exitcode, stdout)) 11977db96d56Sopenharmony_ci 11987db96d56Sopenharmony_ci 11997db96d56Sopenharmony_ciclass StressTest(unittest.TestCase): 12007db96d56Sopenharmony_ci """ 12017db96d56Sopenharmony_ci Stress signal delivery, especially when a signal arrives in 12027db96d56Sopenharmony_ci the middle of recomputing the signal state or executing 12037db96d56Sopenharmony_ci previously tripped signal handlers. 12047db96d56Sopenharmony_ci """ 12057db96d56Sopenharmony_ci 12067db96d56Sopenharmony_ci def setsig(self, signum, handler): 12077db96d56Sopenharmony_ci old_handler = signal.signal(signum, handler) 12087db96d56Sopenharmony_ci self.addCleanup(signal.signal, signum, old_handler) 12097db96d56Sopenharmony_ci 12107db96d56Sopenharmony_ci def measure_itimer_resolution(self): 12117db96d56Sopenharmony_ci N = 20 12127db96d56Sopenharmony_ci times = [] 12137db96d56Sopenharmony_ci 12147db96d56Sopenharmony_ci def handler(signum=None, frame=None): 12157db96d56Sopenharmony_ci if len(times) < N: 12167db96d56Sopenharmony_ci times.append(time.perf_counter()) 12177db96d56Sopenharmony_ci # 1 µs is the smallest possible timer interval, 12187db96d56Sopenharmony_ci # we want to measure what the concrete duration 12197db96d56Sopenharmony_ci # will be on this platform 12207db96d56Sopenharmony_ci signal.setitimer(signal.ITIMER_REAL, 1e-6) 12217db96d56Sopenharmony_ci 12227db96d56Sopenharmony_ci self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0) 12237db96d56Sopenharmony_ci self.setsig(signal.SIGALRM, handler) 12247db96d56Sopenharmony_ci handler() 12257db96d56Sopenharmony_ci while len(times) < N: 12267db96d56Sopenharmony_ci time.sleep(1e-3) 12277db96d56Sopenharmony_ci 12287db96d56Sopenharmony_ci durations = [times[i+1] - times[i] for i in range(len(times) - 1)] 12297db96d56Sopenharmony_ci med = statistics.median(durations) 12307db96d56Sopenharmony_ci if support.verbose: 12317db96d56Sopenharmony_ci print("detected median itimer() resolution: %.6f s." % (med,)) 12327db96d56Sopenharmony_ci return med 12337db96d56Sopenharmony_ci 12347db96d56Sopenharmony_ci def decide_itimer_count(self): 12357db96d56Sopenharmony_ci # Some systems have poor setitimer() resolution (for example 12367db96d56Sopenharmony_ci # measured around 20 ms. on FreeBSD 9), so decide on a reasonable 12377db96d56Sopenharmony_ci # number of sequential timers based on that. 12387db96d56Sopenharmony_ci reso = self.measure_itimer_resolution() 12397db96d56Sopenharmony_ci if reso <= 1e-4: 12407db96d56Sopenharmony_ci return 10000 12417db96d56Sopenharmony_ci elif reso <= 1e-2: 12427db96d56Sopenharmony_ci return 100 12437db96d56Sopenharmony_ci else: 12447db96d56Sopenharmony_ci self.skipTest("detected itimer resolution (%.3f s.) too high " 12457db96d56Sopenharmony_ci "(> 10 ms.) on this platform (or system too busy)" 12467db96d56Sopenharmony_ci % (reso,)) 12477db96d56Sopenharmony_ci 12487db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, "setitimer"), 12497db96d56Sopenharmony_ci "test needs setitimer()") 12507db96d56Sopenharmony_ci def test_stress_delivery_dependent(self): 12517db96d56Sopenharmony_ci """ 12527db96d56Sopenharmony_ci This test uses dependent signal handlers. 12537db96d56Sopenharmony_ci """ 12547db96d56Sopenharmony_ci N = self.decide_itimer_count() 12557db96d56Sopenharmony_ci sigs = [] 12567db96d56Sopenharmony_ci 12577db96d56Sopenharmony_ci def first_handler(signum, frame): 12587db96d56Sopenharmony_ci # 1e-6 is the minimum non-zero value for `setitimer()`. 12597db96d56Sopenharmony_ci # Choose a random delay so as to improve chances of 12607db96d56Sopenharmony_ci # triggering a race condition. Ideally the signal is received 12617db96d56Sopenharmony_ci # when inside critical signal-handling routines such as 12627db96d56Sopenharmony_ci # Py_MakePendingCalls(). 12637db96d56Sopenharmony_ci signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 12647db96d56Sopenharmony_ci 12657db96d56Sopenharmony_ci def second_handler(signum=None, frame=None): 12667db96d56Sopenharmony_ci sigs.append(signum) 12677db96d56Sopenharmony_ci 12687db96d56Sopenharmony_ci # Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both 12697db96d56Sopenharmony_ci # ascending and descending sequences (SIGUSR1 then SIGALRM, 12707db96d56Sopenharmony_ci # SIGPROF then SIGALRM), we maximize chances of hitting a bug. 12717db96d56Sopenharmony_ci self.setsig(signal.SIGPROF, first_handler) 12727db96d56Sopenharmony_ci self.setsig(signal.SIGUSR1, first_handler) 12737db96d56Sopenharmony_ci self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL 12747db96d56Sopenharmony_ci 12757db96d56Sopenharmony_ci expected_sigs = 0 12767db96d56Sopenharmony_ci deadline = time.monotonic() + support.SHORT_TIMEOUT 12777db96d56Sopenharmony_ci 12787db96d56Sopenharmony_ci while expected_sigs < N: 12797db96d56Sopenharmony_ci os.kill(os.getpid(), signal.SIGPROF) 12807db96d56Sopenharmony_ci expected_sigs += 1 12817db96d56Sopenharmony_ci # Wait for handlers to run to avoid signal coalescing 12827db96d56Sopenharmony_ci while len(sigs) < expected_sigs and time.monotonic() < deadline: 12837db96d56Sopenharmony_ci time.sleep(1e-5) 12847db96d56Sopenharmony_ci 12857db96d56Sopenharmony_ci os.kill(os.getpid(), signal.SIGUSR1) 12867db96d56Sopenharmony_ci expected_sigs += 1 12877db96d56Sopenharmony_ci while len(sigs) < expected_sigs and time.monotonic() < deadline: 12887db96d56Sopenharmony_ci time.sleep(1e-5) 12897db96d56Sopenharmony_ci 12907db96d56Sopenharmony_ci # All ITIMER_REAL signals should have been delivered to the 12917db96d56Sopenharmony_ci # Python handler 12927db96d56Sopenharmony_ci self.assertEqual(len(sigs), N, "Some signals were lost") 12937db96d56Sopenharmony_ci 12947db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, "setitimer"), 12957db96d56Sopenharmony_ci "test needs setitimer()") 12967db96d56Sopenharmony_ci def test_stress_delivery_simultaneous(self): 12977db96d56Sopenharmony_ci """ 12987db96d56Sopenharmony_ci This test uses simultaneous signal handlers. 12997db96d56Sopenharmony_ci """ 13007db96d56Sopenharmony_ci N = self.decide_itimer_count() 13017db96d56Sopenharmony_ci sigs = [] 13027db96d56Sopenharmony_ci 13037db96d56Sopenharmony_ci def handler(signum, frame): 13047db96d56Sopenharmony_ci sigs.append(signum) 13057db96d56Sopenharmony_ci 13067db96d56Sopenharmony_ci self.setsig(signal.SIGUSR1, handler) 13077db96d56Sopenharmony_ci self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL 13087db96d56Sopenharmony_ci 13097db96d56Sopenharmony_ci expected_sigs = 0 13107db96d56Sopenharmony_ci deadline = time.monotonic() + support.SHORT_TIMEOUT 13117db96d56Sopenharmony_ci 13127db96d56Sopenharmony_ci while expected_sigs < N: 13137db96d56Sopenharmony_ci # Hopefully the SIGALRM will be received somewhere during 13147db96d56Sopenharmony_ci # initial processing of SIGUSR1. 13157db96d56Sopenharmony_ci signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5) 13167db96d56Sopenharmony_ci os.kill(os.getpid(), signal.SIGUSR1) 13177db96d56Sopenharmony_ci 13187db96d56Sopenharmony_ci expected_sigs += 2 13197db96d56Sopenharmony_ci # Wait for handlers to run to avoid signal coalescing 13207db96d56Sopenharmony_ci while len(sigs) < expected_sigs and time.monotonic() < deadline: 13217db96d56Sopenharmony_ci time.sleep(1e-5) 13227db96d56Sopenharmony_ci 13237db96d56Sopenharmony_ci # All ITIMER_REAL signals should have been delivered to the 13247db96d56Sopenharmony_ci # Python handler 13257db96d56Sopenharmony_ci self.assertEqual(len(sigs), N, "Some signals were lost") 13267db96d56Sopenharmony_ci 13277db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(signal, "SIGUSR1"), 13287db96d56Sopenharmony_ci "test needs SIGUSR1") 13297db96d56Sopenharmony_ci @threading_helper.requires_working_threading() 13307db96d56Sopenharmony_ci def test_stress_modifying_handlers(self): 13317db96d56Sopenharmony_ci # bpo-43406: race condition between trip_signal() and signal.signal 13327db96d56Sopenharmony_ci signum = signal.SIGUSR1 13337db96d56Sopenharmony_ci num_sent_signals = 0 13347db96d56Sopenharmony_ci num_received_signals = 0 13357db96d56Sopenharmony_ci do_stop = False 13367db96d56Sopenharmony_ci 13377db96d56Sopenharmony_ci def custom_handler(signum, frame): 13387db96d56Sopenharmony_ci nonlocal num_received_signals 13397db96d56Sopenharmony_ci num_received_signals += 1 13407db96d56Sopenharmony_ci 13417db96d56Sopenharmony_ci def set_interrupts(): 13427db96d56Sopenharmony_ci nonlocal num_sent_signals 13437db96d56Sopenharmony_ci while not do_stop: 13447db96d56Sopenharmony_ci signal.raise_signal(signum) 13457db96d56Sopenharmony_ci num_sent_signals += 1 13467db96d56Sopenharmony_ci 13477db96d56Sopenharmony_ci def cycle_handlers(): 13487db96d56Sopenharmony_ci while num_sent_signals < 100: 13497db96d56Sopenharmony_ci for i in range(20000): 13507db96d56Sopenharmony_ci # Cycle between a Python-defined and a non-Python handler 13517db96d56Sopenharmony_ci for handler in [custom_handler, signal.SIG_IGN]: 13527db96d56Sopenharmony_ci signal.signal(signum, handler) 13537db96d56Sopenharmony_ci 13547db96d56Sopenharmony_ci old_handler = signal.signal(signum, custom_handler) 13557db96d56Sopenharmony_ci self.addCleanup(signal.signal, signum, old_handler) 13567db96d56Sopenharmony_ci 13577db96d56Sopenharmony_ci t = threading.Thread(target=set_interrupts) 13587db96d56Sopenharmony_ci try: 13597db96d56Sopenharmony_ci ignored = False 13607db96d56Sopenharmony_ci with support.catch_unraisable_exception() as cm: 13617db96d56Sopenharmony_ci t.start() 13627db96d56Sopenharmony_ci cycle_handlers() 13637db96d56Sopenharmony_ci do_stop = True 13647db96d56Sopenharmony_ci t.join() 13657db96d56Sopenharmony_ci 13667db96d56Sopenharmony_ci if cm.unraisable is not None: 13677db96d56Sopenharmony_ci # An unraisable exception may be printed out when 13687db96d56Sopenharmony_ci # a signal is ignored due to the aforementioned 13697db96d56Sopenharmony_ci # race condition, check it. 13707db96d56Sopenharmony_ci self.assertIsInstance(cm.unraisable.exc_value, OSError) 13717db96d56Sopenharmony_ci self.assertIn( 13727db96d56Sopenharmony_ci f"Signal {signum:d} ignored due to race condition", 13737db96d56Sopenharmony_ci str(cm.unraisable.exc_value)) 13747db96d56Sopenharmony_ci ignored = True 13757db96d56Sopenharmony_ci 13767db96d56Sopenharmony_ci # bpo-43406: Even if it is unlikely, it's technically possible that 13777db96d56Sopenharmony_ci # all signals were ignored because of race conditions. 13787db96d56Sopenharmony_ci if not ignored: 13797db96d56Sopenharmony_ci # Sanity check that some signals were received, but not all 13807db96d56Sopenharmony_ci self.assertGreater(num_received_signals, 0) 13817db96d56Sopenharmony_ci self.assertLess(num_received_signals, num_sent_signals) 13827db96d56Sopenharmony_ci finally: 13837db96d56Sopenharmony_ci do_stop = True 13847db96d56Sopenharmony_ci t.join() 13857db96d56Sopenharmony_ci 13867db96d56Sopenharmony_ci 13877db96d56Sopenharmony_ciclass RaiseSignalTest(unittest.TestCase): 13887db96d56Sopenharmony_ci 13897db96d56Sopenharmony_ci def test_sigint(self): 13907db96d56Sopenharmony_ci with self.assertRaises(KeyboardInterrupt): 13917db96d56Sopenharmony_ci signal.raise_signal(signal.SIGINT) 13927db96d56Sopenharmony_ci 13937db96d56Sopenharmony_ci @unittest.skipIf(sys.platform != "win32", "Windows specific test") 13947db96d56Sopenharmony_ci def test_invalid_argument(self): 13957db96d56Sopenharmony_ci try: 13967db96d56Sopenharmony_ci SIGHUP = 1 # not supported on win32 13977db96d56Sopenharmony_ci signal.raise_signal(SIGHUP) 13987db96d56Sopenharmony_ci self.fail("OSError (Invalid argument) expected") 13997db96d56Sopenharmony_ci except OSError as e: 14007db96d56Sopenharmony_ci if e.errno == errno.EINVAL: 14017db96d56Sopenharmony_ci pass 14027db96d56Sopenharmony_ci else: 14037db96d56Sopenharmony_ci raise 14047db96d56Sopenharmony_ci 14057db96d56Sopenharmony_ci def test_handler(self): 14067db96d56Sopenharmony_ci is_ok = False 14077db96d56Sopenharmony_ci def handler(a, b): 14087db96d56Sopenharmony_ci nonlocal is_ok 14097db96d56Sopenharmony_ci is_ok = True 14107db96d56Sopenharmony_ci old_signal = signal.signal(signal.SIGINT, handler) 14117db96d56Sopenharmony_ci self.addCleanup(signal.signal, signal.SIGINT, old_signal) 14127db96d56Sopenharmony_ci 14137db96d56Sopenharmony_ci signal.raise_signal(signal.SIGINT) 14147db96d56Sopenharmony_ci self.assertTrue(is_ok) 14157db96d56Sopenharmony_ci 14167db96d56Sopenharmony_ci def test__thread_interrupt_main(self): 14177db96d56Sopenharmony_ci # See https://github.com/python/cpython/issues/102397 14187db96d56Sopenharmony_ci code = """if 1: 14197db96d56Sopenharmony_ci import _thread 14207db96d56Sopenharmony_ci class Foo(): 14217db96d56Sopenharmony_ci def __del__(self): 14227db96d56Sopenharmony_ci _thread.interrupt_main() 14237db96d56Sopenharmony_ci 14247db96d56Sopenharmony_ci x = Foo() 14257db96d56Sopenharmony_ci """ 14267db96d56Sopenharmony_ci 14277db96d56Sopenharmony_ci rc, out, err = assert_python_ok('-c', code) 14287db96d56Sopenharmony_ci self.assertIn(b'OSError: Signal 2 ignored due to race condition', err) 14297db96d56Sopenharmony_ci 14307db96d56Sopenharmony_ci 14317db96d56Sopenharmony_ci 14327db96d56Sopenharmony_ciclass PidfdSignalTest(unittest.TestCase): 14337db96d56Sopenharmony_ci 14347db96d56Sopenharmony_ci @unittest.skipUnless( 14357db96d56Sopenharmony_ci hasattr(signal, "pidfd_send_signal"), 14367db96d56Sopenharmony_ci "pidfd support not built in", 14377db96d56Sopenharmony_ci ) 14387db96d56Sopenharmony_ci def test_pidfd_send_signal(self): 14397db96d56Sopenharmony_ci with self.assertRaises(OSError) as cm: 14407db96d56Sopenharmony_ci signal.pidfd_send_signal(0, signal.SIGINT) 14417db96d56Sopenharmony_ci if cm.exception.errno == errno.ENOSYS: 14427db96d56Sopenharmony_ci self.skipTest("kernel does not support pidfds") 14437db96d56Sopenharmony_ci elif cm.exception.errno == errno.EPERM: 14447db96d56Sopenharmony_ci self.skipTest("Not enough privileges to use pidfs") 14457db96d56Sopenharmony_ci self.assertEqual(cm.exception.errno, errno.EBADF) 14467db96d56Sopenharmony_ci my_pidfd = os.open(f'/proc/{os.getpid()}', os.O_DIRECTORY) 14477db96d56Sopenharmony_ci self.addCleanup(os.close, my_pidfd) 14487db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "^siginfo must be None$"): 14497db96d56Sopenharmony_ci signal.pidfd_send_signal(my_pidfd, signal.SIGINT, object(), 0) 14507db96d56Sopenharmony_ci with self.assertRaises(KeyboardInterrupt): 14517db96d56Sopenharmony_ci signal.pidfd_send_signal(my_pidfd, signal.SIGINT) 14527db96d56Sopenharmony_ci 14537db96d56Sopenharmony_cidef tearDownModule(): 14547db96d56Sopenharmony_ci support.reap_children() 14557db96d56Sopenharmony_ci 14567db96d56Sopenharmony_ciif __name__ == "__main__": 14577db96d56Sopenharmony_ci unittest.main() 1458