17db96d56Sopenharmony_ciimport contextlib
27db96d56Sopenharmony_ciimport os
37db96d56Sopenharmony_ciimport threading
47db96d56Sopenharmony_cifrom textwrap import dedent
57db96d56Sopenharmony_ciimport unittest
67db96d56Sopenharmony_ciimport time
77db96d56Sopenharmony_ci
87db96d56Sopenharmony_cifrom test import support
97db96d56Sopenharmony_cifrom test.support import import_helper
107db96d56Sopenharmony_ci_interpreters = import_helper.import_module('_xxsubinterpreters')
117db96d56Sopenharmony_cifrom test.support import interpreters
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_ci
147db96d56Sopenharmony_cidef _captured_script(script):
157db96d56Sopenharmony_ci    r, w = os.pipe()
167db96d56Sopenharmony_ci    indented = script.replace('\n', '\n                ')
177db96d56Sopenharmony_ci    wrapped = dedent(f"""
187db96d56Sopenharmony_ci        import contextlib
197db96d56Sopenharmony_ci        with open({w}, 'w', encoding='utf-8') as spipe:
207db96d56Sopenharmony_ci            with contextlib.redirect_stdout(spipe):
217db96d56Sopenharmony_ci                {indented}
227db96d56Sopenharmony_ci        """)
237db96d56Sopenharmony_ci    return wrapped, open(r, encoding='utf-8')
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ci
267db96d56Sopenharmony_cidef clean_up_interpreters():
277db96d56Sopenharmony_ci    for interp in interpreters.list_all():
287db96d56Sopenharmony_ci        if interp.id == 0:  # main
297db96d56Sopenharmony_ci            continue
307db96d56Sopenharmony_ci        try:
317db96d56Sopenharmony_ci            interp.close()
327db96d56Sopenharmony_ci        except RuntimeError:
337db96d56Sopenharmony_ci            pass  # already destroyed
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_cidef _run_output(interp, request, channels=None):
377db96d56Sopenharmony_ci    script, rpipe = _captured_script(request)
387db96d56Sopenharmony_ci    with rpipe:
397db96d56Sopenharmony_ci        interp.run(script, channels=channels)
407db96d56Sopenharmony_ci        return rpipe.read()
417db96d56Sopenharmony_ci
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ci@contextlib.contextmanager
447db96d56Sopenharmony_cidef _running(interp):
457db96d56Sopenharmony_ci    r, w = os.pipe()
467db96d56Sopenharmony_ci    def run():
477db96d56Sopenharmony_ci        interp.run(dedent(f"""
487db96d56Sopenharmony_ci            # wait for "signal"
497db96d56Sopenharmony_ci            with open({r}) as rpipe:
507db96d56Sopenharmony_ci                rpipe.read()
517db96d56Sopenharmony_ci            """))
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci    t = threading.Thread(target=run)
547db96d56Sopenharmony_ci    t.start()
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ci    yield
577db96d56Sopenharmony_ci
587db96d56Sopenharmony_ci    with open(w, 'w') as spipe:
597db96d56Sopenharmony_ci        spipe.write('done')
607db96d56Sopenharmony_ci    t.join()
617db96d56Sopenharmony_ci
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ciclass TestBase(unittest.TestCase):
647db96d56Sopenharmony_ci
657db96d56Sopenharmony_ci    def tearDown(self):
667db96d56Sopenharmony_ci        clean_up_interpreters()
677db96d56Sopenharmony_ci
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ciclass CreateTests(TestBase):
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_ci    def test_in_main(self):
727db96d56Sopenharmony_ci        interp = interpreters.create()
737db96d56Sopenharmony_ci        self.assertIsInstance(interp, interpreters.Interpreter)
747db96d56Sopenharmony_ci        self.assertIn(interp, interpreters.list_all())
757db96d56Sopenharmony_ci
767db96d56Sopenharmony_ci    def test_in_thread(self):
777db96d56Sopenharmony_ci        lock = threading.Lock()
787db96d56Sopenharmony_ci        interp = None
797db96d56Sopenharmony_ci        def f():
807db96d56Sopenharmony_ci            nonlocal interp
817db96d56Sopenharmony_ci            interp = interpreters.create()
827db96d56Sopenharmony_ci            lock.acquire()
837db96d56Sopenharmony_ci            lock.release()
847db96d56Sopenharmony_ci        t = threading.Thread(target=f)
857db96d56Sopenharmony_ci        with lock:
867db96d56Sopenharmony_ci            t.start()
877db96d56Sopenharmony_ci        t.join()
887db96d56Sopenharmony_ci        self.assertIn(interp, interpreters.list_all())
897db96d56Sopenharmony_ci
907db96d56Sopenharmony_ci    def test_in_subinterpreter(self):
917db96d56Sopenharmony_ci        main, = interpreters.list_all()
927db96d56Sopenharmony_ci        interp = interpreters.create()
937db96d56Sopenharmony_ci        out = _run_output(interp, dedent("""
947db96d56Sopenharmony_ci            from test.support import interpreters
957db96d56Sopenharmony_ci            interp = interpreters.create()
967db96d56Sopenharmony_ci            print(interp.id)
977db96d56Sopenharmony_ci            """))
987db96d56Sopenharmony_ci        interp2 = interpreters.Interpreter(int(out))
997db96d56Sopenharmony_ci        self.assertEqual(interpreters.list_all(), [main, interp, interp2])
1007db96d56Sopenharmony_ci
1017db96d56Sopenharmony_ci    def test_after_destroy_all(self):
1027db96d56Sopenharmony_ci        before = set(interpreters.list_all())
1037db96d56Sopenharmony_ci        # Create 3 subinterpreters.
1047db96d56Sopenharmony_ci        interp_lst = []
1057db96d56Sopenharmony_ci        for _ in range(3):
1067db96d56Sopenharmony_ci            interps = interpreters.create()
1077db96d56Sopenharmony_ci            interp_lst.append(interps)
1087db96d56Sopenharmony_ci        # Now destroy them.
1097db96d56Sopenharmony_ci        for interp in interp_lst:
1107db96d56Sopenharmony_ci            interp.close()
1117db96d56Sopenharmony_ci        # Finally, create another.
1127db96d56Sopenharmony_ci        interp = interpreters.create()
1137db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()), before | {interp})
1147db96d56Sopenharmony_ci
1157db96d56Sopenharmony_ci    def test_after_destroy_some(self):
1167db96d56Sopenharmony_ci        before = set(interpreters.list_all())
1177db96d56Sopenharmony_ci        # Create 3 subinterpreters.
1187db96d56Sopenharmony_ci        interp1 = interpreters.create()
1197db96d56Sopenharmony_ci        interp2 = interpreters.create()
1207db96d56Sopenharmony_ci        interp3 = interpreters.create()
1217db96d56Sopenharmony_ci        # Now destroy 2 of them.
1227db96d56Sopenharmony_ci        interp1.close()
1237db96d56Sopenharmony_ci        interp2.close()
1247db96d56Sopenharmony_ci        # Finally, create another.
1257db96d56Sopenharmony_ci        interp = interpreters.create()
1267db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()), before | {interp3, interp})
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci
1297db96d56Sopenharmony_ciclass GetCurrentTests(TestBase):
1307db96d56Sopenharmony_ci
1317db96d56Sopenharmony_ci    def test_main(self):
1327db96d56Sopenharmony_ci        main = interpreters.get_main()
1337db96d56Sopenharmony_ci        current = interpreters.get_current()
1347db96d56Sopenharmony_ci        self.assertEqual(current, main)
1357db96d56Sopenharmony_ci
1367db96d56Sopenharmony_ci    def test_subinterpreter(self):
1377db96d56Sopenharmony_ci        main = _interpreters.get_main()
1387db96d56Sopenharmony_ci        interp = interpreters.create()
1397db96d56Sopenharmony_ci        out = _run_output(interp, dedent("""
1407db96d56Sopenharmony_ci            from test.support import interpreters
1417db96d56Sopenharmony_ci            cur = interpreters.get_current()
1427db96d56Sopenharmony_ci            print(cur.id)
1437db96d56Sopenharmony_ci            """))
1447db96d56Sopenharmony_ci        current = interpreters.Interpreter(int(out))
1457db96d56Sopenharmony_ci        self.assertNotEqual(current, main)
1467db96d56Sopenharmony_ci
1477db96d56Sopenharmony_ci
1487db96d56Sopenharmony_ciclass ListAllTests(TestBase):
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci    def test_initial(self):
1517db96d56Sopenharmony_ci        interps = interpreters.list_all()
1527db96d56Sopenharmony_ci        self.assertEqual(1, len(interps))
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci    def test_after_creating(self):
1557db96d56Sopenharmony_ci        main = interpreters.get_current()
1567db96d56Sopenharmony_ci        first = interpreters.create()
1577db96d56Sopenharmony_ci        second = interpreters.create()
1587db96d56Sopenharmony_ci
1597db96d56Sopenharmony_ci        ids = []
1607db96d56Sopenharmony_ci        for interp in interpreters.list_all():
1617db96d56Sopenharmony_ci            ids.append(interp.id)
1627db96d56Sopenharmony_ci
1637db96d56Sopenharmony_ci        self.assertEqual(ids, [main.id, first.id, second.id])
1647db96d56Sopenharmony_ci
1657db96d56Sopenharmony_ci    def test_after_destroying(self):
1667db96d56Sopenharmony_ci        main = interpreters.get_current()
1677db96d56Sopenharmony_ci        first = interpreters.create()
1687db96d56Sopenharmony_ci        second = interpreters.create()
1697db96d56Sopenharmony_ci        first.close()
1707db96d56Sopenharmony_ci
1717db96d56Sopenharmony_ci        ids = []
1727db96d56Sopenharmony_ci        for interp in interpreters.list_all():
1737db96d56Sopenharmony_ci            ids.append(interp.id)
1747db96d56Sopenharmony_ci
1757db96d56Sopenharmony_ci        self.assertEqual(ids, [main.id, second.id])
1767db96d56Sopenharmony_ci
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ciclass TestInterpreterAttrs(TestBase):
1797db96d56Sopenharmony_ci
1807db96d56Sopenharmony_ci    def test_id_type(self):
1817db96d56Sopenharmony_ci        main = interpreters.get_main()
1827db96d56Sopenharmony_ci        current = interpreters.get_current()
1837db96d56Sopenharmony_ci        interp = interpreters.create()
1847db96d56Sopenharmony_ci        self.assertIsInstance(main.id, _interpreters.InterpreterID)
1857db96d56Sopenharmony_ci        self.assertIsInstance(current.id, _interpreters.InterpreterID)
1867db96d56Sopenharmony_ci        self.assertIsInstance(interp.id, _interpreters.InterpreterID)
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ci    def test_main_id(self):
1897db96d56Sopenharmony_ci        main = interpreters.get_main()
1907db96d56Sopenharmony_ci        self.assertEqual(main.id, 0)
1917db96d56Sopenharmony_ci
1927db96d56Sopenharmony_ci    def test_custom_id(self):
1937db96d56Sopenharmony_ci        interp = interpreters.Interpreter(1)
1947db96d56Sopenharmony_ci        self.assertEqual(interp.id, 1)
1957db96d56Sopenharmony_ci
1967db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
1977db96d56Sopenharmony_ci            interpreters.Interpreter('1')
1987db96d56Sopenharmony_ci
1997db96d56Sopenharmony_ci    def test_id_readonly(self):
2007db96d56Sopenharmony_ci        interp = interpreters.Interpreter(1)
2017db96d56Sopenharmony_ci        with self.assertRaises(AttributeError):
2027db96d56Sopenharmony_ci            interp.id = 2
2037db96d56Sopenharmony_ci
2047db96d56Sopenharmony_ci    @unittest.skip('not ready yet (see bpo-32604)')
2057db96d56Sopenharmony_ci    def test_main_isolated(self):
2067db96d56Sopenharmony_ci        main = interpreters.get_main()
2077db96d56Sopenharmony_ci        self.assertFalse(main.isolated)
2087db96d56Sopenharmony_ci
2097db96d56Sopenharmony_ci    @unittest.skip('not ready yet (see bpo-32604)')
2107db96d56Sopenharmony_ci    def test_subinterpreter_isolated_default(self):
2117db96d56Sopenharmony_ci        interp = interpreters.create()
2127db96d56Sopenharmony_ci        self.assertFalse(interp.isolated)
2137db96d56Sopenharmony_ci
2147db96d56Sopenharmony_ci    def test_subinterpreter_isolated_explicit(self):
2157db96d56Sopenharmony_ci        interp1 = interpreters.create(isolated=True)
2167db96d56Sopenharmony_ci        interp2 = interpreters.create(isolated=False)
2177db96d56Sopenharmony_ci        self.assertTrue(interp1.isolated)
2187db96d56Sopenharmony_ci        self.assertFalse(interp2.isolated)
2197db96d56Sopenharmony_ci
2207db96d56Sopenharmony_ci    @unittest.skip('not ready yet (see bpo-32604)')
2217db96d56Sopenharmony_ci    def test_custom_isolated_default(self):
2227db96d56Sopenharmony_ci        interp = interpreters.Interpreter(1)
2237db96d56Sopenharmony_ci        self.assertFalse(interp.isolated)
2247db96d56Sopenharmony_ci
2257db96d56Sopenharmony_ci    def test_custom_isolated_explicit(self):
2267db96d56Sopenharmony_ci        interp1 = interpreters.Interpreter(1, isolated=True)
2277db96d56Sopenharmony_ci        interp2 = interpreters.Interpreter(1, isolated=False)
2287db96d56Sopenharmony_ci        self.assertTrue(interp1.isolated)
2297db96d56Sopenharmony_ci        self.assertFalse(interp2.isolated)
2307db96d56Sopenharmony_ci
2317db96d56Sopenharmony_ci    def test_isolated_readonly(self):
2327db96d56Sopenharmony_ci        interp = interpreters.Interpreter(1)
2337db96d56Sopenharmony_ci        with self.assertRaises(AttributeError):
2347db96d56Sopenharmony_ci            interp.isolated = True
2357db96d56Sopenharmony_ci
2367db96d56Sopenharmony_ci    def test_equality(self):
2377db96d56Sopenharmony_ci        interp1 = interpreters.create()
2387db96d56Sopenharmony_ci        interp2 = interpreters.create()
2397db96d56Sopenharmony_ci        self.assertEqual(interp1, interp1)
2407db96d56Sopenharmony_ci        self.assertNotEqual(interp1, interp2)
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_ci
2437db96d56Sopenharmony_ciclass TestInterpreterIsRunning(TestBase):
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ci    def test_main(self):
2467db96d56Sopenharmony_ci        main = interpreters.get_main()
2477db96d56Sopenharmony_ci        self.assertTrue(main.is_running())
2487db96d56Sopenharmony_ci
2497db96d56Sopenharmony_ci    @unittest.skip('Fails on FreeBSD')
2507db96d56Sopenharmony_ci    def test_subinterpreter(self):
2517db96d56Sopenharmony_ci        interp = interpreters.create()
2527db96d56Sopenharmony_ci        self.assertFalse(interp.is_running())
2537db96d56Sopenharmony_ci
2547db96d56Sopenharmony_ci        with _running(interp):
2557db96d56Sopenharmony_ci            self.assertTrue(interp.is_running())
2567db96d56Sopenharmony_ci        self.assertFalse(interp.is_running())
2577db96d56Sopenharmony_ci
2587db96d56Sopenharmony_ci    def test_from_subinterpreter(self):
2597db96d56Sopenharmony_ci        interp = interpreters.create()
2607db96d56Sopenharmony_ci        out = _run_output(interp, dedent(f"""
2617db96d56Sopenharmony_ci            import _xxsubinterpreters as _interpreters
2627db96d56Sopenharmony_ci            if _interpreters.is_running({interp.id}):
2637db96d56Sopenharmony_ci                print(True)
2647db96d56Sopenharmony_ci            else:
2657db96d56Sopenharmony_ci                print(False)
2667db96d56Sopenharmony_ci            """))
2677db96d56Sopenharmony_ci        self.assertEqual(out.strip(), 'True')
2687db96d56Sopenharmony_ci
2697db96d56Sopenharmony_ci    def test_already_destroyed(self):
2707db96d56Sopenharmony_ci        interp = interpreters.create()
2717db96d56Sopenharmony_ci        interp.close()
2727db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
2737db96d56Sopenharmony_ci            interp.is_running()
2747db96d56Sopenharmony_ci
2757db96d56Sopenharmony_ci    def test_does_not_exist(self):
2767db96d56Sopenharmony_ci        interp = interpreters.Interpreter(1_000_000)
2777db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
2787db96d56Sopenharmony_ci            interp.is_running()
2797db96d56Sopenharmony_ci
2807db96d56Sopenharmony_ci    def test_bad_id(self):
2817db96d56Sopenharmony_ci        interp = interpreters.Interpreter(-1)
2827db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
2837db96d56Sopenharmony_ci            interp.is_running()
2847db96d56Sopenharmony_ci
2857db96d56Sopenharmony_ci
2867db96d56Sopenharmony_ciclass TestInterpreterClose(TestBase):
2877db96d56Sopenharmony_ci
2887db96d56Sopenharmony_ci    def test_basic(self):
2897db96d56Sopenharmony_ci        main = interpreters.get_main()
2907db96d56Sopenharmony_ci        interp1 = interpreters.create()
2917db96d56Sopenharmony_ci        interp2 = interpreters.create()
2927db96d56Sopenharmony_ci        interp3 = interpreters.create()
2937db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()),
2947db96d56Sopenharmony_ci                         {main, interp1, interp2, interp3})
2957db96d56Sopenharmony_ci        interp2.close()
2967db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()),
2977db96d56Sopenharmony_ci                         {main, interp1, interp3})
2987db96d56Sopenharmony_ci
2997db96d56Sopenharmony_ci    def test_all(self):
3007db96d56Sopenharmony_ci        before = set(interpreters.list_all())
3017db96d56Sopenharmony_ci        interps = set()
3027db96d56Sopenharmony_ci        for _ in range(3):
3037db96d56Sopenharmony_ci            interp = interpreters.create()
3047db96d56Sopenharmony_ci            interps.add(interp)
3057db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()), before | interps)
3067db96d56Sopenharmony_ci        for interp in interps:
3077db96d56Sopenharmony_ci            interp.close()
3087db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()), before)
3097db96d56Sopenharmony_ci
3107db96d56Sopenharmony_ci    def test_main(self):
3117db96d56Sopenharmony_ci        main, = interpreters.list_all()
3127db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
3137db96d56Sopenharmony_ci            main.close()
3147db96d56Sopenharmony_ci
3157db96d56Sopenharmony_ci        def f():
3167db96d56Sopenharmony_ci            with self.assertRaises(RuntimeError):
3177db96d56Sopenharmony_ci                main.close()
3187db96d56Sopenharmony_ci
3197db96d56Sopenharmony_ci        t = threading.Thread(target=f)
3207db96d56Sopenharmony_ci        t.start()
3217db96d56Sopenharmony_ci        t.join()
3227db96d56Sopenharmony_ci
3237db96d56Sopenharmony_ci    def test_already_destroyed(self):
3247db96d56Sopenharmony_ci        interp = interpreters.create()
3257db96d56Sopenharmony_ci        interp.close()
3267db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
3277db96d56Sopenharmony_ci            interp.close()
3287db96d56Sopenharmony_ci
3297db96d56Sopenharmony_ci    def test_does_not_exist(self):
3307db96d56Sopenharmony_ci        interp = interpreters.Interpreter(1_000_000)
3317db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
3327db96d56Sopenharmony_ci            interp.close()
3337db96d56Sopenharmony_ci
3347db96d56Sopenharmony_ci    def test_bad_id(self):
3357db96d56Sopenharmony_ci        interp = interpreters.Interpreter(-1)
3367db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
3377db96d56Sopenharmony_ci            interp.close()
3387db96d56Sopenharmony_ci
3397db96d56Sopenharmony_ci    def test_from_current(self):
3407db96d56Sopenharmony_ci        main, = interpreters.list_all()
3417db96d56Sopenharmony_ci        interp = interpreters.create()
3427db96d56Sopenharmony_ci        out = _run_output(interp, dedent(f"""
3437db96d56Sopenharmony_ci            from test.support import interpreters
3447db96d56Sopenharmony_ci            interp = interpreters.Interpreter({int(interp.id)})
3457db96d56Sopenharmony_ci            try:
3467db96d56Sopenharmony_ci                interp.close()
3477db96d56Sopenharmony_ci            except RuntimeError:
3487db96d56Sopenharmony_ci                print('failed')
3497db96d56Sopenharmony_ci            """))
3507db96d56Sopenharmony_ci        self.assertEqual(out.strip(), 'failed')
3517db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()), {main, interp})
3527db96d56Sopenharmony_ci
3537db96d56Sopenharmony_ci    def test_from_sibling(self):
3547db96d56Sopenharmony_ci        main, = interpreters.list_all()
3557db96d56Sopenharmony_ci        interp1 = interpreters.create()
3567db96d56Sopenharmony_ci        interp2 = interpreters.create()
3577db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()),
3587db96d56Sopenharmony_ci                         {main, interp1, interp2})
3597db96d56Sopenharmony_ci        interp1.run(dedent(f"""
3607db96d56Sopenharmony_ci            from test.support import interpreters
3617db96d56Sopenharmony_ci            interp2 = interpreters.Interpreter(int({interp2.id}))
3627db96d56Sopenharmony_ci            interp2.close()
3637db96d56Sopenharmony_ci            interp3 = interpreters.create()
3647db96d56Sopenharmony_ci            interp3.close()
3657db96d56Sopenharmony_ci            """))
3667db96d56Sopenharmony_ci        self.assertEqual(set(interpreters.list_all()), {main, interp1})
3677db96d56Sopenharmony_ci
3687db96d56Sopenharmony_ci    def test_from_other_thread(self):
3697db96d56Sopenharmony_ci        interp = interpreters.create()
3707db96d56Sopenharmony_ci        def f():
3717db96d56Sopenharmony_ci            interp.close()
3727db96d56Sopenharmony_ci
3737db96d56Sopenharmony_ci        t = threading.Thread(target=f)
3747db96d56Sopenharmony_ci        t.start()
3757db96d56Sopenharmony_ci        t.join()
3767db96d56Sopenharmony_ci
3777db96d56Sopenharmony_ci    @unittest.skip('Fails on FreeBSD')
3787db96d56Sopenharmony_ci    def test_still_running(self):
3797db96d56Sopenharmony_ci        main, = interpreters.list_all()
3807db96d56Sopenharmony_ci        interp = interpreters.create()
3817db96d56Sopenharmony_ci        with _running(interp):
3827db96d56Sopenharmony_ci            with self.assertRaises(RuntimeError):
3837db96d56Sopenharmony_ci                interp.close()
3847db96d56Sopenharmony_ci            self.assertTrue(interp.is_running())
3857db96d56Sopenharmony_ci
3867db96d56Sopenharmony_ci
3877db96d56Sopenharmony_ciclass TestInterpreterRun(TestBase):
3887db96d56Sopenharmony_ci
3897db96d56Sopenharmony_ci    def test_success(self):
3907db96d56Sopenharmony_ci        interp = interpreters.create()
3917db96d56Sopenharmony_ci        script, file = _captured_script('print("it worked!", end="")')
3927db96d56Sopenharmony_ci        with file:
3937db96d56Sopenharmony_ci            interp.run(script)
3947db96d56Sopenharmony_ci            out = file.read()
3957db96d56Sopenharmony_ci
3967db96d56Sopenharmony_ci        self.assertEqual(out, 'it worked!')
3977db96d56Sopenharmony_ci
3987db96d56Sopenharmony_ci    def test_in_thread(self):
3997db96d56Sopenharmony_ci        interp = interpreters.create()
4007db96d56Sopenharmony_ci        script, file = _captured_script('print("it worked!", end="")')
4017db96d56Sopenharmony_ci        with file:
4027db96d56Sopenharmony_ci            def f():
4037db96d56Sopenharmony_ci                interp.run(script)
4047db96d56Sopenharmony_ci
4057db96d56Sopenharmony_ci            t = threading.Thread(target=f)
4067db96d56Sopenharmony_ci            t.start()
4077db96d56Sopenharmony_ci            t.join()
4087db96d56Sopenharmony_ci            out = file.read()
4097db96d56Sopenharmony_ci
4107db96d56Sopenharmony_ci        self.assertEqual(out, 'it worked!')
4117db96d56Sopenharmony_ci
4127db96d56Sopenharmony_ci    @support.requires_fork()
4137db96d56Sopenharmony_ci    def test_fork(self):
4147db96d56Sopenharmony_ci        interp = interpreters.create()
4157db96d56Sopenharmony_ci        import tempfile
4167db96d56Sopenharmony_ci        with tempfile.NamedTemporaryFile('w+', encoding='utf-8') as file:
4177db96d56Sopenharmony_ci            file.write('')
4187db96d56Sopenharmony_ci            file.flush()
4197db96d56Sopenharmony_ci
4207db96d56Sopenharmony_ci            expected = 'spam spam spam spam spam'
4217db96d56Sopenharmony_ci            script = dedent(f"""
4227db96d56Sopenharmony_ci                import os
4237db96d56Sopenharmony_ci                try:
4247db96d56Sopenharmony_ci                    os.fork()
4257db96d56Sopenharmony_ci                except RuntimeError:
4267db96d56Sopenharmony_ci                    with open('{file.name}', 'w', encoding='utf-8') as out:
4277db96d56Sopenharmony_ci                        out.write('{expected}')
4287db96d56Sopenharmony_ci                """)
4297db96d56Sopenharmony_ci            interp.run(script)
4307db96d56Sopenharmony_ci
4317db96d56Sopenharmony_ci            file.seek(0)
4327db96d56Sopenharmony_ci            content = file.read()
4337db96d56Sopenharmony_ci            self.assertEqual(content, expected)
4347db96d56Sopenharmony_ci
4357db96d56Sopenharmony_ci    @unittest.skip('Fails on FreeBSD')
4367db96d56Sopenharmony_ci    def test_already_running(self):
4377db96d56Sopenharmony_ci        interp = interpreters.create()
4387db96d56Sopenharmony_ci        with _running(interp):
4397db96d56Sopenharmony_ci            with self.assertRaises(RuntimeError):
4407db96d56Sopenharmony_ci                interp.run('print("spam")')
4417db96d56Sopenharmony_ci
4427db96d56Sopenharmony_ci    def test_does_not_exist(self):
4437db96d56Sopenharmony_ci        interp = interpreters.Interpreter(1_000_000)
4447db96d56Sopenharmony_ci        with self.assertRaises(RuntimeError):
4457db96d56Sopenharmony_ci            interp.run('print("spam")')
4467db96d56Sopenharmony_ci
4477db96d56Sopenharmony_ci    def test_bad_id(self):
4487db96d56Sopenharmony_ci        interp = interpreters.Interpreter(-1)
4497db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
4507db96d56Sopenharmony_ci            interp.run('print("spam")')
4517db96d56Sopenharmony_ci
4527db96d56Sopenharmony_ci    def test_bad_script(self):
4537db96d56Sopenharmony_ci        interp = interpreters.create()
4547db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
4557db96d56Sopenharmony_ci            interp.run(10)
4567db96d56Sopenharmony_ci
4577db96d56Sopenharmony_ci    def test_bytes_for_script(self):
4587db96d56Sopenharmony_ci        interp = interpreters.create()
4597db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
4607db96d56Sopenharmony_ci            interp.run(b'print("spam")')
4617db96d56Sopenharmony_ci
4627db96d56Sopenharmony_ci    # test_xxsubinterpreters covers the remaining Interpreter.run() behavior.
4637db96d56Sopenharmony_ci
4647db96d56Sopenharmony_ci
4657db96d56Sopenharmony_ciclass TestIsShareable(TestBase):
4667db96d56Sopenharmony_ci
4677db96d56Sopenharmony_ci    def test_default_shareables(self):
4687db96d56Sopenharmony_ci        shareables = [
4697db96d56Sopenharmony_ci                # singletons
4707db96d56Sopenharmony_ci                None,
4717db96d56Sopenharmony_ci                # builtin objects
4727db96d56Sopenharmony_ci                b'spam',
4737db96d56Sopenharmony_ci                'spam',
4747db96d56Sopenharmony_ci                10,
4757db96d56Sopenharmony_ci                -10,
4767db96d56Sopenharmony_ci                ]
4777db96d56Sopenharmony_ci        for obj in shareables:
4787db96d56Sopenharmony_ci            with self.subTest(obj):
4797db96d56Sopenharmony_ci                shareable = interpreters.is_shareable(obj)
4807db96d56Sopenharmony_ci                self.assertTrue(shareable)
4817db96d56Sopenharmony_ci
4827db96d56Sopenharmony_ci    def test_not_shareable(self):
4837db96d56Sopenharmony_ci        class Cheese:
4847db96d56Sopenharmony_ci            def __init__(self, name):
4857db96d56Sopenharmony_ci                self.name = name
4867db96d56Sopenharmony_ci            def __str__(self):
4877db96d56Sopenharmony_ci                return self.name
4887db96d56Sopenharmony_ci
4897db96d56Sopenharmony_ci        class SubBytes(bytes):
4907db96d56Sopenharmony_ci            """A subclass of a shareable type."""
4917db96d56Sopenharmony_ci
4927db96d56Sopenharmony_ci        not_shareables = [
4937db96d56Sopenharmony_ci                # singletons
4947db96d56Sopenharmony_ci                True,
4957db96d56Sopenharmony_ci                False,
4967db96d56Sopenharmony_ci                NotImplemented,
4977db96d56Sopenharmony_ci                ...,
4987db96d56Sopenharmony_ci                # builtin types and objects
4997db96d56Sopenharmony_ci                type,
5007db96d56Sopenharmony_ci                object,
5017db96d56Sopenharmony_ci                object(),
5027db96d56Sopenharmony_ci                Exception(),
5037db96d56Sopenharmony_ci                100.0,
5047db96d56Sopenharmony_ci                # user-defined types and objects
5057db96d56Sopenharmony_ci                Cheese,
5067db96d56Sopenharmony_ci                Cheese('Wensleydale'),
5077db96d56Sopenharmony_ci                SubBytes(b'spam'),
5087db96d56Sopenharmony_ci                ]
5097db96d56Sopenharmony_ci        for obj in not_shareables:
5107db96d56Sopenharmony_ci            with self.subTest(repr(obj)):
5117db96d56Sopenharmony_ci                self.assertFalse(
5127db96d56Sopenharmony_ci                    interpreters.is_shareable(obj))
5137db96d56Sopenharmony_ci
5147db96d56Sopenharmony_ci
5157db96d56Sopenharmony_ciclass TestChannels(TestBase):
5167db96d56Sopenharmony_ci
5177db96d56Sopenharmony_ci    def test_create(self):
5187db96d56Sopenharmony_ci        r, s = interpreters.create_channel()
5197db96d56Sopenharmony_ci        self.assertIsInstance(r, interpreters.RecvChannel)
5207db96d56Sopenharmony_ci        self.assertIsInstance(s, interpreters.SendChannel)
5217db96d56Sopenharmony_ci
5227db96d56Sopenharmony_ci    def test_list_all(self):
5237db96d56Sopenharmony_ci        self.assertEqual(interpreters.list_all_channels(), [])
5247db96d56Sopenharmony_ci        created = set()
5257db96d56Sopenharmony_ci        for _ in range(3):
5267db96d56Sopenharmony_ci            ch = interpreters.create_channel()
5277db96d56Sopenharmony_ci            created.add(ch)
5287db96d56Sopenharmony_ci        after = set(interpreters.list_all_channels())
5297db96d56Sopenharmony_ci        self.assertEqual(after, created)
5307db96d56Sopenharmony_ci
5317db96d56Sopenharmony_ci
5327db96d56Sopenharmony_ciclass TestRecvChannelAttrs(TestBase):
5337db96d56Sopenharmony_ci
5347db96d56Sopenharmony_ci    def test_id_type(self):
5357db96d56Sopenharmony_ci        rch, _ = interpreters.create_channel()
5367db96d56Sopenharmony_ci        self.assertIsInstance(rch.id, _interpreters.ChannelID)
5377db96d56Sopenharmony_ci
5387db96d56Sopenharmony_ci    def test_custom_id(self):
5397db96d56Sopenharmony_ci        rch = interpreters.RecvChannel(1)
5407db96d56Sopenharmony_ci        self.assertEqual(rch.id, 1)
5417db96d56Sopenharmony_ci
5427db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
5437db96d56Sopenharmony_ci            interpreters.RecvChannel('1')
5447db96d56Sopenharmony_ci
5457db96d56Sopenharmony_ci    def test_id_readonly(self):
5467db96d56Sopenharmony_ci        rch = interpreters.RecvChannel(1)
5477db96d56Sopenharmony_ci        with self.assertRaises(AttributeError):
5487db96d56Sopenharmony_ci            rch.id = 2
5497db96d56Sopenharmony_ci
5507db96d56Sopenharmony_ci    def test_equality(self):
5517db96d56Sopenharmony_ci        ch1, _ = interpreters.create_channel()
5527db96d56Sopenharmony_ci        ch2, _ = interpreters.create_channel()
5537db96d56Sopenharmony_ci        self.assertEqual(ch1, ch1)
5547db96d56Sopenharmony_ci        self.assertNotEqual(ch1, ch2)
5557db96d56Sopenharmony_ci
5567db96d56Sopenharmony_ci
5577db96d56Sopenharmony_ciclass TestSendChannelAttrs(TestBase):
5587db96d56Sopenharmony_ci
5597db96d56Sopenharmony_ci    def test_id_type(self):
5607db96d56Sopenharmony_ci        _, sch = interpreters.create_channel()
5617db96d56Sopenharmony_ci        self.assertIsInstance(sch.id, _interpreters.ChannelID)
5627db96d56Sopenharmony_ci
5637db96d56Sopenharmony_ci    def test_custom_id(self):
5647db96d56Sopenharmony_ci        sch = interpreters.SendChannel(1)
5657db96d56Sopenharmony_ci        self.assertEqual(sch.id, 1)
5667db96d56Sopenharmony_ci
5677db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
5687db96d56Sopenharmony_ci            interpreters.SendChannel('1')
5697db96d56Sopenharmony_ci
5707db96d56Sopenharmony_ci    def test_id_readonly(self):
5717db96d56Sopenharmony_ci        sch = interpreters.SendChannel(1)
5727db96d56Sopenharmony_ci        with self.assertRaises(AttributeError):
5737db96d56Sopenharmony_ci            sch.id = 2
5747db96d56Sopenharmony_ci
5757db96d56Sopenharmony_ci    def test_equality(self):
5767db96d56Sopenharmony_ci        _, ch1 = interpreters.create_channel()
5777db96d56Sopenharmony_ci        _, ch2 = interpreters.create_channel()
5787db96d56Sopenharmony_ci        self.assertEqual(ch1, ch1)
5797db96d56Sopenharmony_ci        self.assertNotEqual(ch1, ch2)
5807db96d56Sopenharmony_ci
5817db96d56Sopenharmony_ci
5827db96d56Sopenharmony_ciclass TestSendRecv(TestBase):
5837db96d56Sopenharmony_ci
5847db96d56Sopenharmony_ci    def test_send_recv_main(self):
5857db96d56Sopenharmony_ci        r, s = interpreters.create_channel()
5867db96d56Sopenharmony_ci        orig = b'spam'
5877db96d56Sopenharmony_ci        s.send_nowait(orig)
5887db96d56Sopenharmony_ci        obj = r.recv()
5897db96d56Sopenharmony_ci
5907db96d56Sopenharmony_ci        self.assertEqual(obj, orig)
5917db96d56Sopenharmony_ci        self.assertIsNot(obj, orig)
5927db96d56Sopenharmony_ci
5937db96d56Sopenharmony_ci    def test_send_recv_same_interpreter(self):
5947db96d56Sopenharmony_ci        interp = interpreters.create()
5957db96d56Sopenharmony_ci        interp.run(dedent("""
5967db96d56Sopenharmony_ci            from test.support import interpreters
5977db96d56Sopenharmony_ci            r, s = interpreters.create_channel()
5987db96d56Sopenharmony_ci            orig = b'spam'
5997db96d56Sopenharmony_ci            s.send_nowait(orig)
6007db96d56Sopenharmony_ci            obj = r.recv()
6017db96d56Sopenharmony_ci            assert obj == orig, 'expected: obj == orig'
6027db96d56Sopenharmony_ci            assert obj is not orig, 'expected: obj is not orig'
6037db96d56Sopenharmony_ci            """))
6047db96d56Sopenharmony_ci
6057db96d56Sopenharmony_ci    @unittest.skip('broken (see BPO-...)')
6067db96d56Sopenharmony_ci    def test_send_recv_different_interpreters(self):
6077db96d56Sopenharmony_ci        r1, s1 = interpreters.create_channel()
6087db96d56Sopenharmony_ci        r2, s2 = interpreters.create_channel()
6097db96d56Sopenharmony_ci        orig1 = b'spam'
6107db96d56Sopenharmony_ci        s1.send_nowait(orig1)
6117db96d56Sopenharmony_ci        out = _run_output(
6127db96d56Sopenharmony_ci            interpreters.create(),
6137db96d56Sopenharmony_ci            dedent(f"""
6147db96d56Sopenharmony_ci                obj1 = r.recv()
6157db96d56Sopenharmony_ci                assert obj1 == b'spam', 'expected: obj1 == orig1'
6167db96d56Sopenharmony_ci                # When going to another interpreter we get a copy.
6177db96d56Sopenharmony_ci                assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
6187db96d56Sopenharmony_ci                orig2 = b'eggs'
6197db96d56Sopenharmony_ci                print(id(orig2))
6207db96d56Sopenharmony_ci                s.send_nowait(orig2)
6217db96d56Sopenharmony_ci                """),
6227db96d56Sopenharmony_ci            channels=dict(r=r1, s=s2),
6237db96d56Sopenharmony_ci            )
6247db96d56Sopenharmony_ci        obj2 = r2.recv()
6257db96d56Sopenharmony_ci
6267db96d56Sopenharmony_ci        self.assertEqual(obj2, b'eggs')
6277db96d56Sopenharmony_ci        self.assertNotEqual(id(obj2), int(out))
6287db96d56Sopenharmony_ci
6297db96d56Sopenharmony_ci    def test_send_recv_different_threads(self):
6307db96d56Sopenharmony_ci        r, s = interpreters.create_channel()
6317db96d56Sopenharmony_ci
6327db96d56Sopenharmony_ci        def f():
6337db96d56Sopenharmony_ci            while True:
6347db96d56Sopenharmony_ci                try:
6357db96d56Sopenharmony_ci                    obj = r.recv()
6367db96d56Sopenharmony_ci                    break
6377db96d56Sopenharmony_ci                except interpreters.ChannelEmptyError:
6387db96d56Sopenharmony_ci                    time.sleep(0.1)
6397db96d56Sopenharmony_ci            s.send(obj)
6407db96d56Sopenharmony_ci        t = threading.Thread(target=f)
6417db96d56Sopenharmony_ci        t.start()
6427db96d56Sopenharmony_ci
6437db96d56Sopenharmony_ci        orig = b'spam'
6447db96d56Sopenharmony_ci        s.send(orig)
6457db96d56Sopenharmony_ci        t.join()
6467db96d56Sopenharmony_ci        obj = r.recv()
6477db96d56Sopenharmony_ci
6487db96d56Sopenharmony_ci        self.assertEqual(obj, orig)
6497db96d56Sopenharmony_ci        self.assertIsNot(obj, orig)
6507db96d56Sopenharmony_ci
6517db96d56Sopenharmony_ci    def test_send_recv_nowait_main(self):
6527db96d56Sopenharmony_ci        r, s = interpreters.create_channel()
6537db96d56Sopenharmony_ci        orig = b'spam'
6547db96d56Sopenharmony_ci        s.send_nowait(orig)
6557db96d56Sopenharmony_ci        obj = r.recv_nowait()
6567db96d56Sopenharmony_ci
6577db96d56Sopenharmony_ci        self.assertEqual(obj, orig)
6587db96d56Sopenharmony_ci        self.assertIsNot(obj, orig)
6597db96d56Sopenharmony_ci
6607db96d56Sopenharmony_ci    def test_send_recv_nowait_main_with_default(self):
6617db96d56Sopenharmony_ci        r, _ = interpreters.create_channel()
6627db96d56Sopenharmony_ci        obj = r.recv_nowait(None)
6637db96d56Sopenharmony_ci
6647db96d56Sopenharmony_ci        self.assertIsNone(obj)
6657db96d56Sopenharmony_ci
6667db96d56Sopenharmony_ci    def test_send_recv_nowait_same_interpreter(self):
6677db96d56Sopenharmony_ci        interp = interpreters.create()
6687db96d56Sopenharmony_ci        interp.run(dedent("""
6697db96d56Sopenharmony_ci            from test.support import interpreters
6707db96d56Sopenharmony_ci            r, s = interpreters.create_channel()
6717db96d56Sopenharmony_ci            orig = b'spam'
6727db96d56Sopenharmony_ci            s.send_nowait(orig)
6737db96d56Sopenharmony_ci            obj = r.recv_nowait()
6747db96d56Sopenharmony_ci            assert obj == orig, 'expected: obj == orig'
6757db96d56Sopenharmony_ci            # When going back to the same interpreter we get the same object.
6767db96d56Sopenharmony_ci            assert obj is not orig, 'expected: obj is not orig'
6777db96d56Sopenharmony_ci            """))
6787db96d56Sopenharmony_ci
6797db96d56Sopenharmony_ci    @unittest.skip('broken (see BPO-...)')
6807db96d56Sopenharmony_ci    def test_send_recv_nowait_different_interpreters(self):
6817db96d56Sopenharmony_ci        r1, s1 = interpreters.create_channel()
6827db96d56Sopenharmony_ci        r2, s2 = interpreters.create_channel()
6837db96d56Sopenharmony_ci        orig1 = b'spam'
6847db96d56Sopenharmony_ci        s1.send_nowait(orig1)
6857db96d56Sopenharmony_ci        out = _run_output(
6867db96d56Sopenharmony_ci            interpreters.create(),
6877db96d56Sopenharmony_ci            dedent(f"""
6887db96d56Sopenharmony_ci                obj1 = r.recv_nowait()
6897db96d56Sopenharmony_ci                assert obj1 == b'spam', 'expected: obj1 == orig1'
6907db96d56Sopenharmony_ci                # When going to another interpreter we get a copy.
6917db96d56Sopenharmony_ci                assert id(obj1) != {id(orig1)}, 'expected: obj1 is not orig1'
6927db96d56Sopenharmony_ci                orig2 = b'eggs'
6937db96d56Sopenharmony_ci                print(id(orig2))
6947db96d56Sopenharmony_ci                s.send_nowait(orig2)
6957db96d56Sopenharmony_ci                """),
6967db96d56Sopenharmony_ci            channels=dict(r=r1, s=s2),
6977db96d56Sopenharmony_ci            )
6987db96d56Sopenharmony_ci        obj2 = r2.recv_nowait()
6997db96d56Sopenharmony_ci
7007db96d56Sopenharmony_ci        self.assertEqual(obj2, b'eggs')
7017db96d56Sopenharmony_ci        self.assertNotEqual(id(obj2), int(out))
7027db96d56Sopenharmony_ci
7037db96d56Sopenharmony_ci    def test_recv_channel_does_not_exist(self):
7047db96d56Sopenharmony_ci        ch = interpreters.RecvChannel(1_000_000)
7057db96d56Sopenharmony_ci        with self.assertRaises(interpreters.ChannelNotFoundError):
7067db96d56Sopenharmony_ci            ch.recv()
7077db96d56Sopenharmony_ci
7087db96d56Sopenharmony_ci    def test_send_channel_does_not_exist(self):
7097db96d56Sopenharmony_ci        ch = interpreters.SendChannel(1_000_000)
7107db96d56Sopenharmony_ci        with self.assertRaises(interpreters.ChannelNotFoundError):
7117db96d56Sopenharmony_ci            ch.send(b'spam')
7127db96d56Sopenharmony_ci
7137db96d56Sopenharmony_ci    def test_recv_nowait_channel_does_not_exist(self):
7147db96d56Sopenharmony_ci        ch = interpreters.RecvChannel(1_000_000)
7157db96d56Sopenharmony_ci        with self.assertRaises(interpreters.ChannelNotFoundError):
7167db96d56Sopenharmony_ci            ch.recv_nowait()
7177db96d56Sopenharmony_ci
7187db96d56Sopenharmony_ci    def test_send_nowait_channel_does_not_exist(self):
7197db96d56Sopenharmony_ci        ch = interpreters.SendChannel(1_000_000)
7207db96d56Sopenharmony_ci        with self.assertRaises(interpreters.ChannelNotFoundError):
7217db96d56Sopenharmony_ci            ch.send_nowait(b'spam')
7227db96d56Sopenharmony_ci
7237db96d56Sopenharmony_ci    def test_recv_nowait_empty(self):
7247db96d56Sopenharmony_ci        ch, _ = interpreters.create_channel()
7257db96d56Sopenharmony_ci        with self.assertRaises(interpreters.ChannelEmptyError):
7267db96d56Sopenharmony_ci            ch.recv_nowait()
7277db96d56Sopenharmony_ci
7287db96d56Sopenharmony_ci    def test_recv_nowait_default(self):
7297db96d56Sopenharmony_ci        default = object()
7307db96d56Sopenharmony_ci        rch, sch = interpreters.create_channel()
7317db96d56Sopenharmony_ci        obj1 = rch.recv_nowait(default)
7327db96d56Sopenharmony_ci        sch.send_nowait(None)
7337db96d56Sopenharmony_ci        sch.send_nowait(1)
7347db96d56Sopenharmony_ci        sch.send_nowait(b'spam')
7357db96d56Sopenharmony_ci        sch.send_nowait(b'eggs')
7367db96d56Sopenharmony_ci        obj2 = rch.recv_nowait(default)
7377db96d56Sopenharmony_ci        obj3 = rch.recv_nowait(default)
7387db96d56Sopenharmony_ci        obj4 = rch.recv_nowait()
7397db96d56Sopenharmony_ci        obj5 = rch.recv_nowait(default)
7407db96d56Sopenharmony_ci        obj6 = rch.recv_nowait(default)
7417db96d56Sopenharmony_ci
7427db96d56Sopenharmony_ci        self.assertIs(obj1, default)
7437db96d56Sopenharmony_ci        self.assertIs(obj2, None)
7447db96d56Sopenharmony_ci        self.assertEqual(obj3, 1)
7457db96d56Sopenharmony_ci        self.assertEqual(obj4, b'spam')
7467db96d56Sopenharmony_ci        self.assertEqual(obj5, b'eggs')
7477db96d56Sopenharmony_ci        self.assertIs(obj6, default)
748