17db96d56Sopenharmony_ci# pysqlite2/test/dbapi.py: tests for DB-API compliance 27db96d56Sopenharmony_ci# 37db96d56Sopenharmony_ci# Copyright (C) 2004-2010 Gerhard Häring <gh@ghaering.de> 47db96d56Sopenharmony_ci# 57db96d56Sopenharmony_ci# This file is part of pysqlite. 67db96d56Sopenharmony_ci# 77db96d56Sopenharmony_ci# This software is provided 'as-is', without any express or implied 87db96d56Sopenharmony_ci# warranty. In no event will the authors be held liable for any damages 97db96d56Sopenharmony_ci# arising from the use of this software. 107db96d56Sopenharmony_ci# 117db96d56Sopenharmony_ci# Permission is granted to anyone to use this software for any purpose, 127db96d56Sopenharmony_ci# including commercial applications, and to alter it and redistribute it 137db96d56Sopenharmony_ci# freely, subject to the following restrictions: 147db96d56Sopenharmony_ci# 157db96d56Sopenharmony_ci# 1. The origin of this software must not be misrepresented; you must not 167db96d56Sopenharmony_ci# claim that you wrote the original software. If you use this software 177db96d56Sopenharmony_ci# in a product, an acknowledgment in the product documentation would be 187db96d56Sopenharmony_ci# appreciated but is not required. 197db96d56Sopenharmony_ci# 2. Altered source versions must be plainly marked as such, and must not be 207db96d56Sopenharmony_ci# misrepresented as being the original software. 217db96d56Sopenharmony_ci# 3. This notice may not be removed or altered from any source distribution. 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ciimport contextlib 247db96d56Sopenharmony_ciimport os 257db96d56Sopenharmony_ciimport sqlite3 as sqlite 267db96d56Sopenharmony_ciimport subprocess 277db96d56Sopenharmony_ciimport sys 287db96d56Sopenharmony_ciimport threading 297db96d56Sopenharmony_ciimport unittest 307db96d56Sopenharmony_ciimport urllib.parse 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_cifrom test.support import ( 337db96d56Sopenharmony_ci SHORT_TIMEOUT, bigmemtest, check_disallow_instantiation, requires_subprocess, 347db96d56Sopenharmony_ci is_emscripten, is_wasi 357db96d56Sopenharmony_ci) 367db96d56Sopenharmony_cifrom test.support import threading_helper 377db96d56Sopenharmony_cifrom _testcapi import INT_MAX, ULLONG_MAX 387db96d56Sopenharmony_cifrom os import SEEK_SET, SEEK_CUR, SEEK_END 397db96d56Sopenharmony_cifrom test.support.os_helper import TESTFN, TESTFN_UNDECODABLE, unlink, temp_dir, FakePath 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ci# Helper for temporary memory databases 437db96d56Sopenharmony_cidef memory_database(*args, **kwargs): 447db96d56Sopenharmony_ci cx = sqlite.connect(":memory:", *args, **kwargs) 457db96d56Sopenharmony_ci return contextlib.closing(cx) 467db96d56Sopenharmony_ci 477db96d56Sopenharmony_ci 487db96d56Sopenharmony_ci# Temporarily limit a database connection parameter 497db96d56Sopenharmony_ci@contextlib.contextmanager 507db96d56Sopenharmony_cidef cx_limit(cx, category=sqlite.SQLITE_LIMIT_SQL_LENGTH, limit=128): 517db96d56Sopenharmony_ci try: 527db96d56Sopenharmony_ci _prev = cx.setlimit(category, limit) 537db96d56Sopenharmony_ci yield limit 547db96d56Sopenharmony_ci finally: 557db96d56Sopenharmony_ci cx.setlimit(category, _prev) 567db96d56Sopenharmony_ci 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_ciclass ModuleTests(unittest.TestCase): 597db96d56Sopenharmony_ci def test_api_level(self): 607db96d56Sopenharmony_ci self.assertEqual(sqlite.apilevel, "2.0", 617db96d56Sopenharmony_ci "apilevel is %s, should be 2.0" % sqlite.apilevel) 627db96d56Sopenharmony_ci 637db96d56Sopenharmony_ci def test_thread_safety(self): 647db96d56Sopenharmony_ci self.assertIn(sqlite.threadsafety, {0, 1, 3}, 657db96d56Sopenharmony_ci "threadsafety is %d, should be 0, 1 or 3" % 667db96d56Sopenharmony_ci sqlite.threadsafety) 677db96d56Sopenharmony_ci 687db96d56Sopenharmony_ci def test_param_style(self): 697db96d56Sopenharmony_ci self.assertEqual(sqlite.paramstyle, "qmark", 707db96d56Sopenharmony_ci "paramstyle is '%s', should be 'qmark'" % 717db96d56Sopenharmony_ci sqlite.paramstyle) 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ci def test_warning(self): 747db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.Warning, Exception), 757db96d56Sopenharmony_ci "Warning is not a subclass of Exception") 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ci def test_error(self): 787db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.Error, Exception), 797db96d56Sopenharmony_ci "Error is not a subclass of Exception") 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ci def test_interface_error(self): 827db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.InterfaceError, sqlite.Error), 837db96d56Sopenharmony_ci "InterfaceError is not a subclass of Error") 847db96d56Sopenharmony_ci 857db96d56Sopenharmony_ci def test_database_error(self): 867db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.DatabaseError, sqlite.Error), 877db96d56Sopenharmony_ci "DatabaseError is not a subclass of Error") 887db96d56Sopenharmony_ci 897db96d56Sopenharmony_ci def test_data_error(self): 907db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.DataError, sqlite.DatabaseError), 917db96d56Sopenharmony_ci "DataError is not a subclass of DatabaseError") 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci def test_operational_error(self): 947db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.OperationalError, sqlite.DatabaseError), 957db96d56Sopenharmony_ci "OperationalError is not a subclass of DatabaseError") 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci def test_integrity_error(self): 987db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.IntegrityError, sqlite.DatabaseError), 997db96d56Sopenharmony_ci "IntegrityError is not a subclass of DatabaseError") 1007db96d56Sopenharmony_ci 1017db96d56Sopenharmony_ci def test_internal_error(self): 1027db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.InternalError, sqlite.DatabaseError), 1037db96d56Sopenharmony_ci "InternalError is not a subclass of DatabaseError") 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_ci def test_programming_error(self): 1067db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.ProgrammingError, sqlite.DatabaseError), 1077db96d56Sopenharmony_ci "ProgrammingError is not a subclass of DatabaseError") 1087db96d56Sopenharmony_ci 1097db96d56Sopenharmony_ci def test_not_supported_error(self): 1107db96d56Sopenharmony_ci self.assertTrue(issubclass(sqlite.NotSupportedError, 1117db96d56Sopenharmony_ci sqlite.DatabaseError), 1127db96d56Sopenharmony_ci "NotSupportedError is not a subclass of DatabaseError") 1137db96d56Sopenharmony_ci 1147db96d56Sopenharmony_ci def test_module_constants(self): 1157db96d56Sopenharmony_ci consts = [ 1167db96d56Sopenharmony_ci "SQLITE_ABORT", 1177db96d56Sopenharmony_ci "SQLITE_ALTER_TABLE", 1187db96d56Sopenharmony_ci "SQLITE_ANALYZE", 1197db96d56Sopenharmony_ci "SQLITE_ATTACH", 1207db96d56Sopenharmony_ci "SQLITE_AUTH", 1217db96d56Sopenharmony_ci "SQLITE_BUSY", 1227db96d56Sopenharmony_ci "SQLITE_CANTOPEN", 1237db96d56Sopenharmony_ci "SQLITE_CONSTRAINT", 1247db96d56Sopenharmony_ci "SQLITE_CORRUPT", 1257db96d56Sopenharmony_ci "SQLITE_CREATE_INDEX", 1267db96d56Sopenharmony_ci "SQLITE_CREATE_TABLE", 1277db96d56Sopenharmony_ci "SQLITE_CREATE_TEMP_INDEX", 1287db96d56Sopenharmony_ci "SQLITE_CREATE_TEMP_TABLE", 1297db96d56Sopenharmony_ci "SQLITE_CREATE_TEMP_TRIGGER", 1307db96d56Sopenharmony_ci "SQLITE_CREATE_TEMP_VIEW", 1317db96d56Sopenharmony_ci "SQLITE_CREATE_TRIGGER", 1327db96d56Sopenharmony_ci "SQLITE_CREATE_VIEW", 1337db96d56Sopenharmony_ci "SQLITE_CREATE_VTABLE", 1347db96d56Sopenharmony_ci "SQLITE_DELETE", 1357db96d56Sopenharmony_ci "SQLITE_DENY", 1367db96d56Sopenharmony_ci "SQLITE_DETACH", 1377db96d56Sopenharmony_ci "SQLITE_DONE", 1387db96d56Sopenharmony_ci "SQLITE_DROP_INDEX", 1397db96d56Sopenharmony_ci "SQLITE_DROP_TABLE", 1407db96d56Sopenharmony_ci "SQLITE_DROP_TEMP_INDEX", 1417db96d56Sopenharmony_ci "SQLITE_DROP_TEMP_TABLE", 1427db96d56Sopenharmony_ci "SQLITE_DROP_TEMP_TRIGGER", 1437db96d56Sopenharmony_ci "SQLITE_DROP_TEMP_VIEW", 1447db96d56Sopenharmony_ci "SQLITE_DROP_TRIGGER", 1457db96d56Sopenharmony_ci "SQLITE_DROP_VIEW", 1467db96d56Sopenharmony_ci "SQLITE_DROP_VTABLE", 1477db96d56Sopenharmony_ci "SQLITE_EMPTY", 1487db96d56Sopenharmony_ci "SQLITE_ERROR", 1497db96d56Sopenharmony_ci "SQLITE_FORMAT", 1507db96d56Sopenharmony_ci "SQLITE_FULL", 1517db96d56Sopenharmony_ci "SQLITE_FUNCTION", 1527db96d56Sopenharmony_ci "SQLITE_IGNORE", 1537db96d56Sopenharmony_ci "SQLITE_INSERT", 1547db96d56Sopenharmony_ci "SQLITE_INTERNAL", 1557db96d56Sopenharmony_ci "SQLITE_INTERRUPT", 1567db96d56Sopenharmony_ci "SQLITE_IOERR", 1577db96d56Sopenharmony_ci "SQLITE_LOCKED", 1587db96d56Sopenharmony_ci "SQLITE_MISMATCH", 1597db96d56Sopenharmony_ci "SQLITE_MISUSE", 1607db96d56Sopenharmony_ci "SQLITE_NOLFS", 1617db96d56Sopenharmony_ci "SQLITE_NOMEM", 1627db96d56Sopenharmony_ci "SQLITE_NOTADB", 1637db96d56Sopenharmony_ci "SQLITE_NOTFOUND", 1647db96d56Sopenharmony_ci "SQLITE_OK", 1657db96d56Sopenharmony_ci "SQLITE_PERM", 1667db96d56Sopenharmony_ci "SQLITE_PRAGMA", 1677db96d56Sopenharmony_ci "SQLITE_PROTOCOL", 1687db96d56Sopenharmony_ci "SQLITE_RANGE", 1697db96d56Sopenharmony_ci "SQLITE_READ", 1707db96d56Sopenharmony_ci "SQLITE_READONLY", 1717db96d56Sopenharmony_ci "SQLITE_REINDEX", 1727db96d56Sopenharmony_ci "SQLITE_ROW", 1737db96d56Sopenharmony_ci "SQLITE_SAVEPOINT", 1747db96d56Sopenharmony_ci "SQLITE_SCHEMA", 1757db96d56Sopenharmony_ci "SQLITE_SELECT", 1767db96d56Sopenharmony_ci "SQLITE_TOOBIG", 1777db96d56Sopenharmony_ci "SQLITE_TRANSACTION", 1787db96d56Sopenharmony_ci "SQLITE_UPDATE", 1797db96d56Sopenharmony_ci # Run-time limit categories 1807db96d56Sopenharmony_ci "SQLITE_LIMIT_LENGTH", 1817db96d56Sopenharmony_ci "SQLITE_LIMIT_SQL_LENGTH", 1827db96d56Sopenharmony_ci "SQLITE_LIMIT_COLUMN", 1837db96d56Sopenharmony_ci "SQLITE_LIMIT_EXPR_DEPTH", 1847db96d56Sopenharmony_ci "SQLITE_LIMIT_COMPOUND_SELECT", 1857db96d56Sopenharmony_ci "SQLITE_LIMIT_VDBE_OP", 1867db96d56Sopenharmony_ci "SQLITE_LIMIT_FUNCTION_ARG", 1877db96d56Sopenharmony_ci "SQLITE_LIMIT_ATTACHED", 1887db96d56Sopenharmony_ci "SQLITE_LIMIT_LIKE_PATTERN_LENGTH", 1897db96d56Sopenharmony_ci "SQLITE_LIMIT_VARIABLE_NUMBER", 1907db96d56Sopenharmony_ci "SQLITE_LIMIT_TRIGGER_DEPTH", 1917db96d56Sopenharmony_ci ] 1927db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 7, 17): 1937db96d56Sopenharmony_ci consts += ["SQLITE_NOTICE", "SQLITE_WARNING"] 1947db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 8, 3): 1957db96d56Sopenharmony_ci consts.append("SQLITE_RECURSIVE") 1967db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 8, 7): 1977db96d56Sopenharmony_ci consts.append("SQLITE_LIMIT_WORKER_THREADS") 1987db96d56Sopenharmony_ci consts += ["PARSE_DECLTYPES", "PARSE_COLNAMES"] 1997db96d56Sopenharmony_ci # Extended result codes 2007db96d56Sopenharmony_ci consts += [ 2017db96d56Sopenharmony_ci "SQLITE_ABORT_ROLLBACK", 2027db96d56Sopenharmony_ci "SQLITE_BUSY_RECOVERY", 2037db96d56Sopenharmony_ci "SQLITE_CANTOPEN_FULLPATH", 2047db96d56Sopenharmony_ci "SQLITE_CANTOPEN_ISDIR", 2057db96d56Sopenharmony_ci "SQLITE_CANTOPEN_NOTEMPDIR", 2067db96d56Sopenharmony_ci "SQLITE_CORRUPT_VTAB", 2077db96d56Sopenharmony_ci "SQLITE_IOERR_ACCESS", 2087db96d56Sopenharmony_ci "SQLITE_IOERR_BLOCKED", 2097db96d56Sopenharmony_ci "SQLITE_IOERR_CHECKRESERVEDLOCK", 2107db96d56Sopenharmony_ci "SQLITE_IOERR_CLOSE", 2117db96d56Sopenharmony_ci "SQLITE_IOERR_DELETE", 2127db96d56Sopenharmony_ci "SQLITE_IOERR_DELETE_NOENT", 2137db96d56Sopenharmony_ci "SQLITE_IOERR_DIR_CLOSE", 2147db96d56Sopenharmony_ci "SQLITE_IOERR_DIR_FSYNC", 2157db96d56Sopenharmony_ci "SQLITE_IOERR_FSTAT", 2167db96d56Sopenharmony_ci "SQLITE_IOERR_FSYNC", 2177db96d56Sopenharmony_ci "SQLITE_IOERR_LOCK", 2187db96d56Sopenharmony_ci "SQLITE_IOERR_NOMEM", 2197db96d56Sopenharmony_ci "SQLITE_IOERR_RDLOCK", 2207db96d56Sopenharmony_ci "SQLITE_IOERR_READ", 2217db96d56Sopenharmony_ci "SQLITE_IOERR_SEEK", 2227db96d56Sopenharmony_ci "SQLITE_IOERR_SHMLOCK", 2237db96d56Sopenharmony_ci "SQLITE_IOERR_SHMMAP", 2247db96d56Sopenharmony_ci "SQLITE_IOERR_SHMOPEN", 2257db96d56Sopenharmony_ci "SQLITE_IOERR_SHMSIZE", 2267db96d56Sopenharmony_ci "SQLITE_IOERR_SHORT_READ", 2277db96d56Sopenharmony_ci "SQLITE_IOERR_TRUNCATE", 2287db96d56Sopenharmony_ci "SQLITE_IOERR_UNLOCK", 2297db96d56Sopenharmony_ci "SQLITE_IOERR_WRITE", 2307db96d56Sopenharmony_ci "SQLITE_LOCKED_SHAREDCACHE", 2317db96d56Sopenharmony_ci "SQLITE_READONLY_CANTLOCK", 2327db96d56Sopenharmony_ci "SQLITE_READONLY_RECOVERY", 2337db96d56Sopenharmony_ci ] 2347db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 7, 16): 2357db96d56Sopenharmony_ci consts += [ 2367db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_CHECK", 2377db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_COMMITHOOK", 2387db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_FOREIGNKEY", 2397db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_FUNCTION", 2407db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_NOTNULL", 2417db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_PRIMARYKEY", 2427db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_TRIGGER", 2437db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_UNIQUE", 2447db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_VTAB", 2457db96d56Sopenharmony_ci "SQLITE_READONLY_ROLLBACK", 2467db96d56Sopenharmony_ci ] 2477db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 7, 17): 2487db96d56Sopenharmony_ci consts += [ 2497db96d56Sopenharmony_ci "SQLITE_IOERR_MMAP", 2507db96d56Sopenharmony_ci "SQLITE_NOTICE_RECOVER_ROLLBACK", 2517db96d56Sopenharmony_ci "SQLITE_NOTICE_RECOVER_WAL", 2527db96d56Sopenharmony_ci ] 2537db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 8, 0): 2547db96d56Sopenharmony_ci consts += [ 2557db96d56Sopenharmony_ci "SQLITE_BUSY_SNAPSHOT", 2567db96d56Sopenharmony_ci "SQLITE_IOERR_GETTEMPPATH", 2577db96d56Sopenharmony_ci "SQLITE_WARNING_AUTOINDEX", 2587db96d56Sopenharmony_ci ] 2597db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 8, 1): 2607db96d56Sopenharmony_ci consts += ["SQLITE_CANTOPEN_CONVPATH", "SQLITE_IOERR_CONVPATH"] 2617db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 8, 2): 2627db96d56Sopenharmony_ci consts.append("SQLITE_CONSTRAINT_ROWID") 2637db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 8, 3): 2647db96d56Sopenharmony_ci consts.append("SQLITE_READONLY_DBMOVED") 2657db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 8, 7): 2667db96d56Sopenharmony_ci consts.append("SQLITE_AUTH_USER") 2677db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 9, 0): 2687db96d56Sopenharmony_ci consts.append("SQLITE_IOERR_VNODE") 2697db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 10, 0): 2707db96d56Sopenharmony_ci consts.append("SQLITE_IOERR_AUTH") 2717db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 14, 1): 2727db96d56Sopenharmony_ci consts.append("SQLITE_OK_LOAD_PERMANENTLY") 2737db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 21, 0): 2747db96d56Sopenharmony_ci consts += [ 2757db96d56Sopenharmony_ci "SQLITE_IOERR_BEGIN_ATOMIC", 2767db96d56Sopenharmony_ci "SQLITE_IOERR_COMMIT_ATOMIC", 2777db96d56Sopenharmony_ci "SQLITE_IOERR_ROLLBACK_ATOMIC", 2787db96d56Sopenharmony_ci ] 2797db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 22, 0): 2807db96d56Sopenharmony_ci consts += [ 2817db96d56Sopenharmony_ci "SQLITE_ERROR_MISSING_COLLSEQ", 2827db96d56Sopenharmony_ci "SQLITE_ERROR_RETRY", 2837db96d56Sopenharmony_ci "SQLITE_READONLY_CANTINIT", 2847db96d56Sopenharmony_ci "SQLITE_READONLY_DIRECTORY", 2857db96d56Sopenharmony_ci ] 2867db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 24, 0): 2877db96d56Sopenharmony_ci consts += ["SQLITE_CORRUPT_SEQUENCE", "SQLITE_LOCKED_VTAB"] 2887db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 25, 0): 2897db96d56Sopenharmony_ci consts += ["SQLITE_CANTOPEN_DIRTYWAL", "SQLITE_ERROR_SNAPSHOT"] 2907db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 31, 0): 2917db96d56Sopenharmony_ci consts += [ 2927db96d56Sopenharmony_ci "SQLITE_CANTOPEN_SYMLINK", 2937db96d56Sopenharmony_ci "SQLITE_CONSTRAINT_PINNED", 2947db96d56Sopenharmony_ci "SQLITE_OK_SYMLINK", 2957db96d56Sopenharmony_ci ] 2967db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 32, 0): 2977db96d56Sopenharmony_ci consts += [ 2987db96d56Sopenharmony_ci "SQLITE_BUSY_TIMEOUT", 2997db96d56Sopenharmony_ci "SQLITE_CORRUPT_INDEX", 3007db96d56Sopenharmony_ci "SQLITE_IOERR_DATA", 3017db96d56Sopenharmony_ci ] 3027db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 34, 0): 3037db96d56Sopenharmony_ci consts.append("SQLITE_IOERR_CORRUPTFS") 3047db96d56Sopenharmony_ci for const in consts: 3057db96d56Sopenharmony_ci with self.subTest(const=const): 3067db96d56Sopenharmony_ci self.assertTrue(hasattr(sqlite, const)) 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci def test_error_code_on_exception(self): 3097db96d56Sopenharmony_ci err_msg = "unable to open database file" 3107db96d56Sopenharmony_ci if sys.platform.startswith("win"): 3117db96d56Sopenharmony_ci err_code = sqlite.SQLITE_CANTOPEN_ISDIR 3127db96d56Sopenharmony_ci else: 3137db96d56Sopenharmony_ci err_code = sqlite.SQLITE_CANTOPEN 3147db96d56Sopenharmony_ci 3157db96d56Sopenharmony_ci with temp_dir() as db: 3167db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.Error, err_msg) as cm: 3177db96d56Sopenharmony_ci sqlite.connect(db) 3187db96d56Sopenharmony_ci e = cm.exception 3197db96d56Sopenharmony_ci self.assertEqual(e.sqlite_errorcode, err_code) 3207db96d56Sopenharmony_ci self.assertTrue(e.sqlite_errorname.startswith("SQLITE_CANTOPEN")) 3217db96d56Sopenharmony_ci 3227db96d56Sopenharmony_ci @unittest.skipIf(sqlite.sqlite_version_info <= (3, 7, 16), 3237db96d56Sopenharmony_ci "Requires SQLite 3.7.16 or newer") 3247db96d56Sopenharmony_ci def test_extended_error_code_on_exception(self): 3257db96d56Sopenharmony_ci with memory_database() as con: 3267db96d56Sopenharmony_ci with con: 3277db96d56Sopenharmony_ci con.execute("create table t(t integer check(t > 0))") 3287db96d56Sopenharmony_ci errmsg = "constraint failed" 3297db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.IntegrityError, errmsg) as cm: 3307db96d56Sopenharmony_ci con.execute("insert into t values(-1)") 3317db96d56Sopenharmony_ci exc = cm.exception 3327db96d56Sopenharmony_ci self.assertEqual(exc.sqlite_errorcode, 3337db96d56Sopenharmony_ci sqlite.SQLITE_CONSTRAINT_CHECK) 3347db96d56Sopenharmony_ci self.assertEqual(exc.sqlite_errorname, "SQLITE_CONSTRAINT_CHECK") 3357db96d56Sopenharmony_ci 3367db96d56Sopenharmony_ci # sqlite3_enable_shared_cache() is deprecated on macOS and calling it may raise 3377db96d56Sopenharmony_ci # OperationalError on some buildbots. 3387db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "darwin", "shared cache is deprecated on macOS") 3397db96d56Sopenharmony_ci def test_shared_cache_deprecated(self): 3407db96d56Sopenharmony_ci for enable in (True, False): 3417db96d56Sopenharmony_ci with self.assertWarns(DeprecationWarning) as cm: 3427db96d56Sopenharmony_ci sqlite.enable_shared_cache(enable) 3437db96d56Sopenharmony_ci self.assertIn("dbapi.py", cm.filename) 3447db96d56Sopenharmony_ci 3457db96d56Sopenharmony_ci def test_disallow_instantiation(self): 3467db96d56Sopenharmony_ci cx = sqlite.connect(":memory:") 3477db96d56Sopenharmony_ci check_disallow_instantiation(self, type(cx("select 1"))) 3487db96d56Sopenharmony_ci check_disallow_instantiation(self, sqlite.Blob) 3497db96d56Sopenharmony_ci 3507db96d56Sopenharmony_ci def test_complete_statement(self): 3517db96d56Sopenharmony_ci self.assertFalse(sqlite.complete_statement("select t")) 3527db96d56Sopenharmony_ci self.assertTrue(sqlite.complete_statement("create table t(t);")) 3537db96d56Sopenharmony_ci 3547db96d56Sopenharmony_ci 3557db96d56Sopenharmony_ciclass ConnectionTests(unittest.TestCase): 3567db96d56Sopenharmony_ci 3577db96d56Sopenharmony_ci def setUp(self): 3587db96d56Sopenharmony_ci self.cx = sqlite.connect(":memory:") 3597db96d56Sopenharmony_ci cu = self.cx.cursor() 3607db96d56Sopenharmony_ci cu.execute("create table test(id integer primary key, name text)") 3617db96d56Sopenharmony_ci cu.execute("insert into test(name) values (?)", ("foo",)) 3627db96d56Sopenharmony_ci 3637db96d56Sopenharmony_ci def tearDown(self): 3647db96d56Sopenharmony_ci self.cx.close() 3657db96d56Sopenharmony_ci 3667db96d56Sopenharmony_ci def test_commit(self): 3677db96d56Sopenharmony_ci self.cx.commit() 3687db96d56Sopenharmony_ci 3697db96d56Sopenharmony_ci def test_commit_after_no_changes(self): 3707db96d56Sopenharmony_ci """ 3717db96d56Sopenharmony_ci A commit should also work when no changes were made to the database. 3727db96d56Sopenharmony_ci """ 3737db96d56Sopenharmony_ci self.cx.commit() 3747db96d56Sopenharmony_ci self.cx.commit() 3757db96d56Sopenharmony_ci 3767db96d56Sopenharmony_ci def test_rollback(self): 3777db96d56Sopenharmony_ci self.cx.rollback() 3787db96d56Sopenharmony_ci 3797db96d56Sopenharmony_ci def test_rollback_after_no_changes(self): 3807db96d56Sopenharmony_ci """ 3817db96d56Sopenharmony_ci A rollback should also work when no changes were made to the database. 3827db96d56Sopenharmony_ci """ 3837db96d56Sopenharmony_ci self.cx.rollback() 3847db96d56Sopenharmony_ci self.cx.rollback() 3857db96d56Sopenharmony_ci 3867db96d56Sopenharmony_ci def test_cursor(self): 3877db96d56Sopenharmony_ci cu = self.cx.cursor() 3887db96d56Sopenharmony_ci 3897db96d56Sopenharmony_ci def test_failed_open(self): 3907db96d56Sopenharmony_ci YOU_CANNOT_OPEN_THIS = "/foo/bar/bla/23534/mydb.db" 3917db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 3927db96d56Sopenharmony_ci sqlite.connect(YOU_CANNOT_OPEN_THIS) 3937db96d56Sopenharmony_ci 3947db96d56Sopenharmony_ci def test_close(self): 3957db96d56Sopenharmony_ci self.cx.close() 3967db96d56Sopenharmony_ci 3977db96d56Sopenharmony_ci def test_use_after_close(self): 3987db96d56Sopenharmony_ci sql = "select 1" 3997db96d56Sopenharmony_ci cu = self.cx.cursor() 4007db96d56Sopenharmony_ci res = cu.execute(sql) 4017db96d56Sopenharmony_ci self.cx.close() 4027db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, res.fetchall) 4037db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, cu.execute, sql) 4047db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, cu.executemany, sql, []) 4057db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, cu.executescript, sql) 4067db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, self.cx.execute, sql) 4077db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, 4087db96d56Sopenharmony_ci self.cx.executemany, sql, []) 4097db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, self.cx.executescript, sql) 4107db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, 4117db96d56Sopenharmony_ci self.cx.create_function, "t", 1, lambda x: x) 4127db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, self.cx.cursor) 4137db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 4147db96d56Sopenharmony_ci with self.cx: 4157db96d56Sopenharmony_ci pass 4167db96d56Sopenharmony_ci 4177db96d56Sopenharmony_ci def test_exceptions(self): 4187db96d56Sopenharmony_ci # Optional DB-API extension. 4197db96d56Sopenharmony_ci self.assertEqual(self.cx.Warning, sqlite.Warning) 4207db96d56Sopenharmony_ci self.assertEqual(self.cx.Error, sqlite.Error) 4217db96d56Sopenharmony_ci self.assertEqual(self.cx.InterfaceError, sqlite.InterfaceError) 4227db96d56Sopenharmony_ci self.assertEqual(self.cx.DatabaseError, sqlite.DatabaseError) 4237db96d56Sopenharmony_ci self.assertEqual(self.cx.DataError, sqlite.DataError) 4247db96d56Sopenharmony_ci self.assertEqual(self.cx.OperationalError, sqlite.OperationalError) 4257db96d56Sopenharmony_ci self.assertEqual(self.cx.IntegrityError, sqlite.IntegrityError) 4267db96d56Sopenharmony_ci self.assertEqual(self.cx.InternalError, sqlite.InternalError) 4277db96d56Sopenharmony_ci self.assertEqual(self.cx.ProgrammingError, sqlite.ProgrammingError) 4287db96d56Sopenharmony_ci self.assertEqual(self.cx.NotSupportedError, sqlite.NotSupportedError) 4297db96d56Sopenharmony_ci 4307db96d56Sopenharmony_ci def test_in_transaction(self): 4317db96d56Sopenharmony_ci # Can't use db from setUp because we want to test initial state. 4327db96d56Sopenharmony_ci cx = sqlite.connect(":memory:") 4337db96d56Sopenharmony_ci cu = cx.cursor() 4347db96d56Sopenharmony_ci self.assertEqual(cx.in_transaction, False) 4357db96d56Sopenharmony_ci cu.execute("create table transactiontest(id integer primary key, name text)") 4367db96d56Sopenharmony_ci self.assertEqual(cx.in_transaction, False) 4377db96d56Sopenharmony_ci cu.execute("insert into transactiontest(name) values (?)", ("foo",)) 4387db96d56Sopenharmony_ci self.assertEqual(cx.in_transaction, True) 4397db96d56Sopenharmony_ci cu.execute("select name from transactiontest where name=?", ["foo"]) 4407db96d56Sopenharmony_ci row = cu.fetchone() 4417db96d56Sopenharmony_ci self.assertEqual(cx.in_transaction, True) 4427db96d56Sopenharmony_ci cx.commit() 4437db96d56Sopenharmony_ci self.assertEqual(cx.in_transaction, False) 4447db96d56Sopenharmony_ci cu.execute("select name from transactiontest where name=?", ["foo"]) 4457db96d56Sopenharmony_ci row = cu.fetchone() 4467db96d56Sopenharmony_ci self.assertEqual(cx.in_transaction, False) 4477db96d56Sopenharmony_ci 4487db96d56Sopenharmony_ci def test_in_transaction_ro(self): 4497db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 4507db96d56Sopenharmony_ci self.cx.in_transaction = True 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci def test_connection_exceptions(self): 4537db96d56Sopenharmony_ci exceptions = [ 4547db96d56Sopenharmony_ci "DataError", 4557db96d56Sopenharmony_ci "DatabaseError", 4567db96d56Sopenharmony_ci "Error", 4577db96d56Sopenharmony_ci "IntegrityError", 4587db96d56Sopenharmony_ci "InterfaceError", 4597db96d56Sopenharmony_ci "NotSupportedError", 4607db96d56Sopenharmony_ci "OperationalError", 4617db96d56Sopenharmony_ci "ProgrammingError", 4627db96d56Sopenharmony_ci "Warning", 4637db96d56Sopenharmony_ci ] 4647db96d56Sopenharmony_ci for exc in exceptions: 4657db96d56Sopenharmony_ci with self.subTest(exc=exc): 4667db96d56Sopenharmony_ci self.assertTrue(hasattr(self.cx, exc)) 4677db96d56Sopenharmony_ci self.assertIs(getattr(sqlite, exc), getattr(self.cx, exc)) 4687db96d56Sopenharmony_ci 4697db96d56Sopenharmony_ci def test_interrupt_on_closed_db(self): 4707db96d56Sopenharmony_ci cx = sqlite.connect(":memory:") 4717db96d56Sopenharmony_ci cx.close() 4727db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 4737db96d56Sopenharmony_ci cx.interrupt() 4747db96d56Sopenharmony_ci 4757db96d56Sopenharmony_ci def test_interrupt(self): 4767db96d56Sopenharmony_ci self.assertIsNone(self.cx.interrupt()) 4777db96d56Sopenharmony_ci 4787db96d56Sopenharmony_ci def test_drop_unused_refs(self): 4797db96d56Sopenharmony_ci for n in range(500): 4807db96d56Sopenharmony_ci cu = self.cx.execute(f"select {n}") 4817db96d56Sopenharmony_ci self.assertEqual(cu.fetchone()[0], n) 4827db96d56Sopenharmony_ci 4837db96d56Sopenharmony_ci def test_connection_limits(self): 4847db96d56Sopenharmony_ci category = sqlite.SQLITE_LIMIT_SQL_LENGTH 4857db96d56Sopenharmony_ci saved_limit = self.cx.getlimit(category) 4867db96d56Sopenharmony_ci try: 4877db96d56Sopenharmony_ci new_limit = 10 4887db96d56Sopenharmony_ci prev_limit = self.cx.setlimit(category, new_limit) 4897db96d56Sopenharmony_ci self.assertEqual(saved_limit, prev_limit) 4907db96d56Sopenharmony_ci self.assertEqual(self.cx.getlimit(category), new_limit) 4917db96d56Sopenharmony_ci msg = "query string is too large" 4927db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.DataError, msg, 4937db96d56Sopenharmony_ci self.cx.execute, "select 1 as '16'") 4947db96d56Sopenharmony_ci finally: # restore saved limit 4957db96d56Sopenharmony_ci self.cx.setlimit(category, saved_limit) 4967db96d56Sopenharmony_ci 4977db96d56Sopenharmony_ci def test_connection_bad_limit_category(self): 4987db96d56Sopenharmony_ci msg = "'category' is out of bounds" 4997db96d56Sopenharmony_ci cat = 1111 5007db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, msg, 5017db96d56Sopenharmony_ci self.cx.getlimit, cat) 5027db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, msg, 5037db96d56Sopenharmony_ci self.cx.setlimit, cat, 0) 5047db96d56Sopenharmony_ci 5057db96d56Sopenharmony_ci def test_connection_init_bad_isolation_level(self): 5067db96d56Sopenharmony_ci msg = ( 5077db96d56Sopenharmony_ci "isolation_level string must be '', 'DEFERRED', 'IMMEDIATE', or " 5087db96d56Sopenharmony_ci "'EXCLUSIVE'" 5097db96d56Sopenharmony_ci ) 5107db96d56Sopenharmony_ci levels = ( 5117db96d56Sopenharmony_ci "BOGUS", 5127db96d56Sopenharmony_ci " ", 5137db96d56Sopenharmony_ci "DEFERRE", 5147db96d56Sopenharmony_ci "IMMEDIAT", 5157db96d56Sopenharmony_ci "EXCLUSIV", 5167db96d56Sopenharmony_ci "DEFERREDS", 5177db96d56Sopenharmony_ci "IMMEDIATES", 5187db96d56Sopenharmony_ci "EXCLUSIVES", 5197db96d56Sopenharmony_ci ) 5207db96d56Sopenharmony_ci for level in levels: 5217db96d56Sopenharmony_ci with self.subTest(level=level): 5227db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, msg): 5237db96d56Sopenharmony_ci memory_database(isolation_level=level) 5247db96d56Sopenharmony_ci with memory_database() as cx: 5257db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, msg): 5267db96d56Sopenharmony_ci cx.isolation_level = level 5277db96d56Sopenharmony_ci # Check that the default level is not changed 5287db96d56Sopenharmony_ci self.assertEqual(cx.isolation_level, "") 5297db96d56Sopenharmony_ci 5307db96d56Sopenharmony_ci def test_connection_init_good_isolation_levels(self): 5317db96d56Sopenharmony_ci for level in ("", "DEFERRED", "IMMEDIATE", "EXCLUSIVE", None): 5327db96d56Sopenharmony_ci with self.subTest(level=level): 5337db96d56Sopenharmony_ci with memory_database(isolation_level=level) as cx: 5347db96d56Sopenharmony_ci self.assertEqual(cx.isolation_level, level) 5357db96d56Sopenharmony_ci with memory_database() as cx: 5367db96d56Sopenharmony_ci self.assertEqual(cx.isolation_level, "") 5377db96d56Sopenharmony_ci cx.isolation_level = level 5387db96d56Sopenharmony_ci self.assertEqual(cx.isolation_level, level) 5397db96d56Sopenharmony_ci 5407db96d56Sopenharmony_ci def test_connection_reinit(self): 5417db96d56Sopenharmony_ci db = ":memory:" 5427db96d56Sopenharmony_ci cx = sqlite.connect(db) 5437db96d56Sopenharmony_ci cx.text_factory = bytes 5447db96d56Sopenharmony_ci cx.row_factory = sqlite.Row 5457db96d56Sopenharmony_ci cu = cx.cursor() 5467db96d56Sopenharmony_ci cu.execute("create table foo (bar)") 5477db96d56Sopenharmony_ci cu.executemany("insert into foo (bar) values (?)", 5487db96d56Sopenharmony_ci ((str(v),) for v in range(4))) 5497db96d56Sopenharmony_ci cu.execute("select bar from foo") 5507db96d56Sopenharmony_ci 5517db96d56Sopenharmony_ci rows = [r for r in cu.fetchmany(2)] 5527db96d56Sopenharmony_ci self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows)) 5537db96d56Sopenharmony_ci self.assertEqual([r[0] for r in rows], [b"0", b"1"]) 5547db96d56Sopenharmony_ci 5557db96d56Sopenharmony_ci cx.__init__(db) 5567db96d56Sopenharmony_ci cx.execute("create table foo (bar)") 5577db96d56Sopenharmony_ci cx.executemany("insert into foo (bar) values (?)", 5587db96d56Sopenharmony_ci ((v,) for v in ("a", "b", "c", "d"))) 5597db96d56Sopenharmony_ci 5607db96d56Sopenharmony_ci # This uses the old database, old row factory, but new text factory 5617db96d56Sopenharmony_ci rows = [r for r in cu.fetchall()] 5627db96d56Sopenharmony_ci self.assertTrue(all(isinstance(r, sqlite.Row) for r in rows)) 5637db96d56Sopenharmony_ci self.assertEqual([r[0] for r in rows], ["2", "3"]) 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci def test_connection_bad_reinit(self): 5667db96d56Sopenharmony_ci cx = sqlite.connect(":memory:") 5677db96d56Sopenharmony_ci with cx: 5687db96d56Sopenharmony_ci cx.execute("create table t(t)") 5697db96d56Sopenharmony_ci with temp_dir() as db: 5707db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.OperationalError, 5717db96d56Sopenharmony_ci "unable to open database file", 5727db96d56Sopenharmony_ci cx.__init__, db) 5737db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, 5747db96d56Sopenharmony_ci "Base Connection.__init__ not called", 5757db96d56Sopenharmony_ci cx.executemany, "insert into t values(?)", 5767db96d56Sopenharmony_ci ((v,) for v in range(3))) 5777db96d56Sopenharmony_ci 5787db96d56Sopenharmony_ci 5797db96d56Sopenharmony_ciclass UninitialisedConnectionTests(unittest.TestCase): 5807db96d56Sopenharmony_ci def setUp(self): 5817db96d56Sopenharmony_ci self.cx = sqlite.Connection.__new__(sqlite.Connection) 5827db96d56Sopenharmony_ci 5837db96d56Sopenharmony_ci def test_uninit_operations(self): 5847db96d56Sopenharmony_ci funcs = ( 5857db96d56Sopenharmony_ci lambda: self.cx.isolation_level, 5867db96d56Sopenharmony_ci lambda: self.cx.total_changes, 5877db96d56Sopenharmony_ci lambda: self.cx.in_transaction, 5887db96d56Sopenharmony_ci lambda: self.cx.iterdump(), 5897db96d56Sopenharmony_ci lambda: self.cx.cursor(), 5907db96d56Sopenharmony_ci lambda: self.cx.close(), 5917db96d56Sopenharmony_ci ) 5927db96d56Sopenharmony_ci for func in funcs: 5937db96d56Sopenharmony_ci with self.subTest(func=func): 5947db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, 5957db96d56Sopenharmony_ci "Base Connection.__init__ not called", 5967db96d56Sopenharmony_ci func) 5977db96d56Sopenharmony_ci 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci@unittest.skipUnless(hasattr(sqlite.Connection, "serialize"), 6007db96d56Sopenharmony_ci "Needs SQLite serialize API") 6017db96d56Sopenharmony_ciclass SerializeTests(unittest.TestCase): 6027db96d56Sopenharmony_ci def test_serialize_deserialize(self): 6037db96d56Sopenharmony_ci with memory_database() as cx: 6047db96d56Sopenharmony_ci with cx: 6057db96d56Sopenharmony_ci cx.execute("create table t(t)") 6067db96d56Sopenharmony_ci data = cx.serialize() 6077db96d56Sopenharmony_ci 6087db96d56Sopenharmony_ci # Remove test table, verify that it was removed. 6097db96d56Sopenharmony_ci with cx: 6107db96d56Sopenharmony_ci cx.execute("drop table t") 6117db96d56Sopenharmony_ci regex = "no such table" 6127db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, regex): 6137db96d56Sopenharmony_ci cx.execute("select t from t") 6147db96d56Sopenharmony_ci 6157db96d56Sopenharmony_ci # Deserialize and verify that test table is restored. 6167db96d56Sopenharmony_ci cx.deserialize(data) 6177db96d56Sopenharmony_ci cx.execute("select t from t") 6187db96d56Sopenharmony_ci 6197db96d56Sopenharmony_ci def test_deserialize_wrong_args(self): 6207db96d56Sopenharmony_ci dataset = ( 6217db96d56Sopenharmony_ci (BufferError, memoryview(b"blob")[::2]), 6227db96d56Sopenharmony_ci (TypeError, []), 6237db96d56Sopenharmony_ci (TypeError, 1), 6247db96d56Sopenharmony_ci (TypeError, None), 6257db96d56Sopenharmony_ci ) 6267db96d56Sopenharmony_ci for exc, arg in dataset: 6277db96d56Sopenharmony_ci with self.subTest(exc=exc, arg=arg): 6287db96d56Sopenharmony_ci with memory_database() as cx: 6297db96d56Sopenharmony_ci self.assertRaises(exc, cx.deserialize, arg) 6307db96d56Sopenharmony_ci 6317db96d56Sopenharmony_ci def test_deserialize_corrupt_database(self): 6327db96d56Sopenharmony_ci with memory_database() as cx: 6337db96d56Sopenharmony_ci regex = "file is not a database" 6347db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.DatabaseError, regex): 6357db96d56Sopenharmony_ci cx.deserialize(b"\0\1\3") 6367db96d56Sopenharmony_ci # SQLite does not generate an error until you try to query the 6377db96d56Sopenharmony_ci # deserialized database. 6387db96d56Sopenharmony_ci cx.execute("create table fail(f)") 6397db96d56Sopenharmony_ci 6407db96d56Sopenharmony_ci @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 6417db96d56Sopenharmony_ci @bigmemtest(size=2**63, memuse=3, dry_run=False) 6427db96d56Sopenharmony_ci def test_deserialize_too_much_data_64bit(self): 6437db96d56Sopenharmony_ci with memory_database() as cx: 6447db96d56Sopenharmony_ci with self.assertRaisesRegex(OverflowError, "'data' is too large"): 6457db96d56Sopenharmony_ci cx.deserialize(b"b" * size) 6467db96d56Sopenharmony_ci 6477db96d56Sopenharmony_ci 6487db96d56Sopenharmony_ciclass OpenTests(unittest.TestCase): 6497db96d56Sopenharmony_ci _sql = "create table test(id integer)" 6507db96d56Sopenharmony_ci 6517db96d56Sopenharmony_ci def test_open_with_path_like_object(self): 6527db96d56Sopenharmony_ci """ Checks that we can successfully connect to a database using an object that 6537db96d56Sopenharmony_ci is PathLike, i.e. has __fspath__(). """ 6547db96d56Sopenharmony_ci path = FakePath(TESTFN) 6557db96d56Sopenharmony_ci self.addCleanup(unlink, path) 6567db96d56Sopenharmony_ci self.assertFalse(os.path.exists(path)) 6577db96d56Sopenharmony_ci with contextlib.closing(sqlite.connect(path)) as cx: 6587db96d56Sopenharmony_ci self.assertTrue(os.path.exists(path)) 6597db96d56Sopenharmony_ci cx.execute(self._sql) 6607db96d56Sopenharmony_ci 6617db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "win32", "skipped on Windows") 6627db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "darwin", "skipped on macOS") 6637db96d56Sopenharmony_ci @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI") 6647db96d56Sopenharmony_ci @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths") 6657db96d56Sopenharmony_ci def test_open_with_undecodable_path(self): 6667db96d56Sopenharmony_ci path = TESTFN_UNDECODABLE 6677db96d56Sopenharmony_ci self.addCleanup(unlink, path) 6687db96d56Sopenharmony_ci self.assertFalse(os.path.exists(path)) 6697db96d56Sopenharmony_ci with contextlib.closing(sqlite.connect(path)) as cx: 6707db96d56Sopenharmony_ci self.assertTrue(os.path.exists(path)) 6717db96d56Sopenharmony_ci cx.execute(self._sql) 6727db96d56Sopenharmony_ci 6737db96d56Sopenharmony_ci def test_open_uri(self): 6747db96d56Sopenharmony_ci path = TESTFN 6757db96d56Sopenharmony_ci self.addCleanup(unlink, path) 6767db96d56Sopenharmony_ci uri = "file:" + urllib.parse.quote(os.fsencode(path)) 6777db96d56Sopenharmony_ci self.assertFalse(os.path.exists(path)) 6787db96d56Sopenharmony_ci with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 6797db96d56Sopenharmony_ci self.assertTrue(os.path.exists(path)) 6807db96d56Sopenharmony_ci cx.execute(self._sql) 6817db96d56Sopenharmony_ci 6827db96d56Sopenharmony_ci def test_open_unquoted_uri(self): 6837db96d56Sopenharmony_ci path = TESTFN 6847db96d56Sopenharmony_ci self.addCleanup(unlink, path) 6857db96d56Sopenharmony_ci uri = "file:" + path 6867db96d56Sopenharmony_ci self.assertFalse(os.path.exists(path)) 6877db96d56Sopenharmony_ci with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 6887db96d56Sopenharmony_ci self.assertTrue(os.path.exists(path)) 6897db96d56Sopenharmony_ci cx.execute(self._sql) 6907db96d56Sopenharmony_ci 6917db96d56Sopenharmony_ci def test_open_uri_readonly(self): 6927db96d56Sopenharmony_ci path = TESTFN 6937db96d56Sopenharmony_ci self.addCleanup(unlink, path) 6947db96d56Sopenharmony_ci uri = "file:" + urllib.parse.quote(os.fsencode(path)) + "?mode=ro" 6957db96d56Sopenharmony_ci self.assertFalse(os.path.exists(path)) 6967db96d56Sopenharmony_ci # Cannot create new DB 6977db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 6987db96d56Sopenharmony_ci sqlite.connect(uri, uri=True) 6997db96d56Sopenharmony_ci self.assertFalse(os.path.exists(path)) 7007db96d56Sopenharmony_ci sqlite.connect(path).close() 7017db96d56Sopenharmony_ci self.assertTrue(os.path.exists(path)) 7027db96d56Sopenharmony_ci # Cannot modify new DB 7037db96d56Sopenharmony_ci with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 7047db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 7057db96d56Sopenharmony_ci cx.execute(self._sql) 7067db96d56Sopenharmony_ci 7077db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "win32", "skipped on Windows") 7087db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "darwin", "skipped on macOS") 7097db96d56Sopenharmony_ci @unittest.skipIf(is_emscripten or is_wasi, "not supported on Emscripten/WASI") 7107db96d56Sopenharmony_ci @unittest.skipUnless(TESTFN_UNDECODABLE, "only works if there are undecodable paths") 7117db96d56Sopenharmony_ci def test_open_undecodable_uri(self): 7127db96d56Sopenharmony_ci path = TESTFN_UNDECODABLE 7137db96d56Sopenharmony_ci self.addCleanup(unlink, path) 7147db96d56Sopenharmony_ci uri = "file:" + urllib.parse.quote(path) 7157db96d56Sopenharmony_ci self.assertFalse(os.path.exists(path)) 7167db96d56Sopenharmony_ci with contextlib.closing(sqlite.connect(uri, uri=True)) as cx: 7177db96d56Sopenharmony_ci self.assertTrue(os.path.exists(path)) 7187db96d56Sopenharmony_ci cx.execute(self._sql) 7197db96d56Sopenharmony_ci 7207db96d56Sopenharmony_ci def test_factory_database_arg(self): 7217db96d56Sopenharmony_ci def factory(database, *args, **kwargs): 7227db96d56Sopenharmony_ci nonlocal database_arg 7237db96d56Sopenharmony_ci database_arg = database 7247db96d56Sopenharmony_ci return sqlite.Connection(":memory:", *args, **kwargs) 7257db96d56Sopenharmony_ci 7267db96d56Sopenharmony_ci for database in (TESTFN, os.fsencode(TESTFN), 7277db96d56Sopenharmony_ci FakePath(TESTFN), FakePath(os.fsencode(TESTFN))): 7287db96d56Sopenharmony_ci database_arg = None 7297db96d56Sopenharmony_ci sqlite.connect(database, factory=factory).close() 7307db96d56Sopenharmony_ci self.assertEqual(database_arg, database) 7317db96d56Sopenharmony_ci 7327db96d56Sopenharmony_ci def test_database_keyword(self): 7337db96d56Sopenharmony_ci with contextlib.closing(sqlite.connect(database=":memory:")) as cx: 7347db96d56Sopenharmony_ci self.assertEqual(type(cx), sqlite.Connection) 7357db96d56Sopenharmony_ci 7367db96d56Sopenharmony_ci 7377db96d56Sopenharmony_ciclass CursorTests(unittest.TestCase): 7387db96d56Sopenharmony_ci def setUp(self): 7397db96d56Sopenharmony_ci self.cx = sqlite.connect(":memory:") 7407db96d56Sopenharmony_ci self.cu = self.cx.cursor() 7417db96d56Sopenharmony_ci self.cu.execute( 7427db96d56Sopenharmony_ci "create table test(id integer primary key, name text, " 7437db96d56Sopenharmony_ci "income number, unique_test text unique)" 7447db96d56Sopenharmony_ci ) 7457db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values (?)", ("foo",)) 7467db96d56Sopenharmony_ci 7477db96d56Sopenharmony_ci def tearDown(self): 7487db96d56Sopenharmony_ci self.cu.close() 7497db96d56Sopenharmony_ci self.cx.close() 7507db96d56Sopenharmony_ci 7517db96d56Sopenharmony_ci def test_execute_no_args(self): 7527db96d56Sopenharmony_ci self.cu.execute("delete from test") 7537db96d56Sopenharmony_ci 7547db96d56Sopenharmony_ci def test_execute_illegal_sql(self): 7557db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 7567db96d56Sopenharmony_ci self.cu.execute("select asdf") 7577db96d56Sopenharmony_ci 7587db96d56Sopenharmony_ci def test_execute_multiple_statements(self): 7597db96d56Sopenharmony_ci msg = "You can only execute one statement at a time" 7607db96d56Sopenharmony_ci dataset = ( 7617db96d56Sopenharmony_ci "select 1; select 2", 7627db96d56Sopenharmony_ci "select 1; // c++ comments are not allowed", 7637db96d56Sopenharmony_ci "select 1; *not a comment", 7647db96d56Sopenharmony_ci "select 1; -*not a comment", 7657db96d56Sopenharmony_ci "select 1; /* */ a", 7667db96d56Sopenharmony_ci "select 1; /**/a", 7677db96d56Sopenharmony_ci "select 1; -", 7687db96d56Sopenharmony_ci "select 1; /", 7697db96d56Sopenharmony_ci "select 1; -\n- select 2", 7707db96d56Sopenharmony_ci """select 1; 7717db96d56Sopenharmony_ci -- comment 7727db96d56Sopenharmony_ci select 2 7737db96d56Sopenharmony_ci """, 7747db96d56Sopenharmony_ci ) 7757db96d56Sopenharmony_ci for query in dataset: 7767db96d56Sopenharmony_ci with self.subTest(query=query): 7777db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 7787db96d56Sopenharmony_ci self.cu.execute(query) 7797db96d56Sopenharmony_ci 7807db96d56Sopenharmony_ci def test_execute_with_appended_comments(self): 7817db96d56Sopenharmony_ci dataset = ( 7827db96d56Sopenharmony_ci "select 1; -- foo bar", 7837db96d56Sopenharmony_ci "select 1; --", 7847db96d56Sopenharmony_ci "select 1; /*", # Unclosed comments ending in \0 are skipped. 7857db96d56Sopenharmony_ci """ 7867db96d56Sopenharmony_ci select 5+4; 7877db96d56Sopenharmony_ci 7887db96d56Sopenharmony_ci /* 7897db96d56Sopenharmony_ci foo 7907db96d56Sopenharmony_ci */ 7917db96d56Sopenharmony_ci """, 7927db96d56Sopenharmony_ci ) 7937db96d56Sopenharmony_ci for query in dataset: 7947db96d56Sopenharmony_ci with self.subTest(query=query): 7957db96d56Sopenharmony_ci self.cu.execute(query) 7967db96d56Sopenharmony_ci 7977db96d56Sopenharmony_ci def test_execute_wrong_sql_arg(self): 7987db96d56Sopenharmony_ci with self.assertRaises(TypeError): 7997db96d56Sopenharmony_ci self.cu.execute(42) 8007db96d56Sopenharmony_ci 8017db96d56Sopenharmony_ci def test_execute_arg_int(self): 8027db96d56Sopenharmony_ci self.cu.execute("insert into test(id) values (?)", (42,)) 8037db96d56Sopenharmony_ci 8047db96d56Sopenharmony_ci def test_execute_arg_float(self): 8057db96d56Sopenharmony_ci self.cu.execute("insert into test(income) values (?)", (2500.32,)) 8067db96d56Sopenharmony_ci 8077db96d56Sopenharmony_ci def test_execute_arg_string(self): 8087db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values (?)", ("Hugo",)) 8097db96d56Sopenharmony_ci 8107db96d56Sopenharmony_ci def test_execute_arg_string_with_zero_byte(self): 8117db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values (?)", ("Hu\x00go",)) 8127db96d56Sopenharmony_ci 8137db96d56Sopenharmony_ci self.cu.execute("select name from test where id=?", (self.cu.lastrowid,)) 8147db96d56Sopenharmony_ci row = self.cu.fetchone() 8157db96d56Sopenharmony_ci self.assertEqual(row[0], "Hu\x00go") 8167db96d56Sopenharmony_ci 8177db96d56Sopenharmony_ci def test_execute_non_iterable(self): 8187db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError) as cm: 8197db96d56Sopenharmony_ci self.cu.execute("insert into test(id) values (?)", 42) 8207db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), 'parameters are of unsupported type') 8217db96d56Sopenharmony_ci 8227db96d56Sopenharmony_ci def test_execute_wrong_no_of_args1(self): 8237db96d56Sopenharmony_ci # too many parameters 8247db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 8257db96d56Sopenharmony_ci self.cu.execute("insert into test(id) values (?)", (17, "Egon")) 8267db96d56Sopenharmony_ci 8277db96d56Sopenharmony_ci def test_execute_wrong_no_of_args2(self): 8287db96d56Sopenharmony_ci # too little parameters 8297db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 8307db96d56Sopenharmony_ci self.cu.execute("insert into test(id) values (?)") 8317db96d56Sopenharmony_ci 8327db96d56Sopenharmony_ci def test_execute_wrong_no_of_args3(self): 8337db96d56Sopenharmony_ci # no parameters, parameters are needed 8347db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 8357db96d56Sopenharmony_ci self.cu.execute("insert into test(id) values (?)") 8367db96d56Sopenharmony_ci 8377db96d56Sopenharmony_ci def test_execute_param_list(self): 8387db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 8397db96d56Sopenharmony_ci self.cu.execute("select name from test where name=?", ["foo"]) 8407db96d56Sopenharmony_ci row = self.cu.fetchone() 8417db96d56Sopenharmony_ci self.assertEqual(row[0], "foo") 8427db96d56Sopenharmony_ci 8437db96d56Sopenharmony_ci def test_execute_param_sequence(self): 8447db96d56Sopenharmony_ci class L: 8457db96d56Sopenharmony_ci def __len__(self): 8467db96d56Sopenharmony_ci return 1 8477db96d56Sopenharmony_ci def __getitem__(self, x): 8487db96d56Sopenharmony_ci assert x == 0 8497db96d56Sopenharmony_ci return "foo" 8507db96d56Sopenharmony_ci 8517db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 8527db96d56Sopenharmony_ci self.cu.execute("select name from test where name=?", L()) 8537db96d56Sopenharmony_ci row = self.cu.fetchone() 8547db96d56Sopenharmony_ci self.assertEqual(row[0], "foo") 8557db96d56Sopenharmony_ci 8567db96d56Sopenharmony_ci def test_execute_param_sequence_bad_len(self): 8577db96d56Sopenharmony_ci # Issue41662: Error in __len__() was overridden with ProgrammingError. 8587db96d56Sopenharmony_ci class L: 8597db96d56Sopenharmony_ci def __len__(self): 8607db96d56Sopenharmony_ci 1/0 8617db96d56Sopenharmony_ci def __getitem__(slf, x): 8627db96d56Sopenharmony_ci raise AssertionError 8637db96d56Sopenharmony_ci 8647db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 8657db96d56Sopenharmony_ci with self.assertRaises(ZeroDivisionError): 8667db96d56Sopenharmony_ci self.cu.execute("select name from test where name=?", L()) 8677db96d56Sopenharmony_ci 8687db96d56Sopenharmony_ci def test_execute_too_many_params(self): 8697db96d56Sopenharmony_ci category = sqlite.SQLITE_LIMIT_VARIABLE_NUMBER 8707db96d56Sopenharmony_ci msg = "too many SQL variables" 8717db96d56Sopenharmony_ci with cx_limit(self.cx, category=category, limit=1): 8727db96d56Sopenharmony_ci self.cu.execute("select * from test where id=?", (1,)) 8737db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, msg): 8747db96d56Sopenharmony_ci self.cu.execute("select * from test where id!=? and id!=?", 8757db96d56Sopenharmony_ci (1, 2)) 8767db96d56Sopenharmony_ci 8777db96d56Sopenharmony_ci def test_execute_dict_mapping(self): 8787db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 8797db96d56Sopenharmony_ci self.cu.execute("select name from test where name=:name", {"name": "foo"}) 8807db96d56Sopenharmony_ci row = self.cu.fetchone() 8817db96d56Sopenharmony_ci self.assertEqual(row[0], "foo") 8827db96d56Sopenharmony_ci 8837db96d56Sopenharmony_ci def test_execute_dict_mapping_mapping(self): 8847db96d56Sopenharmony_ci class D(dict): 8857db96d56Sopenharmony_ci def __missing__(self, key): 8867db96d56Sopenharmony_ci return "foo" 8877db96d56Sopenharmony_ci 8887db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 8897db96d56Sopenharmony_ci self.cu.execute("select name from test where name=:name", D()) 8907db96d56Sopenharmony_ci row = self.cu.fetchone() 8917db96d56Sopenharmony_ci self.assertEqual(row[0], "foo") 8927db96d56Sopenharmony_ci 8937db96d56Sopenharmony_ci def test_execute_dict_mapping_too_little_args(self): 8947db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 8957db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 8967db96d56Sopenharmony_ci self.cu.execute("select name from test where name=:name and id=:id", {"name": "foo"}) 8977db96d56Sopenharmony_ci 8987db96d56Sopenharmony_ci def test_execute_dict_mapping_no_args(self): 8997db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 9007db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 9017db96d56Sopenharmony_ci self.cu.execute("select name from test where name=:name") 9027db96d56Sopenharmony_ci 9037db96d56Sopenharmony_ci def test_execute_dict_mapping_unnamed(self): 9047db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 9057db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 9067db96d56Sopenharmony_ci self.cu.execute("select name from test where name=?", {"name": "foo"}) 9077db96d56Sopenharmony_ci 9087db96d56Sopenharmony_ci def test_close(self): 9097db96d56Sopenharmony_ci self.cu.close() 9107db96d56Sopenharmony_ci 9117db96d56Sopenharmony_ci def test_rowcount_execute(self): 9127db96d56Sopenharmony_ci self.cu.execute("delete from test") 9137db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 9147db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 9157db96d56Sopenharmony_ci self.cu.execute("update test set name='bar'") 9167db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, 2) 9177db96d56Sopenharmony_ci 9187db96d56Sopenharmony_ci def test_rowcount_select(self): 9197db96d56Sopenharmony_ci """ 9207db96d56Sopenharmony_ci pysqlite does not know the rowcount of SELECT statements, because we 9217db96d56Sopenharmony_ci don't fetch all rows after executing the select statement. The rowcount 9227db96d56Sopenharmony_ci has thus to be -1. 9237db96d56Sopenharmony_ci """ 9247db96d56Sopenharmony_ci self.cu.execute("select 5 union select 6") 9257db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, -1) 9267db96d56Sopenharmony_ci 9277db96d56Sopenharmony_ci def test_rowcount_executemany(self): 9287db96d56Sopenharmony_ci self.cu.execute("delete from test") 9297db96d56Sopenharmony_ci self.cu.executemany("insert into test(name) values (?)", [(1,), (2,), (3,)]) 9307db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, 3) 9317db96d56Sopenharmony_ci 9327db96d56Sopenharmony_ci @unittest.skipIf(sqlite.sqlite_version_info < (3, 35, 0), 9337db96d56Sopenharmony_ci "Requires SQLite 3.35.0 or newer") 9347db96d56Sopenharmony_ci def test_rowcount_update_returning(self): 9357db96d56Sopenharmony_ci # gh-93421: rowcount is updated correctly for UPDATE...RETURNING queries 9367db96d56Sopenharmony_ci self.cu.execute("update test set name='bar' where name='foo' returning 1") 9377db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchone()[0], 1) 9387db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, 1) 9397db96d56Sopenharmony_ci 9407db96d56Sopenharmony_ci def test_rowcount_prefixed_with_comment(self): 9417db96d56Sopenharmony_ci # gh-79579: rowcount is updated even if query is prefixed with comments 9427db96d56Sopenharmony_ci self.cu.execute(""" 9437db96d56Sopenharmony_ci -- foo 9447db96d56Sopenharmony_ci insert into test(name) values ('foo'), ('foo') 9457db96d56Sopenharmony_ci """) 9467db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, 2) 9477db96d56Sopenharmony_ci self.cu.execute(""" 9487db96d56Sopenharmony_ci /* -- messy *r /* /* ** *- *-- 9497db96d56Sopenharmony_ci */ 9507db96d56Sopenharmony_ci /* one more */ insert into test(name) values ('messy') 9517db96d56Sopenharmony_ci """) 9527db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, 1) 9537db96d56Sopenharmony_ci self.cu.execute("/* bar */ update test set name='bar' where name='foo'") 9547db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, 3) 9557db96d56Sopenharmony_ci 9567db96d56Sopenharmony_ci def test_rowcount_vaccuum(self): 9577db96d56Sopenharmony_ci data = ((1,), (2,), (3,)) 9587db96d56Sopenharmony_ci self.cu.executemany("insert into test(income) values(?)", data) 9597db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, 3) 9607db96d56Sopenharmony_ci self.cx.commit() 9617db96d56Sopenharmony_ci self.cu.execute("vacuum") 9627db96d56Sopenharmony_ci self.assertEqual(self.cu.rowcount, -1) 9637db96d56Sopenharmony_ci 9647db96d56Sopenharmony_ci def test_total_changes(self): 9657db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 9667db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('foo')") 9677db96d56Sopenharmony_ci self.assertLess(2, self.cx.total_changes, msg='total changes reported wrong value') 9687db96d56Sopenharmony_ci 9697db96d56Sopenharmony_ci # Checks for executemany: 9707db96d56Sopenharmony_ci # Sequences are required by the DB-API, iterators 9717db96d56Sopenharmony_ci # enhancements in pysqlite. 9727db96d56Sopenharmony_ci 9737db96d56Sopenharmony_ci def test_execute_many_sequence(self): 9747db96d56Sopenharmony_ci self.cu.executemany("insert into test(income) values (?)", [(x,) for x in range(100, 110)]) 9757db96d56Sopenharmony_ci 9767db96d56Sopenharmony_ci def test_execute_many_iterator(self): 9777db96d56Sopenharmony_ci class MyIter: 9787db96d56Sopenharmony_ci def __init__(self): 9797db96d56Sopenharmony_ci self.value = 5 9807db96d56Sopenharmony_ci 9817db96d56Sopenharmony_ci def __iter__(self): 9827db96d56Sopenharmony_ci return self 9837db96d56Sopenharmony_ci 9847db96d56Sopenharmony_ci def __next__(self): 9857db96d56Sopenharmony_ci if self.value == 10: 9867db96d56Sopenharmony_ci raise StopIteration 9877db96d56Sopenharmony_ci else: 9887db96d56Sopenharmony_ci self.value += 1 9897db96d56Sopenharmony_ci return (self.value,) 9907db96d56Sopenharmony_ci 9917db96d56Sopenharmony_ci self.cu.executemany("insert into test(income) values (?)", MyIter()) 9927db96d56Sopenharmony_ci 9937db96d56Sopenharmony_ci def test_execute_many_generator(self): 9947db96d56Sopenharmony_ci def mygen(): 9957db96d56Sopenharmony_ci for i in range(5): 9967db96d56Sopenharmony_ci yield (i,) 9977db96d56Sopenharmony_ci 9987db96d56Sopenharmony_ci self.cu.executemany("insert into test(income) values (?)", mygen()) 9997db96d56Sopenharmony_ci 10007db96d56Sopenharmony_ci def test_execute_many_wrong_sql_arg(self): 10017db96d56Sopenharmony_ci with self.assertRaises(TypeError): 10027db96d56Sopenharmony_ci self.cu.executemany(42, [(3,)]) 10037db96d56Sopenharmony_ci 10047db96d56Sopenharmony_ci def test_execute_many_select(self): 10057db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 10067db96d56Sopenharmony_ci self.cu.executemany("select ?", [(3,)]) 10077db96d56Sopenharmony_ci 10087db96d56Sopenharmony_ci def test_execute_many_not_iterable(self): 10097db96d56Sopenharmony_ci with self.assertRaises(TypeError): 10107db96d56Sopenharmony_ci self.cu.executemany("insert into test(income) values (?)", 42) 10117db96d56Sopenharmony_ci 10127db96d56Sopenharmony_ci def test_fetch_iter(self): 10137db96d56Sopenharmony_ci # Optional DB-API extension. 10147db96d56Sopenharmony_ci self.cu.execute("delete from test") 10157db96d56Sopenharmony_ci self.cu.execute("insert into test(id) values (?)", (5,)) 10167db96d56Sopenharmony_ci self.cu.execute("insert into test(id) values (?)", (6,)) 10177db96d56Sopenharmony_ci self.cu.execute("select id from test order by id") 10187db96d56Sopenharmony_ci lst = [] 10197db96d56Sopenharmony_ci for row in self.cu: 10207db96d56Sopenharmony_ci lst.append(row[0]) 10217db96d56Sopenharmony_ci self.assertEqual(lst[0], 5) 10227db96d56Sopenharmony_ci self.assertEqual(lst[1], 6) 10237db96d56Sopenharmony_ci 10247db96d56Sopenharmony_ci def test_fetchone(self): 10257db96d56Sopenharmony_ci self.cu.execute("select name from test") 10267db96d56Sopenharmony_ci row = self.cu.fetchone() 10277db96d56Sopenharmony_ci self.assertEqual(row[0], "foo") 10287db96d56Sopenharmony_ci row = self.cu.fetchone() 10297db96d56Sopenharmony_ci self.assertEqual(row, None) 10307db96d56Sopenharmony_ci 10317db96d56Sopenharmony_ci def test_fetchone_no_statement(self): 10327db96d56Sopenharmony_ci cur = self.cx.cursor() 10337db96d56Sopenharmony_ci row = cur.fetchone() 10347db96d56Sopenharmony_ci self.assertEqual(row, None) 10357db96d56Sopenharmony_ci 10367db96d56Sopenharmony_ci def test_array_size(self): 10377db96d56Sopenharmony_ci # must default to 1 10387db96d56Sopenharmony_ci self.assertEqual(self.cu.arraysize, 1) 10397db96d56Sopenharmony_ci 10407db96d56Sopenharmony_ci # now set to 2 10417db96d56Sopenharmony_ci self.cu.arraysize = 2 10427db96d56Sopenharmony_ci 10437db96d56Sopenharmony_ci # now make the query return 3 rows 10447db96d56Sopenharmony_ci self.cu.execute("delete from test") 10457db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('A')") 10467db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('B')") 10477db96d56Sopenharmony_ci self.cu.execute("insert into test(name) values ('C')") 10487db96d56Sopenharmony_ci self.cu.execute("select name from test") 10497db96d56Sopenharmony_ci res = self.cu.fetchmany() 10507db96d56Sopenharmony_ci 10517db96d56Sopenharmony_ci self.assertEqual(len(res), 2) 10527db96d56Sopenharmony_ci 10537db96d56Sopenharmony_ci def test_fetchmany(self): 10547db96d56Sopenharmony_ci self.cu.execute("select name from test") 10557db96d56Sopenharmony_ci res = self.cu.fetchmany(100) 10567db96d56Sopenharmony_ci self.assertEqual(len(res), 1) 10577db96d56Sopenharmony_ci res = self.cu.fetchmany(100) 10587db96d56Sopenharmony_ci self.assertEqual(res, []) 10597db96d56Sopenharmony_ci 10607db96d56Sopenharmony_ci def test_fetchmany_kw_arg(self): 10617db96d56Sopenharmony_ci """Checks if fetchmany works with keyword arguments""" 10627db96d56Sopenharmony_ci self.cu.execute("select name from test") 10637db96d56Sopenharmony_ci res = self.cu.fetchmany(size=100) 10647db96d56Sopenharmony_ci self.assertEqual(len(res), 1) 10657db96d56Sopenharmony_ci 10667db96d56Sopenharmony_ci def test_fetchall(self): 10677db96d56Sopenharmony_ci self.cu.execute("select name from test") 10687db96d56Sopenharmony_ci res = self.cu.fetchall() 10697db96d56Sopenharmony_ci self.assertEqual(len(res), 1) 10707db96d56Sopenharmony_ci res = self.cu.fetchall() 10717db96d56Sopenharmony_ci self.assertEqual(res, []) 10727db96d56Sopenharmony_ci 10737db96d56Sopenharmony_ci def test_setinputsizes(self): 10747db96d56Sopenharmony_ci self.cu.setinputsizes([3, 4, 5]) 10757db96d56Sopenharmony_ci 10767db96d56Sopenharmony_ci def test_setoutputsize(self): 10777db96d56Sopenharmony_ci self.cu.setoutputsize(5, 0) 10787db96d56Sopenharmony_ci 10797db96d56Sopenharmony_ci def test_setoutputsize_no_column(self): 10807db96d56Sopenharmony_ci self.cu.setoutputsize(42) 10817db96d56Sopenharmony_ci 10827db96d56Sopenharmony_ci def test_cursor_connection(self): 10837db96d56Sopenharmony_ci # Optional DB-API extension. 10847db96d56Sopenharmony_ci self.assertEqual(self.cu.connection, self.cx) 10857db96d56Sopenharmony_ci 10867db96d56Sopenharmony_ci def test_wrong_cursor_callable(self): 10877db96d56Sopenharmony_ci with self.assertRaises(TypeError): 10887db96d56Sopenharmony_ci def f(): pass 10897db96d56Sopenharmony_ci cur = self.cx.cursor(f) 10907db96d56Sopenharmony_ci 10917db96d56Sopenharmony_ci def test_cursor_wrong_class(self): 10927db96d56Sopenharmony_ci class Foo: pass 10937db96d56Sopenharmony_ci foo = Foo() 10947db96d56Sopenharmony_ci with self.assertRaises(TypeError): 10957db96d56Sopenharmony_ci cur = sqlite.Cursor(foo) 10967db96d56Sopenharmony_ci 10977db96d56Sopenharmony_ci def test_last_row_id_on_replace(self): 10987db96d56Sopenharmony_ci """ 10997db96d56Sopenharmony_ci INSERT OR REPLACE and REPLACE INTO should produce the same behavior. 11007db96d56Sopenharmony_ci """ 11017db96d56Sopenharmony_ci sql = '{} INTO test(id, unique_test) VALUES (?, ?)' 11027db96d56Sopenharmony_ci for statement in ('INSERT OR REPLACE', 'REPLACE'): 11037db96d56Sopenharmony_ci with self.subTest(statement=statement): 11047db96d56Sopenharmony_ci self.cu.execute(sql.format(statement), (1, 'foo')) 11057db96d56Sopenharmony_ci self.assertEqual(self.cu.lastrowid, 1) 11067db96d56Sopenharmony_ci 11077db96d56Sopenharmony_ci def test_last_row_id_on_ignore(self): 11087db96d56Sopenharmony_ci self.cu.execute( 11097db96d56Sopenharmony_ci "insert or ignore into test(unique_test) values (?)", 11107db96d56Sopenharmony_ci ('test',)) 11117db96d56Sopenharmony_ci self.assertEqual(self.cu.lastrowid, 2) 11127db96d56Sopenharmony_ci self.cu.execute( 11137db96d56Sopenharmony_ci "insert or ignore into test(unique_test) values (?)", 11147db96d56Sopenharmony_ci ('test',)) 11157db96d56Sopenharmony_ci self.assertEqual(self.cu.lastrowid, 2) 11167db96d56Sopenharmony_ci 11177db96d56Sopenharmony_ci def test_last_row_id_insert_o_r(self): 11187db96d56Sopenharmony_ci results = [] 11197db96d56Sopenharmony_ci for statement in ('FAIL', 'ABORT', 'ROLLBACK'): 11207db96d56Sopenharmony_ci sql = 'INSERT OR {} INTO test(unique_test) VALUES (?)' 11217db96d56Sopenharmony_ci with self.subTest(statement='INSERT OR {}'.format(statement)): 11227db96d56Sopenharmony_ci self.cu.execute(sql.format(statement), (statement,)) 11237db96d56Sopenharmony_ci results.append((statement, self.cu.lastrowid)) 11247db96d56Sopenharmony_ci with self.assertRaises(sqlite.IntegrityError): 11257db96d56Sopenharmony_ci self.cu.execute(sql.format(statement), (statement,)) 11267db96d56Sopenharmony_ci results.append((statement, self.cu.lastrowid)) 11277db96d56Sopenharmony_ci expected = [ 11287db96d56Sopenharmony_ci ('FAIL', 2), ('FAIL', 2), 11297db96d56Sopenharmony_ci ('ABORT', 3), ('ABORT', 3), 11307db96d56Sopenharmony_ci ('ROLLBACK', 4), ('ROLLBACK', 4), 11317db96d56Sopenharmony_ci ] 11327db96d56Sopenharmony_ci self.assertEqual(results, expected) 11337db96d56Sopenharmony_ci 11347db96d56Sopenharmony_ci def test_column_count(self): 11357db96d56Sopenharmony_ci # Check that column count is updated correctly for cached statements 11367db96d56Sopenharmony_ci select = "select * from test" 11377db96d56Sopenharmony_ci res = self.cu.execute(select) 11387db96d56Sopenharmony_ci old_count = len(res.description) 11397db96d56Sopenharmony_ci # Add a new column and execute the cached select query again 11407db96d56Sopenharmony_ci self.cu.execute("alter table test add newcol") 11417db96d56Sopenharmony_ci res = self.cu.execute(select) 11427db96d56Sopenharmony_ci new_count = len(res.description) 11437db96d56Sopenharmony_ci self.assertEqual(new_count - old_count, 1) 11447db96d56Sopenharmony_ci 11457db96d56Sopenharmony_ci def test_same_query_in_multiple_cursors(self): 11467db96d56Sopenharmony_ci cursors = [self.cx.execute("select 1") for _ in range(3)] 11477db96d56Sopenharmony_ci for cu in cursors: 11487db96d56Sopenharmony_ci self.assertEqual(cu.fetchall(), [(1,)]) 11497db96d56Sopenharmony_ci 11507db96d56Sopenharmony_ci 11517db96d56Sopenharmony_ciclass BlobTests(unittest.TestCase): 11527db96d56Sopenharmony_ci def setUp(self): 11537db96d56Sopenharmony_ci self.cx = sqlite.connect(":memory:") 11547db96d56Sopenharmony_ci self.cx.execute("create table test(b blob)") 11557db96d56Sopenharmony_ci self.data = b"this blob data string is exactly fifty bytes long!" 11567db96d56Sopenharmony_ci self.cx.execute("insert into test(b) values (?)", (self.data,)) 11577db96d56Sopenharmony_ci self.blob = self.cx.blobopen("test", "b", 1) 11587db96d56Sopenharmony_ci 11597db96d56Sopenharmony_ci def tearDown(self): 11607db96d56Sopenharmony_ci self.blob.close() 11617db96d56Sopenharmony_ci self.cx.close() 11627db96d56Sopenharmony_ci 11637db96d56Sopenharmony_ci def test_blob_is_a_blob(self): 11647db96d56Sopenharmony_ci self.assertIsInstance(self.blob, sqlite.Blob) 11657db96d56Sopenharmony_ci 11667db96d56Sopenharmony_ci def test_blob_seek_and_tell(self): 11677db96d56Sopenharmony_ci self.blob.seek(10) 11687db96d56Sopenharmony_ci self.assertEqual(self.blob.tell(), 10) 11697db96d56Sopenharmony_ci 11707db96d56Sopenharmony_ci self.blob.seek(10, SEEK_SET) 11717db96d56Sopenharmony_ci self.assertEqual(self.blob.tell(), 10) 11727db96d56Sopenharmony_ci 11737db96d56Sopenharmony_ci self.blob.seek(10, SEEK_CUR) 11747db96d56Sopenharmony_ci self.assertEqual(self.blob.tell(), 20) 11757db96d56Sopenharmony_ci 11767db96d56Sopenharmony_ci self.blob.seek(-10, SEEK_END) 11777db96d56Sopenharmony_ci self.assertEqual(self.blob.tell(), 40) 11787db96d56Sopenharmony_ci 11797db96d56Sopenharmony_ci def test_blob_seek_error(self): 11807db96d56Sopenharmony_ci msg_oor = "offset out of blob range" 11817db96d56Sopenharmony_ci msg_orig = "'origin' should be os.SEEK_SET, os.SEEK_CUR, or os.SEEK_END" 11827db96d56Sopenharmony_ci msg_of = "seek offset results in overflow" 11837db96d56Sopenharmony_ci 11847db96d56Sopenharmony_ci dataset = ( 11857db96d56Sopenharmony_ci (ValueError, msg_oor, lambda: self.blob.seek(1000)), 11867db96d56Sopenharmony_ci (ValueError, msg_oor, lambda: self.blob.seek(-10)), 11877db96d56Sopenharmony_ci (ValueError, msg_orig, lambda: self.blob.seek(10, -1)), 11887db96d56Sopenharmony_ci (ValueError, msg_orig, lambda: self.blob.seek(10, 3)), 11897db96d56Sopenharmony_ci ) 11907db96d56Sopenharmony_ci for exc, msg, fn in dataset: 11917db96d56Sopenharmony_ci with self.subTest(exc=exc, msg=msg, fn=fn): 11927db96d56Sopenharmony_ci self.assertRaisesRegex(exc, msg, fn) 11937db96d56Sopenharmony_ci 11947db96d56Sopenharmony_ci # Force overflow errors 11957db96d56Sopenharmony_ci self.blob.seek(1, SEEK_SET) 11967db96d56Sopenharmony_ci with self.assertRaisesRegex(OverflowError, msg_of): 11977db96d56Sopenharmony_ci self.blob.seek(INT_MAX, SEEK_CUR) 11987db96d56Sopenharmony_ci with self.assertRaisesRegex(OverflowError, msg_of): 11997db96d56Sopenharmony_ci self.blob.seek(INT_MAX, SEEK_END) 12007db96d56Sopenharmony_ci 12017db96d56Sopenharmony_ci def test_blob_read(self): 12027db96d56Sopenharmony_ci buf = self.blob.read() 12037db96d56Sopenharmony_ci self.assertEqual(buf, self.data) 12047db96d56Sopenharmony_ci 12057db96d56Sopenharmony_ci def test_blob_read_oversized(self): 12067db96d56Sopenharmony_ci buf = self.blob.read(len(self.data) * 2) 12077db96d56Sopenharmony_ci self.assertEqual(buf, self.data) 12087db96d56Sopenharmony_ci 12097db96d56Sopenharmony_ci def test_blob_read_advance_offset(self): 12107db96d56Sopenharmony_ci n = 10 12117db96d56Sopenharmony_ci buf = self.blob.read(n) 12127db96d56Sopenharmony_ci self.assertEqual(buf, self.data[:n]) 12137db96d56Sopenharmony_ci self.assertEqual(self.blob.tell(), n) 12147db96d56Sopenharmony_ci 12157db96d56Sopenharmony_ci def test_blob_read_at_offset(self): 12167db96d56Sopenharmony_ci self.blob.seek(10) 12177db96d56Sopenharmony_ci self.assertEqual(self.blob.read(10), self.data[10:20]) 12187db96d56Sopenharmony_ci 12197db96d56Sopenharmony_ci def test_blob_read_error_row_changed(self): 12207db96d56Sopenharmony_ci self.cx.execute("update test set b='aaaa' where rowid=1") 12217db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 12227db96d56Sopenharmony_ci self.blob.read() 12237db96d56Sopenharmony_ci 12247db96d56Sopenharmony_ci def test_blob_write(self): 12257db96d56Sopenharmony_ci new_data = b"new data".ljust(50) 12267db96d56Sopenharmony_ci self.blob.write(new_data) 12277db96d56Sopenharmony_ci row = self.cx.execute("select b from test").fetchone() 12287db96d56Sopenharmony_ci self.assertEqual(row[0], new_data) 12297db96d56Sopenharmony_ci 12307db96d56Sopenharmony_ci def test_blob_write_at_offset(self): 12317db96d56Sopenharmony_ci new_data = b"c" * 25 12327db96d56Sopenharmony_ci self.blob.seek(25) 12337db96d56Sopenharmony_ci self.blob.write(new_data) 12347db96d56Sopenharmony_ci row = self.cx.execute("select b from test").fetchone() 12357db96d56Sopenharmony_ci self.assertEqual(row[0], self.data[:25] + new_data) 12367db96d56Sopenharmony_ci 12377db96d56Sopenharmony_ci def test_blob_write_advance_offset(self): 12387db96d56Sopenharmony_ci self.blob.write(b"d"*10) 12397db96d56Sopenharmony_ci self.assertEqual(self.blob.tell(), 10) 12407db96d56Sopenharmony_ci 12417db96d56Sopenharmony_ci def test_blob_write_error_length(self): 12427db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "data longer than blob"): 12437db96d56Sopenharmony_ci self.blob.write(b"a" * 1000) 12447db96d56Sopenharmony_ci 12457db96d56Sopenharmony_ci self.blob.seek(0, SEEK_SET) 12467db96d56Sopenharmony_ci n = len(self.blob) 12477db96d56Sopenharmony_ci self.blob.write(b"a" * (n-1)) 12487db96d56Sopenharmony_ci self.blob.write(b"a") 12497db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "data longer than blob"): 12507db96d56Sopenharmony_ci self.blob.write(b"a") 12517db96d56Sopenharmony_ci 12527db96d56Sopenharmony_ci def test_blob_write_error_row_changed(self): 12537db96d56Sopenharmony_ci self.cx.execute("update test set b='aaaa' where rowid=1") 12547db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 12557db96d56Sopenharmony_ci self.blob.write(b"aaa") 12567db96d56Sopenharmony_ci 12577db96d56Sopenharmony_ci def test_blob_write_error_readonly(self): 12587db96d56Sopenharmony_ci ro_blob = self.cx.blobopen("test", "b", 1, readonly=True) 12597db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, "readonly"): 12607db96d56Sopenharmony_ci ro_blob.write(b"aaa") 12617db96d56Sopenharmony_ci ro_blob.close() 12627db96d56Sopenharmony_ci 12637db96d56Sopenharmony_ci def test_blob_open_error(self): 12647db96d56Sopenharmony_ci dataset = ( 12657db96d56Sopenharmony_ci (("test", "b", 1), {"name": "notexisting"}), 12667db96d56Sopenharmony_ci (("notexisting", "b", 1), {}), 12677db96d56Sopenharmony_ci (("test", "notexisting", 1), {}), 12687db96d56Sopenharmony_ci (("test", "b", 2), {}), 12697db96d56Sopenharmony_ci ) 12707db96d56Sopenharmony_ci regex = "no such" 12717db96d56Sopenharmony_ci for args, kwds in dataset: 12727db96d56Sopenharmony_ci with self.subTest(args=args, kwds=kwds): 12737db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, regex): 12747db96d56Sopenharmony_ci self.cx.blobopen(*args, **kwds) 12757db96d56Sopenharmony_ci 12767db96d56Sopenharmony_ci def test_blob_length(self): 12777db96d56Sopenharmony_ci self.assertEqual(len(self.blob), 50) 12787db96d56Sopenharmony_ci 12797db96d56Sopenharmony_ci def test_blob_get_item(self): 12807db96d56Sopenharmony_ci self.assertEqual(self.blob[5], ord("b")) 12817db96d56Sopenharmony_ci self.assertEqual(self.blob[6], ord("l")) 12827db96d56Sopenharmony_ci self.assertEqual(self.blob[7], ord("o")) 12837db96d56Sopenharmony_ci self.assertEqual(self.blob[8], ord("b")) 12847db96d56Sopenharmony_ci self.assertEqual(self.blob[-1], ord("!")) 12857db96d56Sopenharmony_ci 12867db96d56Sopenharmony_ci def test_blob_set_item(self): 12877db96d56Sopenharmony_ci self.blob[0] = ord("b") 12887db96d56Sopenharmony_ci expected = b"b" + self.data[1:] 12897db96d56Sopenharmony_ci actual = self.cx.execute("select b from test").fetchone()[0] 12907db96d56Sopenharmony_ci self.assertEqual(actual, expected) 12917db96d56Sopenharmony_ci 12927db96d56Sopenharmony_ci def test_blob_set_item_with_offset(self): 12937db96d56Sopenharmony_ci self.blob.seek(0, SEEK_END) 12947db96d56Sopenharmony_ci self.assertEqual(self.blob.read(), b"") # verify that we're at EOB 12957db96d56Sopenharmony_ci self.blob[0] = ord("T") 12967db96d56Sopenharmony_ci self.blob[-1] = ord(".") 12977db96d56Sopenharmony_ci self.blob.seek(0, SEEK_SET) 12987db96d56Sopenharmony_ci expected = b"This blob data string is exactly fifty bytes long." 12997db96d56Sopenharmony_ci self.assertEqual(self.blob.read(), expected) 13007db96d56Sopenharmony_ci 13017db96d56Sopenharmony_ci def test_blob_set_slice_buffer_object(self): 13027db96d56Sopenharmony_ci from array import array 13037db96d56Sopenharmony_ci self.blob[0:5] = memoryview(b"12345") 13047db96d56Sopenharmony_ci self.assertEqual(self.blob[0:5], b"12345") 13057db96d56Sopenharmony_ci 13067db96d56Sopenharmony_ci self.blob[0:5] = bytearray(b"23456") 13077db96d56Sopenharmony_ci self.assertEqual(self.blob[0:5], b"23456") 13087db96d56Sopenharmony_ci 13097db96d56Sopenharmony_ci self.blob[0:5] = array("b", [1, 2, 3, 4, 5]) 13107db96d56Sopenharmony_ci self.assertEqual(self.blob[0:5], b"\x01\x02\x03\x04\x05") 13117db96d56Sopenharmony_ci 13127db96d56Sopenharmony_ci def test_blob_set_item_negative_index(self): 13137db96d56Sopenharmony_ci self.blob[-1] = 255 13147db96d56Sopenharmony_ci self.assertEqual(self.blob[-1], 255) 13157db96d56Sopenharmony_ci 13167db96d56Sopenharmony_ci def test_blob_get_slice(self): 13177db96d56Sopenharmony_ci self.assertEqual(self.blob[5:14], b"blob data") 13187db96d56Sopenharmony_ci 13197db96d56Sopenharmony_ci def test_blob_get_empty_slice(self): 13207db96d56Sopenharmony_ci self.assertEqual(self.blob[5:5], b"") 13217db96d56Sopenharmony_ci 13227db96d56Sopenharmony_ci def test_blob_get_slice_negative_index(self): 13237db96d56Sopenharmony_ci self.assertEqual(self.blob[5:-5], self.data[5:-5]) 13247db96d56Sopenharmony_ci 13257db96d56Sopenharmony_ci def test_blob_get_slice_with_skip(self): 13267db96d56Sopenharmony_ci self.assertEqual(self.blob[0:10:2], b"ti lb") 13277db96d56Sopenharmony_ci 13287db96d56Sopenharmony_ci def test_blob_set_slice(self): 13297db96d56Sopenharmony_ci self.blob[0:5] = b"12345" 13307db96d56Sopenharmony_ci expected = b"12345" + self.data[5:] 13317db96d56Sopenharmony_ci actual = self.cx.execute("select b from test").fetchone()[0] 13327db96d56Sopenharmony_ci self.assertEqual(actual, expected) 13337db96d56Sopenharmony_ci 13347db96d56Sopenharmony_ci def test_blob_set_empty_slice(self): 13357db96d56Sopenharmony_ci self.blob[0:0] = b"" 13367db96d56Sopenharmony_ci self.assertEqual(self.blob[:], self.data) 13377db96d56Sopenharmony_ci 13387db96d56Sopenharmony_ci def test_blob_set_slice_with_skip(self): 13397db96d56Sopenharmony_ci self.blob[0:10:2] = b"12345" 13407db96d56Sopenharmony_ci actual = self.cx.execute("select b from test").fetchone()[0] 13417db96d56Sopenharmony_ci expected = b"1h2s3b4o5 " + self.data[10:] 13427db96d56Sopenharmony_ci self.assertEqual(actual, expected) 13437db96d56Sopenharmony_ci 13447db96d56Sopenharmony_ci def test_blob_mapping_invalid_index_type(self): 13457db96d56Sopenharmony_ci msg = "indices must be integers" 13467db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, msg): 13477db96d56Sopenharmony_ci self.blob[5:5.5] 13487db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, msg): 13497db96d56Sopenharmony_ci self.blob[1.5] 13507db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, msg): 13517db96d56Sopenharmony_ci self.blob["a"] = b"b" 13527db96d56Sopenharmony_ci 13537db96d56Sopenharmony_ci def test_blob_get_item_error(self): 13547db96d56Sopenharmony_ci dataset = [len(self.blob), 105, -105] 13557db96d56Sopenharmony_ci for idx in dataset: 13567db96d56Sopenharmony_ci with self.subTest(idx=idx): 13577db96d56Sopenharmony_ci with self.assertRaisesRegex(IndexError, "index out of range"): 13587db96d56Sopenharmony_ci self.blob[idx] 13597db96d56Sopenharmony_ci with self.assertRaisesRegex(IndexError, "cannot fit 'int'"): 13607db96d56Sopenharmony_ci self.blob[ULLONG_MAX] 13617db96d56Sopenharmony_ci 13627db96d56Sopenharmony_ci # Provoke read error 13637db96d56Sopenharmony_ci self.cx.execute("update test set b='aaaa' where rowid=1") 13647db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 13657db96d56Sopenharmony_ci self.blob[0] 13667db96d56Sopenharmony_ci 13677db96d56Sopenharmony_ci def test_blob_set_item_error(self): 13687db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "cannot be interpreted"): 13697db96d56Sopenharmony_ci self.blob[0] = b"multiple" 13707db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "cannot be interpreted"): 13717db96d56Sopenharmony_ci self.blob[0] = b"1" 13727db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "cannot be interpreted"): 13737db96d56Sopenharmony_ci self.blob[0] = bytearray(b"1") 13747db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"): 13757db96d56Sopenharmony_ci del self.blob[0] 13767db96d56Sopenharmony_ci with self.assertRaisesRegex(IndexError, "Blob index out of range"): 13777db96d56Sopenharmony_ci self.blob[1000] = 0 13787db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "must be in range"): 13797db96d56Sopenharmony_ci self.blob[0] = -1 13807db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "must be in range"): 13817db96d56Sopenharmony_ci self.blob[0] = 256 13827db96d56Sopenharmony_ci # Overflow errors are overridden with ValueError 13837db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "must be in range"): 13847db96d56Sopenharmony_ci self.blob[0] = 2**65 13857db96d56Sopenharmony_ci 13867db96d56Sopenharmony_ci def test_blob_set_slice_error(self): 13877db96d56Sopenharmony_ci with self.assertRaisesRegex(IndexError, "wrong size"): 13887db96d56Sopenharmony_ci self.blob[5:10] = b"a" 13897db96d56Sopenharmony_ci with self.assertRaisesRegex(IndexError, "wrong size"): 13907db96d56Sopenharmony_ci self.blob[5:10] = b"a" * 1000 13917db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "doesn't support.*deletion"): 13927db96d56Sopenharmony_ci del self.blob[5:10] 13937db96d56Sopenharmony_ci with self.assertRaisesRegex(ValueError, "step cannot be zero"): 13947db96d56Sopenharmony_ci self.blob[5:10:0] = b"12345" 13957db96d56Sopenharmony_ci with self.assertRaises(BufferError): 13967db96d56Sopenharmony_ci self.blob[5:10] = memoryview(b"abcde")[::2] 13977db96d56Sopenharmony_ci 13987db96d56Sopenharmony_ci def test_blob_sequence_not_supported(self): 13997db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "unsupported operand"): 14007db96d56Sopenharmony_ci self.blob + self.blob 14017db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "unsupported operand"): 14027db96d56Sopenharmony_ci self.blob * 5 14037db96d56Sopenharmony_ci with self.assertRaisesRegex(TypeError, "is not iterable"): 14047db96d56Sopenharmony_ci b"a" in self.blob 14057db96d56Sopenharmony_ci 14067db96d56Sopenharmony_ci def test_blob_context_manager(self): 14077db96d56Sopenharmony_ci data = b"a" * 50 14087db96d56Sopenharmony_ci with self.cx.blobopen("test", "b", 1) as blob: 14097db96d56Sopenharmony_ci blob.write(data) 14107db96d56Sopenharmony_ci actual = self.cx.execute("select b from test").fetchone()[0] 14117db96d56Sopenharmony_ci self.assertEqual(actual, data) 14127db96d56Sopenharmony_ci 14137db96d56Sopenharmony_ci # Check that __exit__ closed the blob 14147db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, "closed blob"): 14157db96d56Sopenharmony_ci blob.read() 14167db96d56Sopenharmony_ci 14177db96d56Sopenharmony_ci def test_blob_context_manager_reraise_exceptions(self): 14187db96d56Sopenharmony_ci class DummyException(Exception): 14197db96d56Sopenharmony_ci pass 14207db96d56Sopenharmony_ci with self.assertRaisesRegex(DummyException, "reraised"): 14217db96d56Sopenharmony_ci with self.cx.blobopen("test", "b", 1) as blob: 14227db96d56Sopenharmony_ci raise DummyException("reraised") 14237db96d56Sopenharmony_ci 14247db96d56Sopenharmony_ci 14257db96d56Sopenharmony_ci def test_blob_closed(self): 14267db96d56Sopenharmony_ci with memory_database() as cx: 14277db96d56Sopenharmony_ci cx.execute("create table test(b blob)") 14287db96d56Sopenharmony_ci cx.execute("insert into test values (zeroblob(100))") 14297db96d56Sopenharmony_ci blob = cx.blobopen("test", "b", 1) 14307db96d56Sopenharmony_ci blob.close() 14317db96d56Sopenharmony_ci 14327db96d56Sopenharmony_ci msg = "Cannot operate on a closed blob" 14337db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14347db96d56Sopenharmony_ci blob.read() 14357db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14367db96d56Sopenharmony_ci blob.write(b"") 14377db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14387db96d56Sopenharmony_ci blob.seek(0) 14397db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14407db96d56Sopenharmony_ci blob.tell() 14417db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14427db96d56Sopenharmony_ci blob.__enter__() 14437db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14447db96d56Sopenharmony_ci blob.__exit__(None, None, None) 14457db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14467db96d56Sopenharmony_ci len(blob) 14477db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14487db96d56Sopenharmony_ci blob[0] 14497db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14507db96d56Sopenharmony_ci blob[0:1] 14517db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, msg): 14527db96d56Sopenharmony_ci blob[0] = b"" 14537db96d56Sopenharmony_ci 14547db96d56Sopenharmony_ci def test_blob_closed_db_read(self): 14557db96d56Sopenharmony_ci with memory_database() as cx: 14567db96d56Sopenharmony_ci cx.execute("create table test(b blob)") 14577db96d56Sopenharmony_ci cx.execute("insert into test(b) values (zeroblob(100))") 14587db96d56Sopenharmony_ci blob = cx.blobopen("test", "b", 1) 14597db96d56Sopenharmony_ci cx.close() 14607db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, 14617db96d56Sopenharmony_ci "Cannot operate on a closed database", 14627db96d56Sopenharmony_ci blob.read) 14637db96d56Sopenharmony_ci 14647db96d56Sopenharmony_ci def test_blob_32bit_rowid(self): 14657db96d56Sopenharmony_ci # gh-100370: we should not get an OverflowError for 32-bit rowids 14667db96d56Sopenharmony_ci with memory_database() as cx: 14677db96d56Sopenharmony_ci rowid = 2**32 14687db96d56Sopenharmony_ci cx.execute("create table t(t blob)") 14697db96d56Sopenharmony_ci cx.execute("insert into t(rowid, t) values (?, zeroblob(1))", (rowid,)) 14707db96d56Sopenharmony_ci cx.blobopen('t', 't', rowid) 14717db96d56Sopenharmony_ci 14727db96d56Sopenharmony_ci 14737db96d56Sopenharmony_ci@threading_helper.requires_working_threading() 14747db96d56Sopenharmony_ciclass ThreadTests(unittest.TestCase): 14757db96d56Sopenharmony_ci def setUp(self): 14767db96d56Sopenharmony_ci self.con = sqlite.connect(":memory:") 14777db96d56Sopenharmony_ci self.cur = self.con.cursor() 14787db96d56Sopenharmony_ci self.cur.execute("create table test(name text, b blob)") 14797db96d56Sopenharmony_ci self.cur.execute("insert into test values('blob', zeroblob(1))") 14807db96d56Sopenharmony_ci 14817db96d56Sopenharmony_ci def tearDown(self): 14827db96d56Sopenharmony_ci self.cur.close() 14837db96d56Sopenharmony_ci self.con.close() 14847db96d56Sopenharmony_ci 14857db96d56Sopenharmony_ci @threading_helper.reap_threads 14867db96d56Sopenharmony_ci def _run_test(self, fn, *args, **kwds): 14877db96d56Sopenharmony_ci def run(err): 14887db96d56Sopenharmony_ci try: 14897db96d56Sopenharmony_ci fn(*args, **kwds) 14907db96d56Sopenharmony_ci err.append("did not raise ProgrammingError") 14917db96d56Sopenharmony_ci except sqlite.ProgrammingError: 14927db96d56Sopenharmony_ci pass 14937db96d56Sopenharmony_ci except: 14947db96d56Sopenharmony_ci err.append("raised wrong exception") 14957db96d56Sopenharmony_ci 14967db96d56Sopenharmony_ci err = [] 14977db96d56Sopenharmony_ci t = threading.Thread(target=run, kwargs={"err": err}) 14987db96d56Sopenharmony_ci t.start() 14997db96d56Sopenharmony_ci t.join() 15007db96d56Sopenharmony_ci if err: 15017db96d56Sopenharmony_ci self.fail("\n".join(err)) 15027db96d56Sopenharmony_ci 15037db96d56Sopenharmony_ci def test_check_connection_thread(self): 15047db96d56Sopenharmony_ci fns = [ 15057db96d56Sopenharmony_ci lambda: self.con.cursor(), 15067db96d56Sopenharmony_ci lambda: self.con.commit(), 15077db96d56Sopenharmony_ci lambda: self.con.rollback(), 15087db96d56Sopenharmony_ci lambda: self.con.close(), 15097db96d56Sopenharmony_ci lambda: self.con.set_trace_callback(None), 15107db96d56Sopenharmony_ci lambda: self.con.set_authorizer(None), 15117db96d56Sopenharmony_ci lambda: self.con.create_collation("foo", None), 15127db96d56Sopenharmony_ci lambda: self.con.setlimit(sqlite.SQLITE_LIMIT_LENGTH, -1), 15137db96d56Sopenharmony_ci lambda: self.con.getlimit(sqlite.SQLITE_LIMIT_LENGTH), 15147db96d56Sopenharmony_ci lambda: self.con.blobopen("test", "b", 1), 15157db96d56Sopenharmony_ci ] 15167db96d56Sopenharmony_ci if hasattr(sqlite.Connection, "serialize"): 15177db96d56Sopenharmony_ci fns.append(lambda: self.con.serialize()) 15187db96d56Sopenharmony_ci fns.append(lambda: self.con.deserialize(b"")) 15197db96d56Sopenharmony_ci if sqlite.sqlite_version_info >= (3, 25, 0): 15207db96d56Sopenharmony_ci fns.append(lambda: self.con.create_window_function("foo", 0, None)) 15217db96d56Sopenharmony_ci 15227db96d56Sopenharmony_ci for fn in fns: 15237db96d56Sopenharmony_ci with self.subTest(fn=fn): 15247db96d56Sopenharmony_ci self._run_test(fn) 15257db96d56Sopenharmony_ci 15267db96d56Sopenharmony_ci def test_check_cursor_thread(self): 15277db96d56Sopenharmony_ci fns = [ 15287db96d56Sopenharmony_ci lambda: self.cur.execute("insert into test(name) values('a')"), 15297db96d56Sopenharmony_ci lambda: self.cur.close(), 15307db96d56Sopenharmony_ci lambda: self.cur.execute("select name from test"), 15317db96d56Sopenharmony_ci lambda: self.cur.fetchone(), 15327db96d56Sopenharmony_ci ] 15337db96d56Sopenharmony_ci for fn in fns: 15347db96d56Sopenharmony_ci with self.subTest(fn=fn): 15357db96d56Sopenharmony_ci self._run_test(fn) 15367db96d56Sopenharmony_ci 15377db96d56Sopenharmony_ci 15387db96d56Sopenharmony_ci @threading_helper.reap_threads 15397db96d56Sopenharmony_ci def test_dont_check_same_thread(self): 15407db96d56Sopenharmony_ci def run(con, err): 15417db96d56Sopenharmony_ci try: 15427db96d56Sopenharmony_ci con.execute("select 1") 15437db96d56Sopenharmony_ci except sqlite.Error: 15447db96d56Sopenharmony_ci err.append("multi-threading not allowed") 15457db96d56Sopenharmony_ci 15467db96d56Sopenharmony_ci con = sqlite.connect(":memory:", check_same_thread=False) 15477db96d56Sopenharmony_ci err = [] 15487db96d56Sopenharmony_ci t = threading.Thread(target=run, kwargs={"con": con, "err": err}) 15497db96d56Sopenharmony_ci t.start() 15507db96d56Sopenharmony_ci t.join() 15517db96d56Sopenharmony_ci self.assertEqual(len(err), 0, "\n".join(err)) 15527db96d56Sopenharmony_ci 15537db96d56Sopenharmony_ci 15547db96d56Sopenharmony_ciclass ConstructorTests(unittest.TestCase): 15557db96d56Sopenharmony_ci def test_date(self): 15567db96d56Sopenharmony_ci d = sqlite.Date(2004, 10, 28) 15577db96d56Sopenharmony_ci 15587db96d56Sopenharmony_ci def test_time(self): 15597db96d56Sopenharmony_ci t = sqlite.Time(12, 39, 35) 15607db96d56Sopenharmony_ci 15617db96d56Sopenharmony_ci def test_timestamp(self): 15627db96d56Sopenharmony_ci ts = sqlite.Timestamp(2004, 10, 28, 12, 39, 35) 15637db96d56Sopenharmony_ci 15647db96d56Sopenharmony_ci def test_date_from_ticks(self): 15657db96d56Sopenharmony_ci d = sqlite.DateFromTicks(42) 15667db96d56Sopenharmony_ci 15677db96d56Sopenharmony_ci def test_time_from_ticks(self): 15687db96d56Sopenharmony_ci t = sqlite.TimeFromTicks(42) 15697db96d56Sopenharmony_ci 15707db96d56Sopenharmony_ci def test_timestamp_from_ticks(self): 15717db96d56Sopenharmony_ci ts = sqlite.TimestampFromTicks(42) 15727db96d56Sopenharmony_ci 15737db96d56Sopenharmony_ci def test_binary(self): 15747db96d56Sopenharmony_ci b = sqlite.Binary(b"\0'") 15757db96d56Sopenharmony_ci 15767db96d56Sopenharmony_ciclass ExtensionTests(unittest.TestCase): 15777db96d56Sopenharmony_ci def test_script_string_sql(self): 15787db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 15797db96d56Sopenharmony_ci cur = con.cursor() 15807db96d56Sopenharmony_ci cur.executescript(""" 15817db96d56Sopenharmony_ci -- bla bla 15827db96d56Sopenharmony_ci /* a stupid comment */ 15837db96d56Sopenharmony_ci create table a(i); 15847db96d56Sopenharmony_ci insert into a(i) values (5); 15857db96d56Sopenharmony_ci """) 15867db96d56Sopenharmony_ci cur.execute("select i from a") 15877db96d56Sopenharmony_ci res = cur.fetchone()[0] 15887db96d56Sopenharmony_ci self.assertEqual(res, 5) 15897db96d56Sopenharmony_ci 15907db96d56Sopenharmony_ci def test_script_syntax_error(self): 15917db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 15927db96d56Sopenharmony_ci cur = con.cursor() 15937db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 15947db96d56Sopenharmony_ci cur.executescript("create table test(x); asdf; create table test2(x)") 15957db96d56Sopenharmony_ci 15967db96d56Sopenharmony_ci def test_script_error_normal(self): 15977db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 15987db96d56Sopenharmony_ci cur = con.cursor() 15997db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 16007db96d56Sopenharmony_ci cur.executescript("create table test(sadfsadfdsa); select foo from hurz;") 16017db96d56Sopenharmony_ci 16027db96d56Sopenharmony_ci def test_cursor_executescript_as_bytes(self): 16037db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16047db96d56Sopenharmony_ci cur = con.cursor() 16057db96d56Sopenharmony_ci with self.assertRaises(TypeError): 16067db96d56Sopenharmony_ci cur.executescript(b"create table test(foo); insert into test(foo) values (5);") 16077db96d56Sopenharmony_ci 16087db96d56Sopenharmony_ci def test_cursor_executescript_with_null_characters(self): 16097db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16107db96d56Sopenharmony_ci cur = con.cursor() 16117db96d56Sopenharmony_ci with self.assertRaises(ValueError): 16127db96d56Sopenharmony_ci cur.executescript(""" 16137db96d56Sopenharmony_ci create table a(i);\0 16147db96d56Sopenharmony_ci insert into a(i) values (5); 16157db96d56Sopenharmony_ci """) 16167db96d56Sopenharmony_ci 16177db96d56Sopenharmony_ci def test_cursor_executescript_with_surrogates(self): 16187db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16197db96d56Sopenharmony_ci cur = con.cursor() 16207db96d56Sopenharmony_ci with self.assertRaises(UnicodeEncodeError): 16217db96d56Sopenharmony_ci cur.executescript(""" 16227db96d56Sopenharmony_ci create table a(s); 16237db96d56Sopenharmony_ci insert into a(s) values ('\ud8ff'); 16247db96d56Sopenharmony_ci """) 16257db96d56Sopenharmony_ci 16267db96d56Sopenharmony_ci def test_cursor_executescript_too_large_script(self): 16277db96d56Sopenharmony_ci msg = "query string is too large" 16287db96d56Sopenharmony_ci with memory_database() as cx, cx_limit(cx) as lim: 16297db96d56Sopenharmony_ci cx.executescript("select 'almost too large'".ljust(lim)) 16307db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.DataError, msg): 16317db96d56Sopenharmony_ci cx.executescript("select 'too large'".ljust(lim+1)) 16327db96d56Sopenharmony_ci 16337db96d56Sopenharmony_ci def test_cursor_executescript_tx_control(self): 16347db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16357db96d56Sopenharmony_ci con.execute("begin") 16367db96d56Sopenharmony_ci self.assertTrue(con.in_transaction) 16377db96d56Sopenharmony_ci con.executescript("select 1") 16387db96d56Sopenharmony_ci self.assertFalse(con.in_transaction) 16397db96d56Sopenharmony_ci 16407db96d56Sopenharmony_ci def test_connection_execute(self): 16417db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16427db96d56Sopenharmony_ci result = con.execute("select 5").fetchone()[0] 16437db96d56Sopenharmony_ci self.assertEqual(result, 5, "Basic test of Connection.execute") 16447db96d56Sopenharmony_ci 16457db96d56Sopenharmony_ci def test_connection_executemany(self): 16467db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16477db96d56Sopenharmony_ci con.execute("create table test(foo)") 16487db96d56Sopenharmony_ci con.executemany("insert into test(foo) values (?)", [(3,), (4,)]) 16497db96d56Sopenharmony_ci result = con.execute("select foo from test order by foo").fetchall() 16507db96d56Sopenharmony_ci self.assertEqual(result[0][0], 3, "Basic test of Connection.executemany") 16517db96d56Sopenharmony_ci self.assertEqual(result[1][0], 4, "Basic test of Connection.executemany") 16527db96d56Sopenharmony_ci 16537db96d56Sopenharmony_ci def test_connection_executescript(self): 16547db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16557db96d56Sopenharmony_ci con.executescript("create table test(foo); insert into test(foo) values (5);") 16567db96d56Sopenharmony_ci result = con.execute("select foo from test").fetchone()[0] 16577db96d56Sopenharmony_ci self.assertEqual(result, 5, "Basic test of Connection.executescript") 16587db96d56Sopenharmony_ci 16597db96d56Sopenharmony_ciclass ClosedConTests(unittest.TestCase): 16607db96d56Sopenharmony_ci def test_closed_con_cursor(self): 16617db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16627db96d56Sopenharmony_ci con.close() 16637db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 16647db96d56Sopenharmony_ci cur = con.cursor() 16657db96d56Sopenharmony_ci 16667db96d56Sopenharmony_ci def test_closed_con_commit(self): 16677db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16687db96d56Sopenharmony_ci con.close() 16697db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 16707db96d56Sopenharmony_ci con.commit() 16717db96d56Sopenharmony_ci 16727db96d56Sopenharmony_ci def test_closed_con_rollback(self): 16737db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16747db96d56Sopenharmony_ci con.close() 16757db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 16767db96d56Sopenharmony_ci con.rollback() 16777db96d56Sopenharmony_ci 16787db96d56Sopenharmony_ci def test_closed_cur_execute(self): 16797db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16807db96d56Sopenharmony_ci cur = con.cursor() 16817db96d56Sopenharmony_ci con.close() 16827db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 16837db96d56Sopenharmony_ci cur.execute("select 4") 16847db96d56Sopenharmony_ci 16857db96d56Sopenharmony_ci def test_closed_create_function(self): 16867db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16877db96d56Sopenharmony_ci con.close() 16887db96d56Sopenharmony_ci def f(x): return 17 16897db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 16907db96d56Sopenharmony_ci con.create_function("foo", 1, f) 16917db96d56Sopenharmony_ci 16927db96d56Sopenharmony_ci def test_closed_create_aggregate(self): 16937db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 16947db96d56Sopenharmony_ci con.close() 16957db96d56Sopenharmony_ci class Agg: 16967db96d56Sopenharmony_ci def __init__(self): 16977db96d56Sopenharmony_ci pass 16987db96d56Sopenharmony_ci def step(self, x): 16997db96d56Sopenharmony_ci pass 17007db96d56Sopenharmony_ci def finalize(self): 17017db96d56Sopenharmony_ci return 17 17027db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 17037db96d56Sopenharmony_ci con.create_aggregate("foo", 1, Agg) 17047db96d56Sopenharmony_ci 17057db96d56Sopenharmony_ci def test_closed_set_authorizer(self): 17067db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 17077db96d56Sopenharmony_ci con.close() 17087db96d56Sopenharmony_ci def authorizer(*args): 17097db96d56Sopenharmony_ci return sqlite.DENY 17107db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 17117db96d56Sopenharmony_ci con.set_authorizer(authorizer) 17127db96d56Sopenharmony_ci 17137db96d56Sopenharmony_ci def test_closed_set_progress_callback(self): 17147db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 17157db96d56Sopenharmony_ci con.close() 17167db96d56Sopenharmony_ci def progress(): pass 17177db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 17187db96d56Sopenharmony_ci con.set_progress_handler(progress, 100) 17197db96d56Sopenharmony_ci 17207db96d56Sopenharmony_ci def test_closed_call(self): 17217db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 17227db96d56Sopenharmony_ci con.close() 17237db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 17247db96d56Sopenharmony_ci con() 17257db96d56Sopenharmony_ci 17267db96d56Sopenharmony_ciclass ClosedCurTests(unittest.TestCase): 17277db96d56Sopenharmony_ci def test_closed(self): 17287db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 17297db96d56Sopenharmony_ci cur = con.cursor() 17307db96d56Sopenharmony_ci cur.close() 17317db96d56Sopenharmony_ci 17327db96d56Sopenharmony_ci for method_name in ("execute", "executemany", "executescript", "fetchall", "fetchmany", "fetchone"): 17337db96d56Sopenharmony_ci if method_name in ("execute", "executescript"): 17347db96d56Sopenharmony_ci params = ("select 4 union select 5",) 17357db96d56Sopenharmony_ci elif method_name == "executemany": 17367db96d56Sopenharmony_ci params = ("insert into foo(bar) values (?)", [(3,), (4,)]) 17377db96d56Sopenharmony_ci else: 17387db96d56Sopenharmony_ci params = [] 17397db96d56Sopenharmony_ci 17407db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 17417db96d56Sopenharmony_ci method = getattr(cur, method_name) 17427db96d56Sopenharmony_ci method(*params) 17437db96d56Sopenharmony_ci 17447db96d56Sopenharmony_ci 17457db96d56Sopenharmony_ciclass SqliteOnConflictTests(unittest.TestCase): 17467db96d56Sopenharmony_ci """ 17477db96d56Sopenharmony_ci Tests for SQLite's "insert on conflict" feature. 17487db96d56Sopenharmony_ci 17497db96d56Sopenharmony_ci See https://www.sqlite.org/lang_conflict.html for details. 17507db96d56Sopenharmony_ci """ 17517db96d56Sopenharmony_ci 17527db96d56Sopenharmony_ci def setUp(self): 17537db96d56Sopenharmony_ci self.cx = sqlite.connect(":memory:") 17547db96d56Sopenharmony_ci self.cu = self.cx.cursor() 17557db96d56Sopenharmony_ci self.cu.execute(""" 17567db96d56Sopenharmony_ci CREATE TABLE test( 17577db96d56Sopenharmony_ci id INTEGER PRIMARY KEY, name TEXT, unique_name TEXT UNIQUE 17587db96d56Sopenharmony_ci ); 17597db96d56Sopenharmony_ci """) 17607db96d56Sopenharmony_ci 17617db96d56Sopenharmony_ci def tearDown(self): 17627db96d56Sopenharmony_ci self.cu.close() 17637db96d56Sopenharmony_ci self.cx.close() 17647db96d56Sopenharmony_ci 17657db96d56Sopenharmony_ci def test_on_conflict_rollback_with_explicit_transaction(self): 17667db96d56Sopenharmony_ci self.cx.isolation_level = None # autocommit mode 17677db96d56Sopenharmony_ci self.cu = self.cx.cursor() 17687db96d56Sopenharmony_ci # Start an explicit transaction. 17697db96d56Sopenharmony_ci self.cu.execute("BEGIN") 17707db96d56Sopenharmony_ci self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 17717db96d56Sopenharmony_ci self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 17727db96d56Sopenharmony_ci with self.assertRaises(sqlite.IntegrityError): 17737db96d56Sopenharmony_ci self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 17747db96d56Sopenharmony_ci # Use connection to commit. 17757db96d56Sopenharmony_ci self.cx.commit() 17767db96d56Sopenharmony_ci self.cu.execute("SELECT name, unique_name from test") 17777db96d56Sopenharmony_ci # Transaction should have rolled back and nothing should be in table. 17787db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchall(), []) 17797db96d56Sopenharmony_ci 17807db96d56Sopenharmony_ci def test_on_conflict_abort_raises_with_explicit_transactions(self): 17817db96d56Sopenharmony_ci # Abort cancels the current sql statement but doesn't change anything 17827db96d56Sopenharmony_ci # about the current transaction. 17837db96d56Sopenharmony_ci self.cx.isolation_level = None # autocommit mode 17847db96d56Sopenharmony_ci self.cu = self.cx.cursor() 17857db96d56Sopenharmony_ci # Start an explicit transaction. 17867db96d56Sopenharmony_ci self.cu.execute("BEGIN") 17877db96d56Sopenharmony_ci self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 17887db96d56Sopenharmony_ci self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 17897db96d56Sopenharmony_ci with self.assertRaises(sqlite.IntegrityError): 17907db96d56Sopenharmony_ci self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 17917db96d56Sopenharmony_ci self.cx.commit() 17927db96d56Sopenharmony_ci self.cu.execute("SELECT name, unique_name FROM test") 17937db96d56Sopenharmony_ci # Expect the first two inserts to work, third to do nothing. 17947db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 17957db96d56Sopenharmony_ci 17967db96d56Sopenharmony_ci def test_on_conflict_rollback_without_transaction(self): 17977db96d56Sopenharmony_ci # Start of implicit transaction 17987db96d56Sopenharmony_ci self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 17997db96d56Sopenharmony_ci self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 18007db96d56Sopenharmony_ci with self.assertRaises(sqlite.IntegrityError): 18017db96d56Sopenharmony_ci self.cu.execute("INSERT OR ROLLBACK INTO test(unique_name) VALUES ('foo')") 18027db96d56Sopenharmony_ci self.cu.execute("SELECT name, unique_name FROM test") 18037db96d56Sopenharmony_ci # Implicit transaction is rolled back on error. 18047db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchall(), []) 18057db96d56Sopenharmony_ci 18067db96d56Sopenharmony_ci def test_on_conflict_abort_raises_without_transactions(self): 18077db96d56Sopenharmony_ci # Abort cancels the current sql statement but doesn't change anything 18087db96d56Sopenharmony_ci # about the current transaction. 18097db96d56Sopenharmony_ci self.cu.execute("INSERT INTO test(name) VALUES ('abort_test')") 18107db96d56Sopenharmony_ci self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 18117db96d56Sopenharmony_ci with self.assertRaises(sqlite.IntegrityError): 18127db96d56Sopenharmony_ci self.cu.execute("INSERT OR ABORT INTO test(unique_name) VALUES ('foo')") 18137db96d56Sopenharmony_ci # Make sure all other values were inserted. 18147db96d56Sopenharmony_ci self.cu.execute("SELECT name, unique_name FROM test") 18157db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchall(), [('abort_test', None), (None, 'foo',)]) 18167db96d56Sopenharmony_ci 18177db96d56Sopenharmony_ci def test_on_conflict_fail(self): 18187db96d56Sopenharmony_ci self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 18197db96d56Sopenharmony_ci with self.assertRaises(sqlite.IntegrityError): 18207db96d56Sopenharmony_ci self.cu.execute("INSERT OR FAIL INTO test(unique_name) VALUES ('foo')") 18217db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchall(), []) 18227db96d56Sopenharmony_ci 18237db96d56Sopenharmony_ci def test_on_conflict_ignore(self): 18247db96d56Sopenharmony_ci self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 18257db96d56Sopenharmony_ci # Nothing should happen. 18267db96d56Sopenharmony_ci self.cu.execute("INSERT OR IGNORE INTO test(unique_name) VALUES ('foo')") 18277db96d56Sopenharmony_ci self.cu.execute("SELECT unique_name FROM test") 18287db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchall(), [('foo',)]) 18297db96d56Sopenharmony_ci 18307db96d56Sopenharmony_ci def test_on_conflict_replace(self): 18317db96d56Sopenharmony_ci self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Data!', 'foo')") 18327db96d56Sopenharmony_ci # There shouldn't be an IntegrityError exception. 18337db96d56Sopenharmony_ci self.cu.execute("INSERT OR REPLACE INTO test(name, unique_name) VALUES ('Very different data!', 'foo')") 18347db96d56Sopenharmony_ci self.cu.execute("SELECT name, unique_name FROM test") 18357db96d56Sopenharmony_ci self.assertEqual(self.cu.fetchall(), [('Very different data!', 'foo')]) 18367db96d56Sopenharmony_ci 18377db96d56Sopenharmony_ci 18387db96d56Sopenharmony_ci@requires_subprocess() 18397db96d56Sopenharmony_ciclass MultiprocessTests(unittest.TestCase): 18407db96d56Sopenharmony_ci CONNECTION_TIMEOUT = SHORT_TIMEOUT / 1000. # Defaults to 30 ms 18417db96d56Sopenharmony_ci 18427db96d56Sopenharmony_ci def tearDown(self): 18437db96d56Sopenharmony_ci unlink(TESTFN) 18447db96d56Sopenharmony_ci 18457db96d56Sopenharmony_ci def test_ctx_mgr_rollback_if_commit_failed(self): 18467db96d56Sopenharmony_ci # bpo-27334: ctx manager does not rollback if commit fails 18477db96d56Sopenharmony_ci SCRIPT = f"""if 1: 18487db96d56Sopenharmony_ci import sqlite3 18497db96d56Sopenharmony_ci def wait(): 18507db96d56Sopenharmony_ci print("started") 18517db96d56Sopenharmony_ci assert "database is locked" in input() 18527db96d56Sopenharmony_ci 18537db96d56Sopenharmony_ci cx = sqlite3.connect("{TESTFN}", timeout={self.CONNECTION_TIMEOUT}) 18547db96d56Sopenharmony_ci cx.create_function("wait", 0, wait) 18557db96d56Sopenharmony_ci with cx: 18567db96d56Sopenharmony_ci cx.execute("create table t(t)") 18577db96d56Sopenharmony_ci try: 18587db96d56Sopenharmony_ci # execute two transactions; both will try to lock the db 18597db96d56Sopenharmony_ci cx.executescript(''' 18607db96d56Sopenharmony_ci -- start a transaction and wait for parent 18617db96d56Sopenharmony_ci begin transaction; 18627db96d56Sopenharmony_ci select * from t; 18637db96d56Sopenharmony_ci select wait(); 18647db96d56Sopenharmony_ci rollback; 18657db96d56Sopenharmony_ci 18667db96d56Sopenharmony_ci -- start a new transaction; would fail if parent holds lock 18677db96d56Sopenharmony_ci begin transaction; 18687db96d56Sopenharmony_ci select * from t; 18697db96d56Sopenharmony_ci rollback; 18707db96d56Sopenharmony_ci ''') 18717db96d56Sopenharmony_ci finally: 18727db96d56Sopenharmony_ci cx.close() 18737db96d56Sopenharmony_ci """ 18747db96d56Sopenharmony_ci 18757db96d56Sopenharmony_ci # spawn child process 18767db96d56Sopenharmony_ci proc = subprocess.Popen( 18777db96d56Sopenharmony_ci [sys.executable, "-c", SCRIPT], 18787db96d56Sopenharmony_ci encoding="utf-8", 18797db96d56Sopenharmony_ci bufsize=0, 18807db96d56Sopenharmony_ci stdin=subprocess.PIPE, 18817db96d56Sopenharmony_ci stdout=subprocess.PIPE, 18827db96d56Sopenharmony_ci ) 18837db96d56Sopenharmony_ci self.addCleanup(proc.communicate) 18847db96d56Sopenharmony_ci 18857db96d56Sopenharmony_ci # wait for child process to start 18867db96d56Sopenharmony_ci self.assertEqual("started", proc.stdout.readline().strip()) 18877db96d56Sopenharmony_ci 18887db96d56Sopenharmony_ci cx = sqlite.connect(TESTFN, timeout=self.CONNECTION_TIMEOUT) 18897db96d56Sopenharmony_ci try: # context manager should correctly release the db lock 18907db96d56Sopenharmony_ci with cx: 18917db96d56Sopenharmony_ci cx.execute("insert into t values('test')") 18927db96d56Sopenharmony_ci except sqlite.OperationalError as exc: 18937db96d56Sopenharmony_ci proc.stdin.write(str(exc)) 18947db96d56Sopenharmony_ci else: 18957db96d56Sopenharmony_ci proc.stdin.write("no error") 18967db96d56Sopenharmony_ci finally: 18977db96d56Sopenharmony_ci cx.close() 18987db96d56Sopenharmony_ci 18997db96d56Sopenharmony_ci # terminate child process 19007db96d56Sopenharmony_ci self.assertIsNone(proc.returncode) 19017db96d56Sopenharmony_ci try: 19027db96d56Sopenharmony_ci proc.communicate(input="end", timeout=SHORT_TIMEOUT) 19037db96d56Sopenharmony_ci except subprocess.TimeoutExpired: 19047db96d56Sopenharmony_ci proc.kill() 19057db96d56Sopenharmony_ci proc.communicate() 19067db96d56Sopenharmony_ci raise 19077db96d56Sopenharmony_ci self.assertEqual(proc.returncode, 0) 19087db96d56Sopenharmony_ci 19097db96d56Sopenharmony_ci 19107db96d56Sopenharmony_ciif __name__ == "__main__": 19117db96d56Sopenharmony_ci unittest.main() 1912