17db96d56Sopenharmony_ci# pysqlite2/test/regression.py: pysqlite regression tests 27db96d56Sopenharmony_ci# 37db96d56Sopenharmony_ci# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de> 47db96d56Sopenharmony_ci# 57db96d56Sopenharmony_ci# This file is part of pysqlite. 67db96d56Sopenharmony_ci# 77db96d56Sopenharmony_ci# This software is provided 'as-is', without any express or implied 87db96d56Sopenharmony_ci# warranty. In no event will the authors be held liable for any damages 97db96d56Sopenharmony_ci# arising from the use of this software. 107db96d56Sopenharmony_ci# 117db96d56Sopenharmony_ci# Permission is granted to anyone to use this software for any purpose, 127db96d56Sopenharmony_ci# including commercial applications, and to alter it and redistribute it 137db96d56Sopenharmony_ci# freely, subject to the following restrictions: 147db96d56Sopenharmony_ci# 157db96d56Sopenharmony_ci# 1. The origin of this software must not be misrepresented; you must not 167db96d56Sopenharmony_ci# claim that you wrote the original software. If you use this software 177db96d56Sopenharmony_ci# in a product, an acknowledgment in the product documentation would be 187db96d56Sopenharmony_ci# appreciated but is not required. 197db96d56Sopenharmony_ci# 2. Altered source versions must be plainly marked as such, and must not be 207db96d56Sopenharmony_ci# misrepresented as being the original software. 217db96d56Sopenharmony_ci# 3. This notice may not be removed or altered from any source distribution. 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ciimport datetime 247db96d56Sopenharmony_ciimport unittest 257db96d56Sopenharmony_ciimport sqlite3 as sqlite 267db96d56Sopenharmony_ciimport weakref 277db96d56Sopenharmony_ciimport functools 287db96d56Sopenharmony_ci 297db96d56Sopenharmony_cifrom test import support 307db96d56Sopenharmony_cifrom unittest.mock import patch 317db96d56Sopenharmony_cifrom test.test_sqlite3.test_dbapi import memory_database, cx_limit 327db96d56Sopenharmony_ci 337db96d56Sopenharmony_ci 347db96d56Sopenharmony_ciclass RegressionTests(unittest.TestCase): 357db96d56Sopenharmony_ci def setUp(self): 367db96d56Sopenharmony_ci self.con = sqlite.connect(":memory:") 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_ci def tearDown(self): 397db96d56Sopenharmony_ci self.con.close() 407db96d56Sopenharmony_ci 417db96d56Sopenharmony_ci def test_pragma_user_version(self): 427db96d56Sopenharmony_ci # This used to crash pysqlite because this pragma command returns NULL for the column name 437db96d56Sopenharmony_ci cur = self.con.cursor() 447db96d56Sopenharmony_ci cur.execute("pragma user_version") 457db96d56Sopenharmony_ci 467db96d56Sopenharmony_ci def test_pragma_schema_version(self): 477db96d56Sopenharmony_ci # This still crashed pysqlite <= 2.2.1 487db96d56Sopenharmony_ci con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) 497db96d56Sopenharmony_ci try: 507db96d56Sopenharmony_ci cur = self.con.cursor() 517db96d56Sopenharmony_ci cur.execute("pragma schema_version") 527db96d56Sopenharmony_ci finally: 537db96d56Sopenharmony_ci cur.close() 547db96d56Sopenharmony_ci con.close() 557db96d56Sopenharmony_ci 567db96d56Sopenharmony_ci def test_statement_reset(self): 577db96d56Sopenharmony_ci # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are 587db96d56Sopenharmony_ci # reset before a rollback, but only those that are still in the 597db96d56Sopenharmony_ci # statement cache. The others are not accessible from the connection object. 607db96d56Sopenharmony_ci con = sqlite.connect(":memory:", cached_statements=5) 617db96d56Sopenharmony_ci cursors = [con.cursor() for x in range(5)] 627db96d56Sopenharmony_ci cursors[0].execute("create table test(x)") 637db96d56Sopenharmony_ci for i in range(10): 647db96d56Sopenharmony_ci cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) 657db96d56Sopenharmony_ci 667db96d56Sopenharmony_ci for i in range(5): 677db96d56Sopenharmony_ci cursors[i].execute(" " * i + "select x from test") 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci con.rollback() 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ci def test_column_name_with_spaces(self): 727db96d56Sopenharmony_ci cur = self.con.cursor() 737db96d56Sopenharmony_ci cur.execute('select 1 as "foo bar [datetime]"') 747db96d56Sopenharmony_ci self.assertEqual(cur.description[0][0], "foo bar [datetime]") 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci cur.execute('select 1 as "foo baz"') 777db96d56Sopenharmony_ci self.assertEqual(cur.description[0][0], "foo baz") 787db96d56Sopenharmony_ci 797db96d56Sopenharmony_ci def test_statement_finalization_on_close_db(self): 807db96d56Sopenharmony_ci # pysqlite versions <= 2.3.3 only finalized statements in the statement 817db96d56Sopenharmony_ci # cache when closing the database. statements that were still 827db96d56Sopenharmony_ci # referenced in cursors weren't closed and could provoke " 837db96d56Sopenharmony_ci # "OperationalError: Unable to close due to unfinalised statements". 847db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 857db96d56Sopenharmony_ci cursors = [] 867db96d56Sopenharmony_ci # default statement cache size is 100 877db96d56Sopenharmony_ci for i in range(105): 887db96d56Sopenharmony_ci cur = con.cursor() 897db96d56Sopenharmony_ci cursors.append(cur) 907db96d56Sopenharmony_ci cur.execute("select 1 x union select " + str(i)) 917db96d56Sopenharmony_ci con.close() 927db96d56Sopenharmony_ci 937db96d56Sopenharmony_ci def test_on_conflict_rollback(self): 947db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 957db96d56Sopenharmony_ci con.execute("create table foo(x, unique(x) on conflict rollback)") 967db96d56Sopenharmony_ci con.execute("insert into foo(x) values (1)") 977db96d56Sopenharmony_ci try: 987db96d56Sopenharmony_ci con.execute("insert into foo(x) values (1)") 997db96d56Sopenharmony_ci except sqlite.DatabaseError: 1007db96d56Sopenharmony_ci pass 1017db96d56Sopenharmony_ci con.execute("insert into foo(x) values (2)") 1027db96d56Sopenharmony_ci try: 1037db96d56Sopenharmony_ci con.commit() 1047db96d56Sopenharmony_ci except sqlite.OperationalError: 1057db96d56Sopenharmony_ci self.fail("pysqlite knew nothing about the implicit ROLLBACK") 1067db96d56Sopenharmony_ci 1077db96d56Sopenharmony_ci def test_workaround_for_buggy_sqlite_transfer_bindings(self): 1087db96d56Sopenharmony_ci """ 1097db96d56Sopenharmony_ci pysqlite would crash with older SQLite versions unless 1107db96d56Sopenharmony_ci a workaround is implemented. 1117db96d56Sopenharmony_ci """ 1127db96d56Sopenharmony_ci self.con.execute("create table foo(bar)") 1137db96d56Sopenharmony_ci self.con.execute("drop table foo") 1147db96d56Sopenharmony_ci self.con.execute("create table foo(bar)") 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci def test_empty_statement(self): 1177db96d56Sopenharmony_ci """ 1187db96d56Sopenharmony_ci pysqlite used to segfault with SQLite versions 3.5.x. These return NULL 1197db96d56Sopenharmony_ci for "no-operation" statements 1207db96d56Sopenharmony_ci """ 1217db96d56Sopenharmony_ci self.con.execute("") 1227db96d56Sopenharmony_ci 1237db96d56Sopenharmony_ci def test_type_map_usage(self): 1247db96d56Sopenharmony_ci """ 1257db96d56Sopenharmony_ci pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling 1267db96d56Sopenharmony_ci a statement. This test exhibits the problem. 1277db96d56Sopenharmony_ci """ 1287db96d56Sopenharmony_ci SELECT = "select * from foo" 1297db96d56Sopenharmony_ci con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 1307db96d56Sopenharmony_ci cur = con.cursor() 1317db96d56Sopenharmony_ci cur.execute("create table foo(bar timestamp)") 1327db96d56Sopenharmony_ci cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) 1337db96d56Sopenharmony_ci cur.execute(SELECT) 1347db96d56Sopenharmony_ci cur.execute("drop table foo") 1357db96d56Sopenharmony_ci cur.execute("create table foo(bar integer)") 1367db96d56Sopenharmony_ci cur.execute("insert into foo(bar) values (5)") 1377db96d56Sopenharmony_ci cur.execute(SELECT) 1387db96d56Sopenharmony_ci 1397db96d56Sopenharmony_ci def test_bind_mutating_list(self): 1407db96d56Sopenharmony_ci # Issue41662: Crash when mutate a list of parameters during iteration. 1417db96d56Sopenharmony_ci class X: 1427db96d56Sopenharmony_ci def __conform__(self, protocol): 1437db96d56Sopenharmony_ci parameters.clear() 1447db96d56Sopenharmony_ci return "..." 1457db96d56Sopenharmony_ci parameters = [X(), 0] 1467db96d56Sopenharmony_ci con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) 1477db96d56Sopenharmony_ci con.execute("create table foo(bar X, baz integer)") 1487db96d56Sopenharmony_ci # Should not crash 1497db96d56Sopenharmony_ci with self.assertRaises(IndexError): 1507db96d56Sopenharmony_ci con.execute("insert into foo(bar, baz) values (?, ?)", parameters) 1517db96d56Sopenharmony_ci 1527db96d56Sopenharmony_ci def test_error_msg_decode_error(self): 1537db96d56Sopenharmony_ci # When porting the module to Python 3.0, the error message about 1547db96d56Sopenharmony_ci # decoding errors disappeared. This verifies they're back again. 1557db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError) as cm: 1567db96d56Sopenharmony_ci self.con.execute("select 'xxx' || ? || 'yyy' colname", 1577db96d56Sopenharmony_ci (bytes(bytearray([250])),)).fetchone() 1587db96d56Sopenharmony_ci msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" 1597db96d56Sopenharmony_ci self.assertIn(msg, str(cm.exception)) 1607db96d56Sopenharmony_ci 1617db96d56Sopenharmony_ci def test_register_adapter(self): 1627db96d56Sopenharmony_ci """ 1637db96d56Sopenharmony_ci See issue 3312. 1647db96d56Sopenharmony_ci """ 1657db96d56Sopenharmony_ci self.assertRaises(TypeError, sqlite.register_adapter, {}, None) 1667db96d56Sopenharmony_ci 1677db96d56Sopenharmony_ci def test_set_isolation_level(self): 1687db96d56Sopenharmony_ci # See issue 27881. 1697db96d56Sopenharmony_ci class CustomStr(str): 1707db96d56Sopenharmony_ci def upper(self): 1717db96d56Sopenharmony_ci return None 1727db96d56Sopenharmony_ci def __del__(self): 1737db96d56Sopenharmony_ci con.isolation_level = "" 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 1767db96d56Sopenharmony_ci con.isolation_level = None 1777db96d56Sopenharmony_ci for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": 1787db96d56Sopenharmony_ci with self.subTest(level=level): 1797db96d56Sopenharmony_ci con.isolation_level = level 1807db96d56Sopenharmony_ci con.isolation_level = level.lower() 1817db96d56Sopenharmony_ci con.isolation_level = level.capitalize() 1827db96d56Sopenharmony_ci con.isolation_level = CustomStr(level) 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ci # setting isolation_level failure should not alter previous state 1857db96d56Sopenharmony_ci con.isolation_level = None 1867db96d56Sopenharmony_ci con.isolation_level = "DEFERRED" 1877db96d56Sopenharmony_ci pairs = [ 1887db96d56Sopenharmony_ci (1, TypeError), (b'', TypeError), ("abc", ValueError), 1897db96d56Sopenharmony_ci ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), 1907db96d56Sopenharmony_ci ] 1917db96d56Sopenharmony_ci for value, exc in pairs: 1927db96d56Sopenharmony_ci with self.subTest(level=value): 1937db96d56Sopenharmony_ci with self.assertRaises(exc): 1947db96d56Sopenharmony_ci con.isolation_level = value 1957db96d56Sopenharmony_ci self.assertEqual(con.isolation_level, "DEFERRED") 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci def test_cursor_constructor_call_check(self): 1987db96d56Sopenharmony_ci """ 1997db96d56Sopenharmony_ci Verifies that cursor methods check whether base class __init__ was 2007db96d56Sopenharmony_ci called. 2017db96d56Sopenharmony_ci """ 2027db96d56Sopenharmony_ci class Cursor(sqlite.Cursor): 2037db96d56Sopenharmony_ci def __init__(self, con): 2047db96d56Sopenharmony_ci pass 2057db96d56Sopenharmony_ci 2067db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 2077db96d56Sopenharmony_ci cur = Cursor(con) 2087db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 2097db96d56Sopenharmony_ci cur.execute("select 4+5").fetchall() 2107db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.ProgrammingError, 2117db96d56Sopenharmony_ci r'^Base Cursor\.__init__ not called\.$'): 2127db96d56Sopenharmony_ci cur.close() 2137db96d56Sopenharmony_ci 2147db96d56Sopenharmony_ci def test_str_subclass(self): 2157db96d56Sopenharmony_ci """ 2167db96d56Sopenharmony_ci The Python 3.0 port of the module didn't cope with values of subclasses of str. 2177db96d56Sopenharmony_ci """ 2187db96d56Sopenharmony_ci class MyStr(str): pass 2197db96d56Sopenharmony_ci self.con.execute("select ?", (MyStr("abc"),)) 2207db96d56Sopenharmony_ci 2217db96d56Sopenharmony_ci def test_connection_constructor_call_check(self): 2227db96d56Sopenharmony_ci """ 2237db96d56Sopenharmony_ci Verifies that connection methods check whether base class __init__ was 2247db96d56Sopenharmony_ci called. 2257db96d56Sopenharmony_ci """ 2267db96d56Sopenharmony_ci class Connection(sqlite.Connection): 2277db96d56Sopenharmony_ci def __init__(self, name): 2287db96d56Sopenharmony_ci pass 2297db96d56Sopenharmony_ci 2307db96d56Sopenharmony_ci con = Connection(":memory:") 2317db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 2327db96d56Sopenharmony_ci cur = con.cursor() 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ci def test_auto_commit(self): 2357db96d56Sopenharmony_ci """ 2367db96d56Sopenharmony_ci Verifies that creating a connection in autocommit mode works. 2377db96d56Sopenharmony_ci 2.5.3 introduced a regression so that these could no longer 2387db96d56Sopenharmony_ci be created. 2397db96d56Sopenharmony_ci """ 2407db96d56Sopenharmony_ci con = sqlite.connect(":memory:", isolation_level=None) 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_ci def test_pragma_autocommit(self): 2437db96d56Sopenharmony_ci """ 2447db96d56Sopenharmony_ci Verifies that running a PRAGMA statement that does an autocommit does 2457db96d56Sopenharmony_ci work. This did not work in 2.5.3/2.5.4. 2467db96d56Sopenharmony_ci """ 2477db96d56Sopenharmony_ci cur = self.con.cursor() 2487db96d56Sopenharmony_ci cur.execute("create table foo(bar)") 2497db96d56Sopenharmony_ci cur.execute("insert into foo(bar) values (5)") 2507db96d56Sopenharmony_ci 2517db96d56Sopenharmony_ci cur.execute("pragma page_size") 2527db96d56Sopenharmony_ci row = cur.fetchone() 2537db96d56Sopenharmony_ci 2547db96d56Sopenharmony_ci def test_connection_call(self): 2557db96d56Sopenharmony_ci """ 2567db96d56Sopenharmony_ci Call a connection with a non-string SQL request: check error handling 2577db96d56Sopenharmony_ci of the statement constructor. 2587db96d56Sopenharmony_ci """ 2597db96d56Sopenharmony_ci self.assertRaises(TypeError, self.con, b"select 1") 2607db96d56Sopenharmony_ci 2617db96d56Sopenharmony_ci def test_collation(self): 2627db96d56Sopenharmony_ci def collation_cb(a, b): 2637db96d56Sopenharmony_ci return 1 2647db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, self.con.create_collation, 2657db96d56Sopenharmony_ci # Lone surrogate cannot be encoded to the default encoding (utf8) 2667db96d56Sopenharmony_ci "\uDC80", collation_cb) 2677db96d56Sopenharmony_ci 2687db96d56Sopenharmony_ci def test_recursive_cursor_use(self): 2697db96d56Sopenharmony_ci """ 2707db96d56Sopenharmony_ci http://bugs.python.org/issue10811 2717db96d56Sopenharmony_ci 2727db96d56Sopenharmony_ci Recursively using a cursor, such as when reusing it from a generator led to segfaults. 2737db96d56Sopenharmony_ci Now we catch recursive cursor usage and raise a ProgrammingError. 2747db96d56Sopenharmony_ci """ 2757db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci cur = con.cursor() 2787db96d56Sopenharmony_ci cur.execute("create table a (bar)") 2797db96d56Sopenharmony_ci cur.execute("create table b (baz)") 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci def foo(): 2827db96d56Sopenharmony_ci cur.execute("insert into a (bar) values (?)", (1,)) 2837db96d56Sopenharmony_ci yield 1 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci with self.assertRaises(sqlite.ProgrammingError): 2867db96d56Sopenharmony_ci cur.executemany("insert into b (baz) values (?)", 2877db96d56Sopenharmony_ci ((i,) for i in foo())) 2887db96d56Sopenharmony_ci 2897db96d56Sopenharmony_ci def test_convert_timestamp_microsecond_padding(self): 2907db96d56Sopenharmony_ci """ 2917db96d56Sopenharmony_ci http://bugs.python.org/issue14720 2927db96d56Sopenharmony_ci 2937db96d56Sopenharmony_ci The microsecond parsing of convert_timestamp() should pad with zeros, 2947db96d56Sopenharmony_ci since the microsecond string "456" actually represents "456000". 2957db96d56Sopenharmony_ci """ 2967db96d56Sopenharmony_ci 2977db96d56Sopenharmony_ci con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) 2987db96d56Sopenharmony_ci cur = con.cursor() 2997db96d56Sopenharmony_ci cur.execute("CREATE TABLE t (x TIMESTAMP)") 3007db96d56Sopenharmony_ci 3017db96d56Sopenharmony_ci # Microseconds should be 456000 3027db96d56Sopenharmony_ci cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") 3037db96d56Sopenharmony_ci 3047db96d56Sopenharmony_ci # Microseconds should be truncated to 123456 3057db96d56Sopenharmony_ci cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") 3067db96d56Sopenharmony_ci 3077db96d56Sopenharmony_ci cur.execute("SELECT * FROM t") 3087db96d56Sopenharmony_ci values = [x[0] for x in cur.fetchall()] 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci self.assertEqual(values, [ 3117db96d56Sopenharmony_ci datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), 3127db96d56Sopenharmony_ci datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), 3137db96d56Sopenharmony_ci ]) 3147db96d56Sopenharmony_ci 3157db96d56Sopenharmony_ci def test_invalid_isolation_level_type(self): 3167db96d56Sopenharmony_ci # isolation level is a string, not an integer 3177db96d56Sopenharmony_ci self.assertRaises(TypeError, 3187db96d56Sopenharmony_ci sqlite.connect, ":memory:", isolation_level=123) 3197db96d56Sopenharmony_ci 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci def test_null_character(self): 3227db96d56Sopenharmony_ci # Issue #21147 3237db96d56Sopenharmony_ci cur = self.con.cursor() 3247db96d56Sopenharmony_ci queries = ["\0select 1", "select 1\0"] 3257db96d56Sopenharmony_ci for query in queries: 3267db96d56Sopenharmony_ci with self.subTest(query=query): 3277db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, "null char", 3287db96d56Sopenharmony_ci self.con.execute, query) 3297db96d56Sopenharmony_ci with self.subTest(query=query): 3307db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, "null char", 3317db96d56Sopenharmony_ci cur.execute, query) 3327db96d56Sopenharmony_ci 3337db96d56Sopenharmony_ci def test_surrogates(self): 3347db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 3357db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'") 3367db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, con, "select '\udcff'") 3377db96d56Sopenharmony_ci cur = con.cursor() 3387db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'") 3397db96d56Sopenharmony_ci self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'") 3407db96d56Sopenharmony_ci 3417db96d56Sopenharmony_ci def test_large_sql(self): 3427db96d56Sopenharmony_ci msg = "query string is too large" 3437db96d56Sopenharmony_ci with memory_database() as cx, cx_limit(cx) as lim: 3447db96d56Sopenharmony_ci cu = cx.cursor() 3457db96d56Sopenharmony_ci 3467db96d56Sopenharmony_ci cx("select 1".ljust(lim)) 3477db96d56Sopenharmony_ci # use a different SQL statement; don't reuse from the LRU cache 3487db96d56Sopenharmony_ci cu.execute("select 2".ljust(lim)) 3497db96d56Sopenharmony_ci 3507db96d56Sopenharmony_ci sql = "select 3".ljust(lim+1) 3517db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.DataError, msg, cx, sql) 3527db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql) 3537db96d56Sopenharmony_ci 3547db96d56Sopenharmony_ci def test_commit_cursor_reset(self): 3557db96d56Sopenharmony_ci """ 3567db96d56Sopenharmony_ci Connection.commit() did reset cursors, which made sqlite3 3577db96d56Sopenharmony_ci to return rows multiple times when fetched from cursors 3587db96d56Sopenharmony_ci after commit. See issues 10513 and 23129 for details. 3597db96d56Sopenharmony_ci """ 3607db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 3617db96d56Sopenharmony_ci con.executescript(""" 3627db96d56Sopenharmony_ci create table t(c); 3637db96d56Sopenharmony_ci create table t2(c); 3647db96d56Sopenharmony_ci insert into t values(0); 3657db96d56Sopenharmony_ci insert into t values(1); 3667db96d56Sopenharmony_ci insert into t values(2); 3677db96d56Sopenharmony_ci """) 3687db96d56Sopenharmony_ci 3697db96d56Sopenharmony_ci self.assertEqual(con.isolation_level, "") 3707db96d56Sopenharmony_ci 3717db96d56Sopenharmony_ci counter = 0 3727db96d56Sopenharmony_ci for i, row in enumerate(con.execute("select c from t")): 3737db96d56Sopenharmony_ci with self.subTest(i=i, row=row): 3747db96d56Sopenharmony_ci con.execute("insert into t2(c) values (?)", (i,)) 3757db96d56Sopenharmony_ci con.commit() 3767db96d56Sopenharmony_ci if counter == 0: 3777db96d56Sopenharmony_ci self.assertEqual(row[0], 0) 3787db96d56Sopenharmony_ci elif counter == 1: 3797db96d56Sopenharmony_ci self.assertEqual(row[0], 1) 3807db96d56Sopenharmony_ci elif counter == 2: 3817db96d56Sopenharmony_ci self.assertEqual(row[0], 2) 3827db96d56Sopenharmony_ci counter += 1 3837db96d56Sopenharmony_ci self.assertEqual(counter, 3, "should have returned exactly three rows") 3847db96d56Sopenharmony_ci 3857db96d56Sopenharmony_ci def test_bpo31770(self): 3867db96d56Sopenharmony_ci """ 3877db96d56Sopenharmony_ci The interpreter shouldn't crash in case Cursor.__init__() is called 3887db96d56Sopenharmony_ci more than once. 3897db96d56Sopenharmony_ci """ 3907db96d56Sopenharmony_ci def callback(*args): 3917db96d56Sopenharmony_ci pass 3927db96d56Sopenharmony_ci con = sqlite.connect(":memory:") 3937db96d56Sopenharmony_ci cur = sqlite.Cursor(con) 3947db96d56Sopenharmony_ci ref = weakref.ref(cur, callback) 3957db96d56Sopenharmony_ci cur.__init__(con) 3967db96d56Sopenharmony_ci del cur 3977db96d56Sopenharmony_ci # The interpreter shouldn't crash when ref is collected. 3987db96d56Sopenharmony_ci del ref 3997db96d56Sopenharmony_ci support.gc_collect() 4007db96d56Sopenharmony_ci 4017db96d56Sopenharmony_ci def test_del_isolation_level_segfault(self): 4027db96d56Sopenharmony_ci with self.assertRaises(AttributeError): 4037db96d56Sopenharmony_ci del self.con.isolation_level 4047db96d56Sopenharmony_ci 4057db96d56Sopenharmony_ci def test_bpo37347(self): 4067db96d56Sopenharmony_ci class Printer: 4077db96d56Sopenharmony_ci def log(self, *args): 4087db96d56Sopenharmony_ci return sqlite.SQLITE_OK 4097db96d56Sopenharmony_ci 4107db96d56Sopenharmony_ci for method in [self.con.set_trace_callback, 4117db96d56Sopenharmony_ci functools.partial(self.con.set_progress_handler, n=1), 4127db96d56Sopenharmony_ci self.con.set_authorizer]: 4137db96d56Sopenharmony_ci printer_instance = Printer() 4147db96d56Sopenharmony_ci method(printer_instance.log) 4157db96d56Sopenharmony_ci method(printer_instance.log) 4167db96d56Sopenharmony_ci self.con.execute("select 1") # trigger seg fault 4177db96d56Sopenharmony_ci method(None) 4187db96d56Sopenharmony_ci 4197db96d56Sopenharmony_ci def test_return_empty_bytestring(self): 4207db96d56Sopenharmony_ci cur = self.con.execute("select X''") 4217db96d56Sopenharmony_ci val = cur.fetchone()[0] 4227db96d56Sopenharmony_ci self.assertEqual(val, b'') 4237db96d56Sopenharmony_ci 4247db96d56Sopenharmony_ci def test_table_lock_cursor_replace_stmt(self): 4257db96d56Sopenharmony_ci with memory_database() as con: 4267db96d56Sopenharmony_ci cur = con.cursor() 4277db96d56Sopenharmony_ci cur.execute("create table t(t)") 4287db96d56Sopenharmony_ci cur.executemany("insert into t values(?)", 4297db96d56Sopenharmony_ci ((v,) for v in range(5))) 4307db96d56Sopenharmony_ci con.commit() 4317db96d56Sopenharmony_ci cur.execute("select t from t") 4327db96d56Sopenharmony_ci cur.execute("drop table t") 4337db96d56Sopenharmony_ci con.commit() 4347db96d56Sopenharmony_ci 4357db96d56Sopenharmony_ci def test_table_lock_cursor_dealloc(self): 4367db96d56Sopenharmony_ci with memory_database() as con: 4377db96d56Sopenharmony_ci con.execute("create table t(t)") 4387db96d56Sopenharmony_ci con.executemany("insert into t values(?)", 4397db96d56Sopenharmony_ci ((v,) for v in range(5))) 4407db96d56Sopenharmony_ci con.commit() 4417db96d56Sopenharmony_ci cur = con.execute("select t from t") 4427db96d56Sopenharmony_ci del cur 4437db96d56Sopenharmony_ci con.execute("drop table t") 4447db96d56Sopenharmony_ci con.commit() 4457db96d56Sopenharmony_ci 4467db96d56Sopenharmony_ci def test_table_lock_cursor_non_readonly_select(self): 4477db96d56Sopenharmony_ci with memory_database() as con: 4487db96d56Sopenharmony_ci con.execute("create table t(t)") 4497db96d56Sopenharmony_ci con.executemany("insert into t values(?)", 4507db96d56Sopenharmony_ci ((v,) for v in range(5))) 4517db96d56Sopenharmony_ci con.commit() 4527db96d56Sopenharmony_ci def dup(v): 4537db96d56Sopenharmony_ci con.execute("insert into t values(?)", (v,)) 4547db96d56Sopenharmony_ci return 4557db96d56Sopenharmony_ci con.create_function("dup", 1, dup) 4567db96d56Sopenharmony_ci cur = con.execute("select dup(t) from t") 4577db96d56Sopenharmony_ci del cur 4587db96d56Sopenharmony_ci con.execute("drop table t") 4597db96d56Sopenharmony_ci con.commit() 4607db96d56Sopenharmony_ci 4617db96d56Sopenharmony_ci def test_executescript_step_through_select(self): 4627db96d56Sopenharmony_ci with memory_database() as con: 4637db96d56Sopenharmony_ci values = [(v,) for v in range(5)] 4647db96d56Sopenharmony_ci with con: 4657db96d56Sopenharmony_ci con.execute("create table t(t)") 4667db96d56Sopenharmony_ci con.executemany("insert into t values(?)", values) 4677db96d56Sopenharmony_ci steps = [] 4687db96d56Sopenharmony_ci con.create_function("step", 1, lambda x: steps.append((x,))) 4697db96d56Sopenharmony_ci con.executescript("select step(t) from t") 4707db96d56Sopenharmony_ci self.assertEqual(steps, values) 4717db96d56Sopenharmony_ci 4727db96d56Sopenharmony_ci def test_custom_cursor_object_crash_gh_99886(self): 4737db96d56Sopenharmony_ci # This test segfaults on GH-99886 4747db96d56Sopenharmony_ci class MyCursor(sqlite.Cursor): 4757db96d56Sopenharmony_ci def __init__(self, *args, **kwargs): 4767db96d56Sopenharmony_ci super().__init__(*args, **kwargs) 4777db96d56Sopenharmony_ci # this can go before or after the super call; doesn't matter 4787db96d56Sopenharmony_ci self.some_attr = None 4797db96d56Sopenharmony_ci 4807db96d56Sopenharmony_ci with memory_database() as con: 4817db96d56Sopenharmony_ci cur = con.cursor(MyCursor) 4827db96d56Sopenharmony_ci cur.close() 4837db96d56Sopenharmony_ci del cur 4847db96d56Sopenharmony_ci 4857db96d56Sopenharmony_ciclass RecursiveUseOfCursors(unittest.TestCase): 4867db96d56Sopenharmony_ci # GH-80254: sqlite3 should not segfault for recursive use of cursors. 4877db96d56Sopenharmony_ci msg = "Recursive use of cursors not allowed" 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci def setUp(self): 4907db96d56Sopenharmony_ci self.con = sqlite.connect(":memory:", 4917db96d56Sopenharmony_ci detect_types=sqlite.PARSE_COLNAMES) 4927db96d56Sopenharmony_ci self.cur = self.con.cursor() 4937db96d56Sopenharmony_ci self.cur.execute("create table test(x foo)") 4947db96d56Sopenharmony_ci self.cur.executemany("insert into test(x) values (?)", 4957db96d56Sopenharmony_ci [("foo",), ("bar",)]) 4967db96d56Sopenharmony_ci 4977db96d56Sopenharmony_ci def tearDown(self): 4987db96d56Sopenharmony_ci self.cur.close() 4997db96d56Sopenharmony_ci self.con.close() 5007db96d56Sopenharmony_ci 5017db96d56Sopenharmony_ci def test_recursive_cursor_init(self): 5027db96d56Sopenharmony_ci conv = lambda x: self.cur.__init__(self.con) 5037db96d56Sopenharmony_ci with patch.dict(sqlite.converters, {"INIT": conv}): 5047db96d56Sopenharmony_ci self.cur.execute(f'select x as "x [INIT]", x from test') 5057db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 5067db96d56Sopenharmony_ci self.cur.fetchall) 5077db96d56Sopenharmony_ci 5087db96d56Sopenharmony_ci def test_recursive_cursor_close(self): 5097db96d56Sopenharmony_ci conv = lambda x: self.cur.close() 5107db96d56Sopenharmony_ci with patch.dict(sqlite.converters, {"CLOSE": conv}): 5117db96d56Sopenharmony_ci self.cur.execute(f'select x as "x [CLOSE]", x from test') 5127db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 5137db96d56Sopenharmony_ci self.cur.fetchall) 5147db96d56Sopenharmony_ci 5157db96d56Sopenharmony_ci def test_recursive_cursor_iter(self): 5167db96d56Sopenharmony_ci conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None) 5177db96d56Sopenharmony_ci with patch.dict(sqlite.converters, {"ITER": conv}): 5187db96d56Sopenharmony_ci self.cur.execute(f'select x as "x [ITER]", x from test') 5197db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, 5207db96d56Sopenharmony_ci self.cur.fetchall) 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci 5237db96d56Sopenharmony_ciif __name__ == "__main__": 5247db96d56Sopenharmony_ci unittest.main() 525