17db96d56Sopenharmony_ciimport unittest 27db96d56Sopenharmony_ciimport dbm 37db96d56Sopenharmony_ciimport shelve 47db96d56Sopenharmony_ciimport glob 57db96d56Sopenharmony_ciimport pickle 67db96d56Sopenharmony_ciimport os 77db96d56Sopenharmony_ci 87db96d56Sopenharmony_cifrom test import support 97db96d56Sopenharmony_cifrom test.support import os_helper 107db96d56Sopenharmony_cifrom collections.abc import MutableMapping 117db96d56Sopenharmony_cifrom test.test_dbm import dbm_iterator 127db96d56Sopenharmony_ci 137db96d56Sopenharmony_cidef L1(s): 147db96d56Sopenharmony_ci return s.decode("latin-1") 157db96d56Sopenharmony_ci 167db96d56Sopenharmony_ciclass byteskeydict(MutableMapping): 177db96d56Sopenharmony_ci "Mapping that supports bytes keys" 187db96d56Sopenharmony_ci 197db96d56Sopenharmony_ci def __init__(self): 207db96d56Sopenharmony_ci self.d = {} 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ci def __getitem__(self, key): 237db96d56Sopenharmony_ci return self.d[L1(key)] 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci def __setitem__(self, key, value): 267db96d56Sopenharmony_ci self.d[L1(key)] = value 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_ci def __delitem__(self, key): 297db96d56Sopenharmony_ci del self.d[L1(key)] 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci def __len__(self): 327db96d56Sopenharmony_ci return len(self.d) 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ci def iterkeys(self): 357db96d56Sopenharmony_ci for k in self.d.keys(): 367db96d56Sopenharmony_ci yield k.encode("latin-1") 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci __iter__ = iterkeys 397db96d56Sopenharmony_ci 407db96d56Sopenharmony_ci def keys(self): 417db96d56Sopenharmony_ci return list(self.iterkeys()) 427db96d56Sopenharmony_ci 437db96d56Sopenharmony_ci def copy(self): 447db96d56Sopenharmony_ci return byteskeydict(self.d) 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ciclass TestCase(unittest.TestCase): 487db96d56Sopenharmony_ci dirname = os_helper.TESTFN 497db96d56Sopenharmony_ci fn = os.path.join(os_helper.TESTFN, "shelftemp.db") 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci def test_close(self): 527db96d56Sopenharmony_ci d1 = {} 537db96d56Sopenharmony_ci s = shelve.Shelf(d1, protocol=2, writeback=False) 547db96d56Sopenharmony_ci s['key1'] = [1,2,3,4] 557db96d56Sopenharmony_ci self.assertEqual(s['key1'], [1,2,3,4]) 567db96d56Sopenharmony_ci self.assertEqual(len(s), 1) 577db96d56Sopenharmony_ci s.close() 587db96d56Sopenharmony_ci self.assertRaises(ValueError, len, s) 597db96d56Sopenharmony_ci try: 607db96d56Sopenharmony_ci s['key1'] 617db96d56Sopenharmony_ci except ValueError: 627db96d56Sopenharmony_ci pass 637db96d56Sopenharmony_ci else: 647db96d56Sopenharmony_ci self.fail('Closed shelf should not find a key') 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci def test_open_template(self, filename=None, protocol=None): 677db96d56Sopenharmony_ci os.mkdir(self.dirname) 687db96d56Sopenharmony_ci self.addCleanup(os_helper.rmtree, self.dirname) 697db96d56Sopenharmony_ci s = shelve.open(filename=filename if filename is not None else self.fn, 707db96d56Sopenharmony_ci protocol=protocol) 717db96d56Sopenharmony_ci try: 727db96d56Sopenharmony_ci s['key1'] = (1,2,3,4) 737db96d56Sopenharmony_ci self.assertEqual(s['key1'], (1,2,3,4)) 747db96d56Sopenharmony_ci finally: 757db96d56Sopenharmony_ci s.close() 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def test_ascii_file_shelf(self): 787db96d56Sopenharmony_ci self.test_open_template(protocol=0) 797db96d56Sopenharmony_ci 807db96d56Sopenharmony_ci def test_binary_file_shelf(self): 817db96d56Sopenharmony_ci self.test_open_template(protocol=1) 827db96d56Sopenharmony_ci 837db96d56Sopenharmony_ci def test_proto2_file_shelf(self): 847db96d56Sopenharmony_ci self.test_open_template(protocol=2) 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_ci def test_pathlib_path_file_shelf(self): 877db96d56Sopenharmony_ci self.test_open_template(filename=os_helper.FakePath(self.fn)) 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci def test_bytes_path_file_shelf(self): 907db96d56Sopenharmony_ci self.test_open_template(filename=os.fsencode(self.fn)) 917db96d56Sopenharmony_ci 927db96d56Sopenharmony_ci def test_pathlib_bytes_path_file_shelf(self): 937db96d56Sopenharmony_ci self.test_open_template(filename=os_helper.FakePath(os.fsencode(self.fn))) 947db96d56Sopenharmony_ci 957db96d56Sopenharmony_ci def test_in_memory_shelf(self): 967db96d56Sopenharmony_ci d1 = byteskeydict() 977db96d56Sopenharmony_ci with shelve.Shelf(d1, protocol=0) as s: 987db96d56Sopenharmony_ci s['key1'] = (1,2,3,4) 997db96d56Sopenharmony_ci self.assertEqual(s['key1'], (1,2,3,4)) 1007db96d56Sopenharmony_ci d2 = byteskeydict() 1017db96d56Sopenharmony_ci with shelve.Shelf(d2, protocol=1) as s: 1027db96d56Sopenharmony_ci s['key1'] = (1,2,3,4) 1037db96d56Sopenharmony_ci self.assertEqual(s['key1'], (1,2,3,4)) 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci self.assertEqual(len(d1), 1) 1067db96d56Sopenharmony_ci self.assertEqual(len(d2), 1) 1077db96d56Sopenharmony_ci self.assertNotEqual(d1.items(), d2.items()) 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci def test_mutable_entry(self): 1107db96d56Sopenharmony_ci d1 = byteskeydict() 1117db96d56Sopenharmony_ci with shelve.Shelf(d1, protocol=2, writeback=False) as s: 1127db96d56Sopenharmony_ci s['key1'] = [1,2,3,4] 1137db96d56Sopenharmony_ci self.assertEqual(s['key1'], [1,2,3,4]) 1147db96d56Sopenharmony_ci s['key1'].append(5) 1157db96d56Sopenharmony_ci self.assertEqual(s['key1'], [1,2,3,4]) 1167db96d56Sopenharmony_ci 1177db96d56Sopenharmony_ci d2 = byteskeydict() 1187db96d56Sopenharmony_ci with shelve.Shelf(d2, protocol=2, writeback=True) as s: 1197db96d56Sopenharmony_ci s['key1'] = [1,2,3,4] 1207db96d56Sopenharmony_ci self.assertEqual(s['key1'], [1,2,3,4]) 1217db96d56Sopenharmony_ci s['key1'].append(5) 1227db96d56Sopenharmony_ci self.assertEqual(s['key1'], [1,2,3,4,5]) 1237db96d56Sopenharmony_ci 1247db96d56Sopenharmony_ci self.assertEqual(len(d1), 1) 1257db96d56Sopenharmony_ci self.assertEqual(len(d2), 1) 1267db96d56Sopenharmony_ci 1277db96d56Sopenharmony_ci def test_keyencoding(self): 1287db96d56Sopenharmony_ci d = {} 1297db96d56Sopenharmony_ci key = 'Pöp' 1307db96d56Sopenharmony_ci # the default keyencoding is utf-8 1317db96d56Sopenharmony_ci shelve.Shelf(d)[key] = [1] 1327db96d56Sopenharmony_ci self.assertIn(key.encode('utf-8'), d) 1337db96d56Sopenharmony_ci # but a different one can be given 1347db96d56Sopenharmony_ci shelve.Shelf(d, keyencoding='latin-1')[key] = [1] 1357db96d56Sopenharmony_ci self.assertIn(key.encode('latin-1'), d) 1367db96d56Sopenharmony_ci # with all consequences 1377db96d56Sopenharmony_ci s = shelve.Shelf(d, keyencoding='ascii') 1387db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, s.__setitem__, key, [1]) 1397db96d56Sopenharmony_ci 1407db96d56Sopenharmony_ci def test_writeback_also_writes_immediately(self): 1417db96d56Sopenharmony_ci # Issue 5754 1427db96d56Sopenharmony_ci d = {} 1437db96d56Sopenharmony_ci key = 'key' 1447db96d56Sopenharmony_ci encodedkey = key.encode('utf-8') 1457db96d56Sopenharmony_ci with shelve.Shelf(d, writeback=True) as s: 1467db96d56Sopenharmony_ci s[key] = [1] 1477db96d56Sopenharmony_ci p1 = d[encodedkey] # Will give a KeyError if backing store not updated 1487db96d56Sopenharmony_ci s['key'].append(2) 1497db96d56Sopenharmony_ci p2 = d[encodedkey] 1507db96d56Sopenharmony_ci self.assertNotEqual(p1, p2) # Write creates new object in store 1517db96d56Sopenharmony_ci 1527db96d56Sopenharmony_ci def test_with(self): 1537db96d56Sopenharmony_ci d1 = {} 1547db96d56Sopenharmony_ci with shelve.Shelf(d1, protocol=2, writeback=False) as s: 1557db96d56Sopenharmony_ci s['key1'] = [1,2,3,4] 1567db96d56Sopenharmony_ci self.assertEqual(s['key1'], [1,2,3,4]) 1577db96d56Sopenharmony_ci self.assertEqual(len(s), 1) 1587db96d56Sopenharmony_ci self.assertRaises(ValueError, len, s) 1597db96d56Sopenharmony_ci try: 1607db96d56Sopenharmony_ci s['key1'] 1617db96d56Sopenharmony_ci except ValueError: 1627db96d56Sopenharmony_ci pass 1637db96d56Sopenharmony_ci else: 1647db96d56Sopenharmony_ci self.fail('Closed shelf should not find a key') 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ci def test_default_protocol(self): 1677db96d56Sopenharmony_ci with shelve.Shelf({}) as s: 1687db96d56Sopenharmony_ci self.assertEqual(s._protocol, pickle.DEFAULT_PROTOCOL) 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci 1717db96d56Sopenharmony_ciclass TestShelveBase: 1727db96d56Sopenharmony_ci type2test = shelve.Shelf 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci def _reference(self): 1757db96d56Sopenharmony_ci return {"key1":"value1", "key2":2, "key3":(1,2,3)} 1767db96d56Sopenharmony_ci 1777db96d56Sopenharmony_ci 1787db96d56Sopenharmony_ciclass TestShelveInMemBase(TestShelveBase): 1797db96d56Sopenharmony_ci def _empty_mapping(self): 1807db96d56Sopenharmony_ci return shelve.Shelf(byteskeydict(), **self._args) 1817db96d56Sopenharmony_ci 1827db96d56Sopenharmony_ci 1837db96d56Sopenharmony_ciclass TestShelveFileBase(TestShelveBase): 1847db96d56Sopenharmony_ci counter = 0 1857db96d56Sopenharmony_ci 1867db96d56Sopenharmony_ci def _empty_mapping(self): 1877db96d56Sopenharmony_ci self.counter += 1 1887db96d56Sopenharmony_ci x = shelve.open(self.base_path + str(self.counter), **self._args) 1897db96d56Sopenharmony_ci self.addCleanup(x.close) 1907db96d56Sopenharmony_ci return x 1917db96d56Sopenharmony_ci 1927db96d56Sopenharmony_ci def setUp(self): 1937db96d56Sopenharmony_ci dirname = os_helper.TESTFN 1947db96d56Sopenharmony_ci os.mkdir(dirname) 1957db96d56Sopenharmony_ci self.addCleanup(os_helper.rmtree, dirname) 1967db96d56Sopenharmony_ci self.base_path = os.path.join(dirname, "shelftemp.db") 1977db96d56Sopenharmony_ci self.addCleanup(setattr, dbm, '_defaultmod', dbm._defaultmod) 1987db96d56Sopenharmony_ci dbm._defaultmod = self.dbm_mod 1997db96d56Sopenharmony_ci 2007db96d56Sopenharmony_ci 2017db96d56Sopenharmony_cifrom test import mapping_tests 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_cifor proto in range(pickle.HIGHEST_PROTOCOL + 1): 2047db96d56Sopenharmony_ci bases = (TestShelveInMemBase, mapping_tests.BasicTestMappingProtocol) 2057db96d56Sopenharmony_ci name = f'TestProto{proto}MemShelve' 2067db96d56Sopenharmony_ci globals()[name] = type(name, bases, 2077db96d56Sopenharmony_ci {'_args': {'protocol': proto}}) 2087db96d56Sopenharmony_ci bases = (TestShelveFileBase, mapping_tests.BasicTestMappingProtocol) 2097db96d56Sopenharmony_ci for dbm_mod in dbm_iterator(): 2107db96d56Sopenharmony_ci assert dbm_mod.__name__.startswith('dbm.') 2117db96d56Sopenharmony_ci suffix = dbm_mod.__name__[4:] 2127db96d56Sopenharmony_ci name = f'TestProto{proto}File_{suffix}Shelve' 2137db96d56Sopenharmony_ci globals()[name] = type(name, bases, 2147db96d56Sopenharmony_ci {'dbm_mod': dbm_mod, '_args': {'protocol': proto}}) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ciif __name__ == "__main__": 2187db96d56Sopenharmony_ci unittest.main() 219