xref: /third_party/python/Lib/test/test_zlib.py (revision 7db96d56)
17db96d56Sopenharmony_ciimport unittest
27db96d56Sopenharmony_cifrom test import support
37db96d56Sopenharmony_cifrom test.support import import_helper
47db96d56Sopenharmony_ciimport binascii
57db96d56Sopenharmony_ciimport copy
67db96d56Sopenharmony_ciimport os
77db96d56Sopenharmony_ciimport pickle
87db96d56Sopenharmony_ciimport random
97db96d56Sopenharmony_ciimport sys
107db96d56Sopenharmony_cifrom test.support import bigmemtest, _1G, _4G
117db96d56Sopenharmony_ci
127db96d56Sopenharmony_ci
137db96d56Sopenharmony_cizlib = import_helper.import_module('zlib')
147db96d56Sopenharmony_ci
157db96d56Sopenharmony_cirequires_Compress_copy = unittest.skipUnless(
167db96d56Sopenharmony_ci        hasattr(zlib.compressobj(), "copy"),
177db96d56Sopenharmony_ci        'requires Compress.copy()')
187db96d56Sopenharmony_cirequires_Decompress_copy = unittest.skipUnless(
197db96d56Sopenharmony_ci        hasattr(zlib.decompressobj(), "copy"),
207db96d56Sopenharmony_ci        'requires Decompress.copy()')
217db96d56Sopenharmony_ci
227db96d56Sopenharmony_ci# bpo-46623: On s390x, when a hardware accelerator is used, using different
237db96d56Sopenharmony_ci# ways to compress data with zlib can produce different compressed data.
247db96d56Sopenharmony_ci# Simplified test_pair() code:
257db96d56Sopenharmony_ci#
267db96d56Sopenharmony_ci#   def func1(data):
277db96d56Sopenharmony_ci#       return zlib.compress(data)
287db96d56Sopenharmony_ci#
297db96d56Sopenharmony_ci#   def func2(data)
307db96d56Sopenharmony_ci#       co = zlib.compressobj()
317db96d56Sopenharmony_ci#       x1 = co.compress(data)
327db96d56Sopenharmony_ci#       x2 = co.flush()
337db96d56Sopenharmony_ci#       return x1 + x2
347db96d56Sopenharmony_ci#
357db96d56Sopenharmony_ci# On s390x if zlib uses a hardware accelerator, func1() creates a single
367db96d56Sopenharmony_ci# "final" compressed block whereas func2() produces 3 compressed blocks (the
377db96d56Sopenharmony_ci# last one is a final block). On other platforms with no accelerator, func1()
387db96d56Sopenharmony_ci# and func2() produce the same compressed data made of a single (final)
397db96d56Sopenharmony_ci# compressed block.
407db96d56Sopenharmony_ci#
417db96d56Sopenharmony_ci# Only the compressed data is different, the decompression returns the original
427db96d56Sopenharmony_ci# data:
437db96d56Sopenharmony_ci#
447db96d56Sopenharmony_ci#   zlib.decompress(func1(data)) == zlib.decompress(func2(data)) == data
457db96d56Sopenharmony_ci#
467db96d56Sopenharmony_ci# Make the assumption that s390x always has an accelerator to simplify the skip
477db96d56Sopenharmony_ci# condition. Windows doesn't have os.uname() but it doesn't support s390x.
487db96d56Sopenharmony_ciskip_on_s390x = unittest.skipIf(hasattr(os, 'uname') and os.uname().machine == 's390x',
497db96d56Sopenharmony_ci                                'skipped on s390x')
507db96d56Sopenharmony_ci
517db96d56Sopenharmony_ci
527db96d56Sopenharmony_ciclass VersionTestCase(unittest.TestCase):
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci    def test_library_version(self):
557db96d56Sopenharmony_ci        # Test that the major version of the actual library in use matches the
567db96d56Sopenharmony_ci        # major version that we were compiled against. We can't guarantee that
577db96d56Sopenharmony_ci        # the minor versions will match (even on the machine on which the module
587db96d56Sopenharmony_ci        # was compiled), and the API is stable between minor versions, so
597db96d56Sopenharmony_ci        # testing only the major versions avoids spurious failures.
607db96d56Sopenharmony_ci        self.assertEqual(zlib.ZLIB_RUNTIME_VERSION[0], zlib.ZLIB_VERSION[0])
617db96d56Sopenharmony_ci
627db96d56Sopenharmony_ci
637db96d56Sopenharmony_ciclass ChecksumTestCase(unittest.TestCase):
647db96d56Sopenharmony_ci    # checksum test cases
657db96d56Sopenharmony_ci    def test_crc32start(self):
667db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b""), zlib.crc32(b"", 0))
677db96d56Sopenharmony_ci        self.assertTrue(zlib.crc32(b"abc", 0xffffffff))
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci    def test_crc32empty(self):
707db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b"", 0), 0)
717db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b"", 1), 1)
727db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b"", 432), 432)
737db96d56Sopenharmony_ci
747db96d56Sopenharmony_ci    def test_adler32start(self):
757db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b""), zlib.adler32(b"", 1))
767db96d56Sopenharmony_ci        self.assertTrue(zlib.adler32(b"abc", 0xffffffff))
777db96d56Sopenharmony_ci
787db96d56Sopenharmony_ci    def test_adler32empty(self):
797db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b"", 0), 0)
807db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b"", 1), 1)
817db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b"", 432), 432)
827db96d56Sopenharmony_ci
837db96d56Sopenharmony_ci    def test_penguins(self):
847db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b"penguin", 0), 0x0e5c1a120)
857db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b"penguin", 1), 0x43b6aa94)
867db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b"penguin", 0), 0x0bcf02f6)
877db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b"penguin", 1), 0x0bd602f7)
887db96d56Sopenharmony_ci
897db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b"penguin"), zlib.crc32(b"penguin", 0))
907db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b"penguin"),zlib.adler32(b"penguin",1))
917db96d56Sopenharmony_ci
927db96d56Sopenharmony_ci    def test_crc32_adler32_unsigned(self):
937db96d56Sopenharmony_ci        foo = b'abcdefghijklmnop'
947db96d56Sopenharmony_ci        # explicitly test signed behavior
957db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(foo), 2486878355)
967db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(b'spam'), 1138425661)
977db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(foo+foo), 3573550353)
987db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(b'spam'), 72286642)
997db96d56Sopenharmony_ci
1007db96d56Sopenharmony_ci    def test_same_as_binascii_crc32(self):
1017db96d56Sopenharmony_ci        foo = b'abcdefghijklmnop'
1027db96d56Sopenharmony_ci        crc = 2486878355
1037db96d56Sopenharmony_ci        self.assertEqual(binascii.crc32(foo), crc)
1047db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(foo), crc)
1057db96d56Sopenharmony_ci        self.assertEqual(binascii.crc32(b'spam'), zlib.crc32(b'spam'))
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ci
1087db96d56Sopenharmony_ci# Issue #10276 - check that inputs >=4 GiB are handled correctly.
1097db96d56Sopenharmony_ciclass ChecksumBigBufferTestCase(unittest.TestCase):
1107db96d56Sopenharmony_ci
1117db96d56Sopenharmony_ci    @bigmemtest(size=_4G + 4, memuse=1, dry_run=False)
1127db96d56Sopenharmony_ci    def test_big_buffer(self, size):
1137db96d56Sopenharmony_ci        data = b"nyan" * (_1G + 1)
1147db96d56Sopenharmony_ci        self.assertEqual(zlib.crc32(data), 1044521549)
1157db96d56Sopenharmony_ci        self.assertEqual(zlib.adler32(data), 2256789997)
1167db96d56Sopenharmony_ci
1177db96d56Sopenharmony_ci
1187db96d56Sopenharmony_ciclass ExceptionTestCase(unittest.TestCase):
1197db96d56Sopenharmony_ci    # make sure we generate some expected errors
1207db96d56Sopenharmony_ci    def test_badlevel(self):
1217db96d56Sopenharmony_ci        # specifying compression level out of range causes an error
1227db96d56Sopenharmony_ci        # (but -1 is Z_DEFAULT_COMPRESSION and apparently the zlib
1237db96d56Sopenharmony_ci        # accepts 0 too)
1247db96d56Sopenharmony_ci        self.assertRaises(zlib.error, zlib.compress, b'ERROR', 10)
1257db96d56Sopenharmony_ci
1267db96d56Sopenharmony_ci    def test_badargs(self):
1277db96d56Sopenharmony_ci        self.assertRaises(TypeError, zlib.adler32)
1287db96d56Sopenharmony_ci        self.assertRaises(TypeError, zlib.crc32)
1297db96d56Sopenharmony_ci        self.assertRaises(TypeError, zlib.compress)
1307db96d56Sopenharmony_ci        self.assertRaises(TypeError, zlib.decompress)
1317db96d56Sopenharmony_ci        for arg in (42, None, '', 'abc', (), []):
1327db96d56Sopenharmony_ci            self.assertRaises(TypeError, zlib.adler32, arg)
1337db96d56Sopenharmony_ci            self.assertRaises(TypeError, zlib.crc32, arg)
1347db96d56Sopenharmony_ci            self.assertRaises(TypeError, zlib.compress, arg)
1357db96d56Sopenharmony_ci            self.assertRaises(TypeError, zlib.decompress, arg)
1367db96d56Sopenharmony_ci
1377db96d56Sopenharmony_ci    def test_badcompressobj(self):
1387db96d56Sopenharmony_ci        # verify failure on building compress object with bad params
1397db96d56Sopenharmony_ci        self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0)
1407db96d56Sopenharmony_ci        # specifying total bits too large causes an error
1417db96d56Sopenharmony_ci        self.assertRaises(ValueError,
1427db96d56Sopenharmony_ci                zlib.compressobj, 1, zlib.DEFLATED, zlib.MAX_WBITS + 1)
1437db96d56Sopenharmony_ci
1447db96d56Sopenharmony_ci    def test_baddecompressobj(self):
1457db96d56Sopenharmony_ci        # verify failure on building decompress object with bad params
1467db96d56Sopenharmony_ci        self.assertRaises(ValueError, zlib.decompressobj, -1)
1477db96d56Sopenharmony_ci
1487db96d56Sopenharmony_ci    def test_decompressobj_badflush(self):
1497db96d56Sopenharmony_ci        # verify failure on calling decompressobj.flush with bad params
1507db96d56Sopenharmony_ci        self.assertRaises(ValueError, zlib.decompressobj().flush, 0)
1517db96d56Sopenharmony_ci        self.assertRaises(ValueError, zlib.decompressobj().flush, -1)
1527db96d56Sopenharmony_ci
1537db96d56Sopenharmony_ci    @support.cpython_only
1547db96d56Sopenharmony_ci    def test_overflow(self):
1557db96d56Sopenharmony_ci        with self.assertRaisesRegex(OverflowError, 'int too large'):
1567db96d56Sopenharmony_ci            zlib.decompress(b'', 15, sys.maxsize + 1)
1577db96d56Sopenharmony_ci        with self.assertRaisesRegex(OverflowError, 'int too large'):
1587db96d56Sopenharmony_ci            zlib.decompressobj().decompress(b'', sys.maxsize + 1)
1597db96d56Sopenharmony_ci        with self.assertRaisesRegex(OverflowError, 'int too large'):
1607db96d56Sopenharmony_ci            zlib.decompressobj().flush(sys.maxsize + 1)
1617db96d56Sopenharmony_ci
1627db96d56Sopenharmony_ci    @support.cpython_only
1637db96d56Sopenharmony_ci    def test_disallow_instantiation(self):
1647db96d56Sopenharmony_ci        # Ensure that the type disallows instantiation (bpo-43916)
1657db96d56Sopenharmony_ci        support.check_disallow_instantiation(self, type(zlib.compressobj()))
1667db96d56Sopenharmony_ci        support.check_disallow_instantiation(self, type(zlib.decompressobj()))
1677db96d56Sopenharmony_ci
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ciclass BaseCompressTestCase(object):
1707db96d56Sopenharmony_ci    def check_big_compress_buffer(self, size, compress_func):
1717db96d56Sopenharmony_ci        _1M = 1024 * 1024
1727db96d56Sopenharmony_ci        # Generate 10 MiB worth of random, and expand it by repeating it.
1737db96d56Sopenharmony_ci        # The assumption is that zlib's memory is not big enough to exploit
1747db96d56Sopenharmony_ci        # such spread out redundancy.
1757db96d56Sopenharmony_ci        data = random.randbytes(_1M * 10)
1767db96d56Sopenharmony_ci        data = data * (size // len(data) + 1)
1777db96d56Sopenharmony_ci        try:
1787db96d56Sopenharmony_ci            compress_func(data)
1797db96d56Sopenharmony_ci        finally:
1807db96d56Sopenharmony_ci            # Release memory
1817db96d56Sopenharmony_ci            data = None
1827db96d56Sopenharmony_ci
1837db96d56Sopenharmony_ci    def check_big_decompress_buffer(self, size, decompress_func):
1847db96d56Sopenharmony_ci        data = b'x' * size
1857db96d56Sopenharmony_ci        try:
1867db96d56Sopenharmony_ci            compressed = zlib.compress(data, 1)
1877db96d56Sopenharmony_ci        finally:
1887db96d56Sopenharmony_ci            # Release memory
1897db96d56Sopenharmony_ci            data = None
1907db96d56Sopenharmony_ci        data = decompress_func(compressed)
1917db96d56Sopenharmony_ci        # Sanity check
1927db96d56Sopenharmony_ci        try:
1937db96d56Sopenharmony_ci            self.assertEqual(len(data), size)
1947db96d56Sopenharmony_ci            self.assertEqual(len(data.strip(b'x')), 0)
1957db96d56Sopenharmony_ci        finally:
1967db96d56Sopenharmony_ci            data = None
1977db96d56Sopenharmony_ci
1987db96d56Sopenharmony_ci
1997db96d56Sopenharmony_ciclass CompressTestCase(BaseCompressTestCase, unittest.TestCase):
2007db96d56Sopenharmony_ci    # Test compression in one go (whole message compression)
2017db96d56Sopenharmony_ci    def test_speech(self):
2027db96d56Sopenharmony_ci        x = zlib.compress(HAMLET_SCENE)
2037db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
2047db96d56Sopenharmony_ci
2057db96d56Sopenharmony_ci    def test_keywords(self):
2067db96d56Sopenharmony_ci        x = zlib.compress(HAMLET_SCENE, level=3)
2077db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(x), HAMLET_SCENE)
2087db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
2097db96d56Sopenharmony_ci            zlib.compress(data=HAMLET_SCENE, level=3)
2107db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(x,
2117db96d56Sopenharmony_ci                                         wbits=zlib.MAX_WBITS,
2127db96d56Sopenharmony_ci                                         bufsize=zlib.DEF_BUF_SIZE),
2137db96d56Sopenharmony_ci                         HAMLET_SCENE)
2147db96d56Sopenharmony_ci
2157db96d56Sopenharmony_ci    @skip_on_s390x
2167db96d56Sopenharmony_ci    def test_speech128(self):
2177db96d56Sopenharmony_ci        # compress more data
2187db96d56Sopenharmony_ci        data = HAMLET_SCENE * 128
2197db96d56Sopenharmony_ci        x = zlib.compress(data)
2207db96d56Sopenharmony_ci        self.assertEqual(zlib.compress(bytearray(data)), x)
2217db96d56Sopenharmony_ci        for ob in x, bytearray(x):
2227db96d56Sopenharmony_ci            self.assertEqual(zlib.decompress(ob), data)
2237db96d56Sopenharmony_ci
2247db96d56Sopenharmony_ci    def test_incomplete_stream(self):
2257db96d56Sopenharmony_ci        # A useful error message is given
2267db96d56Sopenharmony_ci        x = zlib.compress(HAMLET_SCENE)
2277db96d56Sopenharmony_ci        self.assertRaisesRegex(zlib.error,
2287db96d56Sopenharmony_ci            "Error -5 while decompressing data: incomplete or truncated stream",
2297db96d56Sopenharmony_ci            zlib.decompress, x[:-1])
2307db96d56Sopenharmony_ci
2317db96d56Sopenharmony_ci    # Memory use of the following functions takes into account overallocation
2327db96d56Sopenharmony_ci
2337db96d56Sopenharmony_ci    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
2347db96d56Sopenharmony_ci    def test_big_compress_buffer(self, size):
2357db96d56Sopenharmony_ci        compress = lambda s: zlib.compress(s, 1)
2367db96d56Sopenharmony_ci        self.check_big_compress_buffer(size, compress)
2377db96d56Sopenharmony_ci
2387db96d56Sopenharmony_ci    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
2397db96d56Sopenharmony_ci    def test_big_decompress_buffer(self, size):
2407db96d56Sopenharmony_ci        self.check_big_decompress_buffer(size, zlib.decompress)
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_ci    @bigmemtest(size=_4G, memuse=1)
2437db96d56Sopenharmony_ci    def test_large_bufsize(self, size):
2447db96d56Sopenharmony_ci        # Test decompress(bufsize) parameter greater than the internal limit
2457db96d56Sopenharmony_ci        data = HAMLET_SCENE * 10
2467db96d56Sopenharmony_ci        compressed = zlib.compress(data, 1)
2477db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(compressed, 15, size), data)
2487db96d56Sopenharmony_ci
2497db96d56Sopenharmony_ci    def test_custom_bufsize(self):
2507db96d56Sopenharmony_ci        data = HAMLET_SCENE * 10
2517db96d56Sopenharmony_ci        compressed = zlib.compress(data, 1)
2527db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(compressed, 15, CustomInt()), data)
2537db96d56Sopenharmony_ci
2547db96d56Sopenharmony_ci    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
2557db96d56Sopenharmony_ci    @bigmemtest(size=_4G + 100, memuse=4)
2567db96d56Sopenharmony_ci    def test_64bit_compress(self, size):
2577db96d56Sopenharmony_ci        data = b'x' * size
2587db96d56Sopenharmony_ci        try:
2597db96d56Sopenharmony_ci            comp = zlib.compress(data, 0)
2607db96d56Sopenharmony_ci            self.assertEqual(zlib.decompress(comp), data)
2617db96d56Sopenharmony_ci        finally:
2627db96d56Sopenharmony_ci            comp = data = None
2637db96d56Sopenharmony_ci
2647db96d56Sopenharmony_ci
2657db96d56Sopenharmony_ciclass CompressObjectTestCase(BaseCompressTestCase, unittest.TestCase):
2667db96d56Sopenharmony_ci    # Test compression object
2677db96d56Sopenharmony_ci    @skip_on_s390x
2687db96d56Sopenharmony_ci    def test_pair(self):
2697db96d56Sopenharmony_ci        # straightforward compress/decompress objects
2707db96d56Sopenharmony_ci        datasrc = HAMLET_SCENE * 128
2717db96d56Sopenharmony_ci        datazip = zlib.compress(datasrc)
2727db96d56Sopenharmony_ci        # should compress both bytes and bytearray data
2737db96d56Sopenharmony_ci        for data in (datasrc, bytearray(datasrc)):
2747db96d56Sopenharmony_ci            co = zlib.compressobj()
2757db96d56Sopenharmony_ci            x1 = co.compress(data)
2767db96d56Sopenharmony_ci            x2 = co.flush()
2777db96d56Sopenharmony_ci            self.assertRaises(zlib.error, co.flush) # second flush should not work
2787db96d56Sopenharmony_ci            self.assertEqual(x1 + x2, datazip)
2797db96d56Sopenharmony_ci        for v1, v2 in ((x1, x2), (bytearray(x1), bytearray(x2))):
2807db96d56Sopenharmony_ci            dco = zlib.decompressobj()
2817db96d56Sopenharmony_ci            y1 = dco.decompress(v1 + v2)
2827db96d56Sopenharmony_ci            y2 = dco.flush()
2837db96d56Sopenharmony_ci            self.assertEqual(data, y1 + y2)
2847db96d56Sopenharmony_ci            self.assertIsInstance(dco.unconsumed_tail, bytes)
2857db96d56Sopenharmony_ci            self.assertIsInstance(dco.unused_data, bytes)
2867db96d56Sopenharmony_ci
2877db96d56Sopenharmony_ci    def test_keywords(self):
2887db96d56Sopenharmony_ci        level = 2
2897db96d56Sopenharmony_ci        method = zlib.DEFLATED
2907db96d56Sopenharmony_ci        wbits = -12
2917db96d56Sopenharmony_ci        memLevel = 9
2927db96d56Sopenharmony_ci        strategy = zlib.Z_FILTERED
2937db96d56Sopenharmony_ci        co = zlib.compressobj(level=level,
2947db96d56Sopenharmony_ci                              method=method,
2957db96d56Sopenharmony_ci                              wbits=wbits,
2967db96d56Sopenharmony_ci                              memLevel=memLevel,
2977db96d56Sopenharmony_ci                              strategy=strategy,
2987db96d56Sopenharmony_ci                              zdict=b"")
2997db96d56Sopenharmony_ci        do = zlib.decompressobj(wbits=wbits, zdict=b"")
3007db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
3017db96d56Sopenharmony_ci            co.compress(data=HAMLET_SCENE)
3027db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
3037db96d56Sopenharmony_ci            do.decompress(data=zlib.compress(HAMLET_SCENE))
3047db96d56Sopenharmony_ci        x = co.compress(HAMLET_SCENE) + co.flush()
3057db96d56Sopenharmony_ci        y = do.decompress(x, max_length=len(HAMLET_SCENE)) + do.flush()
3067db96d56Sopenharmony_ci        self.assertEqual(HAMLET_SCENE, y)
3077db96d56Sopenharmony_ci
3087db96d56Sopenharmony_ci    def test_compressoptions(self):
3097db96d56Sopenharmony_ci        # specify lots of options to compressobj()
3107db96d56Sopenharmony_ci        level = 2
3117db96d56Sopenharmony_ci        method = zlib.DEFLATED
3127db96d56Sopenharmony_ci        wbits = -12
3137db96d56Sopenharmony_ci        memLevel = 9
3147db96d56Sopenharmony_ci        strategy = zlib.Z_FILTERED
3157db96d56Sopenharmony_ci        co = zlib.compressobj(level, method, wbits, memLevel, strategy)
3167db96d56Sopenharmony_ci        x1 = co.compress(HAMLET_SCENE)
3177db96d56Sopenharmony_ci        x2 = co.flush()
3187db96d56Sopenharmony_ci        dco = zlib.decompressobj(wbits)
3197db96d56Sopenharmony_ci        y1 = dco.decompress(x1 + x2)
3207db96d56Sopenharmony_ci        y2 = dco.flush()
3217db96d56Sopenharmony_ci        self.assertEqual(HAMLET_SCENE, y1 + y2)
3227db96d56Sopenharmony_ci
3237db96d56Sopenharmony_ci    def test_compressincremental(self):
3247db96d56Sopenharmony_ci        # compress object in steps, decompress object as one-shot
3257db96d56Sopenharmony_ci        data = HAMLET_SCENE * 128
3267db96d56Sopenharmony_ci        co = zlib.compressobj()
3277db96d56Sopenharmony_ci        bufs = []
3287db96d56Sopenharmony_ci        for i in range(0, len(data), 256):
3297db96d56Sopenharmony_ci            bufs.append(co.compress(data[i:i+256]))
3307db96d56Sopenharmony_ci        bufs.append(co.flush())
3317db96d56Sopenharmony_ci        combuf = b''.join(bufs)
3327db96d56Sopenharmony_ci
3337db96d56Sopenharmony_ci        dco = zlib.decompressobj()
3347db96d56Sopenharmony_ci        y1 = dco.decompress(b''.join(bufs))
3357db96d56Sopenharmony_ci        y2 = dco.flush()
3367db96d56Sopenharmony_ci        self.assertEqual(data, y1 + y2)
3377db96d56Sopenharmony_ci
3387db96d56Sopenharmony_ci    def test_decompinc(self, flush=False, source=None, cx=256, dcx=64):
3397db96d56Sopenharmony_ci        # compress object in steps, decompress object in steps
3407db96d56Sopenharmony_ci        source = source or HAMLET_SCENE
3417db96d56Sopenharmony_ci        data = source * 128
3427db96d56Sopenharmony_ci        co = zlib.compressobj()
3437db96d56Sopenharmony_ci        bufs = []
3447db96d56Sopenharmony_ci        for i in range(0, len(data), cx):
3457db96d56Sopenharmony_ci            bufs.append(co.compress(data[i:i+cx]))
3467db96d56Sopenharmony_ci        bufs.append(co.flush())
3477db96d56Sopenharmony_ci        combuf = b''.join(bufs)
3487db96d56Sopenharmony_ci
3497db96d56Sopenharmony_ci        decombuf = zlib.decompress(combuf)
3507db96d56Sopenharmony_ci        # Test type of return value
3517db96d56Sopenharmony_ci        self.assertIsInstance(decombuf, bytes)
3527db96d56Sopenharmony_ci
3537db96d56Sopenharmony_ci        self.assertEqual(data, decombuf)
3547db96d56Sopenharmony_ci
3557db96d56Sopenharmony_ci        dco = zlib.decompressobj()
3567db96d56Sopenharmony_ci        bufs = []
3577db96d56Sopenharmony_ci        for i in range(0, len(combuf), dcx):
3587db96d56Sopenharmony_ci            bufs.append(dco.decompress(combuf[i:i+dcx]))
3597db96d56Sopenharmony_ci            self.assertEqual(b'', dco.unconsumed_tail, ########
3607db96d56Sopenharmony_ci                             "(A) uct should be b'': not %d long" %
3617db96d56Sopenharmony_ci                                       len(dco.unconsumed_tail))
3627db96d56Sopenharmony_ci            self.assertEqual(b'', dco.unused_data)
3637db96d56Sopenharmony_ci        if flush:
3647db96d56Sopenharmony_ci            bufs.append(dco.flush())
3657db96d56Sopenharmony_ci        else:
3667db96d56Sopenharmony_ci            while True:
3677db96d56Sopenharmony_ci                chunk = dco.decompress(b'')
3687db96d56Sopenharmony_ci                if chunk:
3697db96d56Sopenharmony_ci                    bufs.append(chunk)
3707db96d56Sopenharmony_ci                else:
3717db96d56Sopenharmony_ci                    break
3727db96d56Sopenharmony_ci        self.assertEqual(b'', dco.unconsumed_tail, ########
3737db96d56Sopenharmony_ci                         "(B) uct should be b'': not %d long" %
3747db96d56Sopenharmony_ci                                       len(dco.unconsumed_tail))
3757db96d56Sopenharmony_ci        self.assertEqual(b'', dco.unused_data)
3767db96d56Sopenharmony_ci        self.assertEqual(data, b''.join(bufs))
3777db96d56Sopenharmony_ci        # Failure means: "decompressobj with init options failed"
3787db96d56Sopenharmony_ci
3797db96d56Sopenharmony_ci    def test_decompincflush(self):
3807db96d56Sopenharmony_ci        self.test_decompinc(flush=True)
3817db96d56Sopenharmony_ci
3827db96d56Sopenharmony_ci    def test_decompimax(self, source=None, cx=256, dcx=64):
3837db96d56Sopenharmony_ci        # compress in steps, decompress in length-restricted steps
3847db96d56Sopenharmony_ci        source = source or HAMLET_SCENE
3857db96d56Sopenharmony_ci        # Check a decompression object with max_length specified
3867db96d56Sopenharmony_ci        data = source * 128
3877db96d56Sopenharmony_ci        co = zlib.compressobj()
3887db96d56Sopenharmony_ci        bufs = []
3897db96d56Sopenharmony_ci        for i in range(0, len(data), cx):
3907db96d56Sopenharmony_ci            bufs.append(co.compress(data[i:i+cx]))
3917db96d56Sopenharmony_ci        bufs.append(co.flush())
3927db96d56Sopenharmony_ci        combuf = b''.join(bufs)
3937db96d56Sopenharmony_ci        self.assertEqual(data, zlib.decompress(combuf),
3947db96d56Sopenharmony_ci                         'compressed data failure')
3957db96d56Sopenharmony_ci
3967db96d56Sopenharmony_ci        dco = zlib.decompressobj()
3977db96d56Sopenharmony_ci        bufs = []
3987db96d56Sopenharmony_ci        cb = combuf
3997db96d56Sopenharmony_ci        while cb:
4007db96d56Sopenharmony_ci            #max_length = 1 + len(cb)//10
4017db96d56Sopenharmony_ci            chunk = dco.decompress(cb, dcx)
4027db96d56Sopenharmony_ci            self.assertFalse(len(chunk) > dcx,
4037db96d56Sopenharmony_ci                    'chunk too big (%d>%d)' % (len(chunk), dcx))
4047db96d56Sopenharmony_ci            bufs.append(chunk)
4057db96d56Sopenharmony_ci            cb = dco.unconsumed_tail
4067db96d56Sopenharmony_ci        bufs.append(dco.flush())
4077db96d56Sopenharmony_ci        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
4087db96d56Sopenharmony_ci
4097db96d56Sopenharmony_ci    def test_decompressmaxlen(self, flush=False):
4107db96d56Sopenharmony_ci        # Check a decompression object with max_length specified
4117db96d56Sopenharmony_ci        data = HAMLET_SCENE * 128
4127db96d56Sopenharmony_ci        co = zlib.compressobj()
4137db96d56Sopenharmony_ci        bufs = []
4147db96d56Sopenharmony_ci        for i in range(0, len(data), 256):
4157db96d56Sopenharmony_ci            bufs.append(co.compress(data[i:i+256]))
4167db96d56Sopenharmony_ci        bufs.append(co.flush())
4177db96d56Sopenharmony_ci        combuf = b''.join(bufs)
4187db96d56Sopenharmony_ci        self.assertEqual(data, zlib.decompress(combuf),
4197db96d56Sopenharmony_ci                         'compressed data failure')
4207db96d56Sopenharmony_ci
4217db96d56Sopenharmony_ci        dco = zlib.decompressobj()
4227db96d56Sopenharmony_ci        bufs = []
4237db96d56Sopenharmony_ci        cb = combuf
4247db96d56Sopenharmony_ci        while cb:
4257db96d56Sopenharmony_ci            max_length = 1 + len(cb)//10
4267db96d56Sopenharmony_ci            chunk = dco.decompress(cb, max_length)
4277db96d56Sopenharmony_ci            self.assertFalse(len(chunk) > max_length,
4287db96d56Sopenharmony_ci                        'chunk too big (%d>%d)' % (len(chunk),max_length))
4297db96d56Sopenharmony_ci            bufs.append(chunk)
4307db96d56Sopenharmony_ci            cb = dco.unconsumed_tail
4317db96d56Sopenharmony_ci        if flush:
4327db96d56Sopenharmony_ci            bufs.append(dco.flush())
4337db96d56Sopenharmony_ci        else:
4347db96d56Sopenharmony_ci            while chunk:
4357db96d56Sopenharmony_ci                chunk = dco.decompress(b'', max_length)
4367db96d56Sopenharmony_ci                self.assertFalse(len(chunk) > max_length,
4377db96d56Sopenharmony_ci                            'chunk too big (%d>%d)' % (len(chunk),max_length))
4387db96d56Sopenharmony_ci                bufs.append(chunk)
4397db96d56Sopenharmony_ci        self.assertEqual(data, b''.join(bufs), 'Wrong data retrieved')
4407db96d56Sopenharmony_ci
4417db96d56Sopenharmony_ci    def test_decompressmaxlenflush(self):
4427db96d56Sopenharmony_ci        self.test_decompressmaxlen(flush=True)
4437db96d56Sopenharmony_ci
4447db96d56Sopenharmony_ci    def test_maxlenmisc(self):
4457db96d56Sopenharmony_ci        # Misc tests of max_length
4467db96d56Sopenharmony_ci        dco = zlib.decompressobj()
4477db96d56Sopenharmony_ci        self.assertRaises(ValueError, dco.decompress, b"", -1)
4487db96d56Sopenharmony_ci        self.assertEqual(b'', dco.unconsumed_tail)
4497db96d56Sopenharmony_ci
4507db96d56Sopenharmony_ci    def test_maxlen_large(self):
4517db96d56Sopenharmony_ci        # Sizes up to sys.maxsize should be accepted, although zlib is
4527db96d56Sopenharmony_ci        # internally limited to expressing sizes with unsigned int
4537db96d56Sopenharmony_ci        data = HAMLET_SCENE * 10
4547db96d56Sopenharmony_ci        self.assertGreater(len(data), zlib.DEF_BUF_SIZE)
4557db96d56Sopenharmony_ci        compressed = zlib.compress(data, 1)
4567db96d56Sopenharmony_ci        dco = zlib.decompressobj()
4577db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(compressed, sys.maxsize), data)
4587db96d56Sopenharmony_ci
4597db96d56Sopenharmony_ci    def test_maxlen_custom(self):
4607db96d56Sopenharmony_ci        data = HAMLET_SCENE * 10
4617db96d56Sopenharmony_ci        compressed = zlib.compress(data, 1)
4627db96d56Sopenharmony_ci        dco = zlib.decompressobj()
4637db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(compressed, CustomInt()), data[:100])
4647db96d56Sopenharmony_ci
4657db96d56Sopenharmony_ci    def test_clear_unconsumed_tail(self):
4667db96d56Sopenharmony_ci        # Issue #12050: calling decompress() without providing max_length
4677db96d56Sopenharmony_ci        # should clear the unconsumed_tail attribute.
4687db96d56Sopenharmony_ci        cdata = b"x\x9cKLJ\x06\x00\x02M\x01"    # "abc"
4697db96d56Sopenharmony_ci        dco = zlib.decompressobj()
4707db96d56Sopenharmony_ci        ddata = dco.decompress(cdata, 1)
4717db96d56Sopenharmony_ci        ddata += dco.decompress(dco.unconsumed_tail)
4727db96d56Sopenharmony_ci        self.assertEqual(dco.unconsumed_tail, b"")
4737db96d56Sopenharmony_ci
4747db96d56Sopenharmony_ci    def test_flushes(self):
4757db96d56Sopenharmony_ci        # Test flush() with the various options, using all the
4767db96d56Sopenharmony_ci        # different levels in order to provide more variations.
4777db96d56Sopenharmony_ci        sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH',
4787db96d56Sopenharmony_ci                    'Z_PARTIAL_FLUSH']
4797db96d56Sopenharmony_ci
4807db96d56Sopenharmony_ci        ver = tuple(int(v) for v in zlib.ZLIB_RUNTIME_VERSION.split('.'))
4817db96d56Sopenharmony_ci        # Z_BLOCK has a known failure prior to 1.2.5.3
4827db96d56Sopenharmony_ci        if ver >= (1, 2, 5, 3):
4837db96d56Sopenharmony_ci            sync_opt.append('Z_BLOCK')
4847db96d56Sopenharmony_ci
4857db96d56Sopenharmony_ci        sync_opt = [getattr(zlib, opt) for opt in sync_opt
4867db96d56Sopenharmony_ci                    if hasattr(zlib, opt)]
4877db96d56Sopenharmony_ci        data = HAMLET_SCENE * 8
4887db96d56Sopenharmony_ci
4897db96d56Sopenharmony_ci        for sync in sync_opt:
4907db96d56Sopenharmony_ci            for level in range(10):
4917db96d56Sopenharmony_ci                try:
4927db96d56Sopenharmony_ci                    obj = zlib.compressobj( level )
4937db96d56Sopenharmony_ci                    a = obj.compress( data[:3000] )
4947db96d56Sopenharmony_ci                    b = obj.flush( sync )
4957db96d56Sopenharmony_ci                    c = obj.compress( data[3000:] )
4967db96d56Sopenharmony_ci                    d = obj.flush()
4977db96d56Sopenharmony_ci                except:
4987db96d56Sopenharmony_ci                    print("Error for flush mode={}, level={}"
4997db96d56Sopenharmony_ci                          .format(sync, level))
5007db96d56Sopenharmony_ci                    raise
5017db96d56Sopenharmony_ci                self.assertEqual(zlib.decompress(b''.join([a,b,c,d])),
5027db96d56Sopenharmony_ci                                 data, ("Decompress failed: flush "
5037db96d56Sopenharmony_ci                                        "mode=%i, level=%i") % (sync, level))
5047db96d56Sopenharmony_ci                del obj
5057db96d56Sopenharmony_ci
5067db96d56Sopenharmony_ci    @unittest.skipUnless(hasattr(zlib, 'Z_SYNC_FLUSH'),
5077db96d56Sopenharmony_ci                         'requires zlib.Z_SYNC_FLUSH')
5087db96d56Sopenharmony_ci    def test_odd_flush(self):
5097db96d56Sopenharmony_ci        # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1
5107db96d56Sopenharmony_ci        import random
5117db96d56Sopenharmony_ci        # Testing on 17K of "random" data
5127db96d56Sopenharmony_ci
5137db96d56Sopenharmony_ci        # Create compressor and decompressor objects
5147db96d56Sopenharmony_ci        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
5157db96d56Sopenharmony_ci        dco = zlib.decompressobj()
5167db96d56Sopenharmony_ci
5177db96d56Sopenharmony_ci        # Try 17K of data
5187db96d56Sopenharmony_ci        # generate random data stream
5197db96d56Sopenharmony_ci        try:
5207db96d56Sopenharmony_ci            # In 2.3 and later, WichmannHill is the RNG of the bug report
5217db96d56Sopenharmony_ci            gen = random.WichmannHill()
5227db96d56Sopenharmony_ci        except AttributeError:
5237db96d56Sopenharmony_ci            try:
5247db96d56Sopenharmony_ci                # 2.2 called it Random
5257db96d56Sopenharmony_ci                gen = random.Random()
5267db96d56Sopenharmony_ci            except AttributeError:
5277db96d56Sopenharmony_ci                # others might simply have a single RNG
5287db96d56Sopenharmony_ci                gen = random
5297db96d56Sopenharmony_ci        gen.seed(1)
5307db96d56Sopenharmony_ci        data = gen.randbytes(17 * 1024)
5317db96d56Sopenharmony_ci
5327db96d56Sopenharmony_ci        # compress, sync-flush, and decompress
5337db96d56Sopenharmony_ci        first = co.compress(data)
5347db96d56Sopenharmony_ci        second = co.flush(zlib.Z_SYNC_FLUSH)
5357db96d56Sopenharmony_ci        expanded = dco.decompress(first + second)
5367db96d56Sopenharmony_ci
5377db96d56Sopenharmony_ci        # if decompressed data is different from the input data, choke.
5387db96d56Sopenharmony_ci        self.assertEqual(expanded, data, "17K random source doesn't match")
5397db96d56Sopenharmony_ci
5407db96d56Sopenharmony_ci    def test_empty_flush(self):
5417db96d56Sopenharmony_ci        # Test that calling .flush() on unused objects works.
5427db96d56Sopenharmony_ci        # (Bug #1083110 -- calling .flush() on decompress objects
5437db96d56Sopenharmony_ci        # caused a core dump.)
5447db96d56Sopenharmony_ci
5457db96d56Sopenharmony_ci        co = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
5467db96d56Sopenharmony_ci        self.assertTrue(co.flush())  # Returns a zlib header
5477db96d56Sopenharmony_ci        dco = zlib.decompressobj()
5487db96d56Sopenharmony_ci        self.assertEqual(dco.flush(), b"") # Returns nothing
5497db96d56Sopenharmony_ci
5507db96d56Sopenharmony_ci    def test_dictionary(self):
5517db96d56Sopenharmony_ci        h = HAMLET_SCENE
5527db96d56Sopenharmony_ci        # Build a simulated dictionary out of the words in HAMLET.
5537db96d56Sopenharmony_ci        words = h.split()
5547db96d56Sopenharmony_ci        random.shuffle(words)
5557db96d56Sopenharmony_ci        zdict = b''.join(words)
5567db96d56Sopenharmony_ci        # Use it to compress HAMLET.
5577db96d56Sopenharmony_ci        co = zlib.compressobj(zdict=zdict)
5587db96d56Sopenharmony_ci        cd = co.compress(h) + co.flush()
5597db96d56Sopenharmony_ci        # Verify that it will decompress with the dictionary.
5607db96d56Sopenharmony_ci        dco = zlib.decompressobj(zdict=zdict)
5617db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(cd) + dco.flush(), h)
5627db96d56Sopenharmony_ci        # Verify that it fails when not given the dictionary.
5637db96d56Sopenharmony_ci        dco = zlib.decompressobj()
5647db96d56Sopenharmony_ci        self.assertRaises(zlib.error, dco.decompress, cd)
5657db96d56Sopenharmony_ci
5667db96d56Sopenharmony_ci    def test_dictionary_streaming(self):
5677db96d56Sopenharmony_ci        # This simulates the reuse of a compressor object for compressing
5687db96d56Sopenharmony_ci        # several separate data streams.
5697db96d56Sopenharmony_ci        co = zlib.compressobj(zdict=HAMLET_SCENE)
5707db96d56Sopenharmony_ci        do = zlib.decompressobj(zdict=HAMLET_SCENE)
5717db96d56Sopenharmony_ci        piece = HAMLET_SCENE[1000:1500]
5727db96d56Sopenharmony_ci        d0 = co.compress(piece) + co.flush(zlib.Z_SYNC_FLUSH)
5737db96d56Sopenharmony_ci        d1 = co.compress(piece[100:]) + co.flush(zlib.Z_SYNC_FLUSH)
5747db96d56Sopenharmony_ci        d2 = co.compress(piece[:-100]) + co.flush(zlib.Z_SYNC_FLUSH)
5757db96d56Sopenharmony_ci        self.assertEqual(do.decompress(d0), piece)
5767db96d56Sopenharmony_ci        self.assertEqual(do.decompress(d1), piece[100:])
5777db96d56Sopenharmony_ci        self.assertEqual(do.decompress(d2), piece[:-100])
5787db96d56Sopenharmony_ci
5797db96d56Sopenharmony_ci    def test_decompress_incomplete_stream(self):
5807db96d56Sopenharmony_ci        # This is 'foo', deflated
5817db96d56Sopenharmony_ci        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'
5827db96d56Sopenharmony_ci        # For the record
5837db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(x), b'foo')
5847db96d56Sopenharmony_ci        self.assertRaises(zlib.error, zlib.decompress, x[:-5])
5857db96d56Sopenharmony_ci        # Omitting the stream end works with decompressor objects
5867db96d56Sopenharmony_ci        # (see issue #8672).
5877db96d56Sopenharmony_ci        dco = zlib.decompressobj()
5887db96d56Sopenharmony_ci        y = dco.decompress(x[:-5])
5897db96d56Sopenharmony_ci        y += dco.flush()
5907db96d56Sopenharmony_ci        self.assertEqual(y, b'foo')
5917db96d56Sopenharmony_ci
5927db96d56Sopenharmony_ci    def test_decompress_eof(self):
5937db96d56Sopenharmony_ci        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
5947db96d56Sopenharmony_ci        dco = zlib.decompressobj()
5957db96d56Sopenharmony_ci        self.assertFalse(dco.eof)
5967db96d56Sopenharmony_ci        dco.decompress(x[:-5])
5977db96d56Sopenharmony_ci        self.assertFalse(dco.eof)
5987db96d56Sopenharmony_ci        dco.decompress(x[-5:])
5997db96d56Sopenharmony_ci        self.assertTrue(dco.eof)
6007db96d56Sopenharmony_ci        dco.flush()
6017db96d56Sopenharmony_ci        self.assertTrue(dco.eof)
6027db96d56Sopenharmony_ci
6037db96d56Sopenharmony_ci    def test_decompress_eof_incomplete_stream(self):
6047db96d56Sopenharmony_ci        x = b'x\x9cK\xcb\xcf\x07\x00\x02\x82\x01E'  # 'foo'
6057db96d56Sopenharmony_ci        dco = zlib.decompressobj()
6067db96d56Sopenharmony_ci        self.assertFalse(dco.eof)
6077db96d56Sopenharmony_ci        dco.decompress(x[:-5])
6087db96d56Sopenharmony_ci        self.assertFalse(dco.eof)
6097db96d56Sopenharmony_ci        dco.flush()
6107db96d56Sopenharmony_ci        self.assertFalse(dco.eof)
6117db96d56Sopenharmony_ci
6127db96d56Sopenharmony_ci    def test_decompress_unused_data(self):
6137db96d56Sopenharmony_ci        # Repeated calls to decompress() after EOF should accumulate data in
6147db96d56Sopenharmony_ci        # dco.unused_data, instead of just storing the arg to the last call.
6157db96d56Sopenharmony_ci        source = b'abcdefghijklmnopqrstuvwxyz'
6167db96d56Sopenharmony_ci        remainder = b'0123456789'
6177db96d56Sopenharmony_ci        y = zlib.compress(source)
6187db96d56Sopenharmony_ci        x = y + remainder
6197db96d56Sopenharmony_ci        for maxlen in 0, 1000:
6207db96d56Sopenharmony_ci            for step in 1, 2, len(y), len(x):
6217db96d56Sopenharmony_ci                dco = zlib.decompressobj()
6227db96d56Sopenharmony_ci                data = b''
6237db96d56Sopenharmony_ci                for i in range(0, len(x), step):
6247db96d56Sopenharmony_ci                    if i < len(y):
6257db96d56Sopenharmony_ci                        self.assertEqual(dco.unused_data, b'')
6267db96d56Sopenharmony_ci                    if maxlen == 0:
6277db96d56Sopenharmony_ci                        data += dco.decompress(x[i : i + step])
6287db96d56Sopenharmony_ci                        self.assertEqual(dco.unconsumed_tail, b'')
6297db96d56Sopenharmony_ci                    else:
6307db96d56Sopenharmony_ci                        data += dco.decompress(
6317db96d56Sopenharmony_ci                                dco.unconsumed_tail + x[i : i + step], maxlen)
6327db96d56Sopenharmony_ci                data += dco.flush()
6337db96d56Sopenharmony_ci                self.assertTrue(dco.eof)
6347db96d56Sopenharmony_ci                self.assertEqual(data, source)
6357db96d56Sopenharmony_ci                self.assertEqual(dco.unconsumed_tail, b'')
6367db96d56Sopenharmony_ci                self.assertEqual(dco.unused_data, remainder)
6377db96d56Sopenharmony_ci
6387db96d56Sopenharmony_ci    # issue27164
6397db96d56Sopenharmony_ci    def test_decompress_raw_with_dictionary(self):
6407db96d56Sopenharmony_ci        zdict = b'abcdefghijklmnopqrstuvwxyz'
6417db96d56Sopenharmony_ci        co = zlib.compressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
6427db96d56Sopenharmony_ci        comp = co.compress(zdict) + co.flush()
6437db96d56Sopenharmony_ci        dco = zlib.decompressobj(wbits=-zlib.MAX_WBITS, zdict=zdict)
6447db96d56Sopenharmony_ci        uncomp = dco.decompress(comp) + dco.flush()
6457db96d56Sopenharmony_ci        self.assertEqual(zdict, uncomp)
6467db96d56Sopenharmony_ci
6477db96d56Sopenharmony_ci    def test_flush_with_freed_input(self):
6487db96d56Sopenharmony_ci        # Issue #16411: decompressor accesses input to last decompress() call
6497db96d56Sopenharmony_ci        # in flush(), even if this object has been freed in the meanwhile.
6507db96d56Sopenharmony_ci        input1 = b'abcdefghijklmnopqrstuvwxyz'
6517db96d56Sopenharmony_ci        input2 = b'QWERTYUIOPASDFGHJKLZXCVBNM'
6527db96d56Sopenharmony_ci        data = zlib.compress(input1)
6537db96d56Sopenharmony_ci        dco = zlib.decompressobj()
6547db96d56Sopenharmony_ci        dco.decompress(data, 1)
6557db96d56Sopenharmony_ci        del data
6567db96d56Sopenharmony_ci        data = zlib.compress(input2)
6577db96d56Sopenharmony_ci        self.assertEqual(dco.flush(), input1[1:])
6587db96d56Sopenharmony_ci
6597db96d56Sopenharmony_ci    @bigmemtest(size=_4G, memuse=1)
6607db96d56Sopenharmony_ci    def test_flush_large_length(self, size):
6617db96d56Sopenharmony_ci        # Test flush(length) parameter greater than internal limit UINT_MAX
6627db96d56Sopenharmony_ci        input = HAMLET_SCENE * 10
6637db96d56Sopenharmony_ci        data = zlib.compress(input, 1)
6647db96d56Sopenharmony_ci        dco = zlib.decompressobj()
6657db96d56Sopenharmony_ci        dco.decompress(data, 1)
6667db96d56Sopenharmony_ci        self.assertEqual(dco.flush(size), input[1:])
6677db96d56Sopenharmony_ci
6687db96d56Sopenharmony_ci    def test_flush_custom_length(self):
6697db96d56Sopenharmony_ci        input = HAMLET_SCENE * 10
6707db96d56Sopenharmony_ci        data = zlib.compress(input, 1)
6717db96d56Sopenharmony_ci        dco = zlib.decompressobj()
6727db96d56Sopenharmony_ci        dco.decompress(data, 1)
6737db96d56Sopenharmony_ci        self.assertEqual(dco.flush(CustomInt()), input[1:])
6747db96d56Sopenharmony_ci
6757db96d56Sopenharmony_ci    @requires_Compress_copy
6767db96d56Sopenharmony_ci    def test_compresscopy(self):
6777db96d56Sopenharmony_ci        # Test copying a compression object
6787db96d56Sopenharmony_ci        data0 = HAMLET_SCENE
6797db96d56Sopenharmony_ci        data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii")
6807db96d56Sopenharmony_ci        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
6817db96d56Sopenharmony_ci            c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION)
6827db96d56Sopenharmony_ci            bufs0 = []
6837db96d56Sopenharmony_ci            bufs0.append(c0.compress(data0))
6847db96d56Sopenharmony_ci
6857db96d56Sopenharmony_ci            c1 = func(c0)
6867db96d56Sopenharmony_ci            bufs1 = bufs0[:]
6877db96d56Sopenharmony_ci
6887db96d56Sopenharmony_ci            bufs0.append(c0.compress(data0))
6897db96d56Sopenharmony_ci            bufs0.append(c0.flush())
6907db96d56Sopenharmony_ci            s0 = b''.join(bufs0)
6917db96d56Sopenharmony_ci
6927db96d56Sopenharmony_ci            bufs1.append(c1.compress(data1))
6937db96d56Sopenharmony_ci            bufs1.append(c1.flush())
6947db96d56Sopenharmony_ci            s1 = b''.join(bufs1)
6957db96d56Sopenharmony_ci
6967db96d56Sopenharmony_ci            self.assertEqual(zlib.decompress(s0),data0+data0)
6977db96d56Sopenharmony_ci            self.assertEqual(zlib.decompress(s1),data0+data1)
6987db96d56Sopenharmony_ci
6997db96d56Sopenharmony_ci    @requires_Compress_copy
7007db96d56Sopenharmony_ci    def test_badcompresscopy(self):
7017db96d56Sopenharmony_ci        # Test copying a compression object in an inconsistent state
7027db96d56Sopenharmony_ci        c = zlib.compressobj()
7037db96d56Sopenharmony_ci        c.compress(HAMLET_SCENE)
7047db96d56Sopenharmony_ci        c.flush()
7057db96d56Sopenharmony_ci        self.assertRaises(ValueError, c.copy)
7067db96d56Sopenharmony_ci        self.assertRaises(ValueError, copy.copy, c)
7077db96d56Sopenharmony_ci        self.assertRaises(ValueError, copy.deepcopy, c)
7087db96d56Sopenharmony_ci
7097db96d56Sopenharmony_ci    @requires_Decompress_copy
7107db96d56Sopenharmony_ci    def test_decompresscopy(self):
7117db96d56Sopenharmony_ci        # Test copying a decompression object
7127db96d56Sopenharmony_ci        data = HAMLET_SCENE
7137db96d56Sopenharmony_ci        comp = zlib.compress(data)
7147db96d56Sopenharmony_ci        # Test type of return value
7157db96d56Sopenharmony_ci        self.assertIsInstance(comp, bytes)
7167db96d56Sopenharmony_ci
7177db96d56Sopenharmony_ci        for func in lambda c: c.copy(), copy.copy, copy.deepcopy:
7187db96d56Sopenharmony_ci            d0 = zlib.decompressobj()
7197db96d56Sopenharmony_ci            bufs0 = []
7207db96d56Sopenharmony_ci            bufs0.append(d0.decompress(comp[:32]))
7217db96d56Sopenharmony_ci
7227db96d56Sopenharmony_ci            d1 = func(d0)
7237db96d56Sopenharmony_ci            bufs1 = bufs0[:]
7247db96d56Sopenharmony_ci
7257db96d56Sopenharmony_ci            bufs0.append(d0.decompress(comp[32:]))
7267db96d56Sopenharmony_ci            s0 = b''.join(bufs0)
7277db96d56Sopenharmony_ci
7287db96d56Sopenharmony_ci            bufs1.append(d1.decompress(comp[32:]))
7297db96d56Sopenharmony_ci            s1 = b''.join(bufs1)
7307db96d56Sopenharmony_ci
7317db96d56Sopenharmony_ci            self.assertEqual(s0,s1)
7327db96d56Sopenharmony_ci            self.assertEqual(s0,data)
7337db96d56Sopenharmony_ci
7347db96d56Sopenharmony_ci    @requires_Decompress_copy
7357db96d56Sopenharmony_ci    def test_baddecompresscopy(self):
7367db96d56Sopenharmony_ci        # Test copying a compression object in an inconsistent state
7377db96d56Sopenharmony_ci        data = zlib.compress(HAMLET_SCENE)
7387db96d56Sopenharmony_ci        d = zlib.decompressobj()
7397db96d56Sopenharmony_ci        d.decompress(data)
7407db96d56Sopenharmony_ci        d.flush()
7417db96d56Sopenharmony_ci        self.assertRaises(ValueError, d.copy)
7427db96d56Sopenharmony_ci        self.assertRaises(ValueError, copy.copy, d)
7437db96d56Sopenharmony_ci        self.assertRaises(ValueError, copy.deepcopy, d)
7447db96d56Sopenharmony_ci
7457db96d56Sopenharmony_ci    def test_compresspickle(self):
7467db96d56Sopenharmony_ci        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
7477db96d56Sopenharmony_ci            with self.assertRaises((TypeError, pickle.PicklingError)):
7487db96d56Sopenharmony_ci                pickle.dumps(zlib.compressobj(zlib.Z_BEST_COMPRESSION), proto)
7497db96d56Sopenharmony_ci
7507db96d56Sopenharmony_ci    def test_decompresspickle(self):
7517db96d56Sopenharmony_ci        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
7527db96d56Sopenharmony_ci            with self.assertRaises((TypeError, pickle.PicklingError)):
7537db96d56Sopenharmony_ci                pickle.dumps(zlib.decompressobj(), proto)
7547db96d56Sopenharmony_ci
7557db96d56Sopenharmony_ci    # Memory use of the following functions takes into account overallocation
7567db96d56Sopenharmony_ci
7577db96d56Sopenharmony_ci    @bigmemtest(size=_1G + 1024 * 1024, memuse=3)
7587db96d56Sopenharmony_ci    def test_big_compress_buffer(self, size):
7597db96d56Sopenharmony_ci        c = zlib.compressobj(1)
7607db96d56Sopenharmony_ci        compress = lambda s: c.compress(s) + c.flush()
7617db96d56Sopenharmony_ci        self.check_big_compress_buffer(size, compress)
7627db96d56Sopenharmony_ci
7637db96d56Sopenharmony_ci    @bigmemtest(size=_1G + 1024 * 1024, memuse=2)
7647db96d56Sopenharmony_ci    def test_big_decompress_buffer(self, size):
7657db96d56Sopenharmony_ci        d = zlib.decompressobj()
7667db96d56Sopenharmony_ci        decompress = lambda s: d.decompress(s) + d.flush()
7677db96d56Sopenharmony_ci        self.check_big_decompress_buffer(size, decompress)
7687db96d56Sopenharmony_ci
7697db96d56Sopenharmony_ci    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
7707db96d56Sopenharmony_ci    @bigmemtest(size=_4G + 100, memuse=4)
7717db96d56Sopenharmony_ci    def test_64bit_compress(self, size):
7727db96d56Sopenharmony_ci        data = b'x' * size
7737db96d56Sopenharmony_ci        co = zlib.compressobj(0)
7747db96d56Sopenharmony_ci        do = zlib.decompressobj()
7757db96d56Sopenharmony_ci        try:
7767db96d56Sopenharmony_ci            comp = co.compress(data) + co.flush()
7777db96d56Sopenharmony_ci            uncomp = do.decompress(comp) + do.flush()
7787db96d56Sopenharmony_ci            self.assertEqual(uncomp, data)
7797db96d56Sopenharmony_ci        finally:
7807db96d56Sopenharmony_ci            comp = uncomp = data = None
7817db96d56Sopenharmony_ci
7827db96d56Sopenharmony_ci    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
7837db96d56Sopenharmony_ci    @bigmemtest(size=_4G + 100, memuse=3)
7847db96d56Sopenharmony_ci    def test_large_unused_data(self, size):
7857db96d56Sopenharmony_ci        data = b'abcdefghijklmnop'
7867db96d56Sopenharmony_ci        unused = b'x' * size
7877db96d56Sopenharmony_ci        comp = zlib.compress(data) + unused
7887db96d56Sopenharmony_ci        do = zlib.decompressobj()
7897db96d56Sopenharmony_ci        try:
7907db96d56Sopenharmony_ci            uncomp = do.decompress(comp) + do.flush()
7917db96d56Sopenharmony_ci            self.assertEqual(unused, do.unused_data)
7927db96d56Sopenharmony_ci            self.assertEqual(uncomp, data)
7937db96d56Sopenharmony_ci        finally:
7947db96d56Sopenharmony_ci            unused = comp = do = None
7957db96d56Sopenharmony_ci
7967db96d56Sopenharmony_ci    @unittest.skipUnless(sys.maxsize > 2**32, 'requires 64bit platform')
7977db96d56Sopenharmony_ci    @bigmemtest(size=_4G + 100, memuse=5)
7987db96d56Sopenharmony_ci    def test_large_unconsumed_tail(self, size):
7997db96d56Sopenharmony_ci        data = b'x' * size
8007db96d56Sopenharmony_ci        do = zlib.decompressobj()
8017db96d56Sopenharmony_ci        try:
8027db96d56Sopenharmony_ci            comp = zlib.compress(data, 0)
8037db96d56Sopenharmony_ci            uncomp = do.decompress(comp, 1) + do.flush()
8047db96d56Sopenharmony_ci            self.assertEqual(uncomp, data)
8057db96d56Sopenharmony_ci            self.assertEqual(do.unconsumed_tail, b'')
8067db96d56Sopenharmony_ci        finally:
8077db96d56Sopenharmony_ci            comp = uncomp = data = None
8087db96d56Sopenharmony_ci
8097db96d56Sopenharmony_ci    def test_wbits(self):
8107db96d56Sopenharmony_ci        # wbits=0 only supported since zlib v1.2.3.5
8117db96d56Sopenharmony_ci        # Register "1.2.3" as "1.2.3.0"
8127db96d56Sopenharmony_ci        # or "1.2.0-linux","1.2.0.f","1.2.0.f-linux"
8137db96d56Sopenharmony_ci        v = zlib.ZLIB_RUNTIME_VERSION.split('-', 1)[0].split('.')
8147db96d56Sopenharmony_ci        if len(v) < 4:
8157db96d56Sopenharmony_ci            v.append('0')
8167db96d56Sopenharmony_ci        elif not v[-1].isnumeric():
8177db96d56Sopenharmony_ci            v[-1] = '0'
8187db96d56Sopenharmony_ci
8197db96d56Sopenharmony_ci        v = tuple(map(int, v))
8207db96d56Sopenharmony_ci        supports_wbits_0 = v >= (1, 2, 3, 5)
8217db96d56Sopenharmony_ci
8227db96d56Sopenharmony_ci        co = zlib.compressobj(level=1, wbits=15)
8237db96d56Sopenharmony_ci        zlib15 = co.compress(HAMLET_SCENE) + co.flush()
8247db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(zlib15, 15), HAMLET_SCENE)
8257db96d56Sopenharmony_ci        if supports_wbits_0:
8267db96d56Sopenharmony_ci            self.assertEqual(zlib.decompress(zlib15, 0), HAMLET_SCENE)
8277db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(zlib15, 32 + 15), HAMLET_SCENE)
8287db96d56Sopenharmony_ci        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
8297db96d56Sopenharmony_ci            zlib.decompress(zlib15, 14)
8307db96d56Sopenharmony_ci        dco = zlib.decompressobj(wbits=32 + 15)
8317db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(zlib15), HAMLET_SCENE)
8327db96d56Sopenharmony_ci        dco = zlib.decompressobj(wbits=14)
8337db96d56Sopenharmony_ci        with self.assertRaisesRegex(zlib.error, 'invalid window size'):
8347db96d56Sopenharmony_ci            dco.decompress(zlib15)
8357db96d56Sopenharmony_ci
8367db96d56Sopenharmony_ci        co = zlib.compressobj(level=1, wbits=9)
8377db96d56Sopenharmony_ci        zlib9 = co.compress(HAMLET_SCENE) + co.flush()
8387db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(zlib9, 9), HAMLET_SCENE)
8397db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(zlib9, 15), HAMLET_SCENE)
8407db96d56Sopenharmony_ci        if supports_wbits_0:
8417db96d56Sopenharmony_ci            self.assertEqual(zlib.decompress(zlib9, 0), HAMLET_SCENE)
8427db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(zlib9, 32 + 9), HAMLET_SCENE)
8437db96d56Sopenharmony_ci        dco = zlib.decompressobj(wbits=32 + 9)
8447db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(zlib9), HAMLET_SCENE)
8457db96d56Sopenharmony_ci
8467db96d56Sopenharmony_ci        co = zlib.compressobj(level=1, wbits=-15)
8477db96d56Sopenharmony_ci        deflate15 = co.compress(HAMLET_SCENE) + co.flush()
8487db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(deflate15, -15), HAMLET_SCENE)
8497db96d56Sopenharmony_ci        dco = zlib.decompressobj(wbits=-15)
8507db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(deflate15), HAMLET_SCENE)
8517db96d56Sopenharmony_ci
8527db96d56Sopenharmony_ci        co = zlib.compressobj(level=1, wbits=-9)
8537db96d56Sopenharmony_ci        deflate9 = co.compress(HAMLET_SCENE) + co.flush()
8547db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(deflate9, -9), HAMLET_SCENE)
8557db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(deflate9, -15), HAMLET_SCENE)
8567db96d56Sopenharmony_ci        dco = zlib.decompressobj(wbits=-9)
8577db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(deflate9), HAMLET_SCENE)
8587db96d56Sopenharmony_ci
8597db96d56Sopenharmony_ci        co = zlib.compressobj(level=1, wbits=16 + 15)
8607db96d56Sopenharmony_ci        gzip = co.compress(HAMLET_SCENE) + co.flush()
8617db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(gzip, 16 + 15), HAMLET_SCENE)
8627db96d56Sopenharmony_ci        self.assertEqual(zlib.decompress(gzip, 32 + 15), HAMLET_SCENE)
8637db96d56Sopenharmony_ci        dco = zlib.decompressobj(32 + 15)
8647db96d56Sopenharmony_ci        self.assertEqual(dco.decompress(gzip), HAMLET_SCENE)
8657db96d56Sopenharmony_ci
8667db96d56Sopenharmony_ci        for wbits in (-15, 15, 31):
8677db96d56Sopenharmony_ci            with self.subTest(wbits=wbits):
8687db96d56Sopenharmony_ci                expected = HAMLET_SCENE
8697db96d56Sopenharmony_ci                actual = zlib.decompress(
8707db96d56Sopenharmony_ci                    zlib.compress(HAMLET_SCENE, wbits=wbits), wbits=wbits
8717db96d56Sopenharmony_ci                )
8727db96d56Sopenharmony_ci                self.assertEqual(expected, actual)
8737db96d56Sopenharmony_ci
8747db96d56Sopenharmony_cidef choose_lines(source, number, seed=None, generator=random):
8757db96d56Sopenharmony_ci    """Return a list of number lines randomly chosen from the source"""
8767db96d56Sopenharmony_ci    if seed is not None:
8777db96d56Sopenharmony_ci        generator.seed(seed)
8787db96d56Sopenharmony_ci    sources = source.split('\n')
8797db96d56Sopenharmony_ci    return [generator.choice(sources) for n in range(number)]
8807db96d56Sopenharmony_ci
8817db96d56Sopenharmony_ci
8827db96d56Sopenharmony_ciHAMLET_SCENE = b"""
8837db96d56Sopenharmony_ciLAERTES
8847db96d56Sopenharmony_ci
8857db96d56Sopenharmony_ci       O, fear me not.
8867db96d56Sopenharmony_ci       I stay too long: but here my father comes.
8877db96d56Sopenharmony_ci
8887db96d56Sopenharmony_ci       Enter POLONIUS
8897db96d56Sopenharmony_ci
8907db96d56Sopenharmony_ci       A double blessing is a double grace,
8917db96d56Sopenharmony_ci       Occasion smiles upon a second leave.
8927db96d56Sopenharmony_ci
8937db96d56Sopenharmony_ciLORD POLONIUS
8947db96d56Sopenharmony_ci
8957db96d56Sopenharmony_ci       Yet here, Laertes! aboard, aboard, for shame!
8967db96d56Sopenharmony_ci       The wind sits in the shoulder of your sail,
8977db96d56Sopenharmony_ci       And you are stay'd for. There; my blessing with thee!
8987db96d56Sopenharmony_ci       And these few precepts in thy memory
8997db96d56Sopenharmony_ci       See thou character. Give thy thoughts no tongue,
9007db96d56Sopenharmony_ci       Nor any unproportioned thought his act.
9017db96d56Sopenharmony_ci       Be thou familiar, but by no means vulgar.
9027db96d56Sopenharmony_ci       Those friends thou hast, and their adoption tried,
9037db96d56Sopenharmony_ci       Grapple them to thy soul with hoops of steel;
9047db96d56Sopenharmony_ci       But do not dull thy palm with entertainment
9057db96d56Sopenharmony_ci       Of each new-hatch'd, unfledged comrade. Beware
9067db96d56Sopenharmony_ci       Of entrance to a quarrel, but being in,
9077db96d56Sopenharmony_ci       Bear't that the opposed may beware of thee.
9087db96d56Sopenharmony_ci       Give every man thy ear, but few thy voice;
9097db96d56Sopenharmony_ci       Take each man's censure, but reserve thy judgment.
9107db96d56Sopenharmony_ci       Costly thy habit as thy purse can buy,
9117db96d56Sopenharmony_ci       But not express'd in fancy; rich, not gaudy;
9127db96d56Sopenharmony_ci       For the apparel oft proclaims the man,
9137db96d56Sopenharmony_ci       And they in France of the best rank and station
9147db96d56Sopenharmony_ci       Are of a most select and generous chief in that.
9157db96d56Sopenharmony_ci       Neither a borrower nor a lender be;
9167db96d56Sopenharmony_ci       For loan oft loses both itself and friend,
9177db96d56Sopenharmony_ci       And borrowing dulls the edge of husbandry.
9187db96d56Sopenharmony_ci       This above all: to thine ownself be true,
9197db96d56Sopenharmony_ci       And it must follow, as the night the day,
9207db96d56Sopenharmony_ci       Thou canst not then be false to any man.
9217db96d56Sopenharmony_ci       Farewell: my blessing season this in thee!
9227db96d56Sopenharmony_ci
9237db96d56Sopenharmony_ciLAERTES
9247db96d56Sopenharmony_ci
9257db96d56Sopenharmony_ci       Most humbly do I take my leave, my lord.
9267db96d56Sopenharmony_ci
9277db96d56Sopenharmony_ciLORD POLONIUS
9287db96d56Sopenharmony_ci
9297db96d56Sopenharmony_ci       The time invites you; go; your servants tend.
9307db96d56Sopenharmony_ci
9317db96d56Sopenharmony_ciLAERTES
9327db96d56Sopenharmony_ci
9337db96d56Sopenharmony_ci       Farewell, Ophelia; and remember well
9347db96d56Sopenharmony_ci       What I have said to you.
9357db96d56Sopenharmony_ci
9367db96d56Sopenharmony_ciOPHELIA
9377db96d56Sopenharmony_ci
9387db96d56Sopenharmony_ci       'Tis in my memory lock'd,
9397db96d56Sopenharmony_ci       And you yourself shall keep the key of it.
9407db96d56Sopenharmony_ci
9417db96d56Sopenharmony_ciLAERTES
9427db96d56Sopenharmony_ci
9437db96d56Sopenharmony_ci       Farewell.
9447db96d56Sopenharmony_ci"""
9457db96d56Sopenharmony_ci
9467db96d56Sopenharmony_ci
9477db96d56Sopenharmony_ciclass CustomInt:
9487db96d56Sopenharmony_ci    def __index__(self):
9497db96d56Sopenharmony_ci        return 100
9507db96d56Sopenharmony_ci
9517db96d56Sopenharmony_ci
9527db96d56Sopenharmony_ciif __name__ == "__main__":
9537db96d56Sopenharmony_ci    unittest.main()
954