xref: /third_party/python/Lib/test/test_gzip.py (revision 7db96d56)
17db96d56Sopenharmony_ci"""Test script for the gzip module.
27db96d56Sopenharmony_ci"""
37db96d56Sopenharmony_ci
47db96d56Sopenharmony_ciimport array
57db96d56Sopenharmony_ciimport functools
67db96d56Sopenharmony_ciimport io
77db96d56Sopenharmony_ciimport os
87db96d56Sopenharmony_ciimport pathlib
97db96d56Sopenharmony_ciimport struct
107db96d56Sopenharmony_ciimport sys
117db96d56Sopenharmony_ciimport unittest
127db96d56Sopenharmony_cifrom subprocess import PIPE, Popen
137db96d56Sopenharmony_cifrom test.support import import_helper
147db96d56Sopenharmony_cifrom test.support import os_helper
157db96d56Sopenharmony_cifrom test.support import _4G, bigmemtest, requires_subprocess
167db96d56Sopenharmony_cifrom test.support.script_helper import assert_python_ok, assert_python_failure
177db96d56Sopenharmony_ci
187db96d56Sopenharmony_cigzip = import_helper.import_module('gzip')
197db96d56Sopenharmony_ci
207db96d56Sopenharmony_cidata1 = b"""  int length=DEFAULTALLOC, err = Z_OK;
217db96d56Sopenharmony_ci  PyObject *RetVal;
227db96d56Sopenharmony_ci  int flushmode = Z_FINISH;
237db96d56Sopenharmony_ci  unsigned long start_total_out;
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ci"""
267db96d56Sopenharmony_ci
277db96d56Sopenharmony_cidata2 = b"""/* zlibmodule.c -- gzip-compatible data compression */
287db96d56Sopenharmony_ci/* See http://www.gzip.org/zlib/
297db96d56Sopenharmony_ci/* See http://www.winimage.com/zLibDll for Windows */
307db96d56Sopenharmony_ci"""
317db96d56Sopenharmony_ci
327db96d56Sopenharmony_ci
337db96d56Sopenharmony_ciTEMPDIR = os.path.abspath(os_helper.TESTFN) + '-gzdir'
347db96d56Sopenharmony_ci
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ciclass UnseekableIO(io.BytesIO):
377db96d56Sopenharmony_ci    def seekable(self):
387db96d56Sopenharmony_ci        return False
397db96d56Sopenharmony_ci
407db96d56Sopenharmony_ci    def tell(self):
417db96d56Sopenharmony_ci        raise io.UnsupportedOperation
427db96d56Sopenharmony_ci
437db96d56Sopenharmony_ci    def seek(self, *args):
447db96d56Sopenharmony_ci        raise io.UnsupportedOperation
457db96d56Sopenharmony_ci
467db96d56Sopenharmony_ci
477db96d56Sopenharmony_ciclass BaseTest(unittest.TestCase):
487db96d56Sopenharmony_ci    filename = os_helper.TESTFN
497db96d56Sopenharmony_ci
507db96d56Sopenharmony_ci    def setUp(self):
517db96d56Sopenharmony_ci        os_helper.unlink(self.filename)
527db96d56Sopenharmony_ci
537db96d56Sopenharmony_ci    def tearDown(self):
547db96d56Sopenharmony_ci        os_helper.unlink(self.filename)
557db96d56Sopenharmony_ci
567db96d56Sopenharmony_ci
577db96d56Sopenharmony_ciclass TestGzip(BaseTest):
587db96d56Sopenharmony_ci    def write_and_read_back(self, data, mode='b'):
597db96d56Sopenharmony_ci        b_data = bytes(data)
607db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'w'+mode) as f:
617db96d56Sopenharmony_ci            l = f.write(data)
627db96d56Sopenharmony_ci        self.assertEqual(l, len(b_data))
637db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'r'+mode) as f:
647db96d56Sopenharmony_ci            self.assertEqual(f.read(), b_data)
657db96d56Sopenharmony_ci
667db96d56Sopenharmony_ci    def test_write(self):
677db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'wb') as f:
687db96d56Sopenharmony_ci            f.write(data1 * 50)
697db96d56Sopenharmony_ci
707db96d56Sopenharmony_ci            # Try flush and fileno.
717db96d56Sopenharmony_ci            f.flush()
727db96d56Sopenharmony_ci            f.fileno()
737db96d56Sopenharmony_ci            if hasattr(os, 'fsync'):
747db96d56Sopenharmony_ci                os.fsync(f.fileno())
757db96d56Sopenharmony_ci            f.close()
767db96d56Sopenharmony_ci
777db96d56Sopenharmony_ci        # Test multiple close() calls.
787db96d56Sopenharmony_ci        f.close()
797db96d56Sopenharmony_ci
807db96d56Sopenharmony_ci    def test_write_read_with_pathlike_file(self):
817db96d56Sopenharmony_ci        filename = pathlib.Path(self.filename)
827db96d56Sopenharmony_ci        with gzip.GzipFile(filename, 'w') as f:
837db96d56Sopenharmony_ci            f.write(data1 * 50)
847db96d56Sopenharmony_ci        self.assertIsInstance(f.name, str)
857db96d56Sopenharmony_ci        with gzip.GzipFile(filename, 'a') as f:
867db96d56Sopenharmony_ci            f.write(data1)
877db96d56Sopenharmony_ci        with gzip.GzipFile(filename) as f:
887db96d56Sopenharmony_ci            d = f.read()
897db96d56Sopenharmony_ci        self.assertEqual(d, data1 * 51)
907db96d56Sopenharmony_ci        self.assertIsInstance(f.name, str)
917db96d56Sopenharmony_ci
927db96d56Sopenharmony_ci    # The following test_write_xy methods test that write accepts
937db96d56Sopenharmony_ci    # the corresponding bytes-like object type as input
947db96d56Sopenharmony_ci    # and that the data written equals bytes(xy) in all cases.
957db96d56Sopenharmony_ci    def test_write_memoryview(self):
967db96d56Sopenharmony_ci        self.write_and_read_back(memoryview(data1 * 50))
977db96d56Sopenharmony_ci        m = memoryview(bytes(range(256)))
987db96d56Sopenharmony_ci        data = m.cast('B', shape=[8,8,4])
997db96d56Sopenharmony_ci        self.write_and_read_back(data)
1007db96d56Sopenharmony_ci
1017db96d56Sopenharmony_ci    def test_write_bytearray(self):
1027db96d56Sopenharmony_ci        self.write_and_read_back(bytearray(data1 * 50))
1037db96d56Sopenharmony_ci
1047db96d56Sopenharmony_ci    def test_write_array(self):
1057db96d56Sopenharmony_ci        self.write_and_read_back(array.array('I', data1 * 40))
1067db96d56Sopenharmony_ci
1077db96d56Sopenharmony_ci    def test_write_incompatible_type(self):
1087db96d56Sopenharmony_ci        # Test that non-bytes-like types raise TypeError.
1097db96d56Sopenharmony_ci        # Issue #21560: attempts to write incompatible types
1107db96d56Sopenharmony_ci        # should not affect the state of the fileobject
1117db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'wb') as f:
1127db96d56Sopenharmony_ci            with self.assertRaises(TypeError):
1137db96d56Sopenharmony_ci                f.write('')
1147db96d56Sopenharmony_ci            with self.assertRaises(TypeError):
1157db96d56Sopenharmony_ci                f.write([])
1167db96d56Sopenharmony_ci            f.write(data1)
1177db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
1187db96d56Sopenharmony_ci            self.assertEqual(f.read(), data1)
1197db96d56Sopenharmony_ci
1207db96d56Sopenharmony_ci    def test_read(self):
1217db96d56Sopenharmony_ci        self.test_write()
1227db96d56Sopenharmony_ci        # Try reading.
1237db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'r') as f:
1247db96d56Sopenharmony_ci            d = f.read()
1257db96d56Sopenharmony_ci        self.assertEqual(d, data1*50)
1267db96d56Sopenharmony_ci
1277db96d56Sopenharmony_ci    def test_read1(self):
1287db96d56Sopenharmony_ci        self.test_write()
1297db96d56Sopenharmony_ci        blocks = []
1307db96d56Sopenharmony_ci        nread = 0
1317db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'r') as f:
1327db96d56Sopenharmony_ci            while True:
1337db96d56Sopenharmony_ci                d = f.read1()
1347db96d56Sopenharmony_ci                if not d:
1357db96d56Sopenharmony_ci                    break
1367db96d56Sopenharmony_ci                blocks.append(d)
1377db96d56Sopenharmony_ci                nread += len(d)
1387db96d56Sopenharmony_ci                # Check that position was updated correctly (see issue10791).
1397db96d56Sopenharmony_ci                self.assertEqual(f.tell(), nread)
1407db96d56Sopenharmony_ci        self.assertEqual(b''.join(blocks), data1 * 50)
1417db96d56Sopenharmony_ci
1427db96d56Sopenharmony_ci    @bigmemtest(size=_4G, memuse=1)
1437db96d56Sopenharmony_ci    def test_read_large(self, size):
1447db96d56Sopenharmony_ci        # Read chunk size over UINT_MAX should be supported, despite zlib's
1457db96d56Sopenharmony_ci        # limitation per low-level call
1467db96d56Sopenharmony_ci        compressed = gzip.compress(data1, compresslevel=1)
1477db96d56Sopenharmony_ci        f = gzip.GzipFile(fileobj=io.BytesIO(compressed), mode='rb')
1487db96d56Sopenharmony_ci        self.assertEqual(f.read(size), data1)
1497db96d56Sopenharmony_ci
1507db96d56Sopenharmony_ci    def test_io_on_closed_object(self):
1517db96d56Sopenharmony_ci        # Test that I/O operations on closed GzipFile objects raise a
1527db96d56Sopenharmony_ci        # ValueError, just like the corresponding functions on file objects.
1537db96d56Sopenharmony_ci
1547db96d56Sopenharmony_ci        # Write to a file, open it for reading, then close it.
1557db96d56Sopenharmony_ci        self.test_write()
1567db96d56Sopenharmony_ci        f = gzip.GzipFile(self.filename, 'r')
1577db96d56Sopenharmony_ci        fileobj = f.fileobj
1587db96d56Sopenharmony_ci        self.assertFalse(fileobj.closed)
1597db96d56Sopenharmony_ci        f.close()
1607db96d56Sopenharmony_ci        self.assertTrue(fileobj.closed)
1617db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1627db96d56Sopenharmony_ci            f.read(1)
1637db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1647db96d56Sopenharmony_ci            f.seek(0)
1657db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1667db96d56Sopenharmony_ci            f.tell()
1677db96d56Sopenharmony_ci        # Open the file for writing, then close it.
1687db96d56Sopenharmony_ci        f = gzip.GzipFile(self.filename, 'w')
1697db96d56Sopenharmony_ci        fileobj = f.fileobj
1707db96d56Sopenharmony_ci        self.assertFalse(fileobj.closed)
1717db96d56Sopenharmony_ci        f.close()
1727db96d56Sopenharmony_ci        self.assertTrue(fileobj.closed)
1737db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1747db96d56Sopenharmony_ci            f.write(b'')
1757db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
1767db96d56Sopenharmony_ci            f.flush()
1777db96d56Sopenharmony_ci
1787db96d56Sopenharmony_ci    def test_append(self):
1797db96d56Sopenharmony_ci        self.test_write()
1807db96d56Sopenharmony_ci        # Append to the previous file
1817db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'ab') as f:
1827db96d56Sopenharmony_ci            f.write(data2 * 15)
1837db96d56Sopenharmony_ci
1847db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
1857db96d56Sopenharmony_ci            d = f.read()
1867db96d56Sopenharmony_ci        self.assertEqual(d, (data1*50) + (data2*15))
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ci    def test_many_append(self):
1897db96d56Sopenharmony_ci        # Bug #1074261 was triggered when reading a file that contained
1907db96d56Sopenharmony_ci        # many, many members.  Create such a file and verify that reading it
1917db96d56Sopenharmony_ci        # works.
1927db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'wb', 9) as f:
1937db96d56Sopenharmony_ci            f.write(b'a')
1947db96d56Sopenharmony_ci        for i in range(0, 200):
1957db96d56Sopenharmony_ci            with gzip.GzipFile(self.filename, "ab", 9) as f: # append
1967db96d56Sopenharmony_ci                f.write(b'a')
1977db96d56Sopenharmony_ci
1987db96d56Sopenharmony_ci        # Try reading the file
1997db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, "rb") as zgfile:
2007db96d56Sopenharmony_ci            contents = b""
2017db96d56Sopenharmony_ci            while 1:
2027db96d56Sopenharmony_ci                ztxt = zgfile.read(8192)
2037db96d56Sopenharmony_ci                contents += ztxt
2047db96d56Sopenharmony_ci                if not ztxt: break
2057db96d56Sopenharmony_ci        self.assertEqual(contents, b'a'*201)
2067db96d56Sopenharmony_ci
2077db96d56Sopenharmony_ci    def test_exclusive_write(self):
2087db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'xb') as f:
2097db96d56Sopenharmony_ci            f.write(data1 * 50)
2107db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
2117db96d56Sopenharmony_ci            self.assertEqual(f.read(), data1 * 50)
2127db96d56Sopenharmony_ci        with self.assertRaises(FileExistsError):
2137db96d56Sopenharmony_ci            gzip.GzipFile(self.filename, 'xb')
2147db96d56Sopenharmony_ci
2157db96d56Sopenharmony_ci    def test_buffered_reader(self):
2167db96d56Sopenharmony_ci        # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
2177db96d56Sopenharmony_ci        # performance.
2187db96d56Sopenharmony_ci        self.test_write()
2197db96d56Sopenharmony_ci
2207db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
2217db96d56Sopenharmony_ci            with io.BufferedReader(f) as r:
2227db96d56Sopenharmony_ci                lines = [line for line in r]
2237db96d56Sopenharmony_ci
2247db96d56Sopenharmony_ci        self.assertEqual(lines, 50 * data1.splitlines(keepends=True))
2257db96d56Sopenharmony_ci
2267db96d56Sopenharmony_ci    def test_readline(self):
2277db96d56Sopenharmony_ci        self.test_write()
2287db96d56Sopenharmony_ci        # Try .readline() with varying line lengths
2297db96d56Sopenharmony_ci
2307db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
2317db96d56Sopenharmony_ci            line_length = 0
2327db96d56Sopenharmony_ci            while 1:
2337db96d56Sopenharmony_ci                L = f.readline(line_length)
2347db96d56Sopenharmony_ci                if not L and line_length != 0: break
2357db96d56Sopenharmony_ci                self.assertTrue(len(L) <= line_length)
2367db96d56Sopenharmony_ci                line_length = (line_length + 1) % 50
2377db96d56Sopenharmony_ci
2387db96d56Sopenharmony_ci    def test_readlines(self):
2397db96d56Sopenharmony_ci        self.test_write()
2407db96d56Sopenharmony_ci        # Try .readlines()
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
2437db96d56Sopenharmony_ci            L = f.readlines()
2447db96d56Sopenharmony_ci
2457db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
2467db96d56Sopenharmony_ci            while 1:
2477db96d56Sopenharmony_ci                L = f.readlines(150)
2487db96d56Sopenharmony_ci                if L == []: break
2497db96d56Sopenharmony_ci
2507db96d56Sopenharmony_ci    def test_seek_read(self):
2517db96d56Sopenharmony_ci        self.test_write()
2527db96d56Sopenharmony_ci        # Try seek, read test
2537db96d56Sopenharmony_ci
2547db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename) as f:
2557db96d56Sopenharmony_ci            while 1:
2567db96d56Sopenharmony_ci                oldpos = f.tell()
2577db96d56Sopenharmony_ci                line1 = f.readline()
2587db96d56Sopenharmony_ci                if not line1: break
2597db96d56Sopenharmony_ci                newpos = f.tell()
2607db96d56Sopenharmony_ci                f.seek(oldpos)  # negative seek
2617db96d56Sopenharmony_ci                if len(line1)>10:
2627db96d56Sopenharmony_ci                    amount = 10
2637db96d56Sopenharmony_ci                else:
2647db96d56Sopenharmony_ci                    amount = len(line1)
2657db96d56Sopenharmony_ci                line2 = f.read(amount)
2667db96d56Sopenharmony_ci                self.assertEqual(line1[:amount], line2)
2677db96d56Sopenharmony_ci                f.seek(newpos)  # positive seek
2687db96d56Sopenharmony_ci
2697db96d56Sopenharmony_ci    def test_seek_whence(self):
2707db96d56Sopenharmony_ci        self.test_write()
2717db96d56Sopenharmony_ci        # Try seek(whence=1), read test
2727db96d56Sopenharmony_ci
2737db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename) as f:
2747db96d56Sopenharmony_ci            f.read(10)
2757db96d56Sopenharmony_ci            f.seek(10, whence=1)
2767db96d56Sopenharmony_ci            y = f.read(10)
2777db96d56Sopenharmony_ci        self.assertEqual(y, data1[20:30])
2787db96d56Sopenharmony_ci
2797db96d56Sopenharmony_ci    def test_seek_write(self):
2807db96d56Sopenharmony_ci        # Try seek, write test
2817db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'w') as f:
2827db96d56Sopenharmony_ci            for pos in range(0, 256, 16):
2837db96d56Sopenharmony_ci                f.seek(pos)
2847db96d56Sopenharmony_ci                f.write(b'GZ\n')
2857db96d56Sopenharmony_ci
2867db96d56Sopenharmony_ci    def test_mode(self):
2877db96d56Sopenharmony_ci        self.test_write()
2887db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'r') as f:
2897db96d56Sopenharmony_ci            self.assertEqual(f.myfileobj.mode, 'rb')
2907db96d56Sopenharmony_ci        os_helper.unlink(self.filename)
2917db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'x') as f:
2927db96d56Sopenharmony_ci            self.assertEqual(f.myfileobj.mode, 'xb')
2937db96d56Sopenharmony_ci
2947db96d56Sopenharmony_ci    def test_1647484(self):
2957db96d56Sopenharmony_ci        for mode in ('wb', 'rb'):
2967db96d56Sopenharmony_ci            with gzip.GzipFile(self.filename, mode) as f:
2977db96d56Sopenharmony_ci                self.assertTrue(hasattr(f, "name"))
2987db96d56Sopenharmony_ci                self.assertEqual(f.name, self.filename)
2997db96d56Sopenharmony_ci
3007db96d56Sopenharmony_ci    def test_paddedfile_getattr(self):
3017db96d56Sopenharmony_ci        self.test_write()
3027db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'rb') as f:
3037db96d56Sopenharmony_ci            self.assertTrue(hasattr(f.fileobj, "name"))
3047db96d56Sopenharmony_ci            self.assertEqual(f.fileobj.name, self.filename)
3057db96d56Sopenharmony_ci
3067db96d56Sopenharmony_ci    def test_mtime(self):
3077db96d56Sopenharmony_ci        mtime = 123456789
3087db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
3097db96d56Sopenharmony_ci            fWrite.write(data1)
3107db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename) as fRead:
3117db96d56Sopenharmony_ci            self.assertTrue(hasattr(fRead, 'mtime'))
3127db96d56Sopenharmony_ci            self.assertIsNone(fRead.mtime)
3137db96d56Sopenharmony_ci            dataRead = fRead.read()
3147db96d56Sopenharmony_ci            self.assertEqual(dataRead, data1)
3157db96d56Sopenharmony_ci            self.assertEqual(fRead.mtime, mtime)
3167db96d56Sopenharmony_ci
3177db96d56Sopenharmony_ci    def test_metadata(self):
3187db96d56Sopenharmony_ci        mtime = 123456789
3197db96d56Sopenharmony_ci
3207db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
3217db96d56Sopenharmony_ci            fWrite.write(data1)
3227db96d56Sopenharmony_ci
3237db96d56Sopenharmony_ci        with open(self.filename, 'rb') as fRead:
3247db96d56Sopenharmony_ci            # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
3257db96d56Sopenharmony_ci
3267db96d56Sopenharmony_ci            idBytes = fRead.read(2)
3277db96d56Sopenharmony_ci            self.assertEqual(idBytes, b'\x1f\x8b') # gzip ID
3287db96d56Sopenharmony_ci
3297db96d56Sopenharmony_ci            cmByte = fRead.read(1)
3307db96d56Sopenharmony_ci            self.assertEqual(cmByte, b'\x08') # deflate
3317db96d56Sopenharmony_ci
3327db96d56Sopenharmony_ci            try:
3337db96d56Sopenharmony_ci                expectedname = self.filename.encode('Latin-1') + b'\x00'
3347db96d56Sopenharmony_ci                expectedflags = b'\x08' # only the FNAME flag is set
3357db96d56Sopenharmony_ci            except UnicodeEncodeError:
3367db96d56Sopenharmony_ci                expectedname = b''
3377db96d56Sopenharmony_ci                expectedflags = b'\x00'
3387db96d56Sopenharmony_ci
3397db96d56Sopenharmony_ci            flagsByte = fRead.read(1)
3407db96d56Sopenharmony_ci            self.assertEqual(flagsByte, expectedflags)
3417db96d56Sopenharmony_ci
3427db96d56Sopenharmony_ci            mtimeBytes = fRead.read(4)
3437db96d56Sopenharmony_ci            self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
3447db96d56Sopenharmony_ci
3457db96d56Sopenharmony_ci            xflByte = fRead.read(1)
3467db96d56Sopenharmony_ci            self.assertEqual(xflByte, b'\x02') # maximum compression
3477db96d56Sopenharmony_ci
3487db96d56Sopenharmony_ci            osByte = fRead.read(1)
3497db96d56Sopenharmony_ci            self.assertEqual(osByte, b'\xff') # OS "unknown" (OS-independent)
3507db96d56Sopenharmony_ci
3517db96d56Sopenharmony_ci            # Since the FNAME flag is set, the zero-terminated filename follows.
3527db96d56Sopenharmony_ci            # RFC 1952 specifies that this is the name of the input file, if any.
3537db96d56Sopenharmony_ci            # However, the gzip module defaults to storing the name of the output
3547db96d56Sopenharmony_ci            # file in this field.
3557db96d56Sopenharmony_ci            nameBytes = fRead.read(len(expectedname))
3567db96d56Sopenharmony_ci            self.assertEqual(nameBytes, expectedname)
3577db96d56Sopenharmony_ci
3587db96d56Sopenharmony_ci            # Since no other flags were set, the header ends here.
3597db96d56Sopenharmony_ci            # Rather than process the compressed data, let's seek to the trailer.
3607db96d56Sopenharmony_ci            fRead.seek(os.stat(self.filename).st_size - 8)
3617db96d56Sopenharmony_ci
3627db96d56Sopenharmony_ci            crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
3637db96d56Sopenharmony_ci            self.assertEqual(crc32Bytes, b'\xaf\xd7d\x83')
3647db96d56Sopenharmony_ci
3657db96d56Sopenharmony_ci            isizeBytes = fRead.read(4)
3667db96d56Sopenharmony_ci            self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
3677db96d56Sopenharmony_ci
3687db96d56Sopenharmony_ci    def test_metadata_ascii_name(self):
3697db96d56Sopenharmony_ci        self.filename = os_helper.TESTFN_ASCII
3707db96d56Sopenharmony_ci        self.test_metadata()
3717db96d56Sopenharmony_ci
3727db96d56Sopenharmony_ci    def test_compresslevel_metadata(self):
3737db96d56Sopenharmony_ci        # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
3747db96d56Sopenharmony_ci        # specifically, discussion of XFL in section 2.3.1
3757db96d56Sopenharmony_ci        cases = [
3767db96d56Sopenharmony_ci            ('fast', 1, b'\x04'),
3777db96d56Sopenharmony_ci            ('best', 9, b'\x02'),
3787db96d56Sopenharmony_ci            ('tradeoff', 6, b'\x00'),
3797db96d56Sopenharmony_ci        ]
3807db96d56Sopenharmony_ci        xflOffset = 8
3817db96d56Sopenharmony_ci
3827db96d56Sopenharmony_ci        for (name, level, expectedXflByte) in cases:
3837db96d56Sopenharmony_ci            with self.subTest(name):
3847db96d56Sopenharmony_ci                fWrite = gzip.GzipFile(self.filename, 'w', compresslevel=level)
3857db96d56Sopenharmony_ci                with fWrite:
3867db96d56Sopenharmony_ci                    fWrite.write(data1)
3877db96d56Sopenharmony_ci                with open(self.filename, 'rb') as fRead:
3887db96d56Sopenharmony_ci                    fRead.seek(xflOffset)
3897db96d56Sopenharmony_ci                    xflByte = fRead.read(1)
3907db96d56Sopenharmony_ci                    self.assertEqual(xflByte, expectedXflByte)
3917db96d56Sopenharmony_ci
3927db96d56Sopenharmony_ci    def test_with_open(self):
3937db96d56Sopenharmony_ci        # GzipFile supports the context management protocol
3947db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, "wb") as f:
3957db96d56Sopenharmony_ci            f.write(b"xxx")
3967db96d56Sopenharmony_ci        f = gzip.GzipFile(self.filename, "rb")
3977db96d56Sopenharmony_ci        f.close()
3987db96d56Sopenharmony_ci        try:
3997db96d56Sopenharmony_ci            with f:
4007db96d56Sopenharmony_ci                pass
4017db96d56Sopenharmony_ci        except ValueError:
4027db96d56Sopenharmony_ci            pass
4037db96d56Sopenharmony_ci        else:
4047db96d56Sopenharmony_ci            self.fail("__enter__ on a closed file didn't raise an exception")
4057db96d56Sopenharmony_ci        try:
4067db96d56Sopenharmony_ci            with gzip.GzipFile(self.filename, "wb") as f:
4077db96d56Sopenharmony_ci                1/0
4087db96d56Sopenharmony_ci        except ZeroDivisionError:
4097db96d56Sopenharmony_ci            pass
4107db96d56Sopenharmony_ci        else:
4117db96d56Sopenharmony_ci            self.fail("1/0 didn't raise an exception")
4127db96d56Sopenharmony_ci
4137db96d56Sopenharmony_ci    def test_zero_padded_file(self):
4147db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, "wb") as f:
4157db96d56Sopenharmony_ci            f.write(data1 * 50)
4167db96d56Sopenharmony_ci
4177db96d56Sopenharmony_ci        # Pad the file with zeroes
4187db96d56Sopenharmony_ci        with open(self.filename, "ab") as f:
4197db96d56Sopenharmony_ci            f.write(b"\x00" * 50)
4207db96d56Sopenharmony_ci
4217db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, "rb") as f:
4227db96d56Sopenharmony_ci            d = f.read()
4237db96d56Sopenharmony_ci            self.assertEqual(d, data1 * 50, "Incorrect data in file")
4247db96d56Sopenharmony_ci
4257db96d56Sopenharmony_ci    def test_gzip_BadGzipFile_exception(self):
4267db96d56Sopenharmony_ci        self.assertTrue(issubclass(gzip.BadGzipFile, OSError))
4277db96d56Sopenharmony_ci
4287db96d56Sopenharmony_ci    def test_bad_gzip_file(self):
4297db96d56Sopenharmony_ci        with open(self.filename, 'wb') as file:
4307db96d56Sopenharmony_ci            file.write(data1 * 50)
4317db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'r') as file:
4327db96d56Sopenharmony_ci            self.assertRaises(gzip.BadGzipFile, file.readlines)
4337db96d56Sopenharmony_ci
4347db96d56Sopenharmony_ci    def test_non_seekable_file(self):
4357db96d56Sopenharmony_ci        uncompressed = data1 * 50
4367db96d56Sopenharmony_ci        buf = UnseekableIO()
4377db96d56Sopenharmony_ci        with gzip.GzipFile(fileobj=buf, mode="wb") as f:
4387db96d56Sopenharmony_ci            f.write(uncompressed)
4397db96d56Sopenharmony_ci        compressed = buf.getvalue()
4407db96d56Sopenharmony_ci        buf = UnseekableIO(compressed)
4417db96d56Sopenharmony_ci        with gzip.GzipFile(fileobj=buf, mode="rb") as f:
4427db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed)
4437db96d56Sopenharmony_ci
4447db96d56Sopenharmony_ci    def test_peek(self):
4457db96d56Sopenharmony_ci        uncompressed = data1 * 200
4467db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, "wb") as f:
4477db96d56Sopenharmony_ci            f.write(uncompressed)
4487db96d56Sopenharmony_ci
4497db96d56Sopenharmony_ci        def sizes():
4507db96d56Sopenharmony_ci            while True:
4517db96d56Sopenharmony_ci                for n in range(5, 50, 10):
4527db96d56Sopenharmony_ci                    yield n
4537db96d56Sopenharmony_ci
4547db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, "rb") as f:
4557db96d56Sopenharmony_ci            f.max_read_chunk = 33
4567db96d56Sopenharmony_ci            nread = 0
4577db96d56Sopenharmony_ci            for n in sizes():
4587db96d56Sopenharmony_ci                s = f.peek(n)
4597db96d56Sopenharmony_ci                if s == b'':
4607db96d56Sopenharmony_ci                    break
4617db96d56Sopenharmony_ci                self.assertEqual(f.read(len(s)), s)
4627db96d56Sopenharmony_ci                nread += len(s)
4637db96d56Sopenharmony_ci            self.assertEqual(f.read(100), b'')
4647db96d56Sopenharmony_ci            self.assertEqual(nread, len(uncompressed))
4657db96d56Sopenharmony_ci
4667db96d56Sopenharmony_ci    def test_textio_readlines(self):
4677db96d56Sopenharmony_ci        # Issue #10791: TextIOWrapper.readlines() fails when wrapping GzipFile.
4687db96d56Sopenharmony_ci        lines = (data1 * 50).decode("ascii").splitlines(keepends=True)
4697db96d56Sopenharmony_ci        self.test_write()
4707db96d56Sopenharmony_ci        with gzip.GzipFile(self.filename, 'r') as f:
4717db96d56Sopenharmony_ci            with io.TextIOWrapper(f, encoding="ascii") as t:
4727db96d56Sopenharmony_ci                self.assertEqual(t.readlines(), lines)
4737db96d56Sopenharmony_ci
4747db96d56Sopenharmony_ci    def test_fileobj_from_fdopen(self):
4757db96d56Sopenharmony_ci        # Issue #13781: Opening a GzipFile for writing fails when using a
4767db96d56Sopenharmony_ci        # fileobj created with os.fdopen().
4777db96d56Sopenharmony_ci        fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
4787db96d56Sopenharmony_ci        with os.fdopen(fd, "wb") as f:
4797db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=f, mode="w") as g:
4807db96d56Sopenharmony_ci                pass
4817db96d56Sopenharmony_ci
4827db96d56Sopenharmony_ci    def test_fileobj_mode(self):
4837db96d56Sopenharmony_ci        gzip.GzipFile(self.filename, "wb").close()
4847db96d56Sopenharmony_ci        with open(self.filename, "r+b") as f:
4857db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=f, mode='r') as g:
4867db96d56Sopenharmony_ci                self.assertEqual(g.mode, gzip.READ)
4877db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=f, mode='w') as g:
4887db96d56Sopenharmony_ci                self.assertEqual(g.mode, gzip.WRITE)
4897db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=f, mode='a') as g:
4907db96d56Sopenharmony_ci                self.assertEqual(g.mode, gzip.WRITE)
4917db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=f, mode='x') as g:
4927db96d56Sopenharmony_ci                self.assertEqual(g.mode, gzip.WRITE)
4937db96d56Sopenharmony_ci            with self.assertRaises(ValueError):
4947db96d56Sopenharmony_ci                gzip.GzipFile(fileobj=f, mode='z')
4957db96d56Sopenharmony_ci        for mode in "rb", "r+b":
4967db96d56Sopenharmony_ci            with open(self.filename, mode) as f:
4977db96d56Sopenharmony_ci                with gzip.GzipFile(fileobj=f) as g:
4987db96d56Sopenharmony_ci                    self.assertEqual(g.mode, gzip.READ)
4997db96d56Sopenharmony_ci        for mode in "wb", "ab", "xb":
5007db96d56Sopenharmony_ci            if "x" in mode:
5017db96d56Sopenharmony_ci                os_helper.unlink(self.filename)
5027db96d56Sopenharmony_ci            with open(self.filename, mode) as f:
5037db96d56Sopenharmony_ci                with self.assertWarns(FutureWarning):
5047db96d56Sopenharmony_ci                    g = gzip.GzipFile(fileobj=f)
5057db96d56Sopenharmony_ci                with g:
5067db96d56Sopenharmony_ci                    self.assertEqual(g.mode, gzip.WRITE)
5077db96d56Sopenharmony_ci
5087db96d56Sopenharmony_ci    def test_bytes_filename(self):
5097db96d56Sopenharmony_ci        str_filename = self.filename
5107db96d56Sopenharmony_ci        try:
5117db96d56Sopenharmony_ci            bytes_filename = str_filename.encode("ascii")
5127db96d56Sopenharmony_ci        except UnicodeEncodeError:
5137db96d56Sopenharmony_ci            self.skipTest("Temporary file name needs to be ASCII")
5147db96d56Sopenharmony_ci        with gzip.GzipFile(bytes_filename, "wb") as f:
5157db96d56Sopenharmony_ci            f.write(data1 * 50)
5167db96d56Sopenharmony_ci        with gzip.GzipFile(bytes_filename, "rb") as f:
5177db96d56Sopenharmony_ci            self.assertEqual(f.read(), data1 * 50)
5187db96d56Sopenharmony_ci        # Sanity check that we are actually operating on the right file.
5197db96d56Sopenharmony_ci        with gzip.GzipFile(str_filename, "rb") as f:
5207db96d56Sopenharmony_ci            self.assertEqual(f.read(), data1 * 50)
5217db96d56Sopenharmony_ci
5227db96d56Sopenharmony_ci    def test_decompress_limited(self):
5237db96d56Sopenharmony_ci        """Decompressed data buffering should be limited"""
5247db96d56Sopenharmony_ci        bomb = gzip.compress(b'\0' * int(2e6), compresslevel=9)
5257db96d56Sopenharmony_ci        self.assertLess(len(bomb), io.DEFAULT_BUFFER_SIZE)
5267db96d56Sopenharmony_ci
5277db96d56Sopenharmony_ci        bomb = io.BytesIO(bomb)
5287db96d56Sopenharmony_ci        decomp = gzip.GzipFile(fileobj=bomb)
5297db96d56Sopenharmony_ci        self.assertEqual(decomp.read(1), b'\0')
5307db96d56Sopenharmony_ci        max_decomp = 1 + io.DEFAULT_BUFFER_SIZE
5317db96d56Sopenharmony_ci        self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp,
5327db96d56Sopenharmony_ci            "Excessive amount of data was decompressed")
5337db96d56Sopenharmony_ci
5347db96d56Sopenharmony_ci    # Testing compress/decompress shortcut functions
5357db96d56Sopenharmony_ci
5367db96d56Sopenharmony_ci    def test_compress(self):
5377db96d56Sopenharmony_ci        for data in [data1, data2]:
5387db96d56Sopenharmony_ci            for args in [(), (1,), (6,), (9,)]:
5397db96d56Sopenharmony_ci                datac = gzip.compress(data, *args)
5407db96d56Sopenharmony_ci                self.assertEqual(type(datac), bytes)
5417db96d56Sopenharmony_ci                with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
5427db96d56Sopenharmony_ci                    self.assertEqual(f.read(), data)
5437db96d56Sopenharmony_ci
5447db96d56Sopenharmony_ci    def test_compress_mtime(self):
5457db96d56Sopenharmony_ci        mtime = 123456789
5467db96d56Sopenharmony_ci        for data in [data1, data2]:
5477db96d56Sopenharmony_ci            for args in [(), (1,), (6,), (9,)]:
5487db96d56Sopenharmony_ci                with self.subTest(data=data, args=args):
5497db96d56Sopenharmony_ci                    datac = gzip.compress(data, *args, mtime=mtime)
5507db96d56Sopenharmony_ci                    self.assertEqual(type(datac), bytes)
5517db96d56Sopenharmony_ci                    with gzip.GzipFile(fileobj=io.BytesIO(datac), mode="rb") as f:
5527db96d56Sopenharmony_ci                        f.read(1) # to set mtime attribute
5537db96d56Sopenharmony_ci                        self.assertEqual(f.mtime, mtime)
5547db96d56Sopenharmony_ci
5557db96d56Sopenharmony_ci    def test_compress_correct_level(self):
5567db96d56Sopenharmony_ci        # gzip.compress calls with mtime == 0 take a different code path.
5577db96d56Sopenharmony_ci        for mtime in (0, 42):
5587db96d56Sopenharmony_ci            with self.subTest(mtime=mtime):
5597db96d56Sopenharmony_ci                nocompress = gzip.compress(data1, compresslevel=0, mtime=mtime)
5607db96d56Sopenharmony_ci                yescompress = gzip.compress(data1, compresslevel=1, mtime=mtime)
5617db96d56Sopenharmony_ci                self.assertIn(data1, nocompress)
5627db96d56Sopenharmony_ci                self.assertNotIn(data1, yescompress)
5637db96d56Sopenharmony_ci
5647db96d56Sopenharmony_ci    def test_decompress(self):
5657db96d56Sopenharmony_ci        for data in (data1, data2):
5667db96d56Sopenharmony_ci            buf = io.BytesIO()
5677db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=buf, mode="wb") as f:
5687db96d56Sopenharmony_ci                f.write(data)
5697db96d56Sopenharmony_ci            self.assertEqual(gzip.decompress(buf.getvalue()), data)
5707db96d56Sopenharmony_ci            # Roundtrip with compress
5717db96d56Sopenharmony_ci            datac = gzip.compress(data)
5727db96d56Sopenharmony_ci            self.assertEqual(gzip.decompress(datac), data)
5737db96d56Sopenharmony_ci
5747db96d56Sopenharmony_ci    def test_decompress_truncated_trailer(self):
5757db96d56Sopenharmony_ci        compressed_data = gzip.compress(data1)
5767db96d56Sopenharmony_ci        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-4])
5777db96d56Sopenharmony_ci
5787db96d56Sopenharmony_ci    def test_decompress_missing_trailer(self):
5797db96d56Sopenharmony_ci        compressed_data = gzip.compress(data1)
5807db96d56Sopenharmony_ci        self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
5817db96d56Sopenharmony_ci
5827db96d56Sopenharmony_ci    def test_read_truncated(self):
5837db96d56Sopenharmony_ci        data = data1*50
5847db96d56Sopenharmony_ci        # Drop the CRC (4 bytes) and file size (4 bytes).
5857db96d56Sopenharmony_ci        truncated = gzip.compress(data)[:-8]
5867db96d56Sopenharmony_ci        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
5877db96d56Sopenharmony_ci            self.assertRaises(EOFError, f.read)
5887db96d56Sopenharmony_ci        with gzip.GzipFile(fileobj=io.BytesIO(truncated)) as f:
5897db96d56Sopenharmony_ci            self.assertEqual(f.read(len(data)), data)
5907db96d56Sopenharmony_ci            self.assertRaises(EOFError, f.read, 1)
5917db96d56Sopenharmony_ci        # Incomplete 10-byte header.
5927db96d56Sopenharmony_ci        for i in range(2, 10):
5937db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=io.BytesIO(truncated[:i])) as f:
5947db96d56Sopenharmony_ci                self.assertRaises(EOFError, f.read, 1)
5957db96d56Sopenharmony_ci
5967db96d56Sopenharmony_ci    def test_read_with_extra(self):
5977db96d56Sopenharmony_ci        # Gzip data with an extra field
5987db96d56Sopenharmony_ci        gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
5997db96d56Sopenharmony_ci                  b'\x05\x00Extra'
6007db96d56Sopenharmony_ci                  b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
6017db96d56Sopenharmony_ci        with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
6027db96d56Sopenharmony_ci            self.assertEqual(f.read(), b'Test')
6037db96d56Sopenharmony_ci
6047db96d56Sopenharmony_ci    def test_prepend_error(self):
6057db96d56Sopenharmony_ci        # See issue #20875
6067db96d56Sopenharmony_ci        with gzip.open(self.filename, "wb") as f:
6077db96d56Sopenharmony_ci            f.write(data1)
6087db96d56Sopenharmony_ci        with gzip.open(self.filename, "rb") as f:
6097db96d56Sopenharmony_ci            f._buffer.raw._fp.prepend()
6107db96d56Sopenharmony_ci
6117db96d56Sopenharmony_ci    def test_issue44439(self):
6127db96d56Sopenharmony_ci        q = array.array('Q', [1, 2, 3, 4, 5])
6137db96d56Sopenharmony_ci        LENGTH = len(q) * q.itemsize
6147db96d56Sopenharmony_ci
6157db96d56Sopenharmony_ci        with gzip.GzipFile(fileobj=io.BytesIO(), mode='w') as f:
6167db96d56Sopenharmony_ci            self.assertEqual(f.write(q), LENGTH)
6177db96d56Sopenharmony_ci            self.assertEqual(f.tell(), LENGTH)
6187db96d56Sopenharmony_ci
6197db96d56Sopenharmony_ci
6207db96d56Sopenharmony_ciclass TestOpen(BaseTest):
6217db96d56Sopenharmony_ci    def test_binary_modes(self):
6227db96d56Sopenharmony_ci        uncompressed = data1 * 50
6237db96d56Sopenharmony_ci
6247db96d56Sopenharmony_ci        with gzip.open(self.filename, "wb") as f:
6257db96d56Sopenharmony_ci            f.write(uncompressed)
6267db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6277db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read())
6287db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed)
6297db96d56Sopenharmony_ci
6307db96d56Sopenharmony_ci        with gzip.open(self.filename, "rb") as f:
6317db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed)
6327db96d56Sopenharmony_ci
6337db96d56Sopenharmony_ci        with gzip.open(self.filename, "ab") as f:
6347db96d56Sopenharmony_ci            f.write(uncompressed)
6357db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6367db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read())
6377db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed * 2)
6387db96d56Sopenharmony_ci
6397db96d56Sopenharmony_ci        with self.assertRaises(FileExistsError):
6407db96d56Sopenharmony_ci            gzip.open(self.filename, "xb")
6417db96d56Sopenharmony_ci        os_helper.unlink(self.filename)
6427db96d56Sopenharmony_ci        with gzip.open(self.filename, "xb") as f:
6437db96d56Sopenharmony_ci            f.write(uncompressed)
6447db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6457db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read())
6467db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed)
6477db96d56Sopenharmony_ci
6487db96d56Sopenharmony_ci    def test_pathlike_file(self):
6497db96d56Sopenharmony_ci        filename = pathlib.Path(self.filename)
6507db96d56Sopenharmony_ci        with gzip.open(filename, "wb") as f:
6517db96d56Sopenharmony_ci            f.write(data1 * 50)
6527db96d56Sopenharmony_ci        with gzip.open(filename, "ab") as f:
6537db96d56Sopenharmony_ci            f.write(data1)
6547db96d56Sopenharmony_ci        with gzip.open(filename) as f:
6557db96d56Sopenharmony_ci            self.assertEqual(f.read(), data1 * 51)
6567db96d56Sopenharmony_ci
6577db96d56Sopenharmony_ci    def test_implicit_binary_modes(self):
6587db96d56Sopenharmony_ci        # Test implicit binary modes (no "b" or "t" in mode string).
6597db96d56Sopenharmony_ci        uncompressed = data1 * 50
6607db96d56Sopenharmony_ci
6617db96d56Sopenharmony_ci        with gzip.open(self.filename, "w") as f:
6627db96d56Sopenharmony_ci            f.write(uncompressed)
6637db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6647db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read())
6657db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed)
6667db96d56Sopenharmony_ci
6677db96d56Sopenharmony_ci        with gzip.open(self.filename, "r") as f:
6687db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed)
6697db96d56Sopenharmony_ci
6707db96d56Sopenharmony_ci        with gzip.open(self.filename, "a") as f:
6717db96d56Sopenharmony_ci            f.write(uncompressed)
6727db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6737db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read())
6747db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed * 2)
6757db96d56Sopenharmony_ci
6767db96d56Sopenharmony_ci        with self.assertRaises(FileExistsError):
6777db96d56Sopenharmony_ci            gzip.open(self.filename, "x")
6787db96d56Sopenharmony_ci        os_helper.unlink(self.filename)
6797db96d56Sopenharmony_ci        with gzip.open(self.filename, "x") as f:
6807db96d56Sopenharmony_ci            f.write(uncompressed)
6817db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6827db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read())
6837db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed)
6847db96d56Sopenharmony_ci
6857db96d56Sopenharmony_ci    def test_text_modes(self):
6867db96d56Sopenharmony_ci        uncompressed = data1.decode("ascii") * 50
6877db96d56Sopenharmony_ci        uncompressed_raw = uncompressed.replace("\n", os.linesep)
6887db96d56Sopenharmony_ci        with gzip.open(self.filename, "wt", encoding="ascii") as f:
6897db96d56Sopenharmony_ci            f.write(uncompressed)
6907db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6917db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read()).decode("ascii")
6927db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed_raw)
6937db96d56Sopenharmony_ci        with gzip.open(self.filename, "rt", encoding="ascii") as f:
6947db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed)
6957db96d56Sopenharmony_ci        with gzip.open(self.filename, "at", encoding="ascii") as f:
6967db96d56Sopenharmony_ci            f.write(uncompressed)
6977db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
6987db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read()).decode("ascii")
6997db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed_raw * 2)
7007db96d56Sopenharmony_ci
7017db96d56Sopenharmony_ci    def test_fileobj(self):
7027db96d56Sopenharmony_ci        uncompressed_bytes = data1 * 50
7037db96d56Sopenharmony_ci        uncompressed_str = uncompressed_bytes.decode("ascii")
7047db96d56Sopenharmony_ci        compressed = gzip.compress(uncompressed_bytes)
7057db96d56Sopenharmony_ci        with gzip.open(io.BytesIO(compressed), "r") as f:
7067db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed_bytes)
7077db96d56Sopenharmony_ci        with gzip.open(io.BytesIO(compressed), "rb") as f:
7087db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed_bytes)
7097db96d56Sopenharmony_ci        with gzip.open(io.BytesIO(compressed), "rt", encoding="ascii") as f:
7107db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed_str)
7117db96d56Sopenharmony_ci
7127db96d56Sopenharmony_ci    def test_bad_params(self):
7137db96d56Sopenharmony_ci        # Test invalid parameter combinations.
7147db96d56Sopenharmony_ci        with self.assertRaises(TypeError):
7157db96d56Sopenharmony_ci            gzip.open(123.456)
7167db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
7177db96d56Sopenharmony_ci            gzip.open(self.filename, "wbt")
7187db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
7197db96d56Sopenharmony_ci            gzip.open(self.filename, "xbt")
7207db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
7217db96d56Sopenharmony_ci            gzip.open(self.filename, "rb", encoding="utf-8")
7227db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
7237db96d56Sopenharmony_ci            gzip.open(self.filename, "rb", errors="ignore")
7247db96d56Sopenharmony_ci        with self.assertRaises(ValueError):
7257db96d56Sopenharmony_ci            gzip.open(self.filename, "rb", newline="\n")
7267db96d56Sopenharmony_ci
7277db96d56Sopenharmony_ci    def test_encoding(self):
7287db96d56Sopenharmony_ci        # Test non-default encoding.
7297db96d56Sopenharmony_ci        uncompressed = data1.decode("ascii") * 50
7307db96d56Sopenharmony_ci        uncompressed_raw = uncompressed.replace("\n", os.linesep)
7317db96d56Sopenharmony_ci        with gzip.open(self.filename, "wt", encoding="utf-16") as f:
7327db96d56Sopenharmony_ci            f.write(uncompressed)
7337db96d56Sopenharmony_ci        with open(self.filename, "rb") as f:
7347db96d56Sopenharmony_ci            file_data = gzip.decompress(f.read()).decode("utf-16")
7357db96d56Sopenharmony_ci            self.assertEqual(file_data, uncompressed_raw)
7367db96d56Sopenharmony_ci        with gzip.open(self.filename, "rt", encoding="utf-16") as f:
7377db96d56Sopenharmony_ci            self.assertEqual(f.read(), uncompressed)
7387db96d56Sopenharmony_ci
7397db96d56Sopenharmony_ci    def test_encoding_error_handler(self):
7407db96d56Sopenharmony_ci        # Test with non-default encoding error handler.
7417db96d56Sopenharmony_ci        with gzip.open(self.filename, "wb") as f:
7427db96d56Sopenharmony_ci            f.write(b"foo\xffbar")
7437db96d56Sopenharmony_ci        with gzip.open(self.filename, "rt", encoding="ascii", errors="ignore") \
7447db96d56Sopenharmony_ci                as f:
7457db96d56Sopenharmony_ci            self.assertEqual(f.read(), "foobar")
7467db96d56Sopenharmony_ci
7477db96d56Sopenharmony_ci    def test_newline(self):
7487db96d56Sopenharmony_ci        # Test with explicit newline (universal newline mode disabled).
7497db96d56Sopenharmony_ci        uncompressed = data1.decode("ascii") * 50
7507db96d56Sopenharmony_ci        with gzip.open(self.filename, "wt", encoding="ascii", newline="\n") as f:
7517db96d56Sopenharmony_ci            f.write(uncompressed)
7527db96d56Sopenharmony_ci        with gzip.open(self.filename, "rt", encoding="ascii", newline="\r") as f:
7537db96d56Sopenharmony_ci            self.assertEqual(f.readlines(), [uncompressed])
7547db96d56Sopenharmony_ci
7557db96d56Sopenharmony_ci
7567db96d56Sopenharmony_cidef create_and_remove_directory(directory):
7577db96d56Sopenharmony_ci    def decorator(function):
7587db96d56Sopenharmony_ci        @functools.wraps(function)
7597db96d56Sopenharmony_ci        def wrapper(*args, **kwargs):
7607db96d56Sopenharmony_ci            os.makedirs(directory)
7617db96d56Sopenharmony_ci            try:
7627db96d56Sopenharmony_ci                return function(*args, **kwargs)
7637db96d56Sopenharmony_ci            finally:
7647db96d56Sopenharmony_ci                os_helper.rmtree(directory)
7657db96d56Sopenharmony_ci        return wrapper
7667db96d56Sopenharmony_ci    return decorator
7677db96d56Sopenharmony_ci
7687db96d56Sopenharmony_ci
7697db96d56Sopenharmony_ciclass TestCommandLine(unittest.TestCase):
7707db96d56Sopenharmony_ci    data = b'This is a simple test with gzip'
7717db96d56Sopenharmony_ci
7727db96d56Sopenharmony_ci    @requires_subprocess()
7737db96d56Sopenharmony_ci    def test_decompress_stdin_stdout(self):
7747db96d56Sopenharmony_ci        with io.BytesIO() as bytes_io:
7757db96d56Sopenharmony_ci            with gzip.GzipFile(fileobj=bytes_io, mode='wb') as gzip_file:
7767db96d56Sopenharmony_ci                gzip_file.write(self.data)
7777db96d56Sopenharmony_ci
7787db96d56Sopenharmony_ci            args = sys.executable, '-m', 'gzip', '-d'
7797db96d56Sopenharmony_ci            with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
7807db96d56Sopenharmony_ci                out, err = proc.communicate(bytes_io.getvalue())
7817db96d56Sopenharmony_ci
7827db96d56Sopenharmony_ci        self.assertEqual(err, b'')
7837db96d56Sopenharmony_ci        self.assertEqual(out, self.data)
7847db96d56Sopenharmony_ci
7857db96d56Sopenharmony_ci    @create_and_remove_directory(TEMPDIR)
7867db96d56Sopenharmony_ci    def test_decompress_infile_outfile(self):
7877db96d56Sopenharmony_ci        gzipname = os.path.join(TEMPDIR, 'testgzip.gz')
7887db96d56Sopenharmony_ci        self.assertFalse(os.path.exists(gzipname))
7897db96d56Sopenharmony_ci
7907db96d56Sopenharmony_ci        with gzip.open(gzipname, mode='wb') as fp:
7917db96d56Sopenharmony_ci            fp.write(self.data)
7927db96d56Sopenharmony_ci        rc, out, err = assert_python_ok('-m', 'gzip', '-d', gzipname)
7937db96d56Sopenharmony_ci
7947db96d56Sopenharmony_ci        with open(os.path.join(TEMPDIR, "testgzip"), "rb") as gunziped:
7957db96d56Sopenharmony_ci            self.assertEqual(gunziped.read(), self.data)
7967db96d56Sopenharmony_ci
7977db96d56Sopenharmony_ci        self.assertTrue(os.path.exists(gzipname))
7987db96d56Sopenharmony_ci        self.assertEqual(rc, 0)
7997db96d56Sopenharmony_ci        self.assertEqual(out, b'')
8007db96d56Sopenharmony_ci        self.assertEqual(err, b'')
8017db96d56Sopenharmony_ci
8027db96d56Sopenharmony_ci    def test_decompress_infile_outfile_error(self):
8037db96d56Sopenharmony_ci        rc, out, err = assert_python_failure('-m', 'gzip', '-d', 'thisisatest.out')
8047db96d56Sopenharmony_ci        self.assertEqual(b"filename doesn't end in .gz: 'thisisatest.out'", err.strip())
8057db96d56Sopenharmony_ci        self.assertEqual(rc, 1)
8067db96d56Sopenharmony_ci        self.assertEqual(out, b'')
8077db96d56Sopenharmony_ci
8087db96d56Sopenharmony_ci    @requires_subprocess()
8097db96d56Sopenharmony_ci    @create_and_remove_directory(TEMPDIR)
8107db96d56Sopenharmony_ci    def test_compress_stdin_outfile(self):
8117db96d56Sopenharmony_ci        args = sys.executable, '-m', 'gzip'
8127db96d56Sopenharmony_ci        with Popen(args, stdin=PIPE, stdout=PIPE, stderr=PIPE) as proc:
8137db96d56Sopenharmony_ci            out, err = proc.communicate(self.data)
8147db96d56Sopenharmony_ci
8157db96d56Sopenharmony_ci        self.assertEqual(err, b'')
8167db96d56Sopenharmony_ci        self.assertEqual(out[:2], b"\x1f\x8b")
8177db96d56Sopenharmony_ci
8187db96d56Sopenharmony_ci    @create_and_remove_directory(TEMPDIR)
8197db96d56Sopenharmony_ci    def test_compress_infile_outfile_default(self):
8207db96d56Sopenharmony_ci        local_testgzip = os.path.join(TEMPDIR, 'testgzip')
8217db96d56Sopenharmony_ci        gzipname = local_testgzip + '.gz'
8227db96d56Sopenharmony_ci        self.assertFalse(os.path.exists(gzipname))
8237db96d56Sopenharmony_ci
8247db96d56Sopenharmony_ci        with open(local_testgzip, 'wb') as fp:
8257db96d56Sopenharmony_ci            fp.write(self.data)
8267db96d56Sopenharmony_ci
8277db96d56Sopenharmony_ci        rc, out, err = assert_python_ok('-m', 'gzip', local_testgzip)
8287db96d56Sopenharmony_ci
8297db96d56Sopenharmony_ci        self.assertTrue(os.path.exists(gzipname))
8307db96d56Sopenharmony_ci        self.assertEqual(out, b'')
8317db96d56Sopenharmony_ci        self.assertEqual(err, b'')
8327db96d56Sopenharmony_ci
8337db96d56Sopenharmony_ci    @create_and_remove_directory(TEMPDIR)
8347db96d56Sopenharmony_ci    def test_compress_infile_outfile(self):
8357db96d56Sopenharmony_ci        for compress_level in ('--fast', '--best'):
8367db96d56Sopenharmony_ci            with self.subTest(compress_level=compress_level):
8377db96d56Sopenharmony_ci                local_testgzip = os.path.join(TEMPDIR, 'testgzip')
8387db96d56Sopenharmony_ci                gzipname = local_testgzip + '.gz'
8397db96d56Sopenharmony_ci                self.assertFalse(os.path.exists(gzipname))
8407db96d56Sopenharmony_ci
8417db96d56Sopenharmony_ci                with open(local_testgzip, 'wb') as fp:
8427db96d56Sopenharmony_ci                    fp.write(self.data)
8437db96d56Sopenharmony_ci
8447db96d56Sopenharmony_ci                rc, out, err = assert_python_ok('-m', 'gzip', compress_level, local_testgzip)
8457db96d56Sopenharmony_ci
8467db96d56Sopenharmony_ci                self.assertTrue(os.path.exists(gzipname))
8477db96d56Sopenharmony_ci                self.assertEqual(out, b'')
8487db96d56Sopenharmony_ci                self.assertEqual(err, b'')
8497db96d56Sopenharmony_ci                os.remove(gzipname)
8507db96d56Sopenharmony_ci                self.assertFalse(os.path.exists(gzipname))
8517db96d56Sopenharmony_ci
8527db96d56Sopenharmony_ci    def test_compress_fast_best_are_exclusive(self):
8537db96d56Sopenharmony_ci        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '--best')
8547db96d56Sopenharmony_ci        self.assertIn(b"error: argument --best: not allowed with argument --fast", err)
8557db96d56Sopenharmony_ci        self.assertEqual(out, b'')
8567db96d56Sopenharmony_ci
8577db96d56Sopenharmony_ci    def test_decompress_cannot_have_flags_compression(self):
8587db96d56Sopenharmony_ci        rc, out, err = assert_python_failure('-m', 'gzip', '--fast', '-d')
8597db96d56Sopenharmony_ci        self.assertIn(b'error: argument -d/--decompress: not allowed with argument --fast', err)
8607db96d56Sopenharmony_ci        self.assertEqual(out, b'')
8617db96d56Sopenharmony_ci
8627db96d56Sopenharmony_ci
8637db96d56Sopenharmony_ciif __name__ == "__main__":
8647db96d56Sopenharmony_ci    unittest.main()
865