17db96d56Sopenharmony_ciimport contextlib 27db96d56Sopenharmony_ciimport os 37db96d56Sopenharmony_ciimport threading 47db96d56Sopenharmony_cifrom textwrap import dedent 57db96d56Sopenharmony_ciimport unittest 67db96d56Sopenharmony_ciimport time 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_cifrom test import support 97db96d56Sopenharmony_cifrom test.support import import_helper 107db96d56Sopenharmony_ci_interpreters = import_helper.import_module('_xxsubinterpreters') 117db96d56Sopenharmony_cifrom test.support import interpreters 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_ci 147db96d56Sopenharmony_cidef _captured_script(script): 157db96d56Sopenharmony_ci r, w = os.pipe() 167db96d56Sopenharmony_ci indented = script.replace('\n', '\n ') 177db96d56Sopenharmony_ci wrapped = dedent(f""" 187db96d56Sopenharmony_ci import contextlib 197db96d56Sopenharmony_ci with open({w}, 'w', encoding='utf-8') as spipe: 207db96d56Sopenharmony_ci with contextlib.redirect_stdout(spipe): 217db96d56Sopenharmony_ci {indented} 227db96d56Sopenharmony_ci """) 237db96d56Sopenharmony_ci return wrapped, open(r, encoding='utf-8') 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci 267db96d56Sopenharmony_cidef clean_up_interpreters(): 277db96d56Sopenharmony_ci for interp in interpreters.list_all(): 287db96d56Sopenharmony_ci if interp.id == 0: # main 297db96d56Sopenharmony_ci continue 307db96d56Sopenharmony_ci try: 317db96d56Sopenharmony_ci interp.close() 327db96d56Sopenharmony_ci except RuntimeError: 337db96d56Sopenharmony_ci pass # already destroyed 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_cidef _run_output(interp, request, channels=None): 377db96d56Sopenharmony_ci script, rpipe = _captured_script(request) 387db96d56Sopenharmony_ci with rpipe: 397db96d56Sopenharmony_ci interp.run(script, channels=channels) 407db96d56Sopenharmony_ci return rpipe.read() 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci@contextlib.contextmanager 447db96d56Sopenharmony_cidef _running(interp): 457db96d56Sopenharmony_ci r, w = os.pipe() 467db96d56Sopenharmony_ci def run(): 477db96d56Sopenharmony_ci interp.run(dedent(f""" 487db96d56Sopenharmony_ci # wait for "signal" 497db96d56Sopenharmony_ci with open({r}) as rpipe: 507db96d56Sopenharmony_ci rpipe.read() 517db96d56Sopenharmony_ci """)) 527db96d56Sopenharmony_ci 537db96d56Sopenharmony_ci t = threading.Thread(target=run) 547db96d56Sopenharmony_ci t.start() 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci yield 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ci with open(w, 'w') as spipe: 597db96d56Sopenharmony_ci spipe.write('done') 607db96d56Sopenharmony_ci t.join() 617db96d56Sopenharmony_ci 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ciclass TestBase(unittest.TestCase): 647db96d56Sopenharmony_ci 657db96d56Sopenharmony_ci def tearDown(self): 667db96d56Sopenharmony_ci clean_up_interpreters() 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ciclass CreateTests(TestBase): 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci def test_in_main(self): 727db96d56Sopenharmony_ci interp = interpreters.create() 737db96d56Sopenharmony_ci self.assertIsInstance(interp, interpreters.Interpreter) 747db96d56Sopenharmony_ci self.assertIn(interp, interpreters.list_all()) 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci def test_in_thread(self): 777db96d56Sopenharmony_ci lock = threading.Lock() 787db96d56Sopenharmony_ci interp = None 797db96d56Sopenharmony_ci def f(): 807db96d56Sopenharmony_ci nonlocal interp 817db96d56Sopenharmony_ci interp = interpreters.create() 827db96d56Sopenharmony_ci lock.acquire() 837db96d56Sopenharmony_ci lock.release() 847db96d56Sopenharmony_ci t = threading.Thread(target=f) 857db96d56Sopenharmony_ci with lock: 867db96d56Sopenharmony_ci t.start() 877db96d56Sopenharmony_ci t.join() 887db96d56Sopenharmony_ci self.assertIn(interp, interpreters.list_all()) 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ci def test_in_subinterpreter(self): 917db96d56Sopenharmony_ci main, = interpreters.list_all() 927db96d56Sopenharmony_ci interp = interpreters.create() 937db96d56Sopenharmony_ci out = _run_output(interp, dedent(""" 947db96d56Sopenharmony_ci from test.support import interpreters 957db96d56Sopenharmony_ci interp = interpreters.create() 967db96d56Sopenharmony_ci print(interp.id) 977db96d56Sopenharmony_ci """)) 987db96d56Sopenharmony_ci interp2 = interpreters.Interpreter(int(out)) 997db96d56Sopenharmony_ci self.assertEqual(interpreters.list_all(), [main, interp, interp2]) 1007db96d56Sopenharmony_ci 1017db96d56Sopenharmony_ci def test_after_destroy_all(self): 1027db96d56Sopenharmony_ci before = set(interpreters.list_all()) 1037db96d56Sopenharmony_ci # Create 3 subinterpreters. 1047db96d56Sopenharmony_ci interp_lst = [] 1057db96d56Sopenharmony_ci for _ in range(3): 1067db96d56Sopenharmony_ci interps = interpreters.create() 1077db96d56Sopenharmony_ci interp_lst.append(interps) 1087db96d56Sopenharmony_ci # Now destroy them. 1097db96d56Sopenharmony_ci for interp in interp_lst: 1107db96d56Sopenharmony_ci interp.close() 1117db96d56Sopenharmony_ci # Finally, create another. 1127db96d56Sopenharmony_ci interp = interpreters.create() 1137db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), before | {interp}) 1147db96d56Sopenharmony_ci 1157db96d56Sopenharmony_ci def test_after_destroy_some(self): 1167db96d56Sopenharmony_ci before = set(interpreters.list_all()) 1177db96d56Sopenharmony_ci # Create 3 subinterpreters. 1187db96d56Sopenharmony_ci interp1 = interpreters.create() 1197db96d56Sopenharmony_ci interp2 = interpreters.create() 1207db96d56Sopenharmony_ci interp3 = interpreters.create() 1217db96d56Sopenharmony_ci # Now destroy 2 of them. 1227db96d56Sopenharmony_ci interp1.close() 1237db96d56Sopenharmony_ci interp2.close() 1247db96d56Sopenharmony_ci # Finally, create another. 1257db96d56Sopenharmony_ci interp = interpreters.create() 1267db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), before | {interp3, interp}) 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci 1297db96d56Sopenharmony_ciclass GetCurrentTests(TestBase): 1307db96d56Sopenharmony_ci 1317db96d56Sopenharmony_ci def test_main(self): 1327db96d56Sopenharmony_ci main = interpreters.get_main() 1337db96d56Sopenharmony_ci current = interpreters.get_current() 1347db96d56Sopenharmony_ci self.assertEqual(current, main) 1357db96d56Sopenharmony_ci 1367db96d56Sopenharmony_ci def test_subinterpreter(self): 1377db96d56Sopenharmony_ci main = _interpreters.get_main() 1387db96d56Sopenharmony_ci interp = interpreters.create() 1397db96d56Sopenharmony_ci out = _run_output(interp, dedent(""" 1407db96d56Sopenharmony_ci from test.support import interpreters 1417db96d56Sopenharmony_ci cur = interpreters.get_current() 1427db96d56Sopenharmony_ci print(cur.id) 1437db96d56Sopenharmony_ci """)) 1447db96d56Sopenharmony_ci current = interpreters.Interpreter(int(out)) 1457db96d56Sopenharmony_ci self.assertNotEqual(current, main) 1467db96d56Sopenharmony_ci 1477db96d56Sopenharmony_ci 1487db96d56Sopenharmony_ciclass ListAllTests(TestBase): 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci def test_initial(self): 1517db96d56Sopenharmony_ci interps = interpreters.list_all() 1527db96d56Sopenharmony_ci self.assertEqual(1, len(interps)) 1537db96d56Sopenharmony_ci 1547db96d56Sopenharmony_ci def test_after_creating(self): 1557db96d56Sopenharmony_ci main = interpreters.get_current() 1567db96d56Sopenharmony_ci first = interpreters.create() 1577db96d56Sopenharmony_ci second = interpreters.create() 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci ids = [] 1607db96d56Sopenharmony_ci for interp in interpreters.list_all(): 1617db96d56Sopenharmony_ci ids.append(interp.id) 1627db96d56Sopenharmony_ci 1637db96d56Sopenharmony_ci self.assertEqual(ids, [main.id, first.id, second.id]) 1647db96d56Sopenharmony_ci 1657db96d56Sopenharmony_ci def test_after_destroying(self): 1667db96d56Sopenharmony_ci main = interpreters.get_current() 1677db96d56Sopenharmony_ci first = interpreters.create() 1687db96d56Sopenharmony_ci second = interpreters.create() 1697db96d56Sopenharmony_ci first.close() 1707db96d56Sopenharmony_ci 1717db96d56Sopenharmony_ci ids = [] 1727db96d56Sopenharmony_ci for interp in interpreters.list_all(): 1737db96d56Sopenharmony_ci ids.append(interp.id) 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ci self.assertEqual(ids, [main.id, second.id]) 1767db96d56Sopenharmony_ci 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ciclass TestInterpreterAttrs(TestBase): 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci def test_id_type(self): 1817db96d56Sopenharmony_ci main = interpreters.get_main() 1827db96d56Sopenharmony_ci current = interpreters.get_current() 1837db96d56Sopenharmony_ci interp = interpreters.create() 1847db96d56Sopenharmony_ci self.assertIsInstance(main.id, _interpreters.InterpreterID) 1857db96d56Sopenharmony_ci self.assertIsInstance(current.id, _interpreters.InterpreterID) 1867db96d56Sopenharmony_ci self.assertIsInstance(interp.id, _interpreters.InterpreterID) 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci def test_main_id(self): 1897db96d56Sopenharmony_ci main = interpreters.get_main() 1907db96d56Sopenharmony_ci self.assertEqual(main.id, 0) 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci def test_custom_id(self): 1937db96d56Sopenharmony_ci interp = interpreters.Interpreter(1) 1947db96d56Sopenharmony_ci self.assertEqual(interp.id, 1) 1957db96d56Sopenharmony_ci 1967db96d56Sopenharmony_ci with self.assertRaises(TypeError): 1977db96d56Sopenharmony_ci interpreters.Interpreter('1') 1987db96d56Sopenharmony_ci 1997db96d56Sopenharmony_ci def test_id_readonly(self): 2007db96d56Sopenharmony_ci interp = interpreters.Interpreter(1) 2017db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 2027db96d56Sopenharmony_ci interp.id = 2 2037db96d56Sopenharmony_ci 2047db96d56Sopenharmony_ci @unittest.skip('not ready yet (see bpo-32604)') 2057db96d56Sopenharmony_ci def test_main_isolated(self): 2067db96d56Sopenharmony_ci main = interpreters.get_main() 2077db96d56Sopenharmony_ci self.assertFalse(main.isolated) 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci @unittest.skip('not ready yet (see bpo-32604)') 2107db96d56Sopenharmony_ci def test_subinterpreter_isolated_default(self): 2117db96d56Sopenharmony_ci interp = interpreters.create() 2127db96d56Sopenharmony_ci self.assertFalse(interp.isolated) 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ci def test_subinterpreter_isolated_explicit(self): 2157db96d56Sopenharmony_ci interp1 = interpreters.create(isolated=True) 2167db96d56Sopenharmony_ci interp2 = interpreters.create(isolated=False) 2177db96d56Sopenharmony_ci self.assertTrue(interp1.isolated) 2187db96d56Sopenharmony_ci self.assertFalse(interp2.isolated) 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci @unittest.skip('not ready yet (see bpo-32604)') 2217db96d56Sopenharmony_ci def test_custom_isolated_default(self): 2227db96d56Sopenharmony_ci interp = interpreters.Interpreter(1) 2237db96d56Sopenharmony_ci self.assertFalse(interp.isolated) 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci def test_custom_isolated_explicit(self): 2267db96d56Sopenharmony_ci interp1 = interpreters.Interpreter(1, isolated=True) 2277db96d56Sopenharmony_ci interp2 = interpreters.Interpreter(1, isolated=False) 2287db96d56Sopenharmony_ci self.assertTrue(interp1.isolated) 2297db96d56Sopenharmony_ci self.assertFalse(interp2.isolated) 2307db96d56Sopenharmony_ci 2317db96d56Sopenharmony_ci def test_isolated_readonly(self): 2327db96d56Sopenharmony_ci interp = interpreters.Interpreter(1) 2337db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 2347db96d56Sopenharmony_ci interp.isolated = True 2357db96d56Sopenharmony_ci 2367db96d56Sopenharmony_ci def test_equality(self): 2377db96d56Sopenharmony_ci interp1 = interpreters.create() 2387db96d56Sopenharmony_ci interp2 = interpreters.create() 2397db96d56Sopenharmony_ci self.assertEqual(interp1, interp1) 2407db96d56Sopenharmony_ci self.assertNotEqual(interp1, interp2) 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ciclass TestInterpreterIsRunning(TestBase): 2447db96d56Sopenharmony_ci 2457db96d56Sopenharmony_ci def test_main(self): 2467db96d56Sopenharmony_ci main = interpreters.get_main() 2477db96d56Sopenharmony_ci self.assertTrue(main.is_running()) 2487db96d56Sopenharmony_ci 2497db96d56Sopenharmony_ci @unittest.skip('Fails on FreeBSD') 2507db96d56Sopenharmony_ci def test_subinterpreter(self): 2517db96d56Sopenharmony_ci interp = interpreters.create() 2527db96d56Sopenharmony_ci self.assertFalse(interp.is_running()) 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci with _running(interp): 2557db96d56Sopenharmony_ci self.assertTrue(interp.is_running()) 2567db96d56Sopenharmony_ci self.assertFalse(interp.is_running()) 2577db96d56Sopenharmony_ci 2587db96d56Sopenharmony_ci def test_from_subinterpreter(self): 2597db96d56Sopenharmony_ci interp = interpreters.create() 2607db96d56Sopenharmony_ci out = _run_output(interp, dedent(f""" 2617db96d56Sopenharmony_ci import _xxsubinterpreters as _interpreters 2627db96d56Sopenharmony_ci if _interpreters.is_running({interp.id}): 2637db96d56Sopenharmony_ci print(True) 2647db96d56Sopenharmony_ci else: 2657db96d56Sopenharmony_ci print(False) 2667db96d56Sopenharmony_ci """)) 2677db96d56Sopenharmony_ci self.assertEqual(out.strip(), 'True') 2687db96d56Sopenharmony_ci 2697db96d56Sopenharmony_ci def test_already_destroyed(self): 2707db96d56Sopenharmony_ci interp = interpreters.create() 2717db96d56Sopenharmony_ci interp.close() 2727db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 2737db96d56Sopenharmony_ci interp.is_running() 2747db96d56Sopenharmony_ci 2757db96d56Sopenharmony_ci def test_does_not_exist(self): 2767db96d56Sopenharmony_ci interp = interpreters.Interpreter(1_000_000) 2777db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 2787db96d56Sopenharmony_ci interp.is_running() 2797db96d56Sopenharmony_ci 2807db96d56Sopenharmony_ci def test_bad_id(self): 2817db96d56Sopenharmony_ci interp = interpreters.Interpreter(-1) 2827db96d56Sopenharmony_ci with self.assertRaises(ValueError): 2837db96d56Sopenharmony_ci interp.is_running() 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_ciclass TestInterpreterClose(TestBase): 2877db96d56Sopenharmony_ci 2887db96d56Sopenharmony_ci def test_basic(self): 2897db96d56Sopenharmony_ci main = interpreters.get_main() 2907db96d56Sopenharmony_ci interp1 = interpreters.create() 2917db96d56Sopenharmony_ci interp2 = interpreters.create() 2927db96d56Sopenharmony_ci interp3 = interpreters.create() 2937db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), 2947db96d56Sopenharmony_ci {main, interp1, interp2, interp3}) 2957db96d56Sopenharmony_ci interp2.close() 2967db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), 2977db96d56Sopenharmony_ci {main, interp1, interp3}) 2987db96d56Sopenharmony_ci 2997db96d56Sopenharmony_ci def test_all(self): 3007db96d56Sopenharmony_ci before = set(interpreters.list_all()) 3017db96d56Sopenharmony_ci interps = set() 3027db96d56Sopenharmony_ci for _ in range(3): 3037db96d56Sopenharmony_ci interp = interpreters.create() 3047db96d56Sopenharmony_ci interps.add(interp) 3057db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), before | interps) 3067db96d56Sopenharmony_ci for interp in interps: 3077db96d56Sopenharmony_ci interp.close() 3087db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), before) 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci def test_main(self): 3117db96d56Sopenharmony_ci main, = interpreters.list_all() 3127db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 3137db96d56Sopenharmony_ci main.close() 3147db96d56Sopenharmony_ci 3157db96d56Sopenharmony_ci def f(): 3167db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 3177db96d56Sopenharmony_ci main.close() 3187db96d56Sopenharmony_ci 3197db96d56Sopenharmony_ci t = threading.Thread(target=f) 3207db96d56Sopenharmony_ci t.start() 3217db96d56Sopenharmony_ci t.join() 3227db96d56Sopenharmony_ci 3237db96d56Sopenharmony_ci def test_already_destroyed(self): 3247db96d56Sopenharmony_ci interp = interpreters.create() 3257db96d56Sopenharmony_ci interp.close() 3267db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 3277db96d56Sopenharmony_ci interp.close() 3287db96d56Sopenharmony_ci 3297db96d56Sopenharmony_ci def test_does_not_exist(self): 3307db96d56Sopenharmony_ci interp = interpreters.Interpreter(1_000_000) 3317db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 3327db96d56Sopenharmony_ci interp.close() 3337db96d56Sopenharmony_ci 3347db96d56Sopenharmony_ci def test_bad_id(self): 3357db96d56Sopenharmony_ci interp = interpreters.Interpreter(-1) 3367db96d56Sopenharmony_ci with self.assertRaises(ValueError): 3377db96d56Sopenharmony_ci interp.close() 3387db96d56Sopenharmony_ci 3397db96d56Sopenharmony_ci def test_from_current(self): 3407db96d56Sopenharmony_ci main, = interpreters.list_all() 3417db96d56Sopenharmony_ci interp = interpreters.create() 3427db96d56Sopenharmony_ci out = _run_output(interp, dedent(f""" 3437db96d56Sopenharmony_ci from test.support import interpreters 3447db96d56Sopenharmony_ci interp = interpreters.Interpreter({int(interp.id)}) 3457db96d56Sopenharmony_ci try: 3467db96d56Sopenharmony_ci interp.close() 3477db96d56Sopenharmony_ci except RuntimeError: 3487db96d56Sopenharmony_ci print('failed') 3497db96d56Sopenharmony_ci """)) 3507db96d56Sopenharmony_ci self.assertEqual(out.strip(), 'failed') 3517db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), {main, interp}) 3527db96d56Sopenharmony_ci 3537db96d56Sopenharmony_ci def test_from_sibling(self): 3547db96d56Sopenharmony_ci main, = interpreters.list_all() 3557db96d56Sopenharmony_ci interp1 = interpreters.create() 3567db96d56Sopenharmony_ci interp2 = interpreters.create() 3577db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), 3587db96d56Sopenharmony_ci {main, interp1, interp2}) 3597db96d56Sopenharmony_ci interp1.run(dedent(f""" 3607db96d56Sopenharmony_ci from test.support import interpreters 3617db96d56Sopenharmony_ci interp2 = interpreters.Interpreter(int({interp2.id})) 3627db96d56Sopenharmony_ci interp2.close() 3637db96d56Sopenharmony_ci interp3 = interpreters.create() 3647db96d56Sopenharmony_ci interp3.close() 3657db96d56Sopenharmony_ci """)) 3667db96d56Sopenharmony_ci self.assertEqual(set(interpreters.list_all()), {main, interp1}) 3677db96d56Sopenharmony_ci 3687db96d56Sopenharmony_ci def test_from_other_thread(self): 3697db96d56Sopenharmony_ci interp = interpreters.create() 3707db96d56Sopenharmony_ci def f(): 3717db96d56Sopenharmony_ci interp.close() 3727db96d56Sopenharmony_ci 3737db96d56Sopenharmony_ci t = threading.Thread(target=f) 3747db96d56Sopenharmony_ci t.start() 3757db96d56Sopenharmony_ci t.join() 3767db96d56Sopenharmony_ci 3777db96d56Sopenharmony_ci @unittest.skip('Fails on FreeBSD') 3787db96d56Sopenharmony_ci def test_still_running(self): 3797db96d56Sopenharmony_ci main, = interpreters.list_all() 3807db96d56Sopenharmony_ci interp = interpreters.create() 3817db96d56Sopenharmony_ci with _running(interp): 3827db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 3837db96d56Sopenharmony_ci interp.close() 3847db96d56Sopenharmony_ci self.assertTrue(interp.is_running()) 3857db96d56Sopenharmony_ci 3867db96d56Sopenharmony_ci 3877db96d56Sopenharmony_ciclass TestInterpreterRun(TestBase): 3887db96d56Sopenharmony_ci 3897db96d56Sopenharmony_ci def test_success(self): 3907db96d56Sopenharmony_ci interp = interpreters.create() 3917db96d56Sopenharmony_ci script, file = _captured_script('print("it worked!", end="")') 3927db96d56Sopenharmony_ci with file: 3937db96d56Sopenharmony_ci interp.run(script) 3947db96d56Sopenharmony_ci out = file.read() 3957db96d56Sopenharmony_ci 3967db96d56Sopenharmony_ci self.assertEqual(out, 'it worked!') 3977db96d56Sopenharmony_ci 3987db96d56Sopenharmony_ci def test_in_thread(self): 3997db96d56Sopenharmony_ci interp = interpreters.create() 4007db96d56Sopenharmony_ci script, file = _captured_script('print("it worked!", end="")') 4017db96d56Sopenharmony_ci with file: 4027db96d56Sopenharmony_ci def f(): 4037db96d56Sopenharmony_ci interp.run(script) 4047db96d56Sopenharmony_ci 4057db96d56Sopenharmony_ci t = threading.Thread(target=f) 4067db96d56Sopenharmony_ci t.start() 4077db96d56Sopenharmony_ci t.join() 4087db96d56Sopenharmony_ci out = file.read() 4097db96d56Sopenharmony_ci 4107db96d56Sopenharmony_ci self.assertEqual(out, 'it worked!') 4117db96d56Sopenharmony_ci 4127db96d56Sopenharmony_ci @support.requires_fork() 4137db96d56Sopenharmony_ci def test_fork(self): 4147db96d56Sopenharmony_ci interp = interpreters.create() 4157db96d56Sopenharmony_ci import tempfile 4167db96d56Sopenharmony_ci with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file: 4177db96d56Sopenharmony_ci file.write('') 4187db96d56Sopenharmony_ci file.flush() 4197db96d56Sopenharmony_ci 4207db96d56Sopenharmony_ci expected = 'spam spam spam spam spam' 4217db96d56Sopenharmony_ci script = dedent(f""" 4227db96d56Sopenharmony_ci import os 4237db96d56Sopenharmony_ci try: 4247db96d56Sopenharmony_ci os.fork() 4257db96d56Sopenharmony_ci except RuntimeError: 4267db96d56Sopenharmony_ci with open('{file.name}', 'w', encoding='utf-8') as out: 4277db96d56Sopenharmony_ci out.write('{expected}') 4287db96d56Sopenharmony_ci """) 4297db96d56Sopenharmony_ci interp.run(script) 4307db96d56Sopenharmony_ci 4317db96d56Sopenharmony_ci file.seek(0) 4327db96d56Sopenharmony_ci content = file.read() 4337db96d56Sopenharmony_ci self.assertEqual(content, expected) 4347db96d56Sopenharmony_ci 4357db96d56Sopenharmony_ci @unittest.skip('Fails on FreeBSD') 4367db96d56Sopenharmony_ci def test_already_running(self): 4377db96d56Sopenharmony_ci interp = interpreters.create() 4387db96d56Sopenharmony_ci with _running(interp): 4397db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 4407db96d56Sopenharmony_ci interp.run('print("spam")') 4417db96d56Sopenharmony_ci 4427db96d56Sopenharmony_ci def test_does_not_exist(self): 4437db96d56Sopenharmony_ci interp = interpreters.Interpreter(1_000_000) 4447db96d56Sopenharmony_ci with self.assertRaises(RuntimeError): 4457db96d56Sopenharmony_ci interp.run('print("spam")') 4467db96d56Sopenharmony_ci 4477db96d56Sopenharmony_ci def test_bad_id(self): 4487db96d56Sopenharmony_ci interp = interpreters.Interpreter(-1) 4497db96d56Sopenharmony_ci with self.assertRaises(ValueError): 4507db96d56Sopenharmony_ci interp.run('print("spam")') 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci def test_bad_script(self): 4537db96d56Sopenharmony_ci interp = interpreters.create() 4547db96d56Sopenharmony_ci with self.assertRaises(TypeError): 4557db96d56Sopenharmony_ci interp.run(10) 4567db96d56Sopenharmony_ci 4577db96d56Sopenharmony_ci def test_bytes_for_script(self): 4587db96d56Sopenharmony_ci interp = interpreters.create() 4597db96d56Sopenharmony_ci with self.assertRaises(TypeError): 4607db96d56Sopenharmony_ci interp.run(b'print("spam")') 4617db96d56Sopenharmony_ci 4627db96d56Sopenharmony_ci # test_xxsubinterpreters covers the remaining Interpreter.run() behavior. 4637db96d56Sopenharmony_ci 4647db96d56Sopenharmony_ci 4657db96d56Sopenharmony_ciclass TestIsShareable(TestBase): 4667db96d56Sopenharmony_ci 4677db96d56Sopenharmony_ci def test_default_shareables(self): 4687db96d56Sopenharmony_ci shareables = [ 4697db96d56Sopenharmony_ci # singletons 4707db96d56Sopenharmony_ci None, 4717db96d56Sopenharmony_ci # builtin objects 4727db96d56Sopenharmony_ci b'spam', 4737db96d56Sopenharmony_ci 'spam', 4747db96d56Sopenharmony_ci 10, 4757db96d56Sopenharmony_ci -10, 4767db96d56Sopenharmony_ci ] 4777db96d56Sopenharmony_ci for obj in shareables: 4787db96d56Sopenharmony_ci with self.subTest(obj): 4797db96d56Sopenharmony_ci shareable = interpreters.is_shareable(obj) 4807db96d56Sopenharmony_ci self.assertTrue(shareable) 4817db96d56Sopenharmony_ci 4827db96d56Sopenharmony_ci def test_not_shareable(self): 4837db96d56Sopenharmony_ci class Cheese: 4847db96d56Sopenharmony_ci def __init__(self, name): 4857db96d56Sopenharmony_ci self.name = name 4867db96d56Sopenharmony_ci def __str__(self): 4877db96d56Sopenharmony_ci return self.name 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci class SubBytes(bytes): 4907db96d56Sopenharmony_ci """A subclass of a shareable type.""" 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_ci not_shareables = [ 4937db96d56Sopenharmony_ci # singletons 4947db96d56Sopenharmony_ci True, 4957db96d56Sopenharmony_ci False, 4967db96d56Sopenharmony_ci NotImplemented, 4977db96d56Sopenharmony_ci ..., 4987db96d56Sopenharmony_ci # builtin types and objects 4997db96d56Sopenharmony_ci type, 5007db96d56Sopenharmony_ci object, 5017db96d56Sopenharmony_ci object(), 5027db96d56Sopenharmony_ci Exception(), 5037db96d56Sopenharmony_ci 100.0, 5047db96d56Sopenharmony_ci # user-defined types and objects 5057db96d56Sopenharmony_ci Cheese, 5067db96d56Sopenharmony_ci Cheese('Wensleydale'), 5077db96d56Sopenharmony_ci SubBytes(b'spam'), 5087db96d56Sopenharmony_ci ] 5097db96d56Sopenharmony_ci for obj in not_shareables: 5107db96d56Sopenharmony_ci with self.subTest(repr(obj)): 5117db96d56Sopenharmony_ci self.assertFalse( 5127db96d56Sopenharmony_ci interpreters.is_shareable(obj)) 5137db96d56Sopenharmony_ci 5147db96d56Sopenharmony_ci 5157db96d56Sopenharmony_ciclass TestChannels(TestBase): 5167db96d56Sopenharmony_ci 5177db96d56Sopenharmony_ci def test_create(self): 5187db96d56Sopenharmony_ci r, s = interpreters.create_channel() 5197db96d56Sopenharmony_ci self.assertIsInstance(r, interpreters.RecvChannel) 5207db96d56Sopenharmony_ci self.assertIsInstance(s, interpreters.SendChannel) 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci def test_list_all(self): 5237db96d56Sopenharmony_ci self.assertEqual(interpreters.list_all_channels(), []) 5247db96d56Sopenharmony_ci created = set() 5257db96d56Sopenharmony_ci for _ in range(3): 5267db96d56Sopenharmony_ci ch = interpreters.create_channel() 5277db96d56Sopenharmony_ci created.add(ch) 5287db96d56Sopenharmony_ci after = set(interpreters.list_all_channels()) 5297db96d56Sopenharmony_ci self.assertEqual(after, created) 5307db96d56Sopenharmony_ci 5317db96d56Sopenharmony_ci 5327db96d56Sopenharmony_ciclass TestRecvChannelAttrs(TestBase): 5337db96d56Sopenharmony_ci 5347db96d56Sopenharmony_ci def test_id_type(self): 5357db96d56Sopenharmony_ci rch, _ = interpreters.create_channel() 5367db96d56Sopenharmony_ci self.assertIsInstance(rch.id, _interpreters.ChannelID) 5377db96d56Sopenharmony_ci 5387db96d56Sopenharmony_ci def test_custom_id(self): 5397db96d56Sopenharmony_ci rch = interpreters.RecvChannel(1) 5407db96d56Sopenharmony_ci self.assertEqual(rch.id, 1) 5417db96d56Sopenharmony_ci 5427db96d56Sopenharmony_ci with self.assertRaises(TypeError): 5437db96d56Sopenharmony_ci interpreters.RecvChannel('1') 5447db96d56Sopenharmony_ci 5457db96d56Sopenharmony_ci def test_id_readonly(self): 5467db96d56Sopenharmony_ci rch = interpreters.RecvChannel(1) 5477db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 5487db96d56Sopenharmony_ci rch.id = 2 5497db96d56Sopenharmony_ci 5507db96d56Sopenharmony_ci def test_equality(self): 5517db96d56Sopenharmony_ci ch1, _ = interpreters.create_channel() 5527db96d56Sopenharmony_ci ch2, _ = interpreters.create_channel() 5537db96d56Sopenharmony_ci self.assertEqual(ch1, ch1) 5547db96d56Sopenharmony_ci self.assertNotEqual(ch1, ch2) 5557db96d56Sopenharmony_ci 5567db96d56Sopenharmony_ci 5577db96d56Sopenharmony_ciclass TestSendChannelAttrs(TestBase): 5587db96d56Sopenharmony_ci 5597db96d56Sopenharmony_ci def test_id_type(self): 5607db96d56Sopenharmony_ci _, sch = interpreters.create_channel() 5617db96d56Sopenharmony_ci self.assertIsInstance(sch.id, _interpreters.ChannelID) 5627db96d56Sopenharmony_ci 5637db96d56Sopenharmony_ci def test_custom_id(self): 5647db96d56Sopenharmony_ci sch = interpreters.SendChannel(1) 5657db96d56Sopenharmony_ci self.assertEqual(sch.id, 1) 5667db96d56Sopenharmony_ci 5677db96d56Sopenharmony_ci with self.assertRaises(TypeError): 5687db96d56Sopenharmony_ci interpreters.SendChannel('1') 5697db96d56Sopenharmony_ci 5707db96d56Sopenharmony_ci def test_id_readonly(self): 5717db96d56Sopenharmony_ci sch = interpreters.SendChannel(1) 5727db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 5737db96d56Sopenharmony_ci sch.id = 2 5747db96d56Sopenharmony_ci 5757db96d56Sopenharmony_ci def test_equality(self): 5767db96d56Sopenharmony_ci _, ch1 = interpreters.create_channel() 5777db96d56Sopenharmony_ci _, ch2 = interpreters.create_channel() 5787db96d56Sopenharmony_ci self.assertEqual(ch1, ch1) 5797db96d56Sopenharmony_ci self.assertNotEqual(ch1, ch2) 5807db96d56Sopenharmony_ci 5817db96d56Sopenharmony_ci 5827db96d56Sopenharmony_ciclass TestSendRecv(TestBase): 5837db96d56Sopenharmony_ci 5847db96d56Sopenharmony_ci def test_send_recv_main(self): 5857db96d56Sopenharmony_ci r, s = interpreters.create_channel() 5867db96d56Sopenharmony_ci orig = b'spam' 5877db96d56Sopenharmony_ci s.send_nowait(orig) 5887db96d56Sopenharmony_ci obj = r.recv() 5897db96d56Sopenharmony_ci 5907db96d56Sopenharmony_ci self.assertEqual(obj, orig) 5917db96d56Sopenharmony_ci self.assertIsNot(obj, orig) 5927db96d56Sopenharmony_ci 5937db96d56Sopenharmony_ci def test_send_recv_same_interpreter(self): 5947db96d56Sopenharmony_ci interp = interpreters.create() 5957db96d56Sopenharmony_ci interp.run(dedent(""" 5967db96d56Sopenharmony_ci from test.support import interpreters 5977db96d56Sopenharmony_ci r, s = interpreters.create_channel() 5987db96d56Sopenharmony_ci orig = b'spam' 5997db96d56Sopenharmony_ci s.send_nowait(orig) 6007db96d56Sopenharmony_ci obj = r.recv() 6017db96d56Sopenharmony_ci assert obj == orig, 'expected: obj == orig' 6027db96d56Sopenharmony_ci assert obj is not orig, 'expected: obj is not orig' 6037db96d56Sopenharmony_ci """)) 6047db96d56Sopenharmony_ci 6057db96d56Sopenharmony_ci @unittest.skip('broken (see BPO-...)') 6067db96d56Sopenharmony_ci def test_send_recv_different_interpreters(self): 6077db96d56Sopenharmony_ci r1, s1 = interpreters.create_channel() 6087db96d56Sopenharmony_ci r2, s2 = interpreters.create_channel() 6097db96d56Sopenharmony_ci orig1 = b'spam' 6107db96d56Sopenharmony_ci s1.send_nowait(orig1) 6117db96d56Sopenharmony_ci out = _run_output( 6127db96d56Sopenharmony_ci interpreters.create(), 6137db96d56Sopenharmony_ci dedent(f""" 6147db96d56Sopenharmony_ci obj1 = r.recv() 6157db96d56Sopenharmony_ci assert obj1 == b'spam', 'expected: obj1 == orig1' 6167db96d56Sopenharmony_ci # When going to another interpreter we get a copy. 6177db96d56Sopenharmony_ci assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' 6187db96d56Sopenharmony_ci orig2 = b'eggs' 6197db96d56Sopenharmony_ci print(id(orig2)) 6207db96d56Sopenharmony_ci s.send_nowait(orig2) 6217db96d56Sopenharmony_ci """), 6227db96d56Sopenharmony_ci channels=dict(r=r1, s=s2), 6237db96d56Sopenharmony_ci ) 6247db96d56Sopenharmony_ci obj2 = r2.recv() 6257db96d56Sopenharmony_ci 6267db96d56Sopenharmony_ci self.assertEqual(obj2, b'eggs') 6277db96d56Sopenharmony_ci self.assertNotEqual(id(obj2), int(out)) 6287db96d56Sopenharmony_ci 6297db96d56Sopenharmony_ci def test_send_recv_different_threads(self): 6307db96d56Sopenharmony_ci r, s = interpreters.create_channel() 6317db96d56Sopenharmony_ci 6327db96d56Sopenharmony_ci def f(): 6337db96d56Sopenharmony_ci while True: 6347db96d56Sopenharmony_ci try: 6357db96d56Sopenharmony_ci obj = r.recv() 6367db96d56Sopenharmony_ci break 6377db96d56Sopenharmony_ci except interpreters.ChannelEmptyError: 6387db96d56Sopenharmony_ci time.sleep(0.1) 6397db96d56Sopenharmony_ci s.send(obj) 6407db96d56Sopenharmony_ci t = threading.Thread(target=f) 6417db96d56Sopenharmony_ci t.start() 6427db96d56Sopenharmony_ci 6437db96d56Sopenharmony_ci orig = b'spam' 6447db96d56Sopenharmony_ci s.send(orig) 6457db96d56Sopenharmony_ci t.join() 6467db96d56Sopenharmony_ci obj = r.recv() 6477db96d56Sopenharmony_ci 6487db96d56Sopenharmony_ci self.assertEqual(obj, orig) 6497db96d56Sopenharmony_ci self.assertIsNot(obj, orig) 6507db96d56Sopenharmony_ci 6517db96d56Sopenharmony_ci def test_send_recv_nowait_main(self): 6527db96d56Sopenharmony_ci r, s = interpreters.create_channel() 6537db96d56Sopenharmony_ci orig = b'spam' 6547db96d56Sopenharmony_ci s.send_nowait(orig) 6557db96d56Sopenharmony_ci obj = r.recv_nowait() 6567db96d56Sopenharmony_ci 6577db96d56Sopenharmony_ci self.assertEqual(obj, orig) 6587db96d56Sopenharmony_ci self.assertIsNot(obj, orig) 6597db96d56Sopenharmony_ci 6607db96d56Sopenharmony_ci def test_send_recv_nowait_main_with_default(self): 6617db96d56Sopenharmony_ci r, _ = interpreters.create_channel() 6627db96d56Sopenharmony_ci obj = r.recv_nowait(None) 6637db96d56Sopenharmony_ci 6647db96d56Sopenharmony_ci self.assertIsNone(obj) 6657db96d56Sopenharmony_ci 6667db96d56Sopenharmony_ci def test_send_recv_nowait_same_interpreter(self): 6677db96d56Sopenharmony_ci interp = interpreters.create() 6687db96d56Sopenharmony_ci interp.run(dedent(""" 6697db96d56Sopenharmony_ci from test.support import interpreters 6707db96d56Sopenharmony_ci r, s = interpreters.create_channel() 6717db96d56Sopenharmony_ci orig = b'spam' 6727db96d56Sopenharmony_ci s.send_nowait(orig) 6737db96d56Sopenharmony_ci obj = r.recv_nowait() 6747db96d56Sopenharmony_ci assert obj == orig, 'expected: obj == orig' 6757db96d56Sopenharmony_ci # When going back to the same interpreter we get the same object. 6767db96d56Sopenharmony_ci assert obj is not orig, 'expected: obj is not orig' 6777db96d56Sopenharmony_ci """)) 6787db96d56Sopenharmony_ci 6797db96d56Sopenharmony_ci @unittest.skip('broken (see BPO-...)') 6807db96d56Sopenharmony_ci def test_send_recv_nowait_different_interpreters(self): 6817db96d56Sopenharmony_ci r1, s1 = interpreters.create_channel() 6827db96d56Sopenharmony_ci r2, s2 = interpreters.create_channel() 6837db96d56Sopenharmony_ci orig1 = b'spam' 6847db96d56Sopenharmony_ci s1.send_nowait(orig1) 6857db96d56Sopenharmony_ci out = _run_output( 6867db96d56Sopenharmony_ci interpreters.create(), 6877db96d56Sopenharmony_ci dedent(f""" 6887db96d56Sopenharmony_ci obj1 = r.recv_nowait() 6897db96d56Sopenharmony_ci assert obj1 == b'spam', 'expected: obj1 == orig1' 6907db96d56Sopenharmony_ci # When going to another interpreter we get a copy. 6917db96d56Sopenharmony_ci assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1' 6927db96d56Sopenharmony_ci orig2 = b'eggs' 6937db96d56Sopenharmony_ci print(id(orig2)) 6947db96d56Sopenharmony_ci s.send_nowait(orig2) 6957db96d56Sopenharmony_ci """), 6967db96d56Sopenharmony_ci channels=dict(r=r1, s=s2), 6977db96d56Sopenharmony_ci ) 6987db96d56Sopenharmony_ci obj2 = r2.recv_nowait() 6997db96d56Sopenharmony_ci 7007db96d56Sopenharmony_ci self.assertEqual(obj2, b'eggs') 7017db96d56Sopenharmony_ci self.assertNotEqual(id(obj2), int(out)) 7027db96d56Sopenharmony_ci 7037db96d56Sopenharmony_ci def test_recv_channel_does_not_exist(self): 7047db96d56Sopenharmony_ci ch = interpreters.RecvChannel(1_000_000) 7057db96d56Sopenharmony_ci with self.assertRaises(interpreters.ChannelNotFoundError): 7067db96d56Sopenharmony_ci ch.recv() 7077db96d56Sopenharmony_ci 7087db96d56Sopenharmony_ci def test_send_channel_does_not_exist(self): 7097db96d56Sopenharmony_ci ch = interpreters.SendChannel(1_000_000) 7107db96d56Sopenharmony_ci with self.assertRaises(interpreters.ChannelNotFoundError): 7117db96d56Sopenharmony_ci ch.send(b'spam') 7127db96d56Sopenharmony_ci 7137db96d56Sopenharmony_ci def test_recv_nowait_channel_does_not_exist(self): 7147db96d56Sopenharmony_ci ch = interpreters.RecvChannel(1_000_000) 7157db96d56Sopenharmony_ci with self.assertRaises(interpreters.ChannelNotFoundError): 7167db96d56Sopenharmony_ci ch.recv_nowait() 7177db96d56Sopenharmony_ci 7187db96d56Sopenharmony_ci def test_send_nowait_channel_does_not_exist(self): 7197db96d56Sopenharmony_ci ch = interpreters.SendChannel(1_000_000) 7207db96d56Sopenharmony_ci with self.assertRaises(interpreters.ChannelNotFoundError): 7217db96d56Sopenharmony_ci ch.send_nowait(b'spam') 7227db96d56Sopenharmony_ci 7237db96d56Sopenharmony_ci def test_recv_nowait_empty(self): 7247db96d56Sopenharmony_ci ch, _ = interpreters.create_channel() 7257db96d56Sopenharmony_ci with self.assertRaises(interpreters.ChannelEmptyError): 7267db96d56Sopenharmony_ci ch.recv_nowait() 7277db96d56Sopenharmony_ci 7287db96d56Sopenharmony_ci def test_recv_nowait_default(self): 7297db96d56Sopenharmony_ci default = object() 7307db96d56Sopenharmony_ci rch, sch = interpreters.create_channel() 7317db96d56Sopenharmony_ci obj1 = rch.recv_nowait(default) 7327db96d56Sopenharmony_ci sch.send_nowait(None) 7337db96d56Sopenharmony_ci sch.send_nowait(1) 7347db96d56Sopenharmony_ci sch.send_nowait(b'spam') 7357db96d56Sopenharmony_ci sch.send_nowait(b'eggs') 7367db96d56Sopenharmony_ci obj2 = rch.recv_nowait(default) 7377db96d56Sopenharmony_ci obj3 = rch.recv_nowait(default) 7387db96d56Sopenharmony_ci obj4 = rch.recv_nowait() 7397db96d56Sopenharmony_ci obj5 = rch.recv_nowait(default) 7407db96d56Sopenharmony_ci obj6 = rch.recv_nowait(default) 7417db96d56Sopenharmony_ci 7427db96d56Sopenharmony_ci self.assertIs(obj1, default) 7437db96d56Sopenharmony_ci self.assertIs(obj2, None) 7447db96d56Sopenharmony_ci self.assertEqual(obj3, 1) 7457db96d56Sopenharmony_ci self.assertEqual(obj4, b'spam') 7467db96d56Sopenharmony_ci self.assertEqual(obj5, b'eggs') 7477db96d56Sopenharmony_ci self.assertIs(obj6, default) 748