17db96d56Sopenharmony_ci# pysqlite2/test/userfunctions.py: tests for user-defined functions and
27db96d56Sopenharmony_ci#                                  aggregates.
37db96d56Sopenharmony_ci#
47db96d56Sopenharmony_ci# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de>
57db96d56Sopenharmony_ci#
67db96d56Sopenharmony_ci# This file is part of pysqlite.
77db96d56Sopenharmony_ci#
87db96d56Sopenharmony_ci# This software is provided 'as-is', without any express or implied
97db96d56Sopenharmony_ci# warranty.  In no event will the authors be held liable for any damages
107db96d56Sopenharmony_ci# arising from the use of this software.
117db96d56Sopenharmony_ci#
127db96d56Sopenharmony_ci# Permission is granted to anyone to use this software for any purpose,
137db96d56Sopenharmony_ci# including commercial applications, and to alter it and redistribute it
147db96d56Sopenharmony_ci# freely, subject to the following restrictions:
157db96d56Sopenharmony_ci#
167db96d56Sopenharmony_ci# 1. The origin of this software must not be misrepresented; you must not
177db96d56Sopenharmony_ci#    claim that you wrote the original software. If you use this software
187db96d56Sopenharmony_ci#    in a product, an acknowledgment in the product documentation would be
197db96d56Sopenharmony_ci#    appreciated but is not required.
207db96d56Sopenharmony_ci# 2. Altered source versions must be plainly marked as such, and must not be
217db96d56Sopenharmony_ci#    misrepresented as being the original software.
227db96d56Sopenharmony_ci# 3. This notice may not be removed or altered from any source distribution.
237db96d56Sopenharmony_ci
247db96d56Sopenharmony_ciimport contextlib
257db96d56Sopenharmony_ciimport functools
267db96d56Sopenharmony_ciimport io
277db96d56Sopenharmony_ciimport re
287db96d56Sopenharmony_ciimport sys
297db96d56Sopenharmony_ciimport unittest
307db96d56Sopenharmony_ciimport sqlite3 as sqlite
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_cifrom unittest.mock import Mock, patch
337db96d56Sopenharmony_cifrom test.support import bigmemtest, catch_unraisable_exception, gc_collect
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_cifrom test.test_sqlite3.test_dbapi import cx_limit
367db96d56Sopenharmony_ci
377db96d56Sopenharmony_ci
387db96d56Sopenharmony_cidef with_tracebacks(exc, regex="", name=""):
397db96d56Sopenharmony_ci    """Convenience decorator for testing callback tracebacks."""
407db96d56Sopenharmony_ci    def decorator(func):
417db96d56Sopenharmony_ci        _regex = re.compile(regex) if regex else None
427db96d56Sopenharmony_ci        @functools.wraps(func)
437db96d56Sopenharmony_ci        def wrapper(self, *args, **kwargs):
447db96d56Sopenharmony_ci            with catch_unraisable_exception() as cm:
457db96d56Sopenharmony_ci                # First, run the test with traceback enabled.
467db96d56Sopenharmony_ci                with check_tracebacks(self, cm, exc, _regex, name):
477db96d56Sopenharmony_ci                    func(self, *args, **kwargs)
487db96d56Sopenharmony_ci
497db96d56Sopenharmony_ci            # Then run the test with traceback disabled.
507db96d56Sopenharmony_ci            func(self, *args, **kwargs)
517db96d56Sopenharmony_ci        return wrapper
527db96d56Sopenharmony_ci    return decorator
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ci@contextlib.contextmanager
567db96d56Sopenharmony_cidef check_tracebacks(self, cm, exc, regex, obj_name):
577db96d56Sopenharmony_ci    """Convenience context manager for testing callback tracebacks."""
587db96d56Sopenharmony_ci    sqlite.enable_callback_tracebacks(True)
597db96d56Sopenharmony_ci    try:
607db96d56Sopenharmony_ci        buf = io.StringIO()
617db96d56Sopenharmony_ci        with contextlib.redirect_stderr(buf):
627db96d56Sopenharmony_ci            yield
637db96d56Sopenharmony_ci
647db96d56Sopenharmony_ci        self.assertEqual(cm.unraisable.exc_type, exc)
657db96d56Sopenharmony_ci        if regex:
667db96d56Sopenharmony_ci            msg = str(cm.unraisable.exc_value)
677db96d56Sopenharmony_ci            self.assertIsNotNone(regex.search(msg))
687db96d56Sopenharmony_ci        if obj_name:
697db96d56Sopenharmony_ci            self.assertEqual(cm.unraisable.object.__name__, obj_name)
707db96d56Sopenharmony_ci    finally:
717db96d56Sopenharmony_ci        sqlite.enable_callback_tracebacks(False)
727db96d56Sopenharmony_ci
737db96d56Sopenharmony_ci
747db96d56Sopenharmony_cidef func_returntext():
757db96d56Sopenharmony_ci    return "foo"
767db96d56Sopenharmony_cidef func_returntextwithnull():
777db96d56Sopenharmony_ci    return "1\x002"
787db96d56Sopenharmony_cidef func_returnunicode():
797db96d56Sopenharmony_ci    return "bar"
807db96d56Sopenharmony_cidef func_returnint():
817db96d56Sopenharmony_ci    return 42
827db96d56Sopenharmony_cidef func_returnfloat():
837db96d56Sopenharmony_ci    return 3.14
847db96d56Sopenharmony_cidef func_returnnull():
857db96d56Sopenharmony_ci    return None
867db96d56Sopenharmony_cidef func_returnblob():
877db96d56Sopenharmony_ci    return b"blob"
887db96d56Sopenharmony_cidef func_returnlonglong():
897db96d56Sopenharmony_ci    return 1<<31
907db96d56Sopenharmony_cidef func_raiseexception():
917db96d56Sopenharmony_ci    5/0
927db96d56Sopenharmony_cidef func_memoryerror():
937db96d56Sopenharmony_ci    raise MemoryError
947db96d56Sopenharmony_cidef func_overflowerror():
957db96d56Sopenharmony_ci    raise OverflowError
967db96d56Sopenharmony_ci
977db96d56Sopenharmony_ciclass AggrNoStep:
987db96d56Sopenharmony_ci    def __init__(self):
997db96d56Sopenharmony_ci        pass
1007db96d56Sopenharmony_ci
1017db96d56Sopenharmony_ci    def finalize(self):
1027db96d56Sopenharmony_ci        return 1
1037db96d56Sopenharmony_ci
1047db96d56Sopenharmony_ciclass AggrNoFinalize:
1057db96d56Sopenharmony_ci    def __init__(self):
1067db96d56Sopenharmony_ci        pass
1077db96d56Sopenharmony_ci
1087db96d56Sopenharmony_ci    def step(self, x):
1097db96d56Sopenharmony_ci        pass
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ciclass AggrExceptionInInit:
1127db96d56Sopenharmony_ci    def __init__(self):
1137db96d56Sopenharmony_ci        5/0
1147db96d56Sopenharmony_ci
1157db96d56Sopenharmony_ci    def step(self, x):
1167db96d56Sopenharmony_ci        pass
1177db96d56Sopenharmony_ci
1187db96d56Sopenharmony_ci    def finalize(self):
1197db96d56Sopenharmony_ci        pass
1207db96d56Sopenharmony_ci
1217db96d56Sopenharmony_ciclass AggrExceptionInStep:
1227db96d56Sopenharmony_ci    def __init__(self):
1237db96d56Sopenharmony_ci        pass
1247db96d56Sopenharmony_ci
1257db96d56Sopenharmony_ci    def step(self, x):
1267db96d56Sopenharmony_ci        5/0
1277db96d56Sopenharmony_ci
1287db96d56Sopenharmony_ci    def finalize(self):
1297db96d56Sopenharmony_ci        return 42
1307db96d56Sopenharmony_ci
1317db96d56Sopenharmony_ciclass AggrExceptionInFinalize:
1327db96d56Sopenharmony_ci    def __init__(self):
1337db96d56Sopenharmony_ci        pass
1347db96d56Sopenharmony_ci
1357db96d56Sopenharmony_ci    def step(self, x):
1367db96d56Sopenharmony_ci        pass
1377db96d56Sopenharmony_ci
1387db96d56Sopenharmony_ci    def finalize(self):
1397db96d56Sopenharmony_ci        5/0
1407db96d56Sopenharmony_ci
1417db96d56Sopenharmony_ciclass AggrCheckType:
1427db96d56Sopenharmony_ci    def __init__(self):
1437db96d56Sopenharmony_ci        self.val = None
1447db96d56Sopenharmony_ci
1457db96d56Sopenharmony_ci    def step(self, whichType, val):
1467db96d56Sopenharmony_ci        theType = {"str": str, "int": int, "float": float, "None": type(None),
1477db96d56Sopenharmony_ci                   "blob": bytes}
1487db96d56Sopenharmony_ci        self.val = int(theType[whichType] is type(val))
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci    def finalize(self):
1517db96d56Sopenharmony_ci        return self.val
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ciclass AggrCheckTypes:
1547db96d56Sopenharmony_ci    def __init__(self):
1557db96d56Sopenharmony_ci        self.val = 0
1567db96d56Sopenharmony_ci
1577db96d56Sopenharmony_ci    def step(self, whichType, *vals):
1587db96d56Sopenharmony_ci        theType = {"str": str, "int": int, "float": float, "None": type(None),
1597db96d56Sopenharmony_ci                   "blob": bytes}
1607db96d56Sopenharmony_ci        for val in vals:
1617db96d56Sopenharmony_ci            self.val += int(theType[whichType] is type(val))
1627db96d56Sopenharmony_ci
1637db96d56Sopenharmony_ci    def finalize(self):
1647db96d56Sopenharmony_ci        return self.val
1657db96d56Sopenharmony_ci
1667db96d56Sopenharmony_ciclass AggrSum:
1677db96d56Sopenharmony_ci    def __init__(self):
1687db96d56Sopenharmony_ci        self.val = 0.0
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ci    def step(self, val):
1717db96d56Sopenharmony_ci        self.val += val
1727db96d56Sopenharmony_ci
1737db96d56Sopenharmony_ci    def finalize(self):
1747db96d56Sopenharmony_ci        return self.val
1757db96d56Sopenharmony_ci
1767db96d56Sopenharmony_ciclass AggrText:
1777db96d56Sopenharmony_ci    def __init__(self):
1787db96d56Sopenharmony_ci        self.txt = ""
1797db96d56Sopenharmony_ci    def step(self, txt):
1807db96d56Sopenharmony_ci        self.txt = self.txt + txt
1817db96d56Sopenharmony_ci    def finalize(self):
1827db96d56Sopenharmony_ci        return self.txt
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci
1857db96d56Sopenharmony_ciclass FunctionTests(unittest.TestCase):
1867db96d56Sopenharmony_ci    def setUp(self):
1877db96d56Sopenharmony_ci        self.con = sqlite.connect(":memory:")
1887db96d56Sopenharmony_ci
1897db96d56Sopenharmony_ci        self.con.create_function("returntext", 0, func_returntext)
1907db96d56Sopenharmony_ci        self.con.create_function("returntextwithnull", 0, func_returntextwithnull)
1917db96d56Sopenharmony_ci        self.con.create_function("returnunicode", 0, func_returnunicode)
1927db96d56Sopenharmony_ci        self.con.create_function("returnint", 0, func_returnint)
1937db96d56Sopenharmony_ci        self.con.create_function("returnfloat", 0, func_returnfloat)
1947db96d56Sopenharmony_ci        self.con.create_function("returnnull", 0, func_returnnull)
1957db96d56Sopenharmony_ci        self.con.create_function("returnblob", 0, func_returnblob)
1967db96d56Sopenharmony_ci        self.con.create_function("returnlonglong", 0, func_returnlonglong)
1977db96d56Sopenharmony_ci        self.con.create_function("returnnan", 0, lambda: float("nan"))
1987db96d56Sopenharmony_ci        self.con.create_function("returntoolargeint", 0, lambda: 1 << 65)
1997db96d56Sopenharmony_ci        self.con.create_function("return_noncont_blob", 0,
2007db96d56Sopenharmony_ci                                 lambda: memoryview(b"blob")[::2])
2017db96d56Sopenharmony_ci        self.con.create_function("raiseexception", 0, func_raiseexception)
2027db96d56Sopenharmony_ci        self.con.create_function("memoryerror", 0, func_memoryerror)
2037db96d56Sopenharmony_ci        self.con.create_function("overflowerror", 0, func_overflowerror)
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci        self.con.create_function("isblob", 1, lambda x: isinstance(x, bytes))
2067db96d56Sopenharmony_ci        self.con.create_function("isnone", 1, lambda x: x is None)
2077db96d56Sopenharmony_ci        self.con.create_function("spam", -1, lambda *x: len(x))
2087db96d56Sopenharmony_ci        self.con.execute("create table test(t text)")
2097db96d56Sopenharmony_ci
2107db96d56Sopenharmony_ci    def tearDown(self):
2117db96d56Sopenharmony_ci        self.con.close()
2127db96d56Sopenharmony_ci
2137db96d56Sopenharmony_ci    def test_func_error_on_create(self):
2147db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError):
2157db96d56Sopenharmony_ci            self.con.create_function("bla", -100, lambda x: 2*x)
2167db96d56Sopenharmony_ci
2177db96d56Sopenharmony_ci    def test_func_too_many_args(self):
2187db96d56Sopenharmony_ci        category = sqlite.SQLITE_LIMIT_FUNCTION_ARG
2197db96d56Sopenharmony_ci        msg = "too many arguments on function"
2207db96d56Sopenharmony_ci        with cx_limit(self.con, category=category, limit=1):
2217db96d56Sopenharmony_ci            self.con.execute("select abs(-1)");
2227db96d56Sopenharmony_ci            with self.assertRaisesRegex(sqlite.OperationalError, msg):
2237db96d56Sopenharmony_ci                self.con.execute("select max(1, 2)");
2247db96d56Sopenharmony_ci
2257db96d56Sopenharmony_ci    def test_func_ref_count(self):
2267db96d56Sopenharmony_ci        def getfunc():
2277db96d56Sopenharmony_ci            def f():
2287db96d56Sopenharmony_ci                return 1
2297db96d56Sopenharmony_ci            return f
2307db96d56Sopenharmony_ci        f = getfunc()
2317db96d56Sopenharmony_ci        globals()["foo"] = f
2327db96d56Sopenharmony_ci        # self.con.create_function("reftest", 0, getfunc())
2337db96d56Sopenharmony_ci        self.con.create_function("reftest", 0, f)
2347db96d56Sopenharmony_ci        cur = self.con.cursor()
2357db96d56Sopenharmony_ci        cur.execute("select reftest()")
2367db96d56Sopenharmony_ci
2377db96d56Sopenharmony_ci    def test_func_return_text(self):
2387db96d56Sopenharmony_ci        cur = self.con.cursor()
2397db96d56Sopenharmony_ci        cur.execute("select returntext()")
2407db96d56Sopenharmony_ci        val = cur.fetchone()[0]
2417db96d56Sopenharmony_ci        self.assertEqual(type(val), str)
2427db96d56Sopenharmony_ci        self.assertEqual(val, "foo")
2437db96d56Sopenharmony_ci
2447db96d56Sopenharmony_ci    def test_func_return_text_with_null_char(self):
2457db96d56Sopenharmony_ci        cur = self.con.cursor()
2467db96d56Sopenharmony_ci        res = cur.execute("select returntextwithnull()").fetchone()[0]
2477db96d56Sopenharmony_ci        self.assertEqual(type(res), str)
2487db96d56Sopenharmony_ci        self.assertEqual(res, "1\x002")
2497db96d56Sopenharmony_ci
2507db96d56Sopenharmony_ci    def test_func_return_unicode(self):
2517db96d56Sopenharmony_ci        cur = self.con.cursor()
2527db96d56Sopenharmony_ci        cur.execute("select returnunicode()")
2537db96d56Sopenharmony_ci        val = cur.fetchone()[0]
2547db96d56Sopenharmony_ci        self.assertEqual(type(val), str)
2557db96d56Sopenharmony_ci        self.assertEqual(val, "bar")
2567db96d56Sopenharmony_ci
2577db96d56Sopenharmony_ci    def test_func_return_int(self):
2587db96d56Sopenharmony_ci        cur = self.con.cursor()
2597db96d56Sopenharmony_ci        cur.execute("select returnint()")
2607db96d56Sopenharmony_ci        val = cur.fetchone()[0]
2617db96d56Sopenharmony_ci        self.assertEqual(type(val), int)
2627db96d56Sopenharmony_ci        self.assertEqual(val, 42)
2637db96d56Sopenharmony_ci
2647db96d56Sopenharmony_ci    def test_func_return_float(self):
2657db96d56Sopenharmony_ci        cur = self.con.cursor()
2667db96d56Sopenharmony_ci        cur.execute("select returnfloat()")
2677db96d56Sopenharmony_ci        val = cur.fetchone()[0]
2687db96d56Sopenharmony_ci        self.assertEqual(type(val), float)
2697db96d56Sopenharmony_ci        if val < 3.139 or val > 3.141:
2707db96d56Sopenharmony_ci            self.fail("wrong value")
2717db96d56Sopenharmony_ci
2727db96d56Sopenharmony_ci    def test_func_return_null(self):
2737db96d56Sopenharmony_ci        cur = self.con.cursor()
2747db96d56Sopenharmony_ci        cur.execute("select returnnull()")
2757db96d56Sopenharmony_ci        val = cur.fetchone()[0]
2767db96d56Sopenharmony_ci        self.assertEqual(type(val), type(None))
2777db96d56Sopenharmony_ci        self.assertEqual(val, None)
2787db96d56Sopenharmony_ci
2797db96d56Sopenharmony_ci    def test_func_return_blob(self):
2807db96d56Sopenharmony_ci        cur = self.con.cursor()
2817db96d56Sopenharmony_ci        cur.execute("select returnblob()")
2827db96d56Sopenharmony_ci        val = cur.fetchone()[0]
2837db96d56Sopenharmony_ci        self.assertEqual(type(val), bytes)
2847db96d56Sopenharmony_ci        self.assertEqual(val, b"blob")
2857db96d56Sopenharmony_ci
2867db96d56Sopenharmony_ci    def test_func_return_long_long(self):
2877db96d56Sopenharmony_ci        cur = self.con.cursor()
2887db96d56Sopenharmony_ci        cur.execute("select returnlonglong()")
2897db96d56Sopenharmony_ci        val = cur.fetchone()[0]
2907db96d56Sopenharmony_ci        self.assertEqual(val, 1<<31)
2917db96d56Sopenharmony_ci
2927db96d56Sopenharmony_ci    def test_func_return_nan(self):
2937db96d56Sopenharmony_ci        cur = self.con.cursor()
2947db96d56Sopenharmony_ci        cur.execute("select returnnan()")
2957db96d56Sopenharmony_ci        self.assertIsNone(cur.fetchone()[0])
2967db96d56Sopenharmony_ci
2977db96d56Sopenharmony_ci    def test_func_return_too_large_int(self):
2987db96d56Sopenharmony_ci        cur = self.con.cursor()
2997db96d56Sopenharmony_ci        self.assertRaisesRegex(sqlite.DataError, "string or blob too big",
3007db96d56Sopenharmony_ci                               self.con.execute, "select returntoolargeint()")
3017db96d56Sopenharmony_ci
3027db96d56Sopenharmony_ci    @with_tracebacks(ZeroDivisionError, name="func_raiseexception")
3037db96d56Sopenharmony_ci    def test_func_exception(self):
3047db96d56Sopenharmony_ci        cur = self.con.cursor()
3057db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError) as cm:
3067db96d56Sopenharmony_ci            cur.execute("select raiseexception()")
3077db96d56Sopenharmony_ci            cur.fetchone()
3087db96d56Sopenharmony_ci        self.assertEqual(str(cm.exception), 'user-defined function raised exception')
3097db96d56Sopenharmony_ci
3107db96d56Sopenharmony_ci    @with_tracebacks(MemoryError, name="func_memoryerror")
3117db96d56Sopenharmony_ci    def test_func_memory_error(self):
3127db96d56Sopenharmony_ci        cur = self.con.cursor()
3137db96d56Sopenharmony_ci        with self.assertRaises(MemoryError):
3147db96d56Sopenharmony_ci            cur.execute("select memoryerror()")
3157db96d56Sopenharmony_ci            cur.fetchone()
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ci    @with_tracebacks(OverflowError, name="func_overflowerror")
3187db96d56Sopenharmony_ci    def test_func_overflow_error(self):
3197db96d56Sopenharmony_ci        cur = self.con.cursor()
3207db96d56Sopenharmony_ci        with self.assertRaises(sqlite.DataError):
3217db96d56Sopenharmony_ci            cur.execute("select overflowerror()")
3227db96d56Sopenharmony_ci            cur.fetchone()
3237db96d56Sopenharmony_ci
3247db96d56Sopenharmony_ci    def test_any_arguments(self):
3257db96d56Sopenharmony_ci        cur = self.con.cursor()
3267db96d56Sopenharmony_ci        cur.execute("select spam(?, ?)", (1, 2))
3277db96d56Sopenharmony_ci        val = cur.fetchone()[0]
3287db96d56Sopenharmony_ci        self.assertEqual(val, 2)
3297db96d56Sopenharmony_ci
3307db96d56Sopenharmony_ci    def test_empty_blob(self):
3317db96d56Sopenharmony_ci        cur = self.con.execute("select isblob(x'')")
3327db96d56Sopenharmony_ci        self.assertTrue(cur.fetchone()[0])
3337db96d56Sopenharmony_ci
3347db96d56Sopenharmony_ci    def test_nan_float(self):
3357db96d56Sopenharmony_ci        cur = self.con.execute("select isnone(?)", (float("nan"),))
3367db96d56Sopenharmony_ci        # SQLite has no concept of nan; it is converted to NULL
3377db96d56Sopenharmony_ci        self.assertTrue(cur.fetchone()[0])
3387db96d56Sopenharmony_ci
3397db96d56Sopenharmony_ci    def test_too_large_int(self):
3407db96d56Sopenharmony_ci        err = "Python int too large to convert to SQLite INTEGER"
3417db96d56Sopenharmony_ci        self.assertRaisesRegex(OverflowError, err, self.con.execute,
3427db96d56Sopenharmony_ci                               "select spam(?)", (1 << 65,))
3437db96d56Sopenharmony_ci
3447db96d56Sopenharmony_ci    def test_non_contiguous_blob(self):
3457db96d56Sopenharmony_ci        self.assertRaisesRegex(BufferError,
3467db96d56Sopenharmony_ci                               "underlying buffer is not C-contiguous",
3477db96d56Sopenharmony_ci                               self.con.execute, "select spam(?)",
3487db96d56Sopenharmony_ci                               (memoryview(b"blob")[::2],))
3497db96d56Sopenharmony_ci
3507db96d56Sopenharmony_ci    @with_tracebacks(BufferError, regex="buffer.*contiguous")
3517db96d56Sopenharmony_ci    def test_return_non_contiguous_blob(self):
3527db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError):
3537db96d56Sopenharmony_ci            cur = self.con.execute("select return_noncont_blob()")
3547db96d56Sopenharmony_ci            cur.fetchone()
3557db96d56Sopenharmony_ci
3567db96d56Sopenharmony_ci    def test_param_surrogates(self):
3577db96d56Sopenharmony_ci        self.assertRaisesRegex(UnicodeEncodeError, "surrogates not allowed",
3587db96d56Sopenharmony_ci                               self.con.execute, "select spam(?)",
3597db96d56Sopenharmony_ci                               ("\ud803\ude6d",))
3607db96d56Sopenharmony_ci
3617db96d56Sopenharmony_ci    def test_func_params(self):
3627db96d56Sopenharmony_ci        results = []
3637db96d56Sopenharmony_ci        def append_result(arg):
3647db96d56Sopenharmony_ci            results.append((arg, type(arg)))
3657db96d56Sopenharmony_ci        self.con.create_function("test_params", 1, append_result)
3667db96d56Sopenharmony_ci
3677db96d56Sopenharmony_ci        dataset = [
3687db96d56Sopenharmony_ci            (42, int),
3697db96d56Sopenharmony_ci            (-1, int),
3707db96d56Sopenharmony_ci            (1234567890123456789, int),
3717db96d56Sopenharmony_ci            (4611686018427387905, int),  # 63-bit int with non-zero low bits
3727db96d56Sopenharmony_ci            (3.14, float),
3737db96d56Sopenharmony_ci            (float('inf'), float),
3747db96d56Sopenharmony_ci            ("text", str),
3757db96d56Sopenharmony_ci            ("1\x002", str),
3767db96d56Sopenharmony_ci            ("\u02e2q\u02e1\u2071\u1d57\u1d49", str),
3777db96d56Sopenharmony_ci            (b"blob", bytes),
3787db96d56Sopenharmony_ci            (bytearray(range(2)), bytes),
3797db96d56Sopenharmony_ci            (memoryview(b"blob"), bytes),
3807db96d56Sopenharmony_ci            (None, type(None)),
3817db96d56Sopenharmony_ci        ]
3827db96d56Sopenharmony_ci        for val, _ in dataset:
3837db96d56Sopenharmony_ci            cur = self.con.execute("select test_params(?)", (val,))
3847db96d56Sopenharmony_ci            cur.fetchone()
3857db96d56Sopenharmony_ci        self.assertEqual(dataset, results)
3867db96d56Sopenharmony_ci
3877db96d56Sopenharmony_ci    # Regarding deterministic functions:
3887db96d56Sopenharmony_ci    #
3897db96d56Sopenharmony_ci    # Between 3.8.3 and 3.15.0, deterministic functions were only used to
3907db96d56Sopenharmony_ci    # optimize inner loops, so for those versions we can only test if the
3917db96d56Sopenharmony_ci    # sqlite machinery has factored out a call or not. From 3.15.0 and onward,
3927db96d56Sopenharmony_ci    # deterministic functions were permitted in WHERE clauses of partial
3937db96d56Sopenharmony_ci    # indices, which allows testing based on syntax, iso. the query optimizer.
3947db96d56Sopenharmony_ci    @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher")
3957db96d56Sopenharmony_ci    def test_func_non_deterministic(self):
3967db96d56Sopenharmony_ci        mock = Mock(return_value=None)
3977db96d56Sopenharmony_ci        self.con.create_function("nondeterministic", 0, mock, deterministic=False)
3987db96d56Sopenharmony_ci        if sqlite.sqlite_version_info < (3, 15, 0):
3997db96d56Sopenharmony_ci            self.con.execute("select nondeterministic() = nondeterministic()")
4007db96d56Sopenharmony_ci            self.assertEqual(mock.call_count, 2)
4017db96d56Sopenharmony_ci        else:
4027db96d56Sopenharmony_ci            with self.assertRaises(sqlite.OperationalError):
4037db96d56Sopenharmony_ci                self.con.execute("create index t on test(t) where nondeterministic() is not null")
4047db96d56Sopenharmony_ci
4057db96d56Sopenharmony_ci    @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher")
4067db96d56Sopenharmony_ci    def test_func_deterministic(self):
4077db96d56Sopenharmony_ci        mock = Mock(return_value=None)
4087db96d56Sopenharmony_ci        self.con.create_function("deterministic", 0, mock, deterministic=True)
4097db96d56Sopenharmony_ci        if sqlite.sqlite_version_info < (3, 15, 0):
4107db96d56Sopenharmony_ci            self.con.execute("select deterministic() = deterministic()")
4117db96d56Sopenharmony_ci            self.assertEqual(mock.call_count, 1)
4127db96d56Sopenharmony_ci        else:
4137db96d56Sopenharmony_ci            try:
4147db96d56Sopenharmony_ci                self.con.execute("create index t on test(t) where deterministic() is not null")
4157db96d56Sopenharmony_ci            except sqlite.OperationalError:
4167db96d56Sopenharmony_ci                self.fail("Unexpected failure while creating partial index")
4177db96d56Sopenharmony_ci
4187db96d56Sopenharmony_ci    @unittest.skipIf(sqlite.sqlite_version_info >= (3, 8, 3), "SQLite < 3.8.3 needed")
4197db96d56Sopenharmony_ci    def test_func_deterministic_not_supported(self):
4207db96d56Sopenharmony_ci        with self.assertRaises(sqlite.NotSupportedError):
4217db96d56Sopenharmony_ci            self.con.create_function("deterministic", 0, int, deterministic=True)
4227db96d56Sopenharmony_ci
4237db96d56Sopenharmony_ci    def test_func_deterministic_keyword_only(self):
4247db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
4257db96d56Sopenharmony_ci            self.con.create_function("deterministic", 0, int, True)
4267db96d56Sopenharmony_ci
4277db96d56Sopenharmony_ci    def test_function_destructor_via_gc(self):
4287db96d56Sopenharmony_ci        # See bpo-44304: The destructor of the user function can
4297db96d56Sopenharmony_ci        # crash if is called without the GIL from the gc functions
4307db96d56Sopenharmony_ci        dest = sqlite.connect(':memory:')
4317db96d56Sopenharmony_ci        def md5sum(t):
4327db96d56Sopenharmony_ci            return
4337db96d56Sopenharmony_ci
4347db96d56Sopenharmony_ci        dest.create_function("md5", 1, md5sum)
4357db96d56Sopenharmony_ci        x = dest("create table lang (name, first_appeared)")
4367db96d56Sopenharmony_ci        del md5sum, dest
4377db96d56Sopenharmony_ci
4387db96d56Sopenharmony_ci        y = [x]
4397db96d56Sopenharmony_ci        y.append(y)
4407db96d56Sopenharmony_ci
4417db96d56Sopenharmony_ci        del x,y
4427db96d56Sopenharmony_ci        gc_collect()
4437db96d56Sopenharmony_ci
4447db96d56Sopenharmony_ci    @with_tracebacks(OverflowError)
4457db96d56Sopenharmony_ci    def test_func_return_too_large_int(self):
4467db96d56Sopenharmony_ci        cur = self.con.cursor()
4477db96d56Sopenharmony_ci        for value in 2**63, -2**63-1, 2**64:
4487db96d56Sopenharmony_ci            self.con.create_function("largeint", 0, lambda value=value: value)
4497db96d56Sopenharmony_ci            with self.assertRaises(sqlite.DataError):
4507db96d56Sopenharmony_ci                cur.execute("select largeint()")
4517db96d56Sopenharmony_ci
4527db96d56Sopenharmony_ci    @with_tracebacks(UnicodeEncodeError, "surrogates not allowed", "chr")
4537db96d56Sopenharmony_ci    def test_func_return_text_with_surrogates(self):
4547db96d56Sopenharmony_ci        cur = self.con.cursor()
4557db96d56Sopenharmony_ci        self.con.create_function("pychr", 1, chr)
4567db96d56Sopenharmony_ci        for value in 0xd8ff, 0xdcff:
4577db96d56Sopenharmony_ci            with self.assertRaises(sqlite.OperationalError):
4587db96d56Sopenharmony_ci                cur.execute("select pychr(?)", (value,))
4597db96d56Sopenharmony_ci
4607db96d56Sopenharmony_ci    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
4617db96d56Sopenharmony_ci    @bigmemtest(size=2**31, memuse=3, dry_run=False)
4627db96d56Sopenharmony_ci    def test_func_return_too_large_text(self, size):
4637db96d56Sopenharmony_ci        cur = self.con.cursor()
4647db96d56Sopenharmony_ci        for size in 2**31-1, 2**31:
4657db96d56Sopenharmony_ci            self.con.create_function("largetext", 0, lambda size=size: "b" * size)
4667db96d56Sopenharmony_ci            with self.assertRaises(sqlite.DataError):
4677db96d56Sopenharmony_ci                cur.execute("select largetext()")
4687db96d56Sopenharmony_ci
4697db96d56Sopenharmony_ci    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
4707db96d56Sopenharmony_ci    @bigmemtest(size=2**31, memuse=2, dry_run=False)
4717db96d56Sopenharmony_ci    def test_func_return_too_large_blob(self, size):
4727db96d56Sopenharmony_ci        cur = self.con.cursor()
4737db96d56Sopenharmony_ci        for size in 2**31-1, 2**31:
4747db96d56Sopenharmony_ci            self.con.create_function("largeblob", 0, lambda size=size: b"b" * size)
4757db96d56Sopenharmony_ci            with self.assertRaises(sqlite.DataError):
4767db96d56Sopenharmony_ci                cur.execute("select largeblob()")
4777db96d56Sopenharmony_ci
4787db96d56Sopenharmony_ci    def test_func_return_illegal_value(self):
4797db96d56Sopenharmony_ci        self.con.create_function("badreturn", 0, lambda: self)
4807db96d56Sopenharmony_ci        msg = "user-defined function raised exception"
4817db96d56Sopenharmony_ci        self.assertRaisesRegex(sqlite.OperationalError, msg,
4827db96d56Sopenharmony_ci                               self.con.execute, "select badreturn()")
4837db96d56Sopenharmony_ci
4847db96d56Sopenharmony_ci
4857db96d56Sopenharmony_ciclass WindowSumInt:
4867db96d56Sopenharmony_ci    def __init__(self):
4877db96d56Sopenharmony_ci        self.count = 0
4887db96d56Sopenharmony_ci
4897db96d56Sopenharmony_ci    def step(self, value):
4907db96d56Sopenharmony_ci        self.count += value
4917db96d56Sopenharmony_ci
4927db96d56Sopenharmony_ci    def value(self):
4937db96d56Sopenharmony_ci        return self.count
4947db96d56Sopenharmony_ci
4957db96d56Sopenharmony_ci    def inverse(self, value):
4967db96d56Sopenharmony_ci        self.count -= value
4977db96d56Sopenharmony_ci
4987db96d56Sopenharmony_ci    def finalize(self):
4997db96d56Sopenharmony_ci        return self.count
5007db96d56Sopenharmony_ci
5017db96d56Sopenharmony_ciclass BadWindow(Exception):
5027db96d56Sopenharmony_ci    pass
5037db96d56Sopenharmony_ci
5047db96d56Sopenharmony_ci
5057db96d56Sopenharmony_ci@unittest.skipIf(sqlite.sqlite_version_info < (3, 25, 0),
5067db96d56Sopenharmony_ci                 "Requires SQLite 3.25.0 or newer")
5077db96d56Sopenharmony_ciclass WindowFunctionTests(unittest.TestCase):
5087db96d56Sopenharmony_ci    def setUp(self):
5097db96d56Sopenharmony_ci        self.con = sqlite.connect(":memory:")
5107db96d56Sopenharmony_ci        self.cur = self.con.cursor()
5117db96d56Sopenharmony_ci
5127db96d56Sopenharmony_ci        # Test case taken from https://www.sqlite.org/windowfunctions.html#udfwinfunc
5137db96d56Sopenharmony_ci        values = [
5147db96d56Sopenharmony_ci            ("a", 4),
5157db96d56Sopenharmony_ci            ("b", 5),
5167db96d56Sopenharmony_ci            ("c", 3),
5177db96d56Sopenharmony_ci            ("d", 8),
5187db96d56Sopenharmony_ci            ("e", 1),
5197db96d56Sopenharmony_ci        ]
5207db96d56Sopenharmony_ci        with self.con:
5217db96d56Sopenharmony_ci            self.con.execute("create table test(x, y)")
5227db96d56Sopenharmony_ci            self.con.executemany("insert into test values(?, ?)", values)
5237db96d56Sopenharmony_ci        self.expected = [
5247db96d56Sopenharmony_ci            ("a", 9),
5257db96d56Sopenharmony_ci            ("b", 12),
5267db96d56Sopenharmony_ci            ("c", 16),
5277db96d56Sopenharmony_ci            ("d", 12),
5287db96d56Sopenharmony_ci            ("e", 9),
5297db96d56Sopenharmony_ci        ]
5307db96d56Sopenharmony_ci        self.query = """
5317db96d56Sopenharmony_ci            select x, %s(y) over (
5327db96d56Sopenharmony_ci                order by x rows between 1 preceding and 1 following
5337db96d56Sopenharmony_ci            ) as sum_y
5347db96d56Sopenharmony_ci            from test order by x
5357db96d56Sopenharmony_ci        """
5367db96d56Sopenharmony_ci        self.con.create_window_function("sumint", 1, WindowSumInt)
5377db96d56Sopenharmony_ci
5387db96d56Sopenharmony_ci    def test_win_sum_int(self):
5397db96d56Sopenharmony_ci        self.cur.execute(self.query % "sumint")
5407db96d56Sopenharmony_ci        self.assertEqual(self.cur.fetchall(), self.expected)
5417db96d56Sopenharmony_ci
5427db96d56Sopenharmony_ci    def test_win_error_on_create(self):
5437db96d56Sopenharmony_ci        self.assertRaises(sqlite.ProgrammingError,
5447db96d56Sopenharmony_ci                          self.con.create_window_function,
5457db96d56Sopenharmony_ci                          "shouldfail", -100, WindowSumInt)
5467db96d56Sopenharmony_ci
5477db96d56Sopenharmony_ci    @with_tracebacks(BadWindow)
5487db96d56Sopenharmony_ci    def test_win_exception_in_method(self):
5497db96d56Sopenharmony_ci        for meth in "__init__", "step", "value", "inverse":
5507db96d56Sopenharmony_ci            with self.subTest(meth=meth):
5517db96d56Sopenharmony_ci                with patch.object(WindowSumInt, meth, side_effect=BadWindow):
5527db96d56Sopenharmony_ci                    name = f"exc_{meth}"
5537db96d56Sopenharmony_ci                    self.con.create_window_function(name, 1, WindowSumInt)
5547db96d56Sopenharmony_ci                    msg = f"'{meth}' method raised error"
5557db96d56Sopenharmony_ci                    with self.assertRaisesRegex(sqlite.OperationalError, msg):
5567db96d56Sopenharmony_ci                        self.cur.execute(self.query % name)
5577db96d56Sopenharmony_ci                        self.cur.fetchall()
5587db96d56Sopenharmony_ci
5597db96d56Sopenharmony_ci    @with_tracebacks(BadWindow)
5607db96d56Sopenharmony_ci    def test_win_exception_in_finalize(self):
5617db96d56Sopenharmony_ci        # Note: SQLite does not (as of version 3.38.0) propagate finalize
5627db96d56Sopenharmony_ci        # callback errors to sqlite3_step(); this implies that OperationalError
5637db96d56Sopenharmony_ci        # is _not_ raised.
5647db96d56Sopenharmony_ci        with patch.object(WindowSumInt, "finalize", side_effect=BadWindow):
5657db96d56Sopenharmony_ci            name = f"exception_in_finalize"
5667db96d56Sopenharmony_ci            self.con.create_window_function(name, 1, WindowSumInt)
5677db96d56Sopenharmony_ci            self.cur.execute(self.query % name)
5687db96d56Sopenharmony_ci            self.cur.fetchall()
5697db96d56Sopenharmony_ci
5707db96d56Sopenharmony_ci    @with_tracebacks(AttributeError)
5717db96d56Sopenharmony_ci    def test_win_missing_method(self):
5727db96d56Sopenharmony_ci        class MissingValue:
5737db96d56Sopenharmony_ci            def step(self, x): pass
5747db96d56Sopenharmony_ci            def inverse(self, x): pass
5757db96d56Sopenharmony_ci            def finalize(self): return 42
5767db96d56Sopenharmony_ci
5777db96d56Sopenharmony_ci        class MissingInverse:
5787db96d56Sopenharmony_ci            def step(self, x): pass
5797db96d56Sopenharmony_ci            def value(self): return 42
5807db96d56Sopenharmony_ci            def finalize(self): return 42
5817db96d56Sopenharmony_ci
5827db96d56Sopenharmony_ci        class MissingStep:
5837db96d56Sopenharmony_ci            def value(self): return 42
5847db96d56Sopenharmony_ci            def inverse(self, x): pass
5857db96d56Sopenharmony_ci            def finalize(self): return 42
5867db96d56Sopenharmony_ci
5877db96d56Sopenharmony_ci        dataset = (
5887db96d56Sopenharmony_ci            ("step", MissingStep),
5897db96d56Sopenharmony_ci            ("value", MissingValue),
5907db96d56Sopenharmony_ci            ("inverse", MissingInverse),
5917db96d56Sopenharmony_ci        )
5927db96d56Sopenharmony_ci        for meth, cls in dataset:
5937db96d56Sopenharmony_ci            with self.subTest(meth=meth, cls=cls):
5947db96d56Sopenharmony_ci                name = f"exc_{meth}"
5957db96d56Sopenharmony_ci                self.con.create_window_function(name, 1, cls)
5967db96d56Sopenharmony_ci                with self.assertRaisesRegex(sqlite.OperationalError,
5977db96d56Sopenharmony_ci                                            f"'{meth}' method not defined"):
5987db96d56Sopenharmony_ci                    self.cur.execute(self.query % name)
5997db96d56Sopenharmony_ci                    self.cur.fetchall()
6007db96d56Sopenharmony_ci
6017db96d56Sopenharmony_ci    @with_tracebacks(AttributeError)
6027db96d56Sopenharmony_ci    def test_win_missing_finalize(self):
6037db96d56Sopenharmony_ci        # Note: SQLite does not (as of version 3.38.0) propagate finalize
6047db96d56Sopenharmony_ci        # callback errors to sqlite3_step(); this implies that OperationalError
6057db96d56Sopenharmony_ci        # is _not_ raised.
6067db96d56Sopenharmony_ci        class MissingFinalize:
6077db96d56Sopenharmony_ci            def step(self, x): pass
6087db96d56Sopenharmony_ci            def value(self): return 42
6097db96d56Sopenharmony_ci            def inverse(self, x): pass
6107db96d56Sopenharmony_ci
6117db96d56Sopenharmony_ci        name = "missing_finalize"
6127db96d56Sopenharmony_ci        self.con.create_window_function(name, 1, MissingFinalize)
6137db96d56Sopenharmony_ci        self.cur.execute(self.query % name)
6147db96d56Sopenharmony_ci        self.cur.fetchall()
6157db96d56Sopenharmony_ci
6167db96d56Sopenharmony_ci    def test_win_clear_function(self):
6177db96d56Sopenharmony_ci        self.con.create_window_function("sumint", 1, None)
6187db96d56Sopenharmony_ci        self.assertRaises(sqlite.OperationalError, self.cur.execute,
6197db96d56Sopenharmony_ci                          self.query % "sumint")
6207db96d56Sopenharmony_ci
6217db96d56Sopenharmony_ci    def test_win_redefine_function(self):
6227db96d56Sopenharmony_ci        # Redefine WindowSumInt; adjust the expected results accordingly.
6237db96d56Sopenharmony_ci        class Redefined(WindowSumInt):
6247db96d56Sopenharmony_ci            def step(self, value): self.count += value * 2
6257db96d56Sopenharmony_ci            def inverse(self, value): self.count -= value * 2
6267db96d56Sopenharmony_ci        expected = [(v[0], v[1]*2) for v in self.expected]
6277db96d56Sopenharmony_ci
6287db96d56Sopenharmony_ci        self.con.create_window_function("sumint", 1, Redefined)
6297db96d56Sopenharmony_ci        self.cur.execute(self.query % "sumint")
6307db96d56Sopenharmony_ci        self.assertEqual(self.cur.fetchall(), expected)
6317db96d56Sopenharmony_ci
6327db96d56Sopenharmony_ci    def test_win_error_value_return(self):
6337db96d56Sopenharmony_ci        class ErrorValueReturn:
6347db96d56Sopenharmony_ci            def __init__(self): pass
6357db96d56Sopenharmony_ci            def step(self, x): pass
6367db96d56Sopenharmony_ci            def value(self): return 1 << 65
6377db96d56Sopenharmony_ci
6387db96d56Sopenharmony_ci        self.con.create_window_function("err_val_ret", 1, ErrorValueReturn)
6397db96d56Sopenharmony_ci        self.assertRaisesRegex(sqlite.DataError, "string or blob too big",
6407db96d56Sopenharmony_ci                               self.cur.execute, self.query % "err_val_ret")
6417db96d56Sopenharmony_ci
6427db96d56Sopenharmony_ci
6437db96d56Sopenharmony_ciclass AggregateTests(unittest.TestCase):
6447db96d56Sopenharmony_ci    def setUp(self):
6457db96d56Sopenharmony_ci        self.con = sqlite.connect(":memory:")
6467db96d56Sopenharmony_ci        cur = self.con.cursor()
6477db96d56Sopenharmony_ci        cur.execute("""
6487db96d56Sopenharmony_ci            create table test(
6497db96d56Sopenharmony_ci                t text,
6507db96d56Sopenharmony_ci                i integer,
6517db96d56Sopenharmony_ci                f float,
6527db96d56Sopenharmony_ci                n,
6537db96d56Sopenharmony_ci                b blob
6547db96d56Sopenharmony_ci                )
6557db96d56Sopenharmony_ci            """)
6567db96d56Sopenharmony_ci        cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)",
6577db96d56Sopenharmony_ci            ("foo", 5, 3.14, None, memoryview(b"blob"),))
6587db96d56Sopenharmony_ci
6597db96d56Sopenharmony_ci        self.con.create_aggregate("nostep", 1, AggrNoStep)
6607db96d56Sopenharmony_ci        self.con.create_aggregate("nofinalize", 1, AggrNoFinalize)
6617db96d56Sopenharmony_ci        self.con.create_aggregate("excInit", 1, AggrExceptionInInit)
6627db96d56Sopenharmony_ci        self.con.create_aggregate("excStep", 1, AggrExceptionInStep)
6637db96d56Sopenharmony_ci        self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize)
6647db96d56Sopenharmony_ci        self.con.create_aggregate("checkType", 2, AggrCheckType)
6657db96d56Sopenharmony_ci        self.con.create_aggregate("checkTypes", -1, AggrCheckTypes)
6667db96d56Sopenharmony_ci        self.con.create_aggregate("mysum", 1, AggrSum)
6677db96d56Sopenharmony_ci        self.con.create_aggregate("aggtxt", 1, AggrText)
6687db96d56Sopenharmony_ci
6697db96d56Sopenharmony_ci    def tearDown(self):
6707db96d56Sopenharmony_ci        #self.cur.close()
6717db96d56Sopenharmony_ci        #self.con.close()
6727db96d56Sopenharmony_ci        pass
6737db96d56Sopenharmony_ci
6747db96d56Sopenharmony_ci    def test_aggr_error_on_create(self):
6757db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError):
6767db96d56Sopenharmony_ci            self.con.create_function("bla", -100, AggrSum)
6777db96d56Sopenharmony_ci
6787db96d56Sopenharmony_ci    @with_tracebacks(AttributeError, name="AggrNoStep")
6797db96d56Sopenharmony_ci    def test_aggr_no_step(self):
6807db96d56Sopenharmony_ci        cur = self.con.cursor()
6817db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError) as cm:
6827db96d56Sopenharmony_ci            cur.execute("select nostep(t) from test")
6837db96d56Sopenharmony_ci        self.assertEqual(str(cm.exception),
6847db96d56Sopenharmony_ci                         "user-defined aggregate's 'step' method not defined")
6857db96d56Sopenharmony_ci
6867db96d56Sopenharmony_ci    def test_aggr_no_finalize(self):
6877db96d56Sopenharmony_ci        cur = self.con.cursor()
6887db96d56Sopenharmony_ci        msg = "user-defined aggregate's 'finalize' method not defined"
6897db96d56Sopenharmony_ci        with self.assertRaisesRegex(sqlite.OperationalError, msg):
6907db96d56Sopenharmony_ci            cur.execute("select nofinalize(t) from test")
6917db96d56Sopenharmony_ci            val = cur.fetchone()[0]
6927db96d56Sopenharmony_ci
6937db96d56Sopenharmony_ci    @with_tracebacks(ZeroDivisionError, name="AggrExceptionInInit")
6947db96d56Sopenharmony_ci    def test_aggr_exception_in_init(self):
6957db96d56Sopenharmony_ci        cur = self.con.cursor()
6967db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError) as cm:
6977db96d56Sopenharmony_ci            cur.execute("select excInit(t) from test")
6987db96d56Sopenharmony_ci            val = cur.fetchone()[0]
6997db96d56Sopenharmony_ci        self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error")
7007db96d56Sopenharmony_ci
7017db96d56Sopenharmony_ci    @with_tracebacks(ZeroDivisionError, name="AggrExceptionInStep")
7027db96d56Sopenharmony_ci    def test_aggr_exception_in_step(self):
7037db96d56Sopenharmony_ci        cur = self.con.cursor()
7047db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError) as cm:
7057db96d56Sopenharmony_ci            cur.execute("select excStep(t) from test")
7067db96d56Sopenharmony_ci            val = cur.fetchone()[0]
7077db96d56Sopenharmony_ci        self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error")
7087db96d56Sopenharmony_ci
7097db96d56Sopenharmony_ci    @with_tracebacks(ZeroDivisionError, name="AggrExceptionInFinalize")
7107db96d56Sopenharmony_ci    def test_aggr_exception_in_finalize(self):
7117db96d56Sopenharmony_ci        cur = self.con.cursor()
7127db96d56Sopenharmony_ci        with self.assertRaises(sqlite.OperationalError) as cm:
7137db96d56Sopenharmony_ci            cur.execute("select excFinalize(t) from test")
7147db96d56Sopenharmony_ci            val = cur.fetchone()[0]
7157db96d56Sopenharmony_ci        self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error")
7167db96d56Sopenharmony_ci
7177db96d56Sopenharmony_ci    def test_aggr_check_param_str(self):
7187db96d56Sopenharmony_ci        cur = self.con.cursor()
7197db96d56Sopenharmony_ci        cur.execute("select checkTypes('str', ?, ?)", ("foo", str()))
7207db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7217db96d56Sopenharmony_ci        self.assertEqual(val, 2)
7227db96d56Sopenharmony_ci
7237db96d56Sopenharmony_ci    def test_aggr_check_param_int(self):
7247db96d56Sopenharmony_ci        cur = self.con.cursor()
7257db96d56Sopenharmony_ci        cur.execute("select checkType('int', ?)", (42,))
7267db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7277db96d56Sopenharmony_ci        self.assertEqual(val, 1)
7287db96d56Sopenharmony_ci
7297db96d56Sopenharmony_ci    def test_aggr_check_params_int(self):
7307db96d56Sopenharmony_ci        cur = self.con.cursor()
7317db96d56Sopenharmony_ci        cur.execute("select checkTypes('int', ?, ?)", (42, 24))
7327db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7337db96d56Sopenharmony_ci        self.assertEqual(val, 2)
7347db96d56Sopenharmony_ci
7357db96d56Sopenharmony_ci    def test_aggr_check_param_float(self):
7367db96d56Sopenharmony_ci        cur = self.con.cursor()
7377db96d56Sopenharmony_ci        cur.execute("select checkType('float', ?)", (3.14,))
7387db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7397db96d56Sopenharmony_ci        self.assertEqual(val, 1)
7407db96d56Sopenharmony_ci
7417db96d56Sopenharmony_ci    def test_aggr_check_param_none(self):
7427db96d56Sopenharmony_ci        cur = self.con.cursor()
7437db96d56Sopenharmony_ci        cur.execute("select checkType('None', ?)", (None,))
7447db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7457db96d56Sopenharmony_ci        self.assertEqual(val, 1)
7467db96d56Sopenharmony_ci
7477db96d56Sopenharmony_ci    def test_aggr_check_param_blob(self):
7487db96d56Sopenharmony_ci        cur = self.con.cursor()
7497db96d56Sopenharmony_ci        cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),))
7507db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7517db96d56Sopenharmony_ci        self.assertEqual(val, 1)
7527db96d56Sopenharmony_ci
7537db96d56Sopenharmony_ci    def test_aggr_check_aggr_sum(self):
7547db96d56Sopenharmony_ci        cur = self.con.cursor()
7557db96d56Sopenharmony_ci        cur.execute("delete from test")
7567db96d56Sopenharmony_ci        cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)])
7577db96d56Sopenharmony_ci        cur.execute("select mysum(i) from test")
7587db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7597db96d56Sopenharmony_ci        self.assertEqual(val, 60)
7607db96d56Sopenharmony_ci
7617db96d56Sopenharmony_ci    def test_aggr_no_match(self):
7627db96d56Sopenharmony_ci        cur = self.con.execute("select mysum(i) from (select 1 as i) where i == 0")
7637db96d56Sopenharmony_ci        val = cur.fetchone()[0]
7647db96d56Sopenharmony_ci        self.assertIsNone(val)
7657db96d56Sopenharmony_ci
7667db96d56Sopenharmony_ci    def test_aggr_text(self):
7677db96d56Sopenharmony_ci        cur = self.con.cursor()
7687db96d56Sopenharmony_ci        for txt in ["foo", "1\x002"]:
7697db96d56Sopenharmony_ci            with self.subTest(txt=txt):
7707db96d56Sopenharmony_ci                cur.execute("select aggtxt(?) from test", (txt,))
7717db96d56Sopenharmony_ci                val = cur.fetchone()[0]
7727db96d56Sopenharmony_ci                self.assertEqual(val, txt)
7737db96d56Sopenharmony_ci
7747db96d56Sopenharmony_ci
7757db96d56Sopenharmony_ciclass AuthorizerTests(unittest.TestCase):
7767db96d56Sopenharmony_ci    @staticmethod
7777db96d56Sopenharmony_ci    def authorizer_cb(action, arg1, arg2, dbname, source):
7787db96d56Sopenharmony_ci        if action != sqlite.SQLITE_SELECT:
7797db96d56Sopenharmony_ci            return sqlite.SQLITE_DENY
7807db96d56Sopenharmony_ci        if arg2 == 'c2' or arg1 == 't2':
7817db96d56Sopenharmony_ci            return sqlite.SQLITE_DENY
7827db96d56Sopenharmony_ci        return sqlite.SQLITE_OK
7837db96d56Sopenharmony_ci
7847db96d56Sopenharmony_ci    def setUp(self):
7857db96d56Sopenharmony_ci        self.con = sqlite.connect(":memory:")
7867db96d56Sopenharmony_ci        self.con.executescript("""
7877db96d56Sopenharmony_ci            create table t1 (c1, c2);
7887db96d56Sopenharmony_ci            create table t2 (c1, c2);
7897db96d56Sopenharmony_ci            insert into t1 (c1, c2) values (1, 2);
7907db96d56Sopenharmony_ci            insert into t2 (c1, c2) values (4, 5);
7917db96d56Sopenharmony_ci            """)
7927db96d56Sopenharmony_ci
7937db96d56Sopenharmony_ci        # For our security test:
7947db96d56Sopenharmony_ci        self.con.execute("select c2 from t2")
7957db96d56Sopenharmony_ci
7967db96d56Sopenharmony_ci        self.con.set_authorizer(self.authorizer_cb)
7977db96d56Sopenharmony_ci
7987db96d56Sopenharmony_ci    def tearDown(self):
7997db96d56Sopenharmony_ci        pass
8007db96d56Sopenharmony_ci
8017db96d56Sopenharmony_ci    def test_table_access(self):
8027db96d56Sopenharmony_ci        with self.assertRaises(sqlite.DatabaseError) as cm:
8037db96d56Sopenharmony_ci            self.con.execute("select * from t2")
8047db96d56Sopenharmony_ci        self.assertIn('prohibited', str(cm.exception))
8057db96d56Sopenharmony_ci
8067db96d56Sopenharmony_ci    def test_column_access(self):
8077db96d56Sopenharmony_ci        with self.assertRaises(sqlite.DatabaseError) as cm:
8087db96d56Sopenharmony_ci            self.con.execute("select c2 from t1")
8097db96d56Sopenharmony_ci        self.assertIn('prohibited', str(cm.exception))
8107db96d56Sopenharmony_ci
8117db96d56Sopenharmony_ci    def test_clear_authorizer(self):
8127db96d56Sopenharmony_ci        self.con.set_authorizer(None)
8137db96d56Sopenharmony_ci        self.con.execute("select * from t2")
8147db96d56Sopenharmony_ci        self.con.execute("select c2 from t1")
8157db96d56Sopenharmony_ci
8167db96d56Sopenharmony_ci
8177db96d56Sopenharmony_ciclass AuthorizerRaiseExceptionTests(AuthorizerTests):
8187db96d56Sopenharmony_ci    @staticmethod
8197db96d56Sopenharmony_ci    def authorizer_cb(action, arg1, arg2, dbname, source):
8207db96d56Sopenharmony_ci        if action != sqlite.SQLITE_SELECT:
8217db96d56Sopenharmony_ci            raise ValueError
8227db96d56Sopenharmony_ci        if arg2 == 'c2' or arg1 == 't2':
8237db96d56Sopenharmony_ci            raise ValueError
8247db96d56Sopenharmony_ci        return sqlite.SQLITE_OK
8257db96d56Sopenharmony_ci
8267db96d56Sopenharmony_ci    @with_tracebacks(ValueError, name="authorizer_cb")
8277db96d56Sopenharmony_ci    def test_table_access(self):
8287db96d56Sopenharmony_ci        super().test_table_access()
8297db96d56Sopenharmony_ci
8307db96d56Sopenharmony_ci    @with_tracebacks(ValueError, name="authorizer_cb")
8317db96d56Sopenharmony_ci    def test_column_access(self):
8327db96d56Sopenharmony_ci        super().test_table_access()
8337db96d56Sopenharmony_ci
8347db96d56Sopenharmony_ciclass AuthorizerIllegalTypeTests(AuthorizerTests):
8357db96d56Sopenharmony_ci    @staticmethod
8367db96d56Sopenharmony_ci    def authorizer_cb(action, arg1, arg2, dbname, source):
8377db96d56Sopenharmony_ci        if action != sqlite.SQLITE_SELECT:
8387db96d56Sopenharmony_ci            return 0.0
8397db96d56Sopenharmony_ci        if arg2 == 'c2' or arg1 == 't2':
8407db96d56Sopenharmony_ci            return 0.0
8417db96d56Sopenharmony_ci        return sqlite.SQLITE_OK
8427db96d56Sopenharmony_ci
8437db96d56Sopenharmony_ciclass AuthorizerLargeIntegerTests(AuthorizerTests):
8447db96d56Sopenharmony_ci    @staticmethod
8457db96d56Sopenharmony_ci    def authorizer_cb(action, arg1, arg2, dbname, source):
8467db96d56Sopenharmony_ci        if action != sqlite.SQLITE_SELECT:
8477db96d56Sopenharmony_ci            return 2**32
8487db96d56Sopenharmony_ci        if arg2 == 'c2' or arg1 == 't2':
8497db96d56Sopenharmony_ci            return 2**32
8507db96d56Sopenharmony_ci        return sqlite.SQLITE_OK
8517db96d56Sopenharmony_ci
8527db96d56Sopenharmony_ci
8537db96d56Sopenharmony_ciif __name__ == "__main__":
8547db96d56Sopenharmony_ci    unittest.main()
855