17db96d56Sopenharmony_ci# pysqlite2/test/userfunctions.py: tests for user-defined functions and 27db96d56Sopenharmony_ci# aggregates. 37db96d56Sopenharmony_ci# 47db96d56Sopenharmony_ci# Copyright (C) 2005-2007 Gerhard Häring <gh@ghaering.de> 57db96d56Sopenharmony_ci# 67db96d56Sopenharmony_ci# This file is part of pysqlite. 77db96d56Sopenharmony_ci# 87db96d56Sopenharmony_ci# This software is provided 'as-is', without any express or implied 97db96d56Sopenharmony_ci# warranty. In no event will the authors be held liable for any damages 107db96d56Sopenharmony_ci# arising from the use of this software. 117db96d56Sopenharmony_ci# 127db96d56Sopenharmony_ci# Permission is granted to anyone to use this software for any purpose, 137db96d56Sopenharmony_ci# including commercial applications, and to alter it and redistribute it 147db96d56Sopenharmony_ci# freely, subject to the following restrictions: 157db96d56Sopenharmony_ci# 167db96d56Sopenharmony_ci# 1. The origin of this software must not be misrepresented; you must not 177db96d56Sopenharmony_ci# claim that you wrote the original software. If you use this software 187db96d56Sopenharmony_ci# in a product, an acknowledgment in the product documentation would be 197db96d56Sopenharmony_ci# appreciated but is not required. 207db96d56Sopenharmony_ci# 2. Altered source versions must be plainly marked as such, and must not be 217db96d56Sopenharmony_ci# misrepresented as being the original software. 227db96d56Sopenharmony_ci# 3. This notice may not be removed or altered from any source distribution. 237db96d56Sopenharmony_ci 247db96d56Sopenharmony_ciimport contextlib 257db96d56Sopenharmony_ciimport functools 267db96d56Sopenharmony_ciimport io 277db96d56Sopenharmony_ciimport re 287db96d56Sopenharmony_ciimport sys 297db96d56Sopenharmony_ciimport unittest 307db96d56Sopenharmony_ciimport sqlite3 as sqlite 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_cifrom unittest.mock import Mock, patch 337db96d56Sopenharmony_cifrom test.support import bigmemtest, catch_unraisable_exception, gc_collect 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_cifrom test.test_sqlite3.test_dbapi import cx_limit 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ci 387db96d56Sopenharmony_cidef with_tracebacks(exc, regex="", name=""): 397db96d56Sopenharmony_ci """Convenience decorator for testing callback tracebacks.""" 407db96d56Sopenharmony_ci def decorator(func): 417db96d56Sopenharmony_ci _regex = re.compile(regex) if regex else None 427db96d56Sopenharmony_ci @functools.wraps(func) 437db96d56Sopenharmony_ci def wrapper(self, *args, **kwargs): 447db96d56Sopenharmony_ci with catch_unraisable_exception() as cm: 457db96d56Sopenharmony_ci # First, run the test with traceback enabled. 467db96d56Sopenharmony_ci with check_tracebacks(self, cm, exc, _regex, name): 477db96d56Sopenharmony_ci func(self, *args, **kwargs) 487db96d56Sopenharmony_ci 497db96d56Sopenharmony_ci # Then run the test with traceback disabled. 507db96d56Sopenharmony_ci func(self, *args, **kwargs) 517db96d56Sopenharmony_ci return wrapper 527db96d56Sopenharmony_ci return decorator 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ci@contextlib.contextmanager 567db96d56Sopenharmony_cidef check_tracebacks(self, cm, exc, regex, obj_name): 577db96d56Sopenharmony_ci """Convenience context manager for testing callback tracebacks.""" 587db96d56Sopenharmony_ci sqlite.enable_callback_tracebacks(True) 597db96d56Sopenharmony_ci try: 607db96d56Sopenharmony_ci buf = io.StringIO() 617db96d56Sopenharmony_ci with contextlib.redirect_stderr(buf): 627db96d56Sopenharmony_ci yield 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ci self.assertEqual(cm.unraisable.exc_type, exc) 657db96d56Sopenharmony_ci if regex: 667db96d56Sopenharmony_ci msg = str(cm.unraisable.exc_value) 677db96d56Sopenharmony_ci self.assertIsNotNone(regex.search(msg)) 687db96d56Sopenharmony_ci if obj_name: 697db96d56Sopenharmony_ci self.assertEqual(cm.unraisable.object.__name__, obj_name) 707db96d56Sopenharmony_ci finally: 717db96d56Sopenharmony_ci sqlite.enable_callback_tracebacks(False) 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_ci 747db96d56Sopenharmony_cidef func_returntext(): 757db96d56Sopenharmony_ci return "foo" 767db96d56Sopenharmony_cidef func_returntextwithnull(): 777db96d56Sopenharmony_ci return "1\x002" 787db96d56Sopenharmony_cidef func_returnunicode(): 797db96d56Sopenharmony_ci return "bar" 807db96d56Sopenharmony_cidef func_returnint(): 817db96d56Sopenharmony_ci return 42 827db96d56Sopenharmony_cidef func_returnfloat(): 837db96d56Sopenharmony_ci return 3.14 847db96d56Sopenharmony_cidef func_returnnull(): 857db96d56Sopenharmony_ci return None 867db96d56Sopenharmony_cidef func_returnblob(): 877db96d56Sopenharmony_ci return b"blob" 887db96d56Sopenharmony_cidef func_returnlonglong(): 897db96d56Sopenharmony_ci return 1<<31 907db96d56Sopenharmony_cidef func_raiseexception(): 917db96d56Sopenharmony_ci 5/0 927db96d56Sopenharmony_cidef func_memoryerror(): 937db96d56Sopenharmony_ci raise MemoryError 947db96d56Sopenharmony_cidef func_overflowerror(): 957db96d56Sopenharmony_ci raise OverflowError 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ciclass AggrNoStep: 987db96d56Sopenharmony_ci def __init__(self): 997db96d56Sopenharmony_ci pass 1007db96d56Sopenharmony_ci 1017db96d56Sopenharmony_ci def finalize(self): 1027db96d56Sopenharmony_ci return 1 1037db96d56Sopenharmony_ci 1047db96d56Sopenharmony_ciclass AggrNoFinalize: 1057db96d56Sopenharmony_ci def __init__(self): 1067db96d56Sopenharmony_ci pass 1077db96d56Sopenharmony_ci 1087db96d56Sopenharmony_ci def step(self, x): 1097db96d56Sopenharmony_ci pass 1107db96d56Sopenharmony_ci 1117db96d56Sopenharmony_ciclass AggrExceptionInInit: 1127db96d56Sopenharmony_ci def __init__(self): 1137db96d56Sopenharmony_ci 5/0 1147db96d56Sopenharmony_ci 1157db96d56Sopenharmony_ci def step(self, x): 1167db96d56Sopenharmony_ci pass 1177db96d56Sopenharmony_ci 1187db96d56Sopenharmony_ci def finalize(self): 1197db96d56Sopenharmony_ci pass 1207db96d56Sopenharmony_ci 1217db96d56Sopenharmony_ciclass AggrExceptionInStep: 1227db96d56Sopenharmony_ci def __init__(self): 1237db96d56Sopenharmony_ci pass 1247db96d56Sopenharmony_ci 1257db96d56Sopenharmony_ci def step(self, x): 1267db96d56Sopenharmony_ci 5/0 1277db96d56Sopenharmony_ci 1287db96d56Sopenharmony_ci def finalize(self): 1297db96d56Sopenharmony_ci return 42 1307db96d56Sopenharmony_ci 1317db96d56Sopenharmony_ciclass AggrExceptionInFinalize: 1327db96d56Sopenharmony_ci def __init__(self): 1337db96d56Sopenharmony_ci pass 1347db96d56Sopenharmony_ci 1357db96d56Sopenharmony_ci def step(self, x): 1367db96d56Sopenharmony_ci pass 1377db96d56Sopenharmony_ci 1387db96d56Sopenharmony_ci def finalize(self): 1397db96d56Sopenharmony_ci 5/0 1407db96d56Sopenharmony_ci 1417db96d56Sopenharmony_ciclass AggrCheckType: 1427db96d56Sopenharmony_ci def __init__(self): 1437db96d56Sopenharmony_ci self.val = None 1447db96d56Sopenharmony_ci 1457db96d56Sopenharmony_ci def step(self, whichType, val): 1467db96d56Sopenharmony_ci theType = {"str": str, "int": int, "float": float, "None": type(None), 1477db96d56Sopenharmony_ci "blob": bytes} 1487db96d56Sopenharmony_ci self.val = int(theType[whichType] is type(val)) 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci def finalize(self): 1517db96d56Sopenharmony_ci return self.val 1527db96d56Sopenharmony_ci 1537db96d56Sopenharmony_ciclass AggrCheckTypes: 1547db96d56Sopenharmony_ci def __init__(self): 1557db96d56Sopenharmony_ci self.val = 0 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci def step(self, whichType, *vals): 1587db96d56Sopenharmony_ci theType = {"str": str, "int": int, "float": float, "None": type(None), 1597db96d56Sopenharmony_ci "blob": bytes} 1607db96d56Sopenharmony_ci for val in vals: 1617db96d56Sopenharmony_ci self.val += int(theType[whichType] is type(val)) 1627db96d56Sopenharmony_ci 1637db96d56Sopenharmony_ci def finalize(self): 1647db96d56Sopenharmony_ci return self.val 1657db96d56Sopenharmony_ci 1667db96d56Sopenharmony_ciclass AggrSum: 1677db96d56Sopenharmony_ci def __init__(self): 1687db96d56Sopenharmony_ci self.val = 0.0 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ci def step(self, val): 1717db96d56Sopenharmony_ci self.val += val 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ci def finalize(self): 1747db96d56Sopenharmony_ci return self.val 1757db96d56Sopenharmony_ci 1767db96d56Sopenharmony_ciclass AggrText: 1777db96d56Sopenharmony_ci def __init__(self): 1787db96d56Sopenharmony_ci self.txt = "" 1797db96d56Sopenharmony_ci def step(self, txt): 1807db96d56Sopenharmony_ci self.txt = self.txt + txt 1817db96d56Sopenharmony_ci def finalize(self): 1827db96d56Sopenharmony_ci return self.txt 1837db96d56Sopenharmony_ci 1847db96d56Sopenharmony_ci 1857db96d56Sopenharmony_ciclass FunctionTests(unittest.TestCase): 1867db96d56Sopenharmony_ci def setUp(self): 1877db96d56Sopenharmony_ci self.con = sqlite.connect(":memory:") 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_ci self.con.create_function("returntext", 0, func_returntext) 1907db96d56Sopenharmony_ci self.con.create_function("returntextwithnull", 0, func_returntextwithnull) 1917db96d56Sopenharmony_ci self.con.create_function("returnunicode", 0, func_returnunicode) 1927db96d56Sopenharmony_ci self.con.create_function("returnint", 0, func_returnint) 1937db96d56Sopenharmony_ci self.con.create_function("returnfloat", 0, func_returnfloat) 1947db96d56Sopenharmony_ci self.con.create_function("returnnull", 0, func_returnnull) 1957db96d56Sopenharmony_ci self.con.create_function("returnblob", 0, func_returnblob) 1967db96d56Sopenharmony_ci self.con.create_function("returnlonglong", 0, func_returnlonglong) 1977db96d56Sopenharmony_ci self.con.create_function("returnnan", 0, lambda: float("nan")) 1987db96d56Sopenharmony_ci self.con.create_function("returntoolargeint", 0, lambda: 1 << 65) 1997db96d56Sopenharmony_ci self.con.create_function("return_noncont_blob", 0, 2007db96d56Sopenharmony_ci lambda: memoryview(b"blob")[::2]) 2017db96d56Sopenharmony_ci self.con.create_function("raiseexception", 0, func_raiseexception) 2027db96d56Sopenharmony_ci self.con.create_function("memoryerror", 0, func_memoryerror) 2037db96d56Sopenharmony_ci self.con.create_function("overflowerror", 0, func_overflowerror) 2047db96d56Sopenharmony_ci 2057db96d56Sopenharmony_ci self.con.create_function("isblob", 1, lambda x: isinstance(x, bytes)) 2067db96d56Sopenharmony_ci self.con.create_function("isnone", 1, lambda x: x is None) 2077db96d56Sopenharmony_ci self.con.create_function("spam", -1, lambda *x: len(x)) 2087db96d56Sopenharmony_ci self.con.execute("create table test(t text)") 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ci def tearDown(self): 2117db96d56Sopenharmony_ci self.con.close() 2127db96d56Sopenharmony_ci 2137db96d56Sopenharmony_ci def test_func_error_on_create(self): 2147db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 2157db96d56Sopenharmony_ci self.con.create_function("bla", -100, lambda x: 2*x) 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ci def test_func_too_many_args(self): 2187db96d56Sopenharmony_ci category = sqlite.SQLITE_LIMIT_FUNCTION_ARG 2197db96d56Sopenharmony_ci msg = "too many arguments on function" 2207db96d56Sopenharmony_ci with cx_limit(self.con, category=category, limit=1): 2217db96d56Sopenharmony_ci self.con.execute("select abs(-1)"); 2227db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, msg): 2237db96d56Sopenharmony_ci self.con.execute("select max(1, 2)"); 2247db96d56Sopenharmony_ci 2257db96d56Sopenharmony_ci def test_func_ref_count(self): 2267db96d56Sopenharmony_ci def getfunc(): 2277db96d56Sopenharmony_ci def f(): 2287db96d56Sopenharmony_ci return 1 2297db96d56Sopenharmony_ci return f 2307db96d56Sopenharmony_ci f = getfunc() 2317db96d56Sopenharmony_ci globals()["foo"] = f 2327db96d56Sopenharmony_ci # self.con.create_function("reftest", 0, getfunc()) 2337db96d56Sopenharmony_ci self.con.create_function("reftest", 0, f) 2347db96d56Sopenharmony_ci cur = self.con.cursor() 2357db96d56Sopenharmony_ci cur.execute("select reftest()") 2367db96d56Sopenharmony_ci 2377db96d56Sopenharmony_ci def test_func_return_text(self): 2387db96d56Sopenharmony_ci cur = self.con.cursor() 2397db96d56Sopenharmony_ci cur.execute("select returntext()") 2407db96d56Sopenharmony_ci val = cur.fetchone()[0] 2417db96d56Sopenharmony_ci self.assertEqual(type(val), str) 2427db96d56Sopenharmony_ci self.assertEqual(val, "foo") 2437db96d56Sopenharmony_ci 2447db96d56Sopenharmony_ci def test_func_return_text_with_null_char(self): 2457db96d56Sopenharmony_ci cur = self.con.cursor() 2467db96d56Sopenharmony_ci res = cur.execute("select returntextwithnull()").fetchone()[0] 2477db96d56Sopenharmony_ci self.assertEqual(type(res), str) 2487db96d56Sopenharmony_ci self.assertEqual(res, "1\x002") 2497db96d56Sopenharmony_ci 2507db96d56Sopenharmony_ci def test_func_return_unicode(self): 2517db96d56Sopenharmony_ci cur = self.con.cursor() 2527db96d56Sopenharmony_ci cur.execute("select returnunicode()") 2537db96d56Sopenharmony_ci val = cur.fetchone()[0] 2547db96d56Sopenharmony_ci self.assertEqual(type(val), str) 2557db96d56Sopenharmony_ci self.assertEqual(val, "bar") 2567db96d56Sopenharmony_ci 2577db96d56Sopenharmony_ci def test_func_return_int(self): 2587db96d56Sopenharmony_ci cur = self.con.cursor() 2597db96d56Sopenharmony_ci cur.execute("select returnint()") 2607db96d56Sopenharmony_ci val = cur.fetchone()[0] 2617db96d56Sopenharmony_ci self.assertEqual(type(val), int) 2627db96d56Sopenharmony_ci self.assertEqual(val, 42) 2637db96d56Sopenharmony_ci 2647db96d56Sopenharmony_ci def test_func_return_float(self): 2657db96d56Sopenharmony_ci cur = self.con.cursor() 2667db96d56Sopenharmony_ci cur.execute("select returnfloat()") 2677db96d56Sopenharmony_ci val = cur.fetchone()[0] 2687db96d56Sopenharmony_ci self.assertEqual(type(val), float) 2697db96d56Sopenharmony_ci if val < 3.139 or val > 3.141: 2707db96d56Sopenharmony_ci self.fail("wrong value") 2717db96d56Sopenharmony_ci 2727db96d56Sopenharmony_ci def test_func_return_null(self): 2737db96d56Sopenharmony_ci cur = self.con.cursor() 2747db96d56Sopenharmony_ci cur.execute("select returnnull()") 2757db96d56Sopenharmony_ci val = cur.fetchone()[0] 2767db96d56Sopenharmony_ci self.assertEqual(type(val), type(None)) 2777db96d56Sopenharmony_ci self.assertEqual(val, None) 2787db96d56Sopenharmony_ci 2797db96d56Sopenharmony_ci def test_func_return_blob(self): 2807db96d56Sopenharmony_ci cur = self.con.cursor() 2817db96d56Sopenharmony_ci cur.execute("select returnblob()") 2827db96d56Sopenharmony_ci val = cur.fetchone()[0] 2837db96d56Sopenharmony_ci self.assertEqual(type(val), bytes) 2847db96d56Sopenharmony_ci self.assertEqual(val, b"blob") 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_ci def test_func_return_long_long(self): 2877db96d56Sopenharmony_ci cur = self.con.cursor() 2887db96d56Sopenharmony_ci cur.execute("select returnlonglong()") 2897db96d56Sopenharmony_ci val = cur.fetchone()[0] 2907db96d56Sopenharmony_ci self.assertEqual(val, 1<<31) 2917db96d56Sopenharmony_ci 2927db96d56Sopenharmony_ci def test_func_return_nan(self): 2937db96d56Sopenharmony_ci cur = self.con.cursor() 2947db96d56Sopenharmony_ci cur.execute("select returnnan()") 2957db96d56Sopenharmony_ci self.assertIsNone(cur.fetchone()[0]) 2967db96d56Sopenharmony_ci 2977db96d56Sopenharmony_ci def test_func_return_too_large_int(self): 2987db96d56Sopenharmony_ci cur = self.con.cursor() 2997db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.DataError, "string or blob too big", 3007db96d56Sopenharmony_ci self.con.execute, "select returntoolargeint()") 3017db96d56Sopenharmony_ci 3027db96d56Sopenharmony_ci @with_tracebacks(ZeroDivisionError, name="func_raiseexception") 3037db96d56Sopenharmony_ci def test_func_exception(self): 3047db96d56Sopenharmony_ci cur = self.con.cursor() 3057db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError) as cm: 3067db96d56Sopenharmony_ci cur.execute("select raiseexception()") 3077db96d56Sopenharmony_ci cur.fetchone() 3087db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), 'user-defined function raised exception') 3097db96d56Sopenharmony_ci 3107db96d56Sopenharmony_ci @with_tracebacks(MemoryError, name="func_memoryerror") 3117db96d56Sopenharmony_ci def test_func_memory_error(self): 3127db96d56Sopenharmony_ci cur = self.con.cursor() 3137db96d56Sopenharmony_ci with self.assertRaises(MemoryError): 3147db96d56Sopenharmony_ci cur.execute("select memoryerror()") 3157db96d56Sopenharmony_ci cur.fetchone() 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci @with_tracebacks(OverflowError, name="func_overflowerror") 3187db96d56Sopenharmony_ci def test_func_overflow_error(self): 3197db96d56Sopenharmony_ci cur = self.con.cursor() 3207db96d56Sopenharmony_ci with self.assertRaises(sqlite.DataError): 3217db96d56Sopenharmony_ci cur.execute("select overflowerror()") 3227db96d56Sopenharmony_ci cur.fetchone() 3237db96d56Sopenharmony_ci 3247db96d56Sopenharmony_ci def test_any_arguments(self): 3257db96d56Sopenharmony_ci cur = self.con.cursor() 3267db96d56Sopenharmony_ci cur.execute("select spam(?, ?)", (1, 2)) 3277db96d56Sopenharmony_ci val = cur.fetchone()[0] 3287db96d56Sopenharmony_ci self.assertEqual(val, 2) 3297db96d56Sopenharmony_ci 3307db96d56Sopenharmony_ci def test_empty_blob(self): 3317db96d56Sopenharmony_ci cur = self.con.execute("select isblob(x'')") 3327db96d56Sopenharmony_ci self.assertTrue(cur.fetchone()[0]) 3337db96d56Sopenharmony_ci 3347db96d56Sopenharmony_ci def test_nan_float(self): 3357db96d56Sopenharmony_ci cur = self.con.execute("select isnone(?)", (float("nan"),)) 3367db96d56Sopenharmony_ci # SQLite has no concept of nan; it is converted to NULL 3377db96d56Sopenharmony_ci self.assertTrue(cur.fetchone()[0]) 3387db96d56Sopenharmony_ci 3397db96d56Sopenharmony_ci def test_too_large_int(self): 3407db96d56Sopenharmony_ci err = "Python int too large to convert to SQLite INTEGER" 3417db96d56Sopenharmony_ci self.assertRaisesRegex(OverflowError, err, self.con.execute, 3427db96d56Sopenharmony_ci "select spam(?)", (1 << 65,)) 3437db96d56Sopenharmony_ci 3447db96d56Sopenharmony_ci def test_non_contiguous_blob(self): 3457db96d56Sopenharmony_ci self.assertRaisesRegex(BufferError, 3467db96d56Sopenharmony_ci "underlying buffer is not C-contiguous", 3477db96d56Sopenharmony_ci self.con.execute, "select spam(?)", 3487db96d56Sopenharmony_ci (memoryview(b"blob")[::2],)) 3497db96d56Sopenharmony_ci 3507db96d56Sopenharmony_ci @with_tracebacks(BufferError, regex="buffer.*contiguous") 3517db96d56Sopenharmony_ci def test_return_non_contiguous_blob(self): 3527db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 3537db96d56Sopenharmony_ci cur = self.con.execute("select return_noncont_blob()") 3547db96d56Sopenharmony_ci cur.fetchone() 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci def test_param_surrogates(self): 3577db96d56Sopenharmony_ci self.assertRaisesRegex(UnicodeEncodeError, "surrogates not allowed", 3587db96d56Sopenharmony_ci self.con.execute, "select spam(?)", 3597db96d56Sopenharmony_ci ("\ud803\ude6d",)) 3607db96d56Sopenharmony_ci 3617db96d56Sopenharmony_ci def test_func_params(self): 3627db96d56Sopenharmony_ci results = [] 3637db96d56Sopenharmony_ci def append_result(arg): 3647db96d56Sopenharmony_ci results.append((arg, type(arg))) 3657db96d56Sopenharmony_ci self.con.create_function("test_params", 1, append_result) 3667db96d56Sopenharmony_ci 3677db96d56Sopenharmony_ci dataset = [ 3687db96d56Sopenharmony_ci (42, int), 3697db96d56Sopenharmony_ci (-1, int), 3707db96d56Sopenharmony_ci (1234567890123456789, int), 3717db96d56Sopenharmony_ci (4611686018427387905, int), # 63-bit int with non-zero low bits 3727db96d56Sopenharmony_ci (3.14, float), 3737db96d56Sopenharmony_ci (float('inf'), float), 3747db96d56Sopenharmony_ci ("text", str), 3757db96d56Sopenharmony_ci ("1\x002", str), 3767db96d56Sopenharmony_ci ("\u02e2q\u02e1\u2071\u1d57\u1d49", str), 3777db96d56Sopenharmony_ci (b"blob", bytes), 3787db96d56Sopenharmony_ci (bytearray(range(2)), bytes), 3797db96d56Sopenharmony_ci (memoryview(b"blob"), bytes), 3807db96d56Sopenharmony_ci (None, type(None)), 3817db96d56Sopenharmony_ci ] 3827db96d56Sopenharmony_ci for val, _ in dataset: 3837db96d56Sopenharmony_ci cur = self.con.execute("select test_params(?)", (val,)) 3847db96d56Sopenharmony_ci cur.fetchone() 3857db96d56Sopenharmony_ci self.assertEqual(dataset, results) 3867db96d56Sopenharmony_ci 3877db96d56Sopenharmony_ci # Regarding deterministic functions: 3887db96d56Sopenharmony_ci # 3897db96d56Sopenharmony_ci # Between 3.8.3 and 3.15.0, deterministic functions were only used to 3907db96d56Sopenharmony_ci # optimize inner loops, so for those versions we can only test if the 3917db96d56Sopenharmony_ci # sqlite machinery has factored out a call or not. From 3.15.0 and onward, 3927db96d56Sopenharmony_ci # deterministic functions were permitted in WHERE clauses of partial 3937db96d56Sopenharmony_ci # indices, which allows testing based on syntax, iso. the query optimizer. 3947db96d56Sopenharmony_ci @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher") 3957db96d56Sopenharmony_ci def test_func_non_deterministic(self): 3967db96d56Sopenharmony_ci mock = Mock(return_value=None) 3977db96d56Sopenharmony_ci self.con.create_function("nondeterministic", 0, mock, deterministic=False) 3987db96d56Sopenharmony_ci if sqlite.sqlite_version_info < (3, 15, 0): 3997db96d56Sopenharmony_ci self.con.execute("select nondeterministic() = nondeterministic()") 4007db96d56Sopenharmony_ci self.assertEqual(mock.call_count, 2) 4017db96d56Sopenharmony_ci else: 4027db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 4037db96d56Sopenharmony_ci self.con.execute("create index t on test(t) where nondeterministic() is not null") 4047db96d56Sopenharmony_ci 4057db96d56Sopenharmony_ci @unittest.skipIf(sqlite.sqlite_version_info < (3, 8, 3), "Requires SQLite 3.8.3 or higher") 4067db96d56Sopenharmony_ci def test_func_deterministic(self): 4077db96d56Sopenharmony_ci mock = Mock(return_value=None) 4087db96d56Sopenharmony_ci self.con.create_function("deterministic", 0, mock, deterministic=True) 4097db96d56Sopenharmony_ci if sqlite.sqlite_version_info < (3, 15, 0): 4107db96d56Sopenharmony_ci self.con.execute("select deterministic() = deterministic()") 4117db96d56Sopenharmony_ci self.assertEqual(mock.call_count, 1) 4127db96d56Sopenharmony_ci else: 4137db96d56Sopenharmony_ci try: 4147db96d56Sopenharmony_ci self.con.execute("create index t on test(t) where deterministic() is not null") 4157db96d56Sopenharmony_ci except sqlite.OperationalError: 4167db96d56Sopenharmony_ci self.fail("Unexpected failure while creating partial index") 4177db96d56Sopenharmony_ci 4187db96d56Sopenharmony_ci @unittest.skipIf(sqlite.sqlite_version_info >= (3, 8, 3), "SQLite < 3.8.3 needed") 4197db96d56Sopenharmony_ci def test_func_deterministic_not_supported(self): 4207db96d56Sopenharmony_ci with self.assertRaises(sqlite.NotSupportedError): 4217db96d56Sopenharmony_ci self.con.create_function("deterministic", 0, int, deterministic=True) 4227db96d56Sopenharmony_ci 4237db96d56Sopenharmony_ci def test_func_deterministic_keyword_only(self): 4247db96d56Sopenharmony_ci with self.assertRaises(TypeError): 4257db96d56Sopenharmony_ci self.con.create_function("deterministic", 0, int, True) 4267db96d56Sopenharmony_ci 4277db96d56Sopenharmony_ci def test_function_destructor_via_gc(self): 4287db96d56Sopenharmony_ci # See bpo-44304: The destructor of the user function can 4297db96d56Sopenharmony_ci # crash if is called without the GIL from the gc functions 4307db96d56Sopenharmony_ci dest = sqlite.connect(':memory:') 4317db96d56Sopenharmony_ci def md5sum(t): 4327db96d56Sopenharmony_ci return 4337db96d56Sopenharmony_ci 4347db96d56Sopenharmony_ci dest.create_function("md5", 1, md5sum) 4357db96d56Sopenharmony_ci x = dest("create table lang (name, first_appeared)") 4367db96d56Sopenharmony_ci del md5sum, dest 4377db96d56Sopenharmony_ci 4387db96d56Sopenharmony_ci y = [x] 4397db96d56Sopenharmony_ci y.append(y) 4407db96d56Sopenharmony_ci 4417db96d56Sopenharmony_ci del x,y 4427db96d56Sopenharmony_ci gc_collect() 4437db96d56Sopenharmony_ci 4447db96d56Sopenharmony_ci @with_tracebacks(OverflowError) 4457db96d56Sopenharmony_ci def test_func_return_too_large_int(self): 4467db96d56Sopenharmony_ci cur = self.con.cursor() 4477db96d56Sopenharmony_ci for value in 2**63, -2**63-1, 2**64: 4487db96d56Sopenharmony_ci self.con.create_function("largeint", 0, lambda value=value: value) 4497db96d56Sopenharmony_ci with self.assertRaises(sqlite.DataError): 4507db96d56Sopenharmony_ci cur.execute("select largeint()") 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_ci @with_tracebacks(UnicodeEncodeError, "surrogates not allowed", "chr") 4537db96d56Sopenharmony_ci def test_func_return_text_with_surrogates(self): 4547db96d56Sopenharmony_ci cur = self.con.cursor() 4557db96d56Sopenharmony_ci self.con.create_function("pychr", 1, chr) 4567db96d56Sopenharmony_ci for value in 0xd8ff, 0xdcff: 4577db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 4587db96d56Sopenharmony_ci cur.execute("select pychr(?)", (value,)) 4597db96d56Sopenharmony_ci 4607db96d56Sopenharmony_ci @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 4617db96d56Sopenharmony_ci @bigmemtest(size=2**31, memuse=3, dry_run=False) 4627db96d56Sopenharmony_ci def test_func_return_too_large_text(self, size): 4637db96d56Sopenharmony_ci cur = self.con.cursor() 4647db96d56Sopenharmony_ci for size in 2**31-1, 2**31: 4657db96d56Sopenharmony_ci self.con.create_function("largetext", 0, lambda size=size: "b" * size) 4667db96d56Sopenharmony_ci with self.assertRaises(sqlite.DataError): 4677db96d56Sopenharmony_ci cur.execute("select largetext()") 4687db96d56Sopenharmony_ci 4697db96d56Sopenharmony_ci @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform') 4707db96d56Sopenharmony_ci @bigmemtest(size=2**31, memuse=2, dry_run=False) 4717db96d56Sopenharmony_ci def test_func_return_too_large_blob(self, size): 4727db96d56Sopenharmony_ci cur = self.con.cursor() 4737db96d56Sopenharmony_ci for size in 2**31-1, 2**31: 4747db96d56Sopenharmony_ci self.con.create_function("largeblob", 0, lambda size=size: b"b" * size) 4757db96d56Sopenharmony_ci with self.assertRaises(sqlite.DataError): 4767db96d56Sopenharmony_ci cur.execute("select largeblob()") 4777db96d56Sopenharmony_ci 4787db96d56Sopenharmony_ci def test_func_return_illegal_value(self): 4797db96d56Sopenharmony_ci self.con.create_function("badreturn", 0, lambda: self) 4807db96d56Sopenharmony_ci msg = "user-defined function raised exception" 4817db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.OperationalError, msg, 4827db96d56Sopenharmony_ci self.con.execute, "select badreturn()") 4837db96d56Sopenharmony_ci 4847db96d56Sopenharmony_ci 4857db96d56Sopenharmony_ciclass WindowSumInt: 4867db96d56Sopenharmony_ci def __init__(self): 4877db96d56Sopenharmony_ci self.count = 0 4887db96d56Sopenharmony_ci 4897db96d56Sopenharmony_ci def step(self, value): 4907db96d56Sopenharmony_ci self.count += value 4917db96d56Sopenharmony_ci 4927db96d56Sopenharmony_ci def value(self): 4937db96d56Sopenharmony_ci return self.count 4947db96d56Sopenharmony_ci 4957db96d56Sopenharmony_ci def inverse(self, value): 4967db96d56Sopenharmony_ci self.count -= value 4977db96d56Sopenharmony_ci 4987db96d56Sopenharmony_ci def finalize(self): 4997db96d56Sopenharmony_ci return self.count 5007db96d56Sopenharmony_ci 5017db96d56Sopenharmony_ciclass BadWindow(Exception): 5027db96d56Sopenharmony_ci pass 5037db96d56Sopenharmony_ci 5047db96d56Sopenharmony_ci 5057db96d56Sopenharmony_ci@unittest.skipIf(sqlite.sqlite_version_info < (3, 25, 0), 5067db96d56Sopenharmony_ci "Requires SQLite 3.25.0 or newer") 5077db96d56Sopenharmony_ciclass WindowFunctionTests(unittest.TestCase): 5087db96d56Sopenharmony_ci def setUp(self): 5097db96d56Sopenharmony_ci self.con = sqlite.connect(":memory:") 5107db96d56Sopenharmony_ci self.cur = self.con.cursor() 5117db96d56Sopenharmony_ci 5127db96d56Sopenharmony_ci # Test case taken from https://www.sqlite.org/windowfunctions.html#udfwinfunc 5137db96d56Sopenharmony_ci values = [ 5147db96d56Sopenharmony_ci ("a", 4), 5157db96d56Sopenharmony_ci ("b", 5), 5167db96d56Sopenharmony_ci ("c", 3), 5177db96d56Sopenharmony_ci ("d", 8), 5187db96d56Sopenharmony_ci ("e", 1), 5197db96d56Sopenharmony_ci ] 5207db96d56Sopenharmony_ci with self.con: 5217db96d56Sopenharmony_ci self.con.execute("create table test(x, y)") 5227db96d56Sopenharmony_ci self.con.executemany("insert into test values(?, ?)", values) 5237db96d56Sopenharmony_ci self.expected = [ 5247db96d56Sopenharmony_ci ("a", 9), 5257db96d56Sopenharmony_ci ("b", 12), 5267db96d56Sopenharmony_ci ("c", 16), 5277db96d56Sopenharmony_ci ("d", 12), 5287db96d56Sopenharmony_ci ("e", 9), 5297db96d56Sopenharmony_ci ] 5307db96d56Sopenharmony_ci self.query = """ 5317db96d56Sopenharmony_ci select x, %s(y) over ( 5327db96d56Sopenharmony_ci order by x rows between 1 preceding and 1 following 5337db96d56Sopenharmony_ci ) as sum_y 5347db96d56Sopenharmony_ci from test order by x 5357db96d56Sopenharmony_ci """ 5367db96d56Sopenharmony_ci self.con.create_window_function("sumint", 1, WindowSumInt) 5377db96d56Sopenharmony_ci 5387db96d56Sopenharmony_ci def test_win_sum_int(self): 5397db96d56Sopenharmony_ci self.cur.execute(self.query % "sumint") 5407db96d56Sopenharmony_ci self.assertEqual(self.cur.fetchall(), self.expected) 5417db96d56Sopenharmony_ci 5427db96d56Sopenharmony_ci def test_win_error_on_create(self): 5437db96d56Sopenharmony_ci self.assertRaises(sqlite.ProgrammingError, 5447db96d56Sopenharmony_ci self.con.create_window_function, 5457db96d56Sopenharmony_ci "shouldfail", -100, WindowSumInt) 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci @with_tracebacks(BadWindow) 5487db96d56Sopenharmony_ci def test_win_exception_in_method(self): 5497db96d56Sopenharmony_ci for meth in "__init__", "step", "value", "inverse": 5507db96d56Sopenharmony_ci with self.subTest(meth=meth): 5517db96d56Sopenharmony_ci with patch.object(WindowSumInt, meth, side_effect=BadWindow): 5527db96d56Sopenharmony_ci name = f"exc_{meth}" 5537db96d56Sopenharmony_ci self.con.create_window_function(name, 1, WindowSumInt) 5547db96d56Sopenharmony_ci msg = f"'{meth}' method raised error" 5557db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, msg): 5567db96d56Sopenharmony_ci self.cur.execute(self.query % name) 5577db96d56Sopenharmony_ci self.cur.fetchall() 5587db96d56Sopenharmony_ci 5597db96d56Sopenharmony_ci @with_tracebacks(BadWindow) 5607db96d56Sopenharmony_ci def test_win_exception_in_finalize(self): 5617db96d56Sopenharmony_ci # Note: SQLite does not (as of version 3.38.0) propagate finalize 5627db96d56Sopenharmony_ci # callback errors to sqlite3_step(); this implies that OperationalError 5637db96d56Sopenharmony_ci # is _not_ raised. 5647db96d56Sopenharmony_ci with patch.object(WindowSumInt, "finalize", side_effect=BadWindow): 5657db96d56Sopenharmony_ci name = f"exception_in_finalize" 5667db96d56Sopenharmony_ci self.con.create_window_function(name, 1, WindowSumInt) 5677db96d56Sopenharmony_ci self.cur.execute(self.query % name) 5687db96d56Sopenharmony_ci self.cur.fetchall() 5697db96d56Sopenharmony_ci 5707db96d56Sopenharmony_ci @with_tracebacks(AttributeError) 5717db96d56Sopenharmony_ci def test_win_missing_method(self): 5727db96d56Sopenharmony_ci class MissingValue: 5737db96d56Sopenharmony_ci def step(self, x): pass 5747db96d56Sopenharmony_ci def inverse(self, x): pass 5757db96d56Sopenharmony_ci def finalize(self): return 42 5767db96d56Sopenharmony_ci 5777db96d56Sopenharmony_ci class MissingInverse: 5787db96d56Sopenharmony_ci def step(self, x): pass 5797db96d56Sopenharmony_ci def value(self): return 42 5807db96d56Sopenharmony_ci def finalize(self): return 42 5817db96d56Sopenharmony_ci 5827db96d56Sopenharmony_ci class MissingStep: 5837db96d56Sopenharmony_ci def value(self): return 42 5847db96d56Sopenharmony_ci def inverse(self, x): pass 5857db96d56Sopenharmony_ci def finalize(self): return 42 5867db96d56Sopenharmony_ci 5877db96d56Sopenharmony_ci dataset = ( 5887db96d56Sopenharmony_ci ("step", MissingStep), 5897db96d56Sopenharmony_ci ("value", MissingValue), 5907db96d56Sopenharmony_ci ("inverse", MissingInverse), 5917db96d56Sopenharmony_ci ) 5927db96d56Sopenharmony_ci for meth, cls in dataset: 5937db96d56Sopenharmony_ci with self.subTest(meth=meth, cls=cls): 5947db96d56Sopenharmony_ci name = f"exc_{meth}" 5957db96d56Sopenharmony_ci self.con.create_window_function(name, 1, cls) 5967db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, 5977db96d56Sopenharmony_ci f"'{meth}' method not defined"): 5987db96d56Sopenharmony_ci self.cur.execute(self.query % name) 5997db96d56Sopenharmony_ci self.cur.fetchall() 6007db96d56Sopenharmony_ci 6017db96d56Sopenharmony_ci @with_tracebacks(AttributeError) 6027db96d56Sopenharmony_ci def test_win_missing_finalize(self): 6037db96d56Sopenharmony_ci # Note: SQLite does not (as of version 3.38.0) propagate finalize 6047db96d56Sopenharmony_ci # callback errors to sqlite3_step(); this implies that OperationalError 6057db96d56Sopenharmony_ci # is _not_ raised. 6067db96d56Sopenharmony_ci class MissingFinalize: 6077db96d56Sopenharmony_ci def step(self, x): pass 6087db96d56Sopenharmony_ci def value(self): return 42 6097db96d56Sopenharmony_ci def inverse(self, x): pass 6107db96d56Sopenharmony_ci 6117db96d56Sopenharmony_ci name = "missing_finalize" 6127db96d56Sopenharmony_ci self.con.create_window_function(name, 1, MissingFinalize) 6137db96d56Sopenharmony_ci self.cur.execute(self.query % name) 6147db96d56Sopenharmony_ci self.cur.fetchall() 6157db96d56Sopenharmony_ci 6167db96d56Sopenharmony_ci def test_win_clear_function(self): 6177db96d56Sopenharmony_ci self.con.create_window_function("sumint", 1, None) 6187db96d56Sopenharmony_ci self.assertRaises(sqlite.OperationalError, self.cur.execute, 6197db96d56Sopenharmony_ci self.query % "sumint") 6207db96d56Sopenharmony_ci 6217db96d56Sopenharmony_ci def test_win_redefine_function(self): 6227db96d56Sopenharmony_ci # Redefine WindowSumInt; adjust the expected results accordingly. 6237db96d56Sopenharmony_ci class Redefined(WindowSumInt): 6247db96d56Sopenharmony_ci def step(self, value): self.count += value * 2 6257db96d56Sopenharmony_ci def inverse(self, value): self.count -= value * 2 6267db96d56Sopenharmony_ci expected = [(v[0], v[1]*2) for v in self.expected] 6277db96d56Sopenharmony_ci 6287db96d56Sopenharmony_ci self.con.create_window_function("sumint", 1, Redefined) 6297db96d56Sopenharmony_ci self.cur.execute(self.query % "sumint") 6307db96d56Sopenharmony_ci self.assertEqual(self.cur.fetchall(), expected) 6317db96d56Sopenharmony_ci 6327db96d56Sopenharmony_ci def test_win_error_value_return(self): 6337db96d56Sopenharmony_ci class ErrorValueReturn: 6347db96d56Sopenharmony_ci def __init__(self): pass 6357db96d56Sopenharmony_ci def step(self, x): pass 6367db96d56Sopenharmony_ci def value(self): return 1 << 65 6377db96d56Sopenharmony_ci 6387db96d56Sopenharmony_ci self.con.create_window_function("err_val_ret", 1, ErrorValueReturn) 6397db96d56Sopenharmony_ci self.assertRaisesRegex(sqlite.DataError, "string or blob too big", 6407db96d56Sopenharmony_ci self.cur.execute, self.query % "err_val_ret") 6417db96d56Sopenharmony_ci 6427db96d56Sopenharmony_ci 6437db96d56Sopenharmony_ciclass AggregateTests(unittest.TestCase): 6447db96d56Sopenharmony_ci def setUp(self): 6457db96d56Sopenharmony_ci self.con = sqlite.connect(":memory:") 6467db96d56Sopenharmony_ci cur = self.con.cursor() 6477db96d56Sopenharmony_ci cur.execute(""" 6487db96d56Sopenharmony_ci create table test( 6497db96d56Sopenharmony_ci t text, 6507db96d56Sopenharmony_ci i integer, 6517db96d56Sopenharmony_ci f float, 6527db96d56Sopenharmony_ci n, 6537db96d56Sopenharmony_ci b blob 6547db96d56Sopenharmony_ci ) 6557db96d56Sopenharmony_ci """) 6567db96d56Sopenharmony_ci cur.execute("insert into test(t, i, f, n, b) values (?, ?, ?, ?, ?)", 6577db96d56Sopenharmony_ci ("foo", 5, 3.14, None, memoryview(b"blob"),)) 6587db96d56Sopenharmony_ci 6597db96d56Sopenharmony_ci self.con.create_aggregate("nostep", 1, AggrNoStep) 6607db96d56Sopenharmony_ci self.con.create_aggregate("nofinalize", 1, AggrNoFinalize) 6617db96d56Sopenharmony_ci self.con.create_aggregate("excInit", 1, AggrExceptionInInit) 6627db96d56Sopenharmony_ci self.con.create_aggregate("excStep", 1, AggrExceptionInStep) 6637db96d56Sopenharmony_ci self.con.create_aggregate("excFinalize", 1, AggrExceptionInFinalize) 6647db96d56Sopenharmony_ci self.con.create_aggregate("checkType", 2, AggrCheckType) 6657db96d56Sopenharmony_ci self.con.create_aggregate("checkTypes", -1, AggrCheckTypes) 6667db96d56Sopenharmony_ci self.con.create_aggregate("mysum", 1, AggrSum) 6677db96d56Sopenharmony_ci self.con.create_aggregate("aggtxt", 1, AggrText) 6687db96d56Sopenharmony_ci 6697db96d56Sopenharmony_ci def tearDown(self): 6707db96d56Sopenharmony_ci #self.cur.close() 6717db96d56Sopenharmony_ci #self.con.close() 6727db96d56Sopenharmony_ci pass 6737db96d56Sopenharmony_ci 6747db96d56Sopenharmony_ci def test_aggr_error_on_create(self): 6757db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError): 6767db96d56Sopenharmony_ci self.con.create_function("bla", -100, AggrSum) 6777db96d56Sopenharmony_ci 6787db96d56Sopenharmony_ci @with_tracebacks(AttributeError, name="AggrNoStep") 6797db96d56Sopenharmony_ci def test_aggr_no_step(self): 6807db96d56Sopenharmony_ci cur = self.con.cursor() 6817db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError) as cm: 6827db96d56Sopenharmony_ci cur.execute("select nostep(t) from test") 6837db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), 6847db96d56Sopenharmony_ci "user-defined aggregate's 'step' method not defined") 6857db96d56Sopenharmony_ci 6867db96d56Sopenharmony_ci def test_aggr_no_finalize(self): 6877db96d56Sopenharmony_ci cur = self.con.cursor() 6887db96d56Sopenharmony_ci msg = "user-defined aggregate's 'finalize' method not defined" 6897db96d56Sopenharmony_ci with self.assertRaisesRegex(sqlite.OperationalError, msg): 6907db96d56Sopenharmony_ci cur.execute("select nofinalize(t) from test") 6917db96d56Sopenharmony_ci val = cur.fetchone()[0] 6927db96d56Sopenharmony_ci 6937db96d56Sopenharmony_ci @with_tracebacks(ZeroDivisionError, name="AggrExceptionInInit") 6947db96d56Sopenharmony_ci def test_aggr_exception_in_init(self): 6957db96d56Sopenharmony_ci cur = self.con.cursor() 6967db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError) as cm: 6977db96d56Sopenharmony_ci cur.execute("select excInit(t) from test") 6987db96d56Sopenharmony_ci val = cur.fetchone()[0] 6997db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), "user-defined aggregate's '__init__' method raised error") 7007db96d56Sopenharmony_ci 7017db96d56Sopenharmony_ci @with_tracebacks(ZeroDivisionError, name="AggrExceptionInStep") 7027db96d56Sopenharmony_ci def test_aggr_exception_in_step(self): 7037db96d56Sopenharmony_ci cur = self.con.cursor() 7047db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError) as cm: 7057db96d56Sopenharmony_ci cur.execute("select excStep(t) from test") 7067db96d56Sopenharmony_ci val = cur.fetchone()[0] 7077db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), "user-defined aggregate's 'step' method raised error") 7087db96d56Sopenharmony_ci 7097db96d56Sopenharmony_ci @with_tracebacks(ZeroDivisionError, name="AggrExceptionInFinalize") 7107db96d56Sopenharmony_ci def test_aggr_exception_in_finalize(self): 7117db96d56Sopenharmony_ci cur = self.con.cursor() 7127db96d56Sopenharmony_ci with self.assertRaises(sqlite.OperationalError) as cm: 7137db96d56Sopenharmony_ci cur.execute("select excFinalize(t) from test") 7147db96d56Sopenharmony_ci val = cur.fetchone()[0] 7157db96d56Sopenharmony_ci self.assertEqual(str(cm.exception), "user-defined aggregate's 'finalize' method raised error") 7167db96d56Sopenharmony_ci 7177db96d56Sopenharmony_ci def test_aggr_check_param_str(self): 7187db96d56Sopenharmony_ci cur = self.con.cursor() 7197db96d56Sopenharmony_ci cur.execute("select checkTypes('str', ?, ?)", ("foo", str())) 7207db96d56Sopenharmony_ci val = cur.fetchone()[0] 7217db96d56Sopenharmony_ci self.assertEqual(val, 2) 7227db96d56Sopenharmony_ci 7237db96d56Sopenharmony_ci def test_aggr_check_param_int(self): 7247db96d56Sopenharmony_ci cur = self.con.cursor() 7257db96d56Sopenharmony_ci cur.execute("select checkType('int', ?)", (42,)) 7267db96d56Sopenharmony_ci val = cur.fetchone()[0] 7277db96d56Sopenharmony_ci self.assertEqual(val, 1) 7287db96d56Sopenharmony_ci 7297db96d56Sopenharmony_ci def test_aggr_check_params_int(self): 7307db96d56Sopenharmony_ci cur = self.con.cursor() 7317db96d56Sopenharmony_ci cur.execute("select checkTypes('int', ?, ?)", (42, 24)) 7327db96d56Sopenharmony_ci val = cur.fetchone()[0] 7337db96d56Sopenharmony_ci self.assertEqual(val, 2) 7347db96d56Sopenharmony_ci 7357db96d56Sopenharmony_ci def test_aggr_check_param_float(self): 7367db96d56Sopenharmony_ci cur = self.con.cursor() 7377db96d56Sopenharmony_ci cur.execute("select checkType('float', ?)", (3.14,)) 7387db96d56Sopenharmony_ci val = cur.fetchone()[0] 7397db96d56Sopenharmony_ci self.assertEqual(val, 1) 7407db96d56Sopenharmony_ci 7417db96d56Sopenharmony_ci def test_aggr_check_param_none(self): 7427db96d56Sopenharmony_ci cur = self.con.cursor() 7437db96d56Sopenharmony_ci cur.execute("select checkType('None', ?)", (None,)) 7447db96d56Sopenharmony_ci val = cur.fetchone()[0] 7457db96d56Sopenharmony_ci self.assertEqual(val, 1) 7467db96d56Sopenharmony_ci 7477db96d56Sopenharmony_ci def test_aggr_check_param_blob(self): 7487db96d56Sopenharmony_ci cur = self.con.cursor() 7497db96d56Sopenharmony_ci cur.execute("select checkType('blob', ?)", (memoryview(b"blob"),)) 7507db96d56Sopenharmony_ci val = cur.fetchone()[0] 7517db96d56Sopenharmony_ci self.assertEqual(val, 1) 7527db96d56Sopenharmony_ci 7537db96d56Sopenharmony_ci def test_aggr_check_aggr_sum(self): 7547db96d56Sopenharmony_ci cur = self.con.cursor() 7557db96d56Sopenharmony_ci cur.execute("delete from test") 7567db96d56Sopenharmony_ci cur.executemany("insert into test(i) values (?)", [(10,), (20,), (30,)]) 7577db96d56Sopenharmony_ci cur.execute("select mysum(i) from test") 7587db96d56Sopenharmony_ci val = cur.fetchone()[0] 7597db96d56Sopenharmony_ci self.assertEqual(val, 60) 7607db96d56Sopenharmony_ci 7617db96d56Sopenharmony_ci def test_aggr_no_match(self): 7627db96d56Sopenharmony_ci cur = self.con.execute("select mysum(i) from (select 1 as i) where i == 0") 7637db96d56Sopenharmony_ci val = cur.fetchone()[0] 7647db96d56Sopenharmony_ci self.assertIsNone(val) 7657db96d56Sopenharmony_ci 7667db96d56Sopenharmony_ci def test_aggr_text(self): 7677db96d56Sopenharmony_ci cur = self.con.cursor() 7687db96d56Sopenharmony_ci for txt in ["foo", "1\x002"]: 7697db96d56Sopenharmony_ci with self.subTest(txt=txt): 7707db96d56Sopenharmony_ci cur.execute("select aggtxt(?) from test", (txt,)) 7717db96d56Sopenharmony_ci val = cur.fetchone()[0] 7727db96d56Sopenharmony_ci self.assertEqual(val, txt) 7737db96d56Sopenharmony_ci 7747db96d56Sopenharmony_ci 7757db96d56Sopenharmony_ciclass AuthorizerTests(unittest.TestCase): 7767db96d56Sopenharmony_ci @staticmethod 7777db96d56Sopenharmony_ci def authorizer_cb(action, arg1, arg2, dbname, source): 7787db96d56Sopenharmony_ci if action != sqlite.SQLITE_SELECT: 7797db96d56Sopenharmony_ci return sqlite.SQLITE_DENY 7807db96d56Sopenharmony_ci if arg2 == 'c2' or arg1 == 't2': 7817db96d56Sopenharmony_ci return sqlite.SQLITE_DENY 7827db96d56Sopenharmony_ci return sqlite.SQLITE_OK 7837db96d56Sopenharmony_ci 7847db96d56Sopenharmony_ci def setUp(self): 7857db96d56Sopenharmony_ci self.con = sqlite.connect(":memory:") 7867db96d56Sopenharmony_ci self.con.executescript(""" 7877db96d56Sopenharmony_ci create table t1 (c1, c2); 7887db96d56Sopenharmony_ci create table t2 (c1, c2); 7897db96d56Sopenharmony_ci insert into t1 (c1, c2) values (1, 2); 7907db96d56Sopenharmony_ci insert into t2 (c1, c2) values (4, 5); 7917db96d56Sopenharmony_ci """) 7927db96d56Sopenharmony_ci 7937db96d56Sopenharmony_ci # For our security test: 7947db96d56Sopenharmony_ci self.con.execute("select c2 from t2") 7957db96d56Sopenharmony_ci 7967db96d56Sopenharmony_ci self.con.set_authorizer(self.authorizer_cb) 7977db96d56Sopenharmony_ci 7987db96d56Sopenharmony_ci def tearDown(self): 7997db96d56Sopenharmony_ci pass 8007db96d56Sopenharmony_ci 8017db96d56Sopenharmony_ci def test_table_access(self): 8027db96d56Sopenharmony_ci with self.assertRaises(sqlite.DatabaseError) as cm: 8037db96d56Sopenharmony_ci self.con.execute("select * from t2") 8047db96d56Sopenharmony_ci self.assertIn('prohibited', str(cm.exception)) 8057db96d56Sopenharmony_ci 8067db96d56Sopenharmony_ci def test_column_access(self): 8077db96d56Sopenharmony_ci with self.assertRaises(sqlite.DatabaseError) as cm: 8087db96d56Sopenharmony_ci self.con.execute("select c2 from t1") 8097db96d56Sopenharmony_ci self.assertIn('prohibited', str(cm.exception)) 8107db96d56Sopenharmony_ci 8117db96d56Sopenharmony_ci def test_clear_authorizer(self): 8127db96d56Sopenharmony_ci self.con.set_authorizer(None) 8137db96d56Sopenharmony_ci self.con.execute("select * from t2") 8147db96d56Sopenharmony_ci self.con.execute("select c2 from t1") 8157db96d56Sopenharmony_ci 8167db96d56Sopenharmony_ci 8177db96d56Sopenharmony_ciclass AuthorizerRaiseExceptionTests(AuthorizerTests): 8187db96d56Sopenharmony_ci @staticmethod 8197db96d56Sopenharmony_ci def authorizer_cb(action, arg1, arg2, dbname, source): 8207db96d56Sopenharmony_ci if action != sqlite.SQLITE_SELECT: 8217db96d56Sopenharmony_ci raise ValueError 8227db96d56Sopenharmony_ci if arg2 == 'c2' or arg1 == 't2': 8237db96d56Sopenharmony_ci raise ValueError 8247db96d56Sopenharmony_ci return sqlite.SQLITE_OK 8257db96d56Sopenharmony_ci 8267db96d56Sopenharmony_ci @with_tracebacks(ValueError, name="authorizer_cb") 8277db96d56Sopenharmony_ci def test_table_access(self): 8287db96d56Sopenharmony_ci super().test_table_access() 8297db96d56Sopenharmony_ci 8307db96d56Sopenharmony_ci @with_tracebacks(ValueError, name="authorizer_cb") 8317db96d56Sopenharmony_ci def test_column_access(self): 8327db96d56Sopenharmony_ci super().test_table_access() 8337db96d56Sopenharmony_ci 8347db96d56Sopenharmony_ciclass AuthorizerIllegalTypeTests(AuthorizerTests): 8357db96d56Sopenharmony_ci @staticmethod 8367db96d56Sopenharmony_ci def authorizer_cb(action, arg1, arg2, dbname, source): 8377db96d56Sopenharmony_ci if action != sqlite.SQLITE_SELECT: 8387db96d56Sopenharmony_ci return 0.0 8397db96d56Sopenharmony_ci if arg2 == 'c2' or arg1 == 't2': 8407db96d56Sopenharmony_ci return 0.0 8417db96d56Sopenharmony_ci return sqlite.SQLITE_OK 8427db96d56Sopenharmony_ci 8437db96d56Sopenharmony_ciclass AuthorizerLargeIntegerTests(AuthorizerTests): 8447db96d56Sopenharmony_ci @staticmethod 8457db96d56Sopenharmony_ci def authorizer_cb(action, arg1, arg2, dbname, source): 8467db96d56Sopenharmony_ci if action != sqlite.SQLITE_SELECT: 8477db96d56Sopenharmony_ci return 2**32 8487db96d56Sopenharmony_ci if arg2 == 'c2' or arg1 == 't2': 8497db96d56Sopenharmony_ci return 2**32 8507db96d56Sopenharmony_ci return sqlite.SQLITE_OK 8517db96d56Sopenharmony_ci 8527db96d56Sopenharmony_ci 8537db96d56Sopenharmony_ciif __name__ == "__main__": 8547db96d56Sopenharmony_ci unittest.main() 855