17db96d56Sopenharmony_ci# pysqlite2/test/regression.py: pysqlite regression tests
27db96d56Sopenharmony_ci#
37db96d56Sopenharmony_ci# Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de>
47db96d56Sopenharmony_ci#
57db96d56Sopenharmony_ci# This file is part of pysqlite.
67db96d56Sopenharmony_ci#
77db96d56Sopenharmony_ci# This software is provided 'as-is', without any express or implied
87db96d56Sopenharmony_ci# warranty.  In no event will the authors be held liable for any damages
97db96d56Sopenharmony_ci# arising from the use of this software.
107db96d56Sopenharmony_ci#
117db96d56Sopenharmony_ci# Permission is granted to anyone to use this software for any purpose,
127db96d56Sopenharmony_ci# including commercial applications, and to alter it and redistribute it
137db96d56Sopenharmony_ci# freely, subject to the following restrictions:
147db96d56Sopenharmony_ci#
157db96d56Sopenharmony_ci# 1. The origin of this software must not be misrepresented; you must not
167db96d56Sopenharmony_ci#    claim that you wrote the original software. If you use this software
177db96d56Sopenharmony_ci#    in a product, an acknowledgment in the product documentation would be
187db96d56Sopenharmony_ci#    appreciated but is not required.
197db96d56Sopenharmony_ci# 2. Altered source versions must be plainly marked as such, and must not be
207db96d56Sopenharmony_ci#    misrepresented as being the original software.
217db96d56Sopenharmony_ci# 3. This notice may not be removed or altered from any source distribution.
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ciimport datetime
247db96d56Sopenharmony_ciimport unittest
257db96d56Sopenharmony_ciimport sqlite3 as sqlite
267db96d56Sopenharmony_ciimport weakref
277db96d56Sopenharmony_ciimport functools
287db96d56Sopenharmony_ci
297db96d56Sopenharmony_cifrom test import support
307db96d56Sopenharmony_cifrom unittest.mock import patch
317db96d56Sopenharmony_cifrom test.test_sqlite3.test_dbapi import memory_database, cx_limit
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ci
347db96d56Sopenharmony_ciclass RegressionTests(unittest.TestCase):
357db96d56Sopenharmony_ci    def setUp(self):
367db96d56Sopenharmony_ci        self.con = sqlite.connect(":memory:")
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_ci    def tearDown(self):
397db96d56Sopenharmony_ci        self.con.close()
407db96d56Sopenharmony_ci
417db96d56Sopenharmony_ci    def test_pragma_user_version(self):
427db96d56Sopenharmony_ci        # This used to crash pysqlite because this pragma command returns NULL for the column name
437db96d56Sopenharmony_ci        cur = self.con.cursor()
447db96d56Sopenharmony_ci        cur.execute("pragma user_version")
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ci    def test_pragma_schema_version(self):
477db96d56Sopenharmony_ci        # This still crashed pysqlite <= 2.2.1
487db96d56Sopenharmony_ci        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES)
497db96d56Sopenharmony_ci        try:
507db96d56Sopenharmony_ci            cur = self.con.cursor()
517db96d56Sopenharmony_ci            cur.execute("pragma schema_version")
527db96d56Sopenharmony_ci        finally:
537db96d56Sopenharmony_ci            cur.close()
547db96d56Sopenharmony_ci            con.close()
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ci    def test_statement_reset(self):
577db96d56Sopenharmony_ci        # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are
587db96d56Sopenharmony_ci        # reset before a rollback, but only those that are still in the
597db96d56Sopenharmony_ci        # statement cache. The others are not accessible from the connection object.
607db96d56Sopenharmony_ci        con = sqlite.connect(":memory:", cached_statements=5)
617db96d56Sopenharmony_ci        cursors = [con.cursor() for x in range(5)]
627db96d56Sopenharmony_ci        cursors[0].execute("create table test(x)")
637db96d56Sopenharmony_ci        for i in range(10):
647db96d56Sopenharmony_ci            cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)])
657db96d56Sopenharmony_ci
667db96d56Sopenharmony_ci        for i in range(5):
677db96d56Sopenharmony_ci            cursors[i].execute(" " * i + "select x from test")
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci        con.rollback()
707db96d56Sopenharmony_ci
717db96d56Sopenharmony_ci    def test_column_name_with_spaces(self):
727db96d56Sopenharmony_ci        cur = self.con.cursor()
737db96d56Sopenharmony_ci        cur.execute('select 1 as "foo bar [datetime]"')
747db96d56Sopenharmony_ci        self.assertEqual(cur.description[0][0], "foo bar [datetime]")
757db96d56Sopenharmony_ci
767db96d56Sopenharmony_ci        cur.execute('select 1 as "foo baz"')
777db96d56Sopenharmony_ci        self.assertEqual(cur.description[0][0], "foo baz")
787db96d56Sopenharmony_ci
797db96d56Sopenharmony_ci    def test_statement_finalization_on_close_db(self):
807db96d56Sopenharmony_ci        # pysqlite versions <= 2.3.3 only finalized statements in the statement
817db96d56Sopenharmony_ci        # cache when closing the database. statements that were still
827db96d56Sopenharmony_ci        # referenced in cursors weren't closed and could provoke "
837db96d56Sopenharmony_ci        # "OperationalError: Unable to close due to unfinalised statements".
847db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
857db96d56Sopenharmony_ci        cursors = []
867db96d56Sopenharmony_ci        # default statement cache size is 100
877db96d56Sopenharmony_ci        for i in range(105):
887db96d56Sopenharmony_ci            cur = con.cursor()
897db96d56Sopenharmony_ci            cursors.append(cur)
907db96d56Sopenharmony_ci            cur.execute("select 1 x union select " + str(i))
917db96d56Sopenharmony_ci        con.close()
927db96d56Sopenharmony_ci
937db96d56Sopenharmony_ci    def test_on_conflict_rollback(self):
947db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
957db96d56Sopenharmony_ci        con.execute("create table foo(x, unique(x) on conflict rollback)")
967db96d56Sopenharmony_ci        con.execute("insert into foo(x) values (1)")
977db96d56Sopenharmony_ci        try:
987db96d56Sopenharmony_ci            con.execute("insert into foo(x) values (1)")
997db96d56Sopenharmony_ci        except sqlite.DatabaseError:
1007db96d56Sopenharmony_ci            pass
1017db96d56Sopenharmony_ci        con.execute("insert into foo(x) values (2)")
1027db96d56Sopenharmony_ci        try:
1037db96d56Sopenharmony_ci            con.commit()
1047db96d56Sopenharmony_ci        except sqlite.OperationalError:
1057db96d56Sopenharmony_ci            self.fail("pysqlite knew nothing about the implicit ROLLBACK")
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ci    def test_workaround_for_buggy_sqlite_transfer_bindings(self):
1087db96d56Sopenharmony_ci        """
1097db96d56Sopenharmony_ci        pysqlite would crash with older SQLite versions unless
1107db96d56Sopenharmony_ci        a workaround is implemented.
1117db96d56Sopenharmony_ci        """
1127db96d56Sopenharmony_ci        self.con.execute("create table foo(bar)")
1137db96d56Sopenharmony_ci        self.con.execute("drop table foo")
1147db96d56Sopenharmony_ci        self.con.execute("create table foo(bar)")
1157db96d56Sopenharmony_ci
1167db96d56Sopenharmony_ci    def test_empty_statement(self):
1177db96d56Sopenharmony_ci        """
1187db96d56Sopenharmony_ci        pysqlite used to segfault with SQLite versions 3.5.x. These return NULL
1197db96d56Sopenharmony_ci        for "no-operation" statements
1207db96d56Sopenharmony_ci        """
1217db96d56Sopenharmony_ci        self.con.execute("")
1227db96d56Sopenharmony_ci
1237db96d56Sopenharmony_ci    def test_type_map_usage(self):
1247db96d56Sopenharmony_ci        """
1257db96d56Sopenharmony_ci        pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling
1267db96d56Sopenharmony_ci        a statement. This test exhibits the problem.
1277db96d56Sopenharmony_ci        """
1287db96d56Sopenharmony_ci        SELECT = "select * from foo"
1297db96d56Sopenharmony_ci        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
1307db96d56Sopenharmony_ci        cur = con.cursor()
1317db96d56Sopenharmony_ci        cur.execute("create table foo(bar timestamp)")
1327db96d56Sopenharmony_ci        cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),))
1337db96d56Sopenharmony_ci        cur.execute(SELECT)
1347db96d56Sopenharmony_ci        cur.execute("drop table foo")
1357db96d56Sopenharmony_ci        cur.execute("create table foo(bar integer)")
1367db96d56Sopenharmony_ci        cur.execute("insert into foo(bar) values (5)")
1377db96d56Sopenharmony_ci        cur.execute(SELECT)
1387db96d56Sopenharmony_ci
1397db96d56Sopenharmony_ci    def test_bind_mutating_list(self):
1407db96d56Sopenharmony_ci        # Issue41662: Crash when mutate a list of parameters during iteration.
1417db96d56Sopenharmony_ci        class X:
1427db96d56Sopenharmony_ci            def __conform__(self, protocol):
1437db96d56Sopenharmony_ci                parameters.clear()
1447db96d56Sopenharmony_ci                return "..."
1457db96d56Sopenharmony_ci        parameters = [X(), 0]
1467db96d56Sopenharmony_ci        con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES)
1477db96d56Sopenharmony_ci        con.execute("create table foo(bar X, baz integer)")
1487db96d56Sopenharmony_ci        # Should not crash
1497db96d56Sopenharmony_ci        with self.assertRaises(IndexError):
1507db96d56Sopenharmony_ci            con.execute("insert into foo(bar, baz) values (?, ?)", parameters)
1517db96d56Sopenharmony_ci
1527db96d56Sopenharmony_ci    def test_error_msg_decode_error(self):
1537db96d56Sopenharmony_ci        # When porting the module to Python 3.0, the error message about
1547db96d56Sopenharmony_ci        # decoding errors disappeared. This verifies they're back again.
1557db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError) as cm:
1567db96d56Sopenharmony_ci            self.con.execute("select 'xxx' || ? || 'yyy' colname",
1577db96d56Sopenharmony_ci                             (bytes(bytearray([250])),)).fetchone()
1587db96d56Sopenharmony_ci        msg = "Could not decode to UTF-8 column 'colname' with text 'xxx"
1597db96d56Sopenharmony_ci        self.assertIn(msg, str(cm.exception))
1607db96d56Sopenharmony_ci
1617db96d56Sopenharmony_ci    def test_register_adapter(self):
1627db96d56Sopenharmony_ci        """
1637db96d56Sopenharmony_ci        See issue 3312.
1647db96d56Sopenharmony_ci        """
1657db96d56Sopenharmony_ci        self.assertRaises(TypeError, sqlite.register_adapter, {}, None)
1667db96d56Sopenharmony_ci
1677db96d56Sopenharmony_ci    def test_set_isolation_level(self):
1687db96d56Sopenharmony_ci        # See issue 27881.
1697db96d56Sopenharmony_ci        class CustomStr(str):
1707db96d56Sopenharmony_ci            def upper(self):
1717db96d56Sopenharmony_ci                return None
1727db96d56Sopenharmony_ci            def __del__(self):
1737db96d56Sopenharmony_ci                con.isolation_level = ""
1747db96d56Sopenharmony_ci
1757db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
1767db96d56Sopenharmony_ci        con.isolation_level = None
1777db96d56Sopenharmony_ci        for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE":
1787db96d56Sopenharmony_ci            with self.subTest(level=level):
1797db96d56Sopenharmony_ci                con.isolation_level = level
1807db96d56Sopenharmony_ci                con.isolation_level = level.lower()
1817db96d56Sopenharmony_ci                con.isolation_level = level.capitalize()
1827db96d56Sopenharmony_ci                con.isolation_level = CustomStr(level)
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci        # setting isolation_level failure should not alter previous state
1857db96d56Sopenharmony_ci        con.isolation_level = None
1867db96d56Sopenharmony_ci        con.isolation_level = "DEFERRED"
1877db96d56Sopenharmony_ci        pairs = [
1887db96d56Sopenharmony_ci            (1, TypeError), (b'', TypeError), ("abc", ValueError),
1897db96d56Sopenharmony_ci            ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError),
1907db96d56Sopenharmony_ci        ]
1917db96d56Sopenharmony_ci        for value, exc in pairs:
1927db96d56Sopenharmony_ci            with self.subTest(level=value):
1937db96d56Sopenharmony_ci                with self.assertRaises(exc):
1947db96d56Sopenharmony_ci                    con.isolation_level = value
1957db96d56Sopenharmony_ci                self.assertEqual(con.isolation_level, "DEFERRED")
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci    def test_cursor_constructor_call_check(self):
1987db96d56Sopenharmony_ci        """
1997db96d56Sopenharmony_ci        Verifies that cursor methods check whether base class __init__ was
2007db96d56Sopenharmony_ci        called.
2017db96d56Sopenharmony_ci        """
2027db96d56Sopenharmony_ci        class Cursor(sqlite.Cursor):
2037db96d56Sopenharmony_ci            def __init__(self, con):
2047db96d56Sopenharmony_ci                pass
2057db96d56Sopenharmony_ci
2067db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
2077db96d56Sopenharmony_ci        cur = Cursor(con)
2087db96d56Sopenharmony_ci        with self.assertRaises(sqlite.ProgrammingError):
2097db96d56Sopenharmony_ci            cur.execute("select 4+5").fetchall()
2107db96d56Sopenharmony_ci        with self.assertRaisesRegex(sqlite.ProgrammingError,
2117db96d56Sopenharmony_ci                                    r'^Base Cursor\.__init__ not called\.$'):
2127db96d56Sopenharmony_ci            cur.close()
2137db96d56Sopenharmony_ci
2147db96d56Sopenharmony_ci    def test_str_subclass(self):
2157db96d56Sopenharmony_ci        """
2167db96d56Sopenharmony_ci        The Python 3.0 port of the module didn't cope with values of subclasses of str.
2177db96d56Sopenharmony_ci        """
2187db96d56Sopenharmony_ci        class MyStr(str): pass
2197db96d56Sopenharmony_ci        self.con.execute("select ?", (MyStr("abc"),))
2207db96d56Sopenharmony_ci
2217db96d56Sopenharmony_ci    def test_connection_constructor_call_check(self):
2227db96d56Sopenharmony_ci        """
2237db96d56Sopenharmony_ci        Verifies that connection methods check whether base class __init__ was
2247db96d56Sopenharmony_ci        called.
2257db96d56Sopenharmony_ci        """
2267db96d56Sopenharmony_ci        class Connection(sqlite.Connection):
2277db96d56Sopenharmony_ci            def __init__(self, name):
2287db96d56Sopenharmony_ci                pass
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_ci        con = Connection(":memory:")
2317db96d56Sopenharmony_ci        with self.assertRaises(sqlite.ProgrammingError):
2327db96d56Sopenharmony_ci            cur = con.cursor()
2337db96d56Sopenharmony_ci
2347db96d56Sopenharmony_ci    def test_auto_commit(self):
2357db96d56Sopenharmony_ci        """
2367db96d56Sopenharmony_ci        Verifies that creating a connection in autocommit mode works.
2377db96d56Sopenharmony_ci        2.5.3 introduced a regression so that these could no longer
2387db96d56Sopenharmony_ci        be created.
2397db96d56Sopenharmony_ci        """
2407db96d56Sopenharmony_ci        con = sqlite.connect(":memory:", isolation_level=None)
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_ci    def test_pragma_autocommit(self):
2437db96d56Sopenharmony_ci        """
2447db96d56Sopenharmony_ci        Verifies that running a PRAGMA statement that does an autocommit does
2457db96d56Sopenharmony_ci        work. This did not work in 2.5.3/2.5.4.
2467db96d56Sopenharmony_ci        """
2477db96d56Sopenharmony_ci        cur = self.con.cursor()
2487db96d56Sopenharmony_ci        cur.execute("create table foo(bar)")
2497db96d56Sopenharmony_ci        cur.execute("insert into foo(bar) values (5)")
2507db96d56Sopenharmony_ci
2517db96d56Sopenharmony_ci        cur.execute("pragma page_size")
2527db96d56Sopenharmony_ci        row = cur.fetchone()
2537db96d56Sopenharmony_ci
2547db96d56Sopenharmony_ci    def test_connection_call(self):
2557db96d56Sopenharmony_ci        """
2567db96d56Sopenharmony_ci        Call a connection with a non-string SQL request: check error handling
2577db96d56Sopenharmony_ci        of the statement constructor.
2587db96d56Sopenharmony_ci        """
2597db96d56Sopenharmony_ci        self.assertRaises(TypeError, self.con, b"select 1")
2607db96d56Sopenharmony_ci
2617db96d56Sopenharmony_ci    def test_collation(self):
2627db96d56Sopenharmony_ci        def collation_cb(a, b):
2637db96d56Sopenharmony_ci            return 1
2647db96d56Sopenharmony_ci        self.assertRaises(UnicodeEncodeError, self.con.create_collation,
2657db96d56Sopenharmony_ci            # Lone surrogate cannot be encoded to the default encoding (utf8)
2667db96d56Sopenharmony_ci            "\uDC80", collation_cb)
2677db96d56Sopenharmony_ci
2687db96d56Sopenharmony_ci    def test_recursive_cursor_use(self):
2697db96d56Sopenharmony_ci        """
2707db96d56Sopenharmony_ci        http://bugs.python.org/issue10811
2717db96d56Sopenharmony_ci
2727db96d56Sopenharmony_ci        Recursively using a cursor, such as when reusing it from a generator led to segfaults.
2737db96d56Sopenharmony_ci        Now we catch recursive cursor usage and raise a ProgrammingError.
2747db96d56Sopenharmony_ci        """
2757db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci        cur = con.cursor()
2787db96d56Sopenharmony_ci        cur.execute("create table a (bar)")
2797db96d56Sopenharmony_ci        cur.execute("create table b (baz)")
2807db96d56Sopenharmony_ci
2817db96d56Sopenharmony_ci        def foo():
2827db96d56Sopenharmony_ci            cur.execute("insert into a (bar) values (?)", (1,))
2837db96d56Sopenharmony_ci            yield 1
2847db96d56Sopenharmony_ci
2857db96d56Sopenharmony_ci        with self.assertRaises(sqlite.ProgrammingError):
2867db96d56Sopenharmony_ci            cur.executemany("insert into b (baz) values (?)",
2877db96d56Sopenharmony_ci                            ((i,) for i in foo()))
2887db96d56Sopenharmony_ci
2897db96d56Sopenharmony_ci    def test_convert_timestamp_microsecond_padding(self):
2907db96d56Sopenharmony_ci        """
2917db96d56Sopenharmony_ci        http://bugs.python.org/issue14720
2927db96d56Sopenharmony_ci
2937db96d56Sopenharmony_ci        The microsecond parsing of convert_timestamp() should pad with zeros,
2947db96d56Sopenharmony_ci        since the microsecond string "456" actually represents "456000".
2957db96d56Sopenharmony_ci        """
2967db96d56Sopenharmony_ci
2977db96d56Sopenharmony_ci        con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES)
2987db96d56Sopenharmony_ci        cur = con.cursor()
2997db96d56Sopenharmony_ci        cur.execute("CREATE TABLE t (x TIMESTAMP)")
3007db96d56Sopenharmony_ci
3017db96d56Sopenharmony_ci        # Microseconds should be 456000
3027db96d56Sopenharmony_ci        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')")
3037db96d56Sopenharmony_ci
3047db96d56Sopenharmony_ci        # Microseconds should be truncated to 123456
3057db96d56Sopenharmony_ci        cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')")
3067db96d56Sopenharmony_ci
3077db96d56Sopenharmony_ci        cur.execute("SELECT * FROM t")
3087db96d56Sopenharmony_ci        values = [x[0] for x in cur.fetchall()]
3097db96d56Sopenharmony_ci
3107db96d56Sopenharmony_ci        self.assertEqual(values, [
3117db96d56Sopenharmony_ci            datetime.datetime(2012, 4, 4, 15, 6, 0, 456000),
3127db96d56Sopenharmony_ci            datetime.datetime(2012, 4, 4, 15, 6, 0, 123456),
3137db96d56Sopenharmony_ci        ])
3147db96d56Sopenharmony_ci
3157db96d56Sopenharmony_ci    def test_invalid_isolation_level_type(self):
3167db96d56Sopenharmony_ci        # isolation level is a string, not an integer
3177db96d56Sopenharmony_ci        self.assertRaises(TypeError,
3187db96d56Sopenharmony_ci                          sqlite.connect, ":memory:", isolation_level=123)
3197db96d56Sopenharmony_ci
3207db96d56Sopenharmony_ci
3217db96d56Sopenharmony_ci    def test_null_character(self):
3227db96d56Sopenharmony_ci        # Issue #21147
3237db96d56Sopenharmony_ci        cur = self.con.cursor()
3247db96d56Sopenharmony_ci        queries = ["\0select 1", "select 1\0"]
3257db96d56Sopenharmony_ci        for query in queries:
3267db96d56Sopenharmony_ci            with self.subTest(query=query):
3277db96d56Sopenharmony_ci                self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
3287db96d56Sopenharmony_ci                                       self.con.execute, query)
3297db96d56Sopenharmony_ci            with self.subTest(query=query):
3307db96d56Sopenharmony_ci                self.assertRaisesRegex(sqlite.ProgrammingError, "null char",
3317db96d56Sopenharmony_ci                                       cur.execute, query)
3327db96d56Sopenharmony_ci
3337db96d56Sopenharmony_ci    def test_surrogates(self):
3347db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
3357db96d56Sopenharmony_ci        self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'")
3367db96d56Sopenharmony_ci        self.assertRaises(UnicodeEncodeError, con, "select '\udcff'")
3377db96d56Sopenharmony_ci        cur = con.cursor()
3387db96d56Sopenharmony_ci        self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'")
3397db96d56Sopenharmony_ci        self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'")
3407db96d56Sopenharmony_ci
3417db96d56Sopenharmony_ci    def test_large_sql(self):
3427db96d56Sopenharmony_ci        msg = "query string is too large"
3437db96d56Sopenharmony_ci        with memory_database() as cx, cx_limit(cx) as lim:
3447db96d56Sopenharmony_ci            cu = cx.cursor()
3457db96d56Sopenharmony_ci
3467db96d56Sopenharmony_ci            cx("select 1".ljust(lim))
3477db96d56Sopenharmony_ci            # use a different SQL statement; don't reuse from the LRU cache
3487db96d56Sopenharmony_ci            cu.execute("select 2".ljust(lim))
3497db96d56Sopenharmony_ci
3507db96d56Sopenharmony_ci            sql = "select 3".ljust(lim+1)
3517db96d56Sopenharmony_ci            self.assertRaisesRegex(sqlite.DataError, msg, cx, sql)
3527db96d56Sopenharmony_ci            self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql)
3537db96d56Sopenharmony_ci
3547db96d56Sopenharmony_ci    def test_commit_cursor_reset(self):
3557db96d56Sopenharmony_ci        """
3567db96d56Sopenharmony_ci        Connection.commit() did reset cursors, which made sqlite3
3577db96d56Sopenharmony_ci        to return rows multiple times when fetched from cursors
3587db96d56Sopenharmony_ci        after commit. See issues 10513 and 23129 for details.
3597db96d56Sopenharmony_ci        """
3607db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
3617db96d56Sopenharmony_ci        con.executescript("""
3627db96d56Sopenharmony_ci        create table t(c);
3637db96d56Sopenharmony_ci        create table t2(c);
3647db96d56Sopenharmony_ci        insert into t values(0);
3657db96d56Sopenharmony_ci        insert into t values(1);
3667db96d56Sopenharmony_ci        insert into t values(2);
3677db96d56Sopenharmony_ci        """)
3687db96d56Sopenharmony_ci
3697db96d56Sopenharmony_ci        self.assertEqual(con.isolation_level, "")
3707db96d56Sopenharmony_ci
3717db96d56Sopenharmony_ci        counter = 0
3727db96d56Sopenharmony_ci        for i, row in enumerate(con.execute("select c from t")):
3737db96d56Sopenharmony_ci            with self.subTest(i=i, row=row):
3747db96d56Sopenharmony_ci                con.execute("insert into t2(c) values (?)", (i,))
3757db96d56Sopenharmony_ci                con.commit()
3767db96d56Sopenharmony_ci                if counter == 0:
3777db96d56Sopenharmony_ci                    self.assertEqual(row[0], 0)
3787db96d56Sopenharmony_ci                elif counter == 1:
3797db96d56Sopenharmony_ci                    self.assertEqual(row[0], 1)
3807db96d56Sopenharmony_ci                elif counter == 2:
3817db96d56Sopenharmony_ci                    self.assertEqual(row[0], 2)
3827db96d56Sopenharmony_ci                counter += 1
3837db96d56Sopenharmony_ci        self.assertEqual(counter, 3, "should have returned exactly three rows")
3847db96d56Sopenharmony_ci
3857db96d56Sopenharmony_ci    def test_bpo31770(self):
3867db96d56Sopenharmony_ci        """
3877db96d56Sopenharmony_ci        The interpreter shouldn't crash in case Cursor.__init__() is called
3887db96d56Sopenharmony_ci        more than once.
3897db96d56Sopenharmony_ci        """
3907db96d56Sopenharmony_ci        def callback(*args):
3917db96d56Sopenharmony_ci            pass
3927db96d56Sopenharmony_ci        con = sqlite.connect(":memory:")
3937db96d56Sopenharmony_ci        cur = sqlite.Cursor(con)
3947db96d56Sopenharmony_ci        ref = weakref.ref(cur, callback)
3957db96d56Sopenharmony_ci        cur.__init__(con)
3967db96d56Sopenharmony_ci        del cur
3977db96d56Sopenharmony_ci        # The interpreter shouldn't crash when ref is collected.
3987db96d56Sopenharmony_ci        del ref
3997db96d56Sopenharmony_ci        support.gc_collect()
4007db96d56Sopenharmony_ci
4017db96d56Sopenharmony_ci    def test_del_isolation_level_segfault(self):
4027db96d56Sopenharmony_ci        with self.assertRaises(AttributeError):
4037db96d56Sopenharmony_ci            del self.con.isolation_level
4047db96d56Sopenharmony_ci
4057db96d56Sopenharmony_ci    def test_bpo37347(self):
4067db96d56Sopenharmony_ci        class Printer:
4077db96d56Sopenharmony_ci            def log(self, *args):
4087db96d56Sopenharmony_ci                return sqlite.SQLITE_OK
4097db96d56Sopenharmony_ci
4107db96d56Sopenharmony_ci        for method in [self.con.set_trace_callback,
4117db96d56Sopenharmony_ci                       functools.partial(self.con.set_progress_handler, n=1),
4127db96d56Sopenharmony_ci                       self.con.set_authorizer]:
4137db96d56Sopenharmony_ci            printer_instance = Printer()
4147db96d56Sopenharmony_ci            method(printer_instance.log)
4157db96d56Sopenharmony_ci            method(printer_instance.log)
4167db96d56Sopenharmony_ci            self.con.execute("select 1")  # trigger seg fault
4177db96d56Sopenharmony_ci            method(None)
4187db96d56Sopenharmony_ci
4197db96d56Sopenharmony_ci    def test_return_empty_bytestring(self):
4207db96d56Sopenharmony_ci        cur = self.con.execute("select X''")
4217db96d56Sopenharmony_ci        val = cur.fetchone()[0]
4227db96d56Sopenharmony_ci        self.assertEqual(val, b'')
4237db96d56Sopenharmony_ci
4247db96d56Sopenharmony_ci    def test_table_lock_cursor_replace_stmt(self):
4257db96d56Sopenharmony_ci        with memory_database() as con:
4267db96d56Sopenharmony_ci            cur = con.cursor()
4277db96d56Sopenharmony_ci            cur.execute("create table t(t)")
4287db96d56Sopenharmony_ci            cur.executemany("insert into t values(?)",
4297db96d56Sopenharmony_ci                            ((v,) for v in range(5)))
4307db96d56Sopenharmony_ci            con.commit()
4317db96d56Sopenharmony_ci            cur.execute("select t from t")
4327db96d56Sopenharmony_ci            cur.execute("drop table t")
4337db96d56Sopenharmony_ci            con.commit()
4347db96d56Sopenharmony_ci
4357db96d56Sopenharmony_ci    def test_table_lock_cursor_dealloc(self):
4367db96d56Sopenharmony_ci        with memory_database() as con:
4377db96d56Sopenharmony_ci            con.execute("create table t(t)")
4387db96d56Sopenharmony_ci            con.executemany("insert into t values(?)",
4397db96d56Sopenharmony_ci                            ((v,) for v in range(5)))
4407db96d56Sopenharmony_ci            con.commit()
4417db96d56Sopenharmony_ci            cur = con.execute("select t from t")
4427db96d56Sopenharmony_ci            del cur
4437db96d56Sopenharmony_ci            con.execute("drop table t")
4447db96d56Sopenharmony_ci            con.commit()
4457db96d56Sopenharmony_ci
4467db96d56Sopenharmony_ci    def test_table_lock_cursor_non_readonly_select(self):
4477db96d56Sopenharmony_ci        with memory_database() as con:
4487db96d56Sopenharmony_ci            con.execute("create table t(t)")
4497db96d56Sopenharmony_ci            con.executemany("insert into t values(?)",
4507db96d56Sopenharmony_ci                            ((v,) for v in range(5)))
4517db96d56Sopenharmony_ci            con.commit()
4527db96d56Sopenharmony_ci            def dup(v):
4537db96d56Sopenharmony_ci                con.execute("insert into t values(?)", (v,))
4547db96d56Sopenharmony_ci                return
4557db96d56Sopenharmony_ci            con.create_function("dup", 1, dup)
4567db96d56Sopenharmony_ci            cur = con.execute("select dup(t) from t")
4577db96d56Sopenharmony_ci            del cur
4587db96d56Sopenharmony_ci            con.execute("drop table t")
4597db96d56Sopenharmony_ci            con.commit()
4607db96d56Sopenharmony_ci
4617db96d56Sopenharmony_ci    def test_executescript_step_through_select(self):
4627db96d56Sopenharmony_ci        with memory_database() as con:
4637db96d56Sopenharmony_ci            values = [(v,) for v in range(5)]
4647db96d56Sopenharmony_ci            with con:
4657db96d56Sopenharmony_ci                con.execute("create table t(t)")
4667db96d56Sopenharmony_ci                con.executemany("insert into t values(?)", values)
4677db96d56Sopenharmony_ci            steps = []
4687db96d56Sopenharmony_ci            con.create_function("step", 1, lambda x: steps.append((x,)))
4697db96d56Sopenharmony_ci            con.executescript("select step(t) from t")
4707db96d56Sopenharmony_ci            self.assertEqual(steps, values)
4717db96d56Sopenharmony_ci
4727db96d56Sopenharmony_ci    def test_custom_cursor_object_crash_gh_99886(self):
4737db96d56Sopenharmony_ci        # This test segfaults on GH-99886
4747db96d56Sopenharmony_ci        class MyCursor(sqlite.Cursor):
4757db96d56Sopenharmony_ci            def __init__(self, *args, **kwargs):
4767db96d56Sopenharmony_ci                super().__init__(*args, **kwargs)
4777db96d56Sopenharmony_ci                # this can go before or after the super call; doesn't matter
4787db96d56Sopenharmony_ci                self.some_attr = None
4797db96d56Sopenharmony_ci
4807db96d56Sopenharmony_ci        with memory_database() as con:
4817db96d56Sopenharmony_ci            cur = con.cursor(MyCursor)
4827db96d56Sopenharmony_ci            cur.close()
4837db96d56Sopenharmony_ci            del cur
4847db96d56Sopenharmony_ci
4857db96d56Sopenharmony_ciclass RecursiveUseOfCursors(unittest.TestCase):
4867db96d56Sopenharmony_ci    # GH-80254: sqlite3 should not segfault for recursive use of cursors.
4877db96d56Sopenharmony_ci    msg = "Recursive use of cursors not allowed"
4887db96d56Sopenharmony_ci
4897db96d56Sopenharmony_ci    def setUp(self):
4907db96d56Sopenharmony_ci        self.con = sqlite.connect(":memory:",
4917db96d56Sopenharmony_ci                                  detect_types=sqlite.PARSE_COLNAMES)
4927db96d56Sopenharmony_ci        self.cur = self.con.cursor()
4937db96d56Sopenharmony_ci        self.cur.execute("create table test(x foo)")
4947db96d56Sopenharmony_ci        self.cur.executemany("insert into test(x) values (?)",
4957db96d56Sopenharmony_ci                             [("foo",), ("bar",)])
4967db96d56Sopenharmony_ci
4977db96d56Sopenharmony_ci    def tearDown(self):
4987db96d56Sopenharmony_ci        self.cur.close()
4997db96d56Sopenharmony_ci        self.con.close()
5007db96d56Sopenharmony_ci
5017db96d56Sopenharmony_ci    def test_recursive_cursor_init(self):
5027db96d56Sopenharmony_ci        conv = lambda x: self.cur.__init__(self.con)
5037db96d56Sopenharmony_ci        with patch.dict(sqlite.converters, {"INIT": conv}):
5047db96d56Sopenharmony_ci            self.cur.execute(f'select x as "x [INIT]", x from test')
5057db96d56Sopenharmony_ci            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
5067db96d56Sopenharmony_ci                                   self.cur.fetchall)
5077db96d56Sopenharmony_ci
5087db96d56Sopenharmony_ci    def test_recursive_cursor_close(self):
5097db96d56Sopenharmony_ci        conv = lambda x: self.cur.close()
5107db96d56Sopenharmony_ci        with patch.dict(sqlite.converters, {"CLOSE": conv}):
5117db96d56Sopenharmony_ci            self.cur.execute(f'select x as "x [CLOSE]", x from test')
5127db96d56Sopenharmony_ci            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
5137db96d56Sopenharmony_ci                                   self.cur.fetchall)
5147db96d56Sopenharmony_ci
5157db96d56Sopenharmony_ci    def test_recursive_cursor_iter(self):
5167db96d56Sopenharmony_ci        conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None)
5177db96d56Sopenharmony_ci        with patch.dict(sqlite.converters, {"ITER": conv}):
5187db96d56Sopenharmony_ci            self.cur.execute(f'select x as "x [ITER]", x from test')
5197db96d56Sopenharmony_ci            self.assertRaisesRegex(sqlite.ProgrammingError, self.msg,
5207db96d56Sopenharmony_ci                                   self.cur.fetchall)
5217db96d56Sopenharmony_ci
5227db96d56Sopenharmony_ci
5237db96d56Sopenharmony_ciif __name__ == "__main__":
5247db96d56Sopenharmony_ci    unittest.main()
525