17db96d56Sopenharmony_ciimport atexit 27db96d56Sopenharmony_ciimport os 37db96d56Sopenharmony_ciimport sys 47db96d56Sopenharmony_ciimport textwrap 57db96d56Sopenharmony_ciimport unittest 67db96d56Sopenharmony_cifrom test import support 77db96d56Sopenharmony_cifrom test.support import script_helper 87db96d56Sopenharmony_ci 97db96d56Sopenharmony_ci 107db96d56Sopenharmony_ciclass GeneralTest(unittest.TestCase): 117db96d56Sopenharmony_ci def test_general(self): 127db96d56Sopenharmony_ci # Run _test_atexit.py in a subprocess since it calls atexit._clear() 137db96d56Sopenharmony_ci script = support.findfile("_test_atexit.py") 147db96d56Sopenharmony_ci script_helper.run_test_script(script) 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ciclass FunctionalTest(unittest.TestCase): 177db96d56Sopenharmony_ci def test_shutdown(self): 187db96d56Sopenharmony_ci # Actually test the shutdown mechanism in a subprocess 197db96d56Sopenharmony_ci code = textwrap.dedent(""" 207db96d56Sopenharmony_ci import atexit 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ci def f(msg): 237db96d56Sopenharmony_ci print(msg) 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci atexit.register(f, "one") 267db96d56Sopenharmony_ci atexit.register(f, "two") 277db96d56Sopenharmony_ci """) 287db96d56Sopenharmony_ci res = script_helper.assert_python_ok("-c", code) 297db96d56Sopenharmony_ci self.assertEqual(res.out.decode().splitlines(), ["two", "one"]) 307db96d56Sopenharmony_ci self.assertFalse(res.err) 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_ci def test_atexit_instances(self): 337db96d56Sopenharmony_ci # bpo-42639: It is safe to have more than one atexit instance. 347db96d56Sopenharmony_ci code = textwrap.dedent(""" 357db96d56Sopenharmony_ci import sys 367db96d56Sopenharmony_ci import atexit as atexit1 377db96d56Sopenharmony_ci del sys.modules['atexit'] 387db96d56Sopenharmony_ci import atexit as atexit2 397db96d56Sopenharmony_ci del sys.modules['atexit'] 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ci assert atexit2 is not atexit1 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci atexit1.register(print, "atexit1") 447db96d56Sopenharmony_ci atexit2.register(print, "atexit2") 457db96d56Sopenharmony_ci """) 467db96d56Sopenharmony_ci res = script_helper.assert_python_ok("-c", code) 477db96d56Sopenharmony_ci self.assertEqual(res.out.decode().splitlines(), ["atexit2", "atexit1"]) 487db96d56Sopenharmony_ci self.assertFalse(res.err) 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci@support.cpython_only 527db96d56Sopenharmony_ciclass SubinterpreterTest(unittest.TestCase): 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci def test_callbacks_leak(self): 557db96d56Sopenharmony_ci # This test shows a leak in refleak mode if atexit doesn't 567db96d56Sopenharmony_ci # take care to free callbacks in its per-subinterpreter module 577db96d56Sopenharmony_ci # state. 587db96d56Sopenharmony_ci n = atexit._ncallbacks() 597db96d56Sopenharmony_ci code = textwrap.dedent(r""" 607db96d56Sopenharmony_ci import atexit 617db96d56Sopenharmony_ci def f(): 627db96d56Sopenharmony_ci pass 637db96d56Sopenharmony_ci atexit.register(f) 647db96d56Sopenharmony_ci del atexit 657db96d56Sopenharmony_ci """) 667db96d56Sopenharmony_ci ret = support.run_in_subinterp(code) 677db96d56Sopenharmony_ci self.assertEqual(ret, 0) 687db96d56Sopenharmony_ci self.assertEqual(atexit._ncallbacks(), n) 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_ci def test_callbacks_leak_refcycle(self): 717db96d56Sopenharmony_ci # Similar to the above, but with a refcycle through the atexit 727db96d56Sopenharmony_ci # module. 737db96d56Sopenharmony_ci n = atexit._ncallbacks() 747db96d56Sopenharmony_ci code = textwrap.dedent(r""" 757db96d56Sopenharmony_ci import atexit 767db96d56Sopenharmony_ci def f(): 777db96d56Sopenharmony_ci pass 787db96d56Sopenharmony_ci atexit.register(f) 797db96d56Sopenharmony_ci atexit.__atexit = atexit 807db96d56Sopenharmony_ci """) 817db96d56Sopenharmony_ci ret = support.run_in_subinterp(code) 827db96d56Sopenharmony_ci self.assertEqual(ret, 0) 837db96d56Sopenharmony_ci self.assertEqual(atexit._ncallbacks(), n) 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()") 867db96d56Sopenharmony_ci def test_callback_on_subinterpreter_teardown(self): 877db96d56Sopenharmony_ci # This tests if a callback is called on 887db96d56Sopenharmony_ci # subinterpreter teardown. 897db96d56Sopenharmony_ci expected = b"The test has passed!" 907db96d56Sopenharmony_ci r, w = os.pipe() 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci code = textwrap.dedent(r""" 937db96d56Sopenharmony_ci import os 947db96d56Sopenharmony_ci import atexit 957db96d56Sopenharmony_ci def callback(): 967db96d56Sopenharmony_ci os.write({:d}, b"The test has passed!") 977db96d56Sopenharmony_ci atexit.register(callback) 987db96d56Sopenharmony_ci """.format(w)) 997db96d56Sopenharmony_ci ret = support.run_in_subinterp(code) 1007db96d56Sopenharmony_ci os.close(w) 1017db96d56Sopenharmony_ci self.assertEqual(os.read(r, len(expected)), expected) 1027db96d56Sopenharmony_ci os.close(r) 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ciif __name__ == "__main__": 1067db96d56Sopenharmony_ci unittest.main() 107