17db96d56Sopenharmony_cifrom _compat_pickle import (IMPORT_MAPPING, REVERSE_IMPORT_MAPPING, 27db96d56Sopenharmony_ci NAME_MAPPING, REVERSE_NAME_MAPPING) 37db96d56Sopenharmony_ciimport builtins 47db96d56Sopenharmony_ciimport pickle 57db96d56Sopenharmony_ciimport io 67db96d56Sopenharmony_ciimport collections 77db96d56Sopenharmony_ciimport struct 87db96d56Sopenharmony_ciimport sys 97db96d56Sopenharmony_ciimport warnings 107db96d56Sopenharmony_ciimport weakref 117db96d56Sopenharmony_ci 127db96d56Sopenharmony_ciimport doctest 137db96d56Sopenharmony_ciimport unittest 147db96d56Sopenharmony_cifrom test import support 157db96d56Sopenharmony_cifrom test.support import import_helper 167db96d56Sopenharmony_ci 177db96d56Sopenharmony_cifrom test.pickletester import AbstractHookTests 187db96d56Sopenharmony_cifrom test.pickletester import AbstractUnpickleTests 197db96d56Sopenharmony_cifrom test.pickletester import AbstractPickleTests 207db96d56Sopenharmony_cifrom test.pickletester import AbstractPickleModuleTests 217db96d56Sopenharmony_cifrom test.pickletester import AbstractPersistentPicklerTests 227db96d56Sopenharmony_cifrom test.pickletester import AbstractIdentityPersistentPicklerTests 237db96d56Sopenharmony_cifrom test.pickletester import AbstractPicklerUnpicklerObjectTests 247db96d56Sopenharmony_cifrom test.pickletester import AbstractDispatchTableTests 257db96d56Sopenharmony_cifrom test.pickletester import AbstractCustomPicklerClass 267db96d56Sopenharmony_cifrom test.pickletester import BigmemPickleTests 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_citry: 297db96d56Sopenharmony_ci import _pickle 307db96d56Sopenharmony_ci has_c_implementation = True 317db96d56Sopenharmony_ciexcept ImportError: 327db96d56Sopenharmony_ci has_c_implementation = False 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ciclass PyPickleTests(AbstractPickleModuleTests, unittest.TestCase): 367db96d56Sopenharmony_ci dump = staticmethod(pickle._dump) 377db96d56Sopenharmony_ci dumps = staticmethod(pickle._dumps) 387db96d56Sopenharmony_ci load = staticmethod(pickle._load) 397db96d56Sopenharmony_ci loads = staticmethod(pickle._loads) 407db96d56Sopenharmony_ci Pickler = pickle._Pickler 417db96d56Sopenharmony_ci Unpickler = pickle._Unpickler 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci 447db96d56Sopenharmony_ciclass PyUnpicklerTests(AbstractUnpickleTests, unittest.TestCase): 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci unpickler = pickle._Unpickler 477db96d56Sopenharmony_ci bad_stack_errors = (IndexError,) 487db96d56Sopenharmony_ci truncated_errors = (pickle.UnpicklingError, EOFError, 497db96d56Sopenharmony_ci AttributeError, ValueError, 507db96d56Sopenharmony_ci struct.error, IndexError, ImportError) 517db96d56Sopenharmony_ci 527db96d56Sopenharmony_ci def loads(self, buf, **kwds): 537db96d56Sopenharmony_ci f = io.BytesIO(buf) 547db96d56Sopenharmony_ci u = self.unpickler(f, **kwds) 557db96d56Sopenharmony_ci return u.load() 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ciclass PyPicklerTests(AbstractPickleTests, unittest.TestCase): 597db96d56Sopenharmony_ci 607db96d56Sopenharmony_ci pickler = pickle._Pickler 617db96d56Sopenharmony_ci unpickler = pickle._Unpickler 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci def dumps(self, arg, proto=None, **kwargs): 647db96d56Sopenharmony_ci f = io.BytesIO() 657db96d56Sopenharmony_ci p = self.pickler(f, proto, **kwargs) 667db96d56Sopenharmony_ci p.dump(arg) 677db96d56Sopenharmony_ci f.seek(0) 687db96d56Sopenharmony_ci return bytes(f.read()) 697db96d56Sopenharmony_ci 707db96d56Sopenharmony_ci def loads(self, buf, **kwds): 717db96d56Sopenharmony_ci f = io.BytesIO(buf) 727db96d56Sopenharmony_ci u = self.unpickler(f, **kwds) 737db96d56Sopenharmony_ci return u.load() 747db96d56Sopenharmony_ci 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ciclass InMemoryPickleTests(AbstractPickleTests, AbstractUnpickleTests, 777db96d56Sopenharmony_ci BigmemPickleTests, unittest.TestCase): 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci bad_stack_errors = (pickle.UnpicklingError, IndexError) 807db96d56Sopenharmony_ci truncated_errors = (pickle.UnpicklingError, EOFError, 817db96d56Sopenharmony_ci AttributeError, ValueError, 827db96d56Sopenharmony_ci struct.error, IndexError, ImportError) 837db96d56Sopenharmony_ci 847db96d56Sopenharmony_ci def dumps(self, arg, protocol=None, **kwargs): 857db96d56Sopenharmony_ci return pickle.dumps(arg, protocol, **kwargs) 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_ci def loads(self, buf, **kwds): 887db96d56Sopenharmony_ci return pickle.loads(buf, **kwds) 897db96d56Sopenharmony_ci 907db96d56Sopenharmony_ci test_framed_write_sizes_with_delayed_writer = None 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ciclass PersistentPicklerUnpicklerMixin(object): 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def dumps(self, arg, proto=None): 967db96d56Sopenharmony_ci class PersPickler(self.pickler): 977db96d56Sopenharmony_ci def persistent_id(subself, obj): 987db96d56Sopenharmony_ci return self.persistent_id(obj) 997db96d56Sopenharmony_ci f = io.BytesIO() 1007db96d56Sopenharmony_ci p = PersPickler(f, proto) 1017db96d56Sopenharmony_ci p.dump(arg) 1027db96d56Sopenharmony_ci return f.getvalue() 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ci def loads(self, buf, **kwds): 1057db96d56Sopenharmony_ci class PersUnpickler(self.unpickler): 1067db96d56Sopenharmony_ci def persistent_load(subself, obj): 1077db96d56Sopenharmony_ci return self.persistent_load(obj) 1087db96d56Sopenharmony_ci f = io.BytesIO(buf) 1097db96d56Sopenharmony_ci u = PersUnpickler(f, **kwds) 1107db96d56Sopenharmony_ci return u.load() 1117db96d56Sopenharmony_ci 1127db96d56Sopenharmony_ci 1137db96d56Sopenharmony_ciclass PyPersPicklerTests(AbstractPersistentPicklerTests, 1147db96d56Sopenharmony_ci PersistentPicklerUnpicklerMixin, unittest.TestCase): 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci pickler = pickle._Pickler 1177db96d56Sopenharmony_ci unpickler = pickle._Unpickler 1187db96d56Sopenharmony_ci 1197db96d56Sopenharmony_ci 1207db96d56Sopenharmony_ciclass PyIdPersPicklerTests(AbstractIdentityPersistentPicklerTests, 1217db96d56Sopenharmony_ci PersistentPicklerUnpicklerMixin, unittest.TestCase): 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci pickler = pickle._Pickler 1247db96d56Sopenharmony_ci unpickler = pickle._Unpickler 1257db96d56Sopenharmony_ci 1267db96d56Sopenharmony_ci @support.cpython_only 1277db96d56Sopenharmony_ci def test_pickler_reference_cycle(self): 1287db96d56Sopenharmony_ci def check(Pickler): 1297db96d56Sopenharmony_ci for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1307db96d56Sopenharmony_ci f = io.BytesIO() 1317db96d56Sopenharmony_ci pickler = Pickler(f, proto) 1327db96d56Sopenharmony_ci pickler.dump('abc') 1337db96d56Sopenharmony_ci self.assertEqual(self.loads(f.getvalue()), 'abc') 1347db96d56Sopenharmony_ci pickler = Pickler(io.BytesIO()) 1357db96d56Sopenharmony_ci self.assertEqual(pickler.persistent_id('def'), 'def') 1367db96d56Sopenharmony_ci r = weakref.ref(pickler) 1377db96d56Sopenharmony_ci del pickler 1387db96d56Sopenharmony_ci self.assertIsNone(r()) 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci class PersPickler(self.pickler): 1417db96d56Sopenharmony_ci def persistent_id(subself, obj): 1427db96d56Sopenharmony_ci return obj 1437db96d56Sopenharmony_ci check(PersPickler) 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci class PersPickler(self.pickler): 1467db96d56Sopenharmony_ci @classmethod 1477db96d56Sopenharmony_ci def persistent_id(cls, obj): 1487db96d56Sopenharmony_ci return obj 1497db96d56Sopenharmony_ci check(PersPickler) 1507db96d56Sopenharmony_ci 1517db96d56Sopenharmony_ci class PersPickler(self.pickler): 1527db96d56Sopenharmony_ci @staticmethod 1537db96d56Sopenharmony_ci def persistent_id(obj): 1547db96d56Sopenharmony_ci return obj 1557db96d56Sopenharmony_ci check(PersPickler) 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci @support.cpython_only 1587db96d56Sopenharmony_ci def test_custom_pickler_dispatch_table_memleak(self): 1597db96d56Sopenharmony_ci # See https://github.com/python/cpython/issues/89988 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci class Pickler(self.pickler): 1627db96d56Sopenharmony_ci def __init__(self, *args, **kwargs): 1637db96d56Sopenharmony_ci self.dispatch_table = table 1647db96d56Sopenharmony_ci super().__init__(*args, **kwargs) 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci class DispatchTable: 1677db96d56Sopenharmony_ci pass 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci table = DispatchTable() 1707db96d56Sopenharmony_ci pickler = Pickler(io.BytesIO()) 1717db96d56Sopenharmony_ci self.assertIs(pickler.dispatch_table, table) 1727db96d56Sopenharmony_ci table_ref = weakref.ref(table) 1737db96d56Sopenharmony_ci self.assertIsNotNone(table_ref()) 1747db96d56Sopenharmony_ci del pickler 1757db96d56Sopenharmony_ci del table 1767db96d56Sopenharmony_ci support.gc_collect() 1777db96d56Sopenharmony_ci self.assertIsNone(table_ref()) 1787db96d56Sopenharmony_ci 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci @support.cpython_only 1817db96d56Sopenharmony_ci def test_unpickler_reference_cycle(self): 1827db96d56Sopenharmony_ci def check(Unpickler): 1837db96d56Sopenharmony_ci for proto in range(pickle.HIGHEST_PROTOCOL + 1): 1847db96d56Sopenharmony_ci unpickler = Unpickler(io.BytesIO(self.dumps('abc', proto))) 1857db96d56Sopenharmony_ci self.assertEqual(unpickler.load(), 'abc') 1867db96d56Sopenharmony_ci unpickler = Unpickler(io.BytesIO()) 1877db96d56Sopenharmony_ci self.assertEqual(unpickler.persistent_load('def'), 'def') 1887db96d56Sopenharmony_ci r = weakref.ref(unpickler) 1897db96d56Sopenharmony_ci del unpickler 1907db96d56Sopenharmony_ci self.assertIsNone(r()) 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci class PersUnpickler(self.unpickler): 1937db96d56Sopenharmony_ci def persistent_load(subself, pid): 1947db96d56Sopenharmony_ci return pid 1957db96d56Sopenharmony_ci check(PersUnpickler) 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci class PersUnpickler(self.unpickler): 1987db96d56Sopenharmony_ci @classmethod 1997db96d56Sopenharmony_ci def persistent_load(cls, pid): 2007db96d56Sopenharmony_ci return pid 2017db96d56Sopenharmony_ci check(PersUnpickler) 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_ci class PersUnpickler(self.unpickler): 2047db96d56Sopenharmony_ci @staticmethod 2057db96d56Sopenharmony_ci def persistent_load(pid): 2067db96d56Sopenharmony_ci return pid 2077db96d56Sopenharmony_ci check(PersUnpickler) 2087db96d56Sopenharmony_ci 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ciclass PyPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests, unittest.TestCase): 2117db96d56Sopenharmony_ci 2127db96d56Sopenharmony_ci pickler_class = pickle._Pickler 2137db96d56Sopenharmony_ci unpickler_class = pickle._Unpickler 2147db96d56Sopenharmony_ci 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ciclass PyDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): 2177db96d56Sopenharmony_ci 2187db96d56Sopenharmony_ci pickler_class = pickle._Pickler 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_ci def get_dispatch_table(self): 2217db96d56Sopenharmony_ci return pickle.dispatch_table.copy() 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_ci 2247db96d56Sopenharmony_ciclass PyChainDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_ci pickler_class = pickle._Pickler 2277db96d56Sopenharmony_ci 2287db96d56Sopenharmony_ci def get_dispatch_table(self): 2297db96d56Sopenharmony_ci return collections.ChainMap({}, pickle.dispatch_table) 2307db96d56Sopenharmony_ci 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ciclass PyPicklerHookTests(AbstractHookTests, unittest.TestCase): 2337db96d56Sopenharmony_ci class CustomPyPicklerClass(pickle._Pickler, 2347db96d56Sopenharmony_ci AbstractCustomPicklerClass): 2357db96d56Sopenharmony_ci pass 2367db96d56Sopenharmony_ci pickler_class = CustomPyPicklerClass 2377db96d56Sopenharmony_ci 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ciif has_c_implementation: 2407db96d56Sopenharmony_ci class CPickleTests(AbstractPickleModuleTests, unittest.TestCase): 2417db96d56Sopenharmony_ci from _pickle import dump, dumps, load, loads, Pickler, Unpickler 2427db96d56Sopenharmony_ci 2437db96d56Sopenharmony_ci class CUnpicklerTests(PyUnpicklerTests): 2447db96d56Sopenharmony_ci unpickler = _pickle.Unpickler 2457db96d56Sopenharmony_ci bad_stack_errors = (pickle.UnpicklingError,) 2467db96d56Sopenharmony_ci truncated_errors = (pickle.UnpicklingError,) 2477db96d56Sopenharmony_ci 2487db96d56Sopenharmony_ci class CPicklerTests(PyPicklerTests): 2497db96d56Sopenharmony_ci pickler = _pickle.Pickler 2507db96d56Sopenharmony_ci unpickler = _pickle.Unpickler 2517db96d56Sopenharmony_ci 2527db96d56Sopenharmony_ci class CPersPicklerTests(PyPersPicklerTests): 2537db96d56Sopenharmony_ci pickler = _pickle.Pickler 2547db96d56Sopenharmony_ci unpickler = _pickle.Unpickler 2557db96d56Sopenharmony_ci 2567db96d56Sopenharmony_ci class CIdPersPicklerTests(PyIdPersPicklerTests): 2577db96d56Sopenharmony_ci pickler = _pickle.Pickler 2587db96d56Sopenharmony_ci unpickler = _pickle.Unpickler 2597db96d56Sopenharmony_ci 2607db96d56Sopenharmony_ci class CDumpPickle_LoadPickle(PyPicklerTests): 2617db96d56Sopenharmony_ci pickler = _pickle.Pickler 2627db96d56Sopenharmony_ci unpickler = pickle._Unpickler 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci class DumpPickle_CLoadPickle(PyPicklerTests): 2657db96d56Sopenharmony_ci pickler = pickle._Pickler 2667db96d56Sopenharmony_ci unpickler = _pickle.Unpickler 2677db96d56Sopenharmony_ci 2687db96d56Sopenharmony_ci class CPicklerUnpicklerObjectTests(AbstractPicklerUnpicklerObjectTests, unittest.TestCase): 2697db96d56Sopenharmony_ci pickler_class = _pickle.Pickler 2707db96d56Sopenharmony_ci unpickler_class = _pickle.Unpickler 2717db96d56Sopenharmony_ci 2727db96d56Sopenharmony_ci def test_issue18339(self): 2737db96d56Sopenharmony_ci unpickler = self.unpickler_class(io.BytesIO()) 2747db96d56Sopenharmony_ci with self.assertRaises(TypeError): 2757db96d56Sopenharmony_ci unpickler.memo = object 2767db96d56Sopenharmony_ci # used to cause a segfault 2777db96d56Sopenharmony_ci with self.assertRaises(ValueError): 2787db96d56Sopenharmony_ci unpickler.memo = {-1: None} 2797db96d56Sopenharmony_ci unpickler.memo = {1: None} 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci class CDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): 2827db96d56Sopenharmony_ci pickler_class = pickle.Pickler 2837db96d56Sopenharmony_ci def get_dispatch_table(self): 2847db96d56Sopenharmony_ci return pickle.dispatch_table.copy() 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_ci class CChainDispatchTableTests(AbstractDispatchTableTests, unittest.TestCase): 2877db96d56Sopenharmony_ci pickler_class = pickle.Pickler 2887db96d56Sopenharmony_ci def get_dispatch_table(self): 2897db96d56Sopenharmony_ci return collections.ChainMap({}, pickle.dispatch_table) 2907db96d56Sopenharmony_ci 2917db96d56Sopenharmony_ci class CPicklerHookTests(AbstractHookTests, unittest.TestCase): 2927db96d56Sopenharmony_ci class CustomCPicklerClass(_pickle.Pickler, AbstractCustomPicklerClass): 2937db96d56Sopenharmony_ci pass 2947db96d56Sopenharmony_ci pickler_class = CustomCPicklerClass 2957db96d56Sopenharmony_ci 2967db96d56Sopenharmony_ci @support.cpython_only 2977db96d56Sopenharmony_ci class SizeofTests(unittest.TestCase): 2987db96d56Sopenharmony_ci check_sizeof = support.check_sizeof 2997db96d56Sopenharmony_ci 3007db96d56Sopenharmony_ci def test_pickler(self): 3017db96d56Sopenharmony_ci basesize = support.calcobjsize('7P2n3i2n3i2P') 3027db96d56Sopenharmony_ci p = _pickle.Pickler(io.BytesIO()) 3037db96d56Sopenharmony_ci self.assertEqual(object.__sizeof__(p), basesize) 3047db96d56Sopenharmony_ci MT_size = struct.calcsize('3nP0n') 3057db96d56Sopenharmony_ci ME_size = struct.calcsize('Pn0P') 3067db96d56Sopenharmony_ci check = self.check_sizeof 3077db96d56Sopenharmony_ci check(p, basesize + 3087db96d56Sopenharmony_ci MT_size + 8 * ME_size + # Minimal memo table size. 3097db96d56Sopenharmony_ci sys.getsizeof(b'x'*4096)) # Minimal write buffer size. 3107db96d56Sopenharmony_ci for i in range(6): 3117db96d56Sopenharmony_ci p.dump(chr(i)) 3127db96d56Sopenharmony_ci check(p, basesize + 3137db96d56Sopenharmony_ci MT_size + 32 * ME_size + # Size of memo table required to 3147db96d56Sopenharmony_ci # save references to 6 objects. 3157db96d56Sopenharmony_ci 0) # Write buffer is cleared after every dump(). 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci def test_unpickler(self): 3187db96d56Sopenharmony_ci basesize = support.calcobjsize('2P2n2P 2P2n2i5P 2P3n8P2n2i') 3197db96d56Sopenharmony_ci unpickler = _pickle.Unpickler 3207db96d56Sopenharmony_ci P = struct.calcsize('P') # Size of memo table entry. 3217db96d56Sopenharmony_ci n = struct.calcsize('n') # Size of mark table entry. 3227db96d56Sopenharmony_ci check = self.check_sizeof 3237db96d56Sopenharmony_ci for encoding in 'ASCII', 'UTF-16', 'latin-1': 3247db96d56Sopenharmony_ci for errors in 'strict', 'replace': 3257db96d56Sopenharmony_ci u = unpickler(io.BytesIO(), 3267db96d56Sopenharmony_ci encoding=encoding, errors=errors) 3277db96d56Sopenharmony_ci self.assertEqual(object.__sizeof__(u), basesize) 3287db96d56Sopenharmony_ci check(u, basesize + 3297db96d56Sopenharmony_ci 32 * P + # Minimal memo table size. 3307db96d56Sopenharmony_ci len(encoding) + 1 + len(errors) + 1) 3317db96d56Sopenharmony_ci 3327db96d56Sopenharmony_ci stdsize = basesize + len('ASCII') + 1 + len('strict') + 1 3337db96d56Sopenharmony_ci def check_unpickler(data, memo_size, marks_size): 3347db96d56Sopenharmony_ci dump = pickle.dumps(data) 3357db96d56Sopenharmony_ci u = unpickler(io.BytesIO(dump), 3367db96d56Sopenharmony_ci encoding='ASCII', errors='strict') 3377db96d56Sopenharmony_ci u.load() 3387db96d56Sopenharmony_ci check(u, stdsize + memo_size * P + marks_size * n) 3397db96d56Sopenharmony_ci 3407db96d56Sopenharmony_ci check_unpickler(0, 32, 0) 3417db96d56Sopenharmony_ci # 20 is minimal non-empty mark stack size. 3427db96d56Sopenharmony_ci check_unpickler([0] * 100, 32, 20) 3437db96d56Sopenharmony_ci # 128 is memo table size required to save references to 100 objects. 3447db96d56Sopenharmony_ci check_unpickler([chr(i) for i in range(100)], 128, 20) 3457db96d56Sopenharmony_ci def recurse(deep): 3467db96d56Sopenharmony_ci data = 0 3477db96d56Sopenharmony_ci for i in range(deep): 3487db96d56Sopenharmony_ci data = [data, data] 3497db96d56Sopenharmony_ci return data 3507db96d56Sopenharmony_ci check_unpickler(recurse(0), 32, 0) 3517db96d56Sopenharmony_ci check_unpickler(recurse(1), 32, 20) 3527db96d56Sopenharmony_ci check_unpickler(recurse(20), 32, 20) 3537db96d56Sopenharmony_ci check_unpickler(recurse(50), 64, 60) 3547db96d56Sopenharmony_ci check_unpickler(recurse(100), 128, 140) 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci u = unpickler(io.BytesIO(pickle.dumps('a', 0)), 3577db96d56Sopenharmony_ci encoding='ASCII', errors='strict') 3587db96d56Sopenharmony_ci u.load() 3597db96d56Sopenharmony_ci check(u, stdsize + 32 * P + 2 + 1) 3607db96d56Sopenharmony_ci 3617db96d56Sopenharmony_ci 3627db96d56Sopenharmony_ciALT_IMPORT_MAPPING = { 3637db96d56Sopenharmony_ci ('_elementtree', 'xml.etree.ElementTree'), 3647db96d56Sopenharmony_ci ('cPickle', 'pickle'), 3657db96d56Sopenharmony_ci ('StringIO', 'io'), 3667db96d56Sopenharmony_ci ('cStringIO', 'io'), 3677db96d56Sopenharmony_ci} 3687db96d56Sopenharmony_ci 3697db96d56Sopenharmony_ciALT_NAME_MAPPING = { 3707db96d56Sopenharmony_ci ('__builtin__', 'basestring', 'builtins', 'str'), 3717db96d56Sopenharmony_ci ('exceptions', 'StandardError', 'builtins', 'Exception'), 3727db96d56Sopenharmony_ci ('UserDict', 'UserDict', 'collections', 'UserDict'), 3737db96d56Sopenharmony_ci ('socket', '_socketobject', 'socket', 'SocketType'), 3747db96d56Sopenharmony_ci} 3757db96d56Sopenharmony_ci 3767db96d56Sopenharmony_cidef mapping(module, name): 3777db96d56Sopenharmony_ci if (module, name) in NAME_MAPPING: 3787db96d56Sopenharmony_ci module, name = NAME_MAPPING[(module, name)] 3797db96d56Sopenharmony_ci elif module in IMPORT_MAPPING: 3807db96d56Sopenharmony_ci module = IMPORT_MAPPING[module] 3817db96d56Sopenharmony_ci return module, name 3827db96d56Sopenharmony_ci 3837db96d56Sopenharmony_cidef reverse_mapping(module, name): 3847db96d56Sopenharmony_ci if (module, name) in REVERSE_NAME_MAPPING: 3857db96d56Sopenharmony_ci module, name = REVERSE_NAME_MAPPING[(module, name)] 3867db96d56Sopenharmony_ci elif module in REVERSE_IMPORT_MAPPING: 3877db96d56Sopenharmony_ci module = REVERSE_IMPORT_MAPPING[module] 3887db96d56Sopenharmony_ci return module, name 3897db96d56Sopenharmony_ci 3907db96d56Sopenharmony_cidef getmodule(module): 3917db96d56Sopenharmony_ci try: 3927db96d56Sopenharmony_ci return sys.modules[module] 3937db96d56Sopenharmony_ci except KeyError: 3947db96d56Sopenharmony_ci try: 3957db96d56Sopenharmony_ci with warnings.catch_warnings(): 3967db96d56Sopenharmony_ci action = 'always' if support.verbose else 'ignore' 3977db96d56Sopenharmony_ci warnings.simplefilter(action, DeprecationWarning) 3987db96d56Sopenharmony_ci __import__(module) 3997db96d56Sopenharmony_ci except AttributeError as exc: 4007db96d56Sopenharmony_ci if support.verbose: 4017db96d56Sopenharmony_ci print("Can't import module %r: %s" % (module, exc)) 4027db96d56Sopenharmony_ci raise ImportError 4037db96d56Sopenharmony_ci except ImportError as exc: 4047db96d56Sopenharmony_ci if support.verbose: 4057db96d56Sopenharmony_ci print(exc) 4067db96d56Sopenharmony_ci raise 4077db96d56Sopenharmony_ci return sys.modules[module] 4087db96d56Sopenharmony_ci 4097db96d56Sopenharmony_cidef getattribute(module, name): 4107db96d56Sopenharmony_ci obj = getmodule(module) 4117db96d56Sopenharmony_ci for n in name.split('.'): 4127db96d56Sopenharmony_ci obj = getattr(obj, n) 4137db96d56Sopenharmony_ci return obj 4147db96d56Sopenharmony_ci 4157db96d56Sopenharmony_cidef get_exceptions(mod): 4167db96d56Sopenharmony_ci for name in dir(mod): 4177db96d56Sopenharmony_ci attr = getattr(mod, name) 4187db96d56Sopenharmony_ci if isinstance(attr, type) and issubclass(attr, BaseException): 4197db96d56Sopenharmony_ci yield name, attr 4207db96d56Sopenharmony_ci 4217db96d56Sopenharmony_ciclass CompatPickleTests(unittest.TestCase): 4227db96d56Sopenharmony_ci def test_import(self): 4237db96d56Sopenharmony_ci modules = set(IMPORT_MAPPING.values()) 4247db96d56Sopenharmony_ci modules |= set(REVERSE_IMPORT_MAPPING) 4257db96d56Sopenharmony_ci modules |= {module for module, name in REVERSE_NAME_MAPPING} 4267db96d56Sopenharmony_ci modules |= {module for module, name in NAME_MAPPING.values()} 4277db96d56Sopenharmony_ci for module in modules: 4287db96d56Sopenharmony_ci try: 4297db96d56Sopenharmony_ci getmodule(module) 4307db96d56Sopenharmony_ci except ImportError: 4317db96d56Sopenharmony_ci pass 4327db96d56Sopenharmony_ci 4337db96d56Sopenharmony_ci def test_import_mapping(self): 4347db96d56Sopenharmony_ci for module3, module2 in REVERSE_IMPORT_MAPPING.items(): 4357db96d56Sopenharmony_ci with self.subTest((module3, module2)): 4367db96d56Sopenharmony_ci try: 4377db96d56Sopenharmony_ci getmodule(module3) 4387db96d56Sopenharmony_ci except ImportError: 4397db96d56Sopenharmony_ci pass 4407db96d56Sopenharmony_ci if module3[:1] != '_': 4417db96d56Sopenharmony_ci self.assertIn(module2, IMPORT_MAPPING) 4427db96d56Sopenharmony_ci self.assertEqual(IMPORT_MAPPING[module2], module3) 4437db96d56Sopenharmony_ci 4447db96d56Sopenharmony_ci def test_name_mapping(self): 4457db96d56Sopenharmony_ci for (module3, name3), (module2, name2) in REVERSE_NAME_MAPPING.items(): 4467db96d56Sopenharmony_ci with self.subTest(((module3, name3), (module2, name2))): 4477db96d56Sopenharmony_ci if (module2, name2) == ('exceptions', 'OSError'): 4487db96d56Sopenharmony_ci attr = getattribute(module3, name3) 4497db96d56Sopenharmony_ci self.assertTrue(issubclass(attr, OSError)) 4507db96d56Sopenharmony_ci elif (module2, name2) == ('exceptions', 'ImportError'): 4517db96d56Sopenharmony_ci attr = getattribute(module3, name3) 4527db96d56Sopenharmony_ci self.assertTrue(issubclass(attr, ImportError)) 4537db96d56Sopenharmony_ci else: 4547db96d56Sopenharmony_ci module, name = mapping(module2, name2) 4557db96d56Sopenharmony_ci if module3[:1] != '_': 4567db96d56Sopenharmony_ci self.assertEqual((module, name), (module3, name3)) 4577db96d56Sopenharmony_ci try: 4587db96d56Sopenharmony_ci attr = getattribute(module3, name3) 4597db96d56Sopenharmony_ci except ImportError: 4607db96d56Sopenharmony_ci pass 4617db96d56Sopenharmony_ci else: 4627db96d56Sopenharmony_ci self.assertEqual(getattribute(module, name), attr) 4637db96d56Sopenharmony_ci 4647db96d56Sopenharmony_ci def test_reverse_import_mapping(self): 4657db96d56Sopenharmony_ci for module2, module3 in IMPORT_MAPPING.items(): 4667db96d56Sopenharmony_ci with self.subTest((module2, module3)): 4677db96d56Sopenharmony_ci try: 4687db96d56Sopenharmony_ci getmodule(module3) 4697db96d56Sopenharmony_ci except ImportError as exc: 4707db96d56Sopenharmony_ci if support.verbose: 4717db96d56Sopenharmony_ci print(exc) 4727db96d56Sopenharmony_ci if ((module2, module3) not in ALT_IMPORT_MAPPING and 4737db96d56Sopenharmony_ci REVERSE_IMPORT_MAPPING.get(module3, None) != module2): 4747db96d56Sopenharmony_ci for (m3, n3), (m2, n2) in REVERSE_NAME_MAPPING.items(): 4757db96d56Sopenharmony_ci if (module3, module2) == (m3, m2): 4767db96d56Sopenharmony_ci break 4777db96d56Sopenharmony_ci else: 4787db96d56Sopenharmony_ci self.fail('No reverse mapping from %r to %r' % 4797db96d56Sopenharmony_ci (module3, module2)) 4807db96d56Sopenharmony_ci module = REVERSE_IMPORT_MAPPING.get(module3, module3) 4817db96d56Sopenharmony_ci module = IMPORT_MAPPING.get(module, module) 4827db96d56Sopenharmony_ci self.assertEqual(module, module3) 4837db96d56Sopenharmony_ci 4847db96d56Sopenharmony_ci def test_reverse_name_mapping(self): 4857db96d56Sopenharmony_ci for (module2, name2), (module3, name3) in NAME_MAPPING.items(): 4867db96d56Sopenharmony_ci with self.subTest(((module2, name2), (module3, name3))): 4877db96d56Sopenharmony_ci try: 4887db96d56Sopenharmony_ci attr = getattribute(module3, name3) 4897db96d56Sopenharmony_ci except ImportError: 4907db96d56Sopenharmony_ci pass 4917db96d56Sopenharmony_ci module, name = reverse_mapping(module3, name3) 4927db96d56Sopenharmony_ci if (module2, name2, module3, name3) not in ALT_NAME_MAPPING: 4937db96d56Sopenharmony_ci self.assertEqual((module, name), (module2, name2)) 4947db96d56Sopenharmony_ci module, name = mapping(module, name) 4957db96d56Sopenharmony_ci self.assertEqual((module, name), (module3, name3)) 4967db96d56Sopenharmony_ci 4977db96d56Sopenharmony_ci def test_exceptions(self): 4987db96d56Sopenharmony_ci self.assertEqual(mapping('exceptions', 'StandardError'), 4997db96d56Sopenharmony_ci ('builtins', 'Exception')) 5007db96d56Sopenharmony_ci self.assertEqual(mapping('exceptions', 'Exception'), 5017db96d56Sopenharmony_ci ('builtins', 'Exception')) 5027db96d56Sopenharmony_ci self.assertEqual(reverse_mapping('builtins', 'Exception'), 5037db96d56Sopenharmony_ci ('exceptions', 'Exception')) 5047db96d56Sopenharmony_ci self.assertEqual(mapping('exceptions', 'OSError'), 5057db96d56Sopenharmony_ci ('builtins', 'OSError')) 5067db96d56Sopenharmony_ci self.assertEqual(reverse_mapping('builtins', 'OSError'), 5077db96d56Sopenharmony_ci ('exceptions', 'OSError')) 5087db96d56Sopenharmony_ci 5097db96d56Sopenharmony_ci for name, exc in get_exceptions(builtins): 5107db96d56Sopenharmony_ci with self.subTest(name): 5117db96d56Sopenharmony_ci if exc in (BlockingIOError, 5127db96d56Sopenharmony_ci ResourceWarning, 5137db96d56Sopenharmony_ci StopAsyncIteration, 5147db96d56Sopenharmony_ci RecursionError, 5157db96d56Sopenharmony_ci EncodingWarning, 5167db96d56Sopenharmony_ci BaseExceptionGroup, 5177db96d56Sopenharmony_ci ExceptionGroup): 5187db96d56Sopenharmony_ci continue 5197db96d56Sopenharmony_ci if exc is not OSError and issubclass(exc, OSError): 5207db96d56Sopenharmony_ci self.assertEqual(reverse_mapping('builtins', name), 5217db96d56Sopenharmony_ci ('exceptions', 'OSError')) 5227db96d56Sopenharmony_ci elif exc is not ImportError and issubclass(exc, ImportError): 5237db96d56Sopenharmony_ci self.assertEqual(reverse_mapping('builtins', name), 5247db96d56Sopenharmony_ci ('exceptions', 'ImportError')) 5257db96d56Sopenharmony_ci self.assertEqual(mapping('exceptions', name), 5267db96d56Sopenharmony_ci ('exceptions', name)) 5277db96d56Sopenharmony_ci else: 5287db96d56Sopenharmony_ci self.assertEqual(reverse_mapping('builtins', name), 5297db96d56Sopenharmony_ci ('exceptions', name)) 5307db96d56Sopenharmony_ci self.assertEqual(mapping('exceptions', name), 5317db96d56Sopenharmony_ci ('builtins', name)) 5327db96d56Sopenharmony_ci 5337db96d56Sopenharmony_ci def test_multiprocessing_exceptions(self): 5347db96d56Sopenharmony_ci module = import_helper.import_module('multiprocessing.context') 5357db96d56Sopenharmony_ci for name, exc in get_exceptions(module): 5367db96d56Sopenharmony_ci with self.subTest(name): 5377db96d56Sopenharmony_ci self.assertEqual(reverse_mapping('multiprocessing.context', name), 5387db96d56Sopenharmony_ci ('multiprocessing', name)) 5397db96d56Sopenharmony_ci self.assertEqual(mapping('multiprocessing', name), 5407db96d56Sopenharmony_ci ('multiprocessing.context', name)) 5417db96d56Sopenharmony_ci 5427db96d56Sopenharmony_ci 5437db96d56Sopenharmony_cidef load_tests(loader, tests, pattern): 5447db96d56Sopenharmony_ci tests.addTest(doctest.DocTestSuite()) 5457db96d56Sopenharmony_ci return tests 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci 5487db96d56Sopenharmony_ciif __name__ == "__main__": 5497db96d56Sopenharmony_ci unittest.main() 550