xref: /third_party/python/Lib/test/test_tarfile.py (revision 7db96d56)
1import sys
2import os
3import io
4from hashlib import sha256
5from contextlib import contextmanager
6from random import Random
7import pathlib
8import shutil
9import re
10import warnings
11import stat
12
13import unittest
14import unittest.mock
15import tarfile
16
17from test import support
18from test.support import os_helper
19from test.support import script_helper
20from test.support import warnings_helper
21
22# Check for our compression modules.
23try:
24    import gzip
25except ImportError:
26    gzip = None
27try:
28    import zlib
29except ImportError:
30    zlib = None
31try:
32    import bz2
33except ImportError:
34    bz2 = None
35try:
36    import lzma
37except ImportError:
38    lzma = None
39
40def sha256sum(data):
41    return sha256(data).hexdigest()
42
43TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir"
44tarextdir = TEMPDIR + '-extract-test'
45tarname = support.findfile("testtar.tar")
46gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
47bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
48xzname = os.path.join(TEMPDIR, "testtar.tar.xz")
49tmpname = os.path.join(TEMPDIR, "tmp.tar")
50dotlessname = os.path.join(TEMPDIR, "testtar")
51
52sha256_regtype = (
53    "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce"
54)
55sha256_sparse = (
56    "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b"
57)
58
59
60class TarTest:
61    tarname = tarname
62    suffix = ''
63    open = io.FileIO
64    taropen = tarfile.TarFile.taropen
65
66    @property
67    def mode(self):
68        return self.prefix + self.suffix
69
70@support.requires_gzip()
71class GzipTest:
72    tarname = gzipname
73    suffix = 'gz'
74    open = gzip.GzipFile if gzip else None
75    taropen = tarfile.TarFile.gzopen
76
77@support.requires_bz2()
78class Bz2Test:
79    tarname = bz2name
80    suffix = 'bz2'
81    open = bz2.BZ2File if bz2 else None
82    taropen = tarfile.TarFile.bz2open
83
84@support.requires_lzma()
85class LzmaTest:
86    tarname = xzname
87    suffix = 'xz'
88    open = lzma.LZMAFile if lzma else None
89    taropen = tarfile.TarFile.xzopen
90
91
92class ReadTest(TarTest):
93
94    prefix = "r:"
95
96    def setUp(self):
97        self.tar = tarfile.open(self.tarname, mode=self.mode,
98                                encoding="iso8859-1")
99
100    def tearDown(self):
101        self.tar.close()
102
103
104class UstarReadTest(ReadTest, unittest.TestCase):
105
106    def test_fileobj_regular_file(self):
107        tarinfo = self.tar.getmember("ustar/regtype")
108        with self.tar.extractfile(tarinfo) as fobj:
109            data = fobj.read()
110            self.assertEqual(len(data), tarinfo.size,
111                    "regular file extraction failed")
112            self.assertEqual(sha256sum(data), sha256_regtype,
113                    "regular file extraction failed")
114
115    def test_fileobj_readlines(self):
116        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
117        tarinfo = self.tar.getmember("ustar/regtype")
118        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
119            lines1 = fobj1.readlines()
120
121        with self.tar.extractfile(tarinfo) as fobj:
122            fobj2 = io.TextIOWrapper(fobj)
123            lines2 = fobj2.readlines()
124            self.assertEqual(lines1, lines2,
125                    "fileobj.readlines() failed")
126            self.assertEqual(len(lines2), 114,
127                    "fileobj.readlines() failed")
128            self.assertEqual(lines2[83],
129                    "I will gladly admit that Python is not the fastest "
130                    "running scripting language.\n",
131                    "fileobj.readlines() failed")
132
133    def test_fileobj_iter(self):
134        self.tar.extract("ustar/regtype", TEMPDIR, filter='data')
135        tarinfo = self.tar.getmember("ustar/regtype")
136        with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1:
137            lines1 = fobj1.readlines()
138        with self.tar.extractfile(tarinfo) as fobj2:
139            lines2 = list(io.TextIOWrapper(fobj2))
140            self.assertEqual(lines1, lines2,
141                    "fileobj.__iter__() failed")
142
143    def test_fileobj_seek(self):
144        self.tar.extract("ustar/regtype", TEMPDIR,
145                         filter='data')
146        with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
147            data = fobj.read()
148
149        tarinfo = self.tar.getmember("ustar/regtype")
150        with self.tar.extractfile(tarinfo) as fobj:
151            text = fobj.read()
152            fobj.seek(0)
153            self.assertEqual(0, fobj.tell(),
154                         "seek() to file's start failed")
155            fobj.seek(2048, 0)
156            self.assertEqual(2048, fobj.tell(),
157                         "seek() to absolute position failed")
158            fobj.seek(-1024, 1)
159            self.assertEqual(1024, fobj.tell(),
160                         "seek() to negative relative position failed")
161            fobj.seek(1024, 1)
162            self.assertEqual(2048, fobj.tell(),
163                         "seek() to positive relative position failed")
164            s = fobj.read(10)
165            self.assertEqual(s, data[2048:2058],
166                         "read() after seek failed")
167            fobj.seek(0, 2)
168            self.assertEqual(tarinfo.size, fobj.tell(),
169                         "seek() to file's end failed")
170            self.assertEqual(fobj.read(), b"",
171                         "read() at file's end did not return empty string")
172            fobj.seek(-tarinfo.size, 2)
173            self.assertEqual(0, fobj.tell(),
174                         "relative seek() to file's end failed")
175            fobj.seek(512)
176            s1 = fobj.readlines()
177            fobj.seek(512)
178            s2 = fobj.readlines()
179            self.assertEqual(s1, s2,
180                         "readlines() after seek failed")
181            fobj.seek(0)
182            self.assertEqual(len(fobj.readline()), fobj.tell(),
183                         "tell() after readline() failed")
184            fobj.seek(512)
185            self.assertEqual(len(fobj.readline()) + 512, fobj.tell(),
186                         "tell() after seek() and readline() failed")
187            fobj.seek(0)
188            line = fobj.readline()
189            self.assertEqual(fobj.read(), data[len(line):],
190                         "read() after readline() failed")
191
192    def test_fileobj_text(self):
193        with self.tar.extractfile("ustar/regtype") as fobj:
194            fobj = io.TextIOWrapper(fobj)
195            data = fobj.read().encode("iso8859-1")
196            self.assertEqual(sha256sum(data), sha256_regtype)
197            try:
198                fobj.seek(100)
199            except AttributeError:
200                # Issue #13815: seek() complained about a missing
201                # flush() method.
202                self.fail("seeking failed in text mode")
203
204    # Test if symbolic and hard links are resolved by extractfile().  The
205    # test link members each point to a regular member whose data is
206    # supposed to be exported.
207    def _test_fileobj_link(self, lnktype, regtype):
208        with self.tar.extractfile(lnktype) as a, \
209             self.tar.extractfile(regtype) as b:
210            self.assertEqual(a.name, b.name)
211
212    def test_fileobj_link1(self):
213        self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
214
215    def test_fileobj_link2(self):
216        self._test_fileobj_link("./ustar/linktest2/lnktype",
217                                "ustar/linktest1/regtype")
218
219    def test_fileobj_symlink1(self):
220        self._test_fileobj_link("ustar/symtype", "ustar/regtype")
221
222    def test_fileobj_symlink2(self):
223        self._test_fileobj_link("./ustar/linktest2/symtype",
224                                "ustar/linktest1/regtype")
225
226    def test_issue14160(self):
227        self._test_fileobj_link("symtype2", "ustar/regtype")
228
229    def test_add_dir_getmember(self):
230        # bpo-21987
231        self.add_dir_and_getmember('bar')
232        self.add_dir_and_getmember('a'*101)
233
234    @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"),
235                         "Missing getuid or getgid implementation")
236    def add_dir_and_getmember(self, name):
237        def filter(tarinfo):
238            tarinfo.uid = tarinfo.gid = 100
239            return tarinfo
240
241        with os_helper.temp_cwd():
242            with tarfile.open(tmpname, 'w') as tar:
243                tar.format = tarfile.USTAR_FORMAT
244                try:
245                    os.mkdir(name)
246                    tar.add(name, filter=filter)
247                finally:
248                    os.rmdir(name)
249            with tarfile.open(tmpname) as tar:
250                self.assertEqual(
251                    tar.getmember(name),
252                    tar.getmember(name + '/')
253                )
254
255class GzipUstarReadTest(GzipTest, UstarReadTest):
256    pass
257
258class Bz2UstarReadTest(Bz2Test, UstarReadTest):
259    pass
260
261class LzmaUstarReadTest(LzmaTest, UstarReadTest):
262    pass
263
264
265class ListTest(ReadTest, unittest.TestCase):
266
267    # Override setUp to use default encoding (UTF-8)
268    def setUp(self):
269        self.tar = tarfile.open(self.tarname, mode=self.mode)
270
271    def test_list(self):
272        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
273        with support.swap_attr(sys, 'stdout', tio):
274            self.tar.list(verbose=False)
275        out = tio.detach().getvalue()
276        self.assertIn(b'ustar/conttype', out)
277        self.assertIn(b'ustar/regtype', out)
278        self.assertIn(b'ustar/lnktype', out)
279        self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out)
280        self.assertIn(b'./ustar/linktest2/symtype', out)
281        self.assertIn(b'./ustar/linktest2/lnktype', out)
282        # Make sure it puts trailing slash for directory
283        self.assertIn(b'ustar/dirtype/', out)
284        self.assertIn(b'ustar/dirtype-with-size/', out)
285        # Make sure it is able to print unencodable characters
286        def conv(b):
287            s = b.decode(self.tar.encoding, 'surrogateescape')
288            return s.encode('ascii', 'backslashreplace')
289        self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
290        self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-'
291                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
292        self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-'
293                           b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out)
294        self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out)
295        self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out)
296        # Make sure it prints files separated by one newline without any
297        # 'ls -l'-like accessories if verbose flag is not being used
298        # ...
299        # ustar/conttype
300        # ustar/regtype
301        # ...
302        self.assertRegex(out, br'ustar/conttype ?\r?\n'
303                              br'ustar/regtype ?\r?\n')
304        # Make sure it does not print the source of link without verbose flag
305        self.assertNotIn(b'link to', out)
306        self.assertNotIn(b'->', out)
307
308    def test_list_verbose(self):
309        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
310        with support.swap_attr(sys, 'stdout', tio):
311            self.tar.list(verbose=True)
312        out = tio.detach().getvalue()
313        # Make sure it prints files separated by one newline with 'ls -l'-like
314        # accessories if verbose flag is being used
315        # ...
316        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/conttype
317        # ?rw-r--r-- tarfile/tarfile     7011 2003-01-06 07:19:43 ustar/regtype
318        # ...
319        self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 '
320                               br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
321                               br'ustar/\w+type ?\r?\n') * 2)
322        # Make sure it prints the source of link with verbose flag
323        self.assertIn(b'ustar/symtype -> regtype', out)
324        self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out)
325        self.assertIn(b'./ustar/linktest2/lnktype link to '
326                      b'./ustar/linktest1/regtype', out)
327        self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' +
328                      (b'/123' * 125) + b'/longname', out)
329        self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' +
330                      (b'/123' * 125) + b'/longname', out)
331
332    def test_list_members(self):
333        tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
334        def members(tar):
335            for tarinfo in tar.getmembers():
336                if 'reg' in tarinfo.name:
337                    yield tarinfo
338        with support.swap_attr(sys, 'stdout', tio):
339            self.tar.list(verbose=False, members=members(self.tar))
340        out = tio.detach().getvalue()
341        self.assertIn(b'ustar/regtype', out)
342        self.assertNotIn(b'ustar/conttype', out)
343
344
345class GzipListTest(GzipTest, ListTest):
346    pass
347
348
349class Bz2ListTest(Bz2Test, ListTest):
350    pass
351
352
353class LzmaListTest(LzmaTest, ListTest):
354    pass
355
356
357class CommonReadTest(ReadTest):
358
359    def test_is_tarfile_erroneous(self):
360        with open(tmpname, "wb"):
361            pass
362
363        # is_tarfile works on filenames
364        self.assertFalse(tarfile.is_tarfile(tmpname))
365
366        # is_tarfile works on path-like objects
367        self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname)))
368
369        # is_tarfile works on file objects
370        with open(tmpname, "rb") as fobj:
371            self.assertFalse(tarfile.is_tarfile(fobj))
372
373        # is_tarfile works on file-like objects
374        self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid")))
375
376    def test_is_tarfile_valid(self):
377        # is_tarfile works on filenames
378        self.assertTrue(tarfile.is_tarfile(self.tarname))
379
380        # is_tarfile works on path-like objects
381        self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname)))
382
383        # is_tarfile works on file objects
384        with open(self.tarname, "rb") as fobj:
385            self.assertTrue(tarfile.is_tarfile(fobj))
386
387        # is_tarfile works on file-like objects
388        with open(self.tarname, "rb") as fobj:
389            self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read())))
390
391    def test_is_tarfile_keeps_position(self):
392        # Test for issue44289: tarfile.is_tarfile() modifies
393        # file object's current position
394        with open(self.tarname, "rb") as fobj:
395            tarfile.is_tarfile(fobj)
396            self.assertEqual(fobj.tell(), 0)
397
398        with open(self.tarname, "rb") as fobj:
399            file_like = io.BytesIO(fobj.read())
400            tarfile.is_tarfile(file_like)
401            self.assertEqual(file_like.tell(), 0)
402
403    def test_empty_tarfile(self):
404        # Test for issue6123: Allow opening empty archives.
405        # This test checks if tarfile.open() is able to open an empty tar
406        # archive successfully. Note that an empty tar archive is not the
407        # same as an empty file!
408        with tarfile.open(tmpname, self.mode.replace("r", "w")):
409            pass
410        try:
411            tar = tarfile.open(tmpname, self.mode)
412            tar.getnames()
413        except tarfile.ReadError:
414            self.fail("tarfile.open() failed on empty archive")
415        else:
416            self.assertListEqual(tar.getmembers(), [])
417        finally:
418            tar.close()
419
420    def test_non_existent_tarfile(self):
421        # Test for issue11513: prevent non-existent gzipped tarfiles raising
422        # multiple exceptions.
423        with self.assertRaisesRegex(FileNotFoundError, "xxx"):
424            tarfile.open("xxx", self.mode)
425
426    def test_null_tarfile(self):
427        # Test for issue6123: Allow opening empty archives.
428        # This test guarantees that tarfile.open() does not treat an empty
429        # file as an empty tar archive.
430        with open(tmpname, "wb"):
431            pass
432        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
433        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
434
435    def test_ignore_zeros(self):
436        # Test TarFile's ignore_zeros option.
437        # generate 512 pseudorandom bytes
438        data = Random(0).randbytes(512)
439        for char in (b'\0', b'a'):
440            # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
441            # are ignored correctly.
442            with self.open(tmpname, "w") as fobj:
443                fobj.write(char * 1024)
444                tarinfo = tarfile.TarInfo("foo")
445                tarinfo.size = len(data)
446                fobj.write(tarinfo.tobuf())
447                fobj.write(data)
448
449            tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
450            try:
451                self.assertListEqual(tar.getnames(), ["foo"],
452                    "ignore_zeros=True should have skipped the %r-blocks" %
453                    char)
454            finally:
455                tar.close()
456
457    def test_premature_end_of_archive(self):
458        for size in (512, 600, 1024, 1200):
459            with tarfile.open(tmpname, "w:") as tar:
460                t = tarfile.TarInfo("foo")
461                t.size = 1024
462                tar.addfile(t, io.BytesIO(b"a" * 1024))
463
464            with open(tmpname, "r+b") as fobj:
465                fobj.truncate(size)
466
467            with tarfile.open(tmpname) as tar:
468                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
469                    for t in tar:
470                        pass
471
472            with tarfile.open(tmpname) as tar:
473                t = tar.next()
474
475                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
476                    tar.extract(t, TEMPDIR, filter='data')
477
478                with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
479                    tar.extractfile(t).read()
480
481    def test_length_zero_header(self):
482        # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail
483        # with an exception
484        with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"):
485            with tarfile.open(support.findfile('recursion.tar')) as tar:
486                pass
487
488class MiscReadTestBase(CommonReadTest):
489    def requires_name_attribute(self):
490        pass
491
492    def test_no_name_argument(self):
493        self.requires_name_attribute()
494        with open(self.tarname, "rb") as fobj:
495            self.assertIsInstance(fobj.name, str)
496            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
497                self.assertIsInstance(tar.name, str)
498                self.assertEqual(tar.name, os.path.abspath(fobj.name))
499
500    def test_no_name_attribute(self):
501        with open(self.tarname, "rb") as fobj:
502            data = fobj.read()
503        fobj = io.BytesIO(data)
504        self.assertRaises(AttributeError, getattr, fobj, "name")
505        tar = tarfile.open(fileobj=fobj, mode=self.mode)
506        self.assertIsNone(tar.name)
507
508    def test_empty_name_attribute(self):
509        with open(self.tarname, "rb") as fobj:
510            data = fobj.read()
511        fobj = io.BytesIO(data)
512        fobj.name = ""
513        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
514            self.assertIsNone(tar.name)
515
516    def test_int_name_attribute(self):
517        # Issue 21044: tarfile.open() should handle fileobj with an integer
518        # 'name' attribute.
519        fd = os.open(self.tarname, os.O_RDONLY)
520        with open(fd, 'rb') as fobj:
521            self.assertIsInstance(fobj.name, int)
522            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
523                self.assertIsNone(tar.name)
524
525    def test_bytes_name_attribute(self):
526        self.requires_name_attribute()
527        tarname = os.fsencode(self.tarname)
528        with open(tarname, 'rb') as fobj:
529            self.assertIsInstance(fobj.name, bytes)
530            with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
531                self.assertIsInstance(tar.name, bytes)
532                self.assertEqual(tar.name, os.path.abspath(fobj.name))
533
534    def test_pathlike_name(self):
535        tarname = pathlib.Path(self.tarname)
536        with tarfile.open(tarname, mode=self.mode) as tar:
537            self.assertIsInstance(tar.name, str)
538            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
539        with self.taropen(tarname) as tar:
540            self.assertIsInstance(tar.name, str)
541            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
542        with tarfile.TarFile.open(tarname, mode=self.mode) as tar:
543            self.assertIsInstance(tar.name, str)
544            self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
545        if self.suffix == '':
546            with tarfile.TarFile(tarname, mode='r') as tar:
547                self.assertIsInstance(tar.name, str)
548                self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname)))
549
550    def test_illegal_mode_arg(self):
551        with open(tmpname, 'wb'):
552            pass
553        with self.assertRaisesRegex(ValueError, 'mode must be '):
554            tar = self.taropen(tmpname, 'q')
555        with self.assertRaisesRegex(ValueError, 'mode must be '):
556            tar = self.taropen(tmpname, 'rw')
557        with self.assertRaisesRegex(ValueError, 'mode must be '):
558            tar = self.taropen(tmpname, '')
559
560    def test_fileobj_with_offset(self):
561        # Skip the first member and store values from the second member
562        # of the testtar.
563        tar = tarfile.open(self.tarname, mode=self.mode)
564        try:
565            tar.next()
566            t = tar.next()
567            name = t.name
568            offset = t.offset
569            with tar.extractfile(t) as f:
570                data = f.read()
571        finally:
572            tar.close()
573
574        # Open the testtar and seek to the offset of the second member.
575        with self.open(self.tarname) as fobj:
576            fobj.seek(offset)
577
578            # Test if the tarfile starts with the second member.
579            with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar:
580                t = tar.next()
581                self.assertEqual(t.name, name)
582                # Read to the end of fileobj and test if seeking back to the
583                # beginning works.
584                tar.getmembers()
585                self.assertEqual(tar.extractfile(t).read(), data,
586                        "seek back did not work")
587
588    def test_fail_comp(self):
589        # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
590        self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
591        with open(tarname, "rb") as fobj:
592            self.assertRaises(tarfile.ReadError, tarfile.open,
593                              fileobj=fobj, mode=self.mode)
594
595    def test_v7_dirtype(self):
596        # Test old style dirtype member (bug #1336623):
597        # Old V7 tars create directory members using an AREGTYPE
598        # header with a "/" appended to the filename field.
599        tarinfo = self.tar.getmember("misc/dirtype-old-v7")
600        self.assertEqual(tarinfo.type, tarfile.DIRTYPE,
601                "v7 dirtype failed")
602
603    def test_xstar_type(self):
604        # The xstar format stores extra atime and ctime fields inside the
605        # space reserved for the prefix field. The prefix field must be
606        # ignored in this case, otherwise it will mess up the name.
607        try:
608            self.tar.getmember("misc/regtype-xstar")
609        except KeyError:
610            self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
611
612    def test_check_members(self):
613        for tarinfo in self.tar:
614            self.assertEqual(int(tarinfo.mtime), 0o7606136617,
615                    "wrong mtime for %s" % tarinfo.name)
616            if not tarinfo.name.startswith("ustar/"):
617                continue
618            self.assertEqual(tarinfo.uname, "tarfile",
619                    "wrong uname for %s" % tarinfo.name)
620
621    def test_find_members(self):
622        self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof",
623                "could not find all members")
624
625    @unittest.skipUnless(hasattr(os, "link"),
626                         "Missing hardlink implementation")
627    @os_helper.skip_unless_symlink
628    def test_extract_hardlink(self):
629        # Test hardlink extraction (e.g. bug #857297).
630        with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
631            tar.extract("ustar/regtype", TEMPDIR, filter='data')
632            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype"))
633
634            tar.extract("ustar/lnktype", TEMPDIR, filter='data')
635            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype"))
636            with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
637                data = f.read()
638            self.assertEqual(sha256sum(data), sha256_regtype)
639
640            tar.extract("ustar/symtype", TEMPDIR, filter='data')
641            self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype"))
642            with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
643                data = f.read()
644            self.assertEqual(sha256sum(data), sha256_regtype)
645
646    @os_helper.skip_unless_working_chmod
647    def test_extractall(self):
648        # Test if extractall() correctly restores directory permissions
649        # and times (see issue1735).
650        tar = tarfile.open(tarname, encoding="iso8859-1")
651        DIR = os.path.join(TEMPDIR, "extractall")
652        os.mkdir(DIR)
653        try:
654            directories = [t for t in tar if t.isdir()]
655            tar.extractall(DIR, directories, filter='fully_trusted')
656            for tarinfo in directories:
657                path = os.path.join(DIR, tarinfo.name)
658                if sys.platform != "win32":
659                    # Win32 has no support for fine grained permissions.
660                    self.assertEqual(tarinfo.mode & 0o777,
661                                     os.stat(path).st_mode & 0o777,
662                                     tarinfo.name)
663                def format_mtime(mtime):
664                    if isinstance(mtime, float):
665                        return "{} ({})".format(mtime, mtime.hex())
666                    else:
667                        return "{!r} (int)".format(mtime)
668                file_mtime = os.path.getmtime(path)
669                errmsg = "tar mtime {0} != file time {1} of path {2!a}".format(
670                    format_mtime(tarinfo.mtime),
671                    format_mtime(file_mtime),
672                    path)
673                self.assertEqual(tarinfo.mtime, file_mtime, errmsg)
674        finally:
675            tar.close()
676            os_helper.rmtree(DIR)
677
678    @os_helper.skip_unless_working_chmod
679    def test_extract_directory(self):
680        dirtype = "ustar/dirtype"
681        DIR = os.path.join(TEMPDIR, "extractdir")
682        os.mkdir(DIR)
683        try:
684            with tarfile.open(tarname, encoding="iso8859-1") as tar:
685                tarinfo = tar.getmember(dirtype)
686                tar.extract(tarinfo, path=DIR, filter='fully_trusted')
687                extracted = os.path.join(DIR, dirtype)
688                self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
689                if sys.platform != "win32":
690                    self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755)
691        finally:
692            os_helper.rmtree(DIR)
693
694    def test_extractall_pathlike_name(self):
695        DIR = pathlib.Path(TEMPDIR) / "extractall"
696        with os_helper.temp_dir(DIR), \
697             tarfile.open(tarname, encoding="iso8859-1") as tar:
698            directories = [t for t in tar if t.isdir()]
699            tar.extractall(DIR, directories, filter='fully_trusted')
700            for tarinfo in directories:
701                path = DIR / tarinfo.name
702                self.assertEqual(os.path.getmtime(path), tarinfo.mtime)
703
704    def test_extract_pathlike_name(self):
705        dirtype = "ustar/dirtype"
706        DIR = pathlib.Path(TEMPDIR) / "extractall"
707        with os_helper.temp_dir(DIR), \
708             tarfile.open(tarname, encoding="iso8859-1") as tar:
709            tarinfo = tar.getmember(dirtype)
710            tar.extract(tarinfo, path=DIR, filter='fully_trusted')
711            extracted = DIR / dirtype
712            self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime)
713
714    def test_init_close_fobj(self):
715        # Issue #7341: Close the internal file object in the TarFile
716        # constructor in case of an error. For the test we rely on
717        # the fact that opening an empty file raises a ReadError.
718        empty = os.path.join(TEMPDIR, "empty")
719        with open(empty, "wb") as fobj:
720            fobj.write(b"")
721
722        try:
723            tar = object.__new__(tarfile.TarFile)
724            try:
725                tar.__init__(empty)
726            except tarfile.ReadError:
727                self.assertTrue(tar.fileobj.closed)
728            else:
729                self.fail("ReadError not raised")
730        finally:
731            os_helper.unlink(empty)
732
733    def test_parallel_iteration(self):
734        # Issue #16601: Restarting iteration over tarfile continued
735        # from where it left off.
736        with tarfile.open(self.tarname) as tar:
737            for m1, m2 in zip(tar, tar):
738                self.assertEqual(m1.offset, m2.offset)
739                self.assertEqual(m1.get_info(), m2.get_info())
740
741    @unittest.skipIf(zlib is None, "requires zlib")
742    def test_zlib_error_does_not_leak(self):
743        # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when
744        # parsing certain types of invalid data
745        with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock:
746            mock.side_effect = zlib.error
747            with self.assertRaises(tarfile.ReadError):
748                tarfile.open(self.tarname)
749
750    def test_next_on_empty_tarfile(self):
751        fd = io.BytesIO()
752        tf = tarfile.open(fileobj=fd, mode="w")
753        tf.close()
754
755        fd.seek(0)
756        with tarfile.open(fileobj=fd, mode="r|") as tf:
757            self.assertEqual(tf.next(), None)
758
759        fd.seek(0)
760        with tarfile.open(fileobj=fd, mode="r") as tf:
761            self.assertEqual(tf.next(), None)
762
763class MiscReadTest(MiscReadTestBase, unittest.TestCase):
764    test_fail_comp = None
765
766class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase):
767    pass
768
769class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase):
770    def requires_name_attribute(self):
771        self.skipTest("BZ2File have no name attribute")
772
773class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase):
774    def requires_name_attribute(self):
775        self.skipTest("LZMAFile have no name attribute")
776
777
778class StreamReadTest(CommonReadTest, unittest.TestCase):
779
780    prefix="r|"
781
782    def test_read_through(self):
783        # Issue #11224: A poorly designed _FileInFile.read() method
784        # caused seeking errors with stream tar files.
785        for tarinfo in self.tar:
786            if not tarinfo.isreg():
787                continue
788            with self.tar.extractfile(tarinfo) as fobj:
789                while True:
790                    try:
791                        buf = fobj.read(512)
792                    except tarfile.StreamError:
793                        self.fail("simple read-through using "
794                                  "TarFile.extractfile() failed")
795                    if not buf:
796                        break
797
798    def test_fileobj_regular_file(self):
799        tarinfo = self.tar.next() # get "regtype" (can't use getmember)
800        with self.tar.extractfile(tarinfo) as fobj:
801            data = fobj.read()
802        self.assertEqual(len(data), tarinfo.size,
803                "regular file extraction failed")
804        self.assertEqual(sha256sum(data), sha256_regtype,
805                "regular file extraction failed")
806
807    def test_provoke_stream_error(self):
808        tarinfos = self.tar.getmembers()
809        with self.tar.extractfile(tarinfos[0]) as f: # read the first member
810            self.assertRaises(tarfile.StreamError, f.read)
811
812    def test_compare_members(self):
813        tar1 = tarfile.open(tarname, encoding="iso8859-1")
814        try:
815            tar2 = self.tar
816
817            while True:
818                t1 = tar1.next()
819                t2 = tar2.next()
820                if t1 is None:
821                    break
822                self.assertIsNotNone(t2, "stream.next() failed.")
823
824                if t2.islnk() or t2.issym():
825                    with self.assertRaises(tarfile.StreamError):
826                        tar2.extractfile(t2)
827                    continue
828
829                v1 = tar1.extractfile(t1)
830                v2 = tar2.extractfile(t2)
831                if v1 is None:
832                    continue
833                self.assertIsNotNone(v2, "stream.extractfile() failed")
834                self.assertEqual(v1.read(), v2.read(),
835                        "stream extraction failed")
836        finally:
837            tar1.close()
838
839class GzipStreamReadTest(GzipTest, StreamReadTest):
840    pass
841
842class Bz2StreamReadTest(Bz2Test, StreamReadTest):
843    pass
844
845class LzmaStreamReadTest(LzmaTest, StreamReadTest):
846    pass
847
848
849class DetectReadTest(TarTest, unittest.TestCase):
850    def _testfunc_file(self, name, mode):
851        try:
852            tar = tarfile.open(name, mode)
853        except tarfile.ReadError as e:
854            self.fail()
855        else:
856            tar.close()
857
858    def _testfunc_fileobj(self, name, mode):
859        try:
860            with open(name, "rb") as f:
861                tar = tarfile.open(name, mode, fileobj=f)
862        except tarfile.ReadError as e:
863            self.fail()
864        else:
865            tar.close()
866
867    def _test_modes(self, testfunc):
868        if self.suffix:
869            with self.assertRaises(tarfile.ReadError):
870                tarfile.open(tarname, mode="r:" + self.suffix)
871            with self.assertRaises(tarfile.ReadError):
872                tarfile.open(tarname, mode="r|" + self.suffix)
873            with self.assertRaises(tarfile.ReadError):
874                tarfile.open(self.tarname, mode="r:")
875            with self.assertRaises(tarfile.ReadError):
876                tarfile.open(self.tarname, mode="r|")
877        testfunc(self.tarname, "r")
878        testfunc(self.tarname, "r:" + self.suffix)
879        testfunc(self.tarname, "r:*")
880        testfunc(self.tarname, "r|" + self.suffix)
881        testfunc(self.tarname, "r|*")
882
883    def test_detect_file(self):
884        self._test_modes(self._testfunc_file)
885
886    def test_detect_fileobj(self):
887        self._test_modes(self._testfunc_fileobj)
888
889class GzipDetectReadTest(GzipTest, DetectReadTest):
890    pass
891
892class Bz2DetectReadTest(Bz2Test, DetectReadTest):
893    def test_detect_stream_bz2(self):
894        # Originally, tarfile's stream detection looked for the string
895        # "BZh91" at the start of the file. This is incorrect because
896        # the '9' represents the blocksize (900,000 bytes). If the file was
897        # compressed using another blocksize autodetection fails.
898        with open(tarname, "rb") as fobj:
899            data = fobj.read()
900
901        # Compress with blocksize 100,000 bytes, the file starts with "BZh11".
902        with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
903            fobj.write(data)
904
905        self._testfunc_file(tmpname, "r|*")
906
907class LzmaDetectReadTest(LzmaTest, DetectReadTest):
908    pass
909
910
911class MemberReadTest(ReadTest, unittest.TestCase):
912
913    def _test_member(self, tarinfo, chksum=None, **kwargs):
914        if chksum is not None:
915            with self.tar.extractfile(tarinfo) as f:
916                self.assertEqual(sha256sum(f.read()), chksum,
917                        "wrong sha256sum for %s" % tarinfo.name)
918
919        kwargs["mtime"] = 0o7606136617
920        kwargs["uid"] = 1000
921        kwargs["gid"] = 100
922        if "old-v7" not in tarinfo.name:
923            # V7 tar can't handle alphabetic owners.
924            kwargs["uname"] = "tarfile"
925            kwargs["gname"] = "tarfile"
926        for k, v in kwargs.items():
927            self.assertEqual(getattr(tarinfo, k), v,
928                    "wrong value in %s field of %s" % (k, tarinfo.name))
929
930    def test_find_regtype(self):
931        tarinfo = self.tar.getmember("ustar/regtype")
932        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
933
934    def test_find_conttype(self):
935        tarinfo = self.tar.getmember("ustar/conttype")
936        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
937
938    def test_find_dirtype(self):
939        tarinfo = self.tar.getmember("ustar/dirtype")
940        self._test_member(tarinfo, size=0)
941
942    def test_find_dirtype_with_size(self):
943        tarinfo = self.tar.getmember("ustar/dirtype-with-size")
944        self._test_member(tarinfo, size=255)
945
946    def test_find_lnktype(self):
947        tarinfo = self.tar.getmember("ustar/lnktype")
948        self._test_member(tarinfo, size=0, linkname="ustar/regtype")
949
950    def test_find_symtype(self):
951        tarinfo = self.tar.getmember("ustar/symtype")
952        self._test_member(tarinfo, size=0, linkname="regtype")
953
954    def test_find_blktype(self):
955        tarinfo = self.tar.getmember("ustar/blktype")
956        self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
957
958    def test_find_chrtype(self):
959        tarinfo = self.tar.getmember("ustar/chrtype")
960        self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
961
962    def test_find_fifotype(self):
963        tarinfo = self.tar.getmember("ustar/fifotype")
964        self._test_member(tarinfo, size=0)
965
966    def test_find_sparse(self):
967        tarinfo = self.tar.getmember("ustar/sparse")
968        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
969
970    def test_find_gnusparse(self):
971        tarinfo = self.tar.getmember("gnu/sparse")
972        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
973
974    def test_find_gnusparse_00(self):
975        tarinfo = self.tar.getmember("gnu/sparse-0.0")
976        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
977
978    def test_find_gnusparse_01(self):
979        tarinfo = self.tar.getmember("gnu/sparse-0.1")
980        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
981
982    def test_find_gnusparse_10(self):
983        tarinfo = self.tar.getmember("gnu/sparse-1.0")
984        self._test_member(tarinfo, size=86016, chksum=sha256_sparse)
985
986    def test_find_umlauts(self):
987        tarinfo = self.tar.getmember("ustar/umlauts-"
988                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
989        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
990
991    def test_find_ustar_longname(self):
992        name = "ustar/" + "12345/" * 39 + "1234567/longname"
993        self.assertIn(name, self.tar.getnames())
994
995    def test_find_regtype_oldv7(self):
996        tarinfo = self.tar.getmember("misc/regtype-old-v7")
997        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
998
999    def test_find_pax_umlauts(self):
1000        self.tar.close()
1001        self.tar = tarfile.open(self.tarname, mode=self.mode,
1002                                encoding="iso8859-1")
1003        tarinfo = self.tar.getmember("pax/umlauts-"
1004                                     "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1005        self._test_member(tarinfo, size=7011, chksum=sha256_regtype)
1006
1007
1008class LongnameTest:
1009
1010    def test_read_longname(self):
1011        # Test reading of longname (bug #1471427).
1012        longname = self.subdir + "/" + "123/" * 125 + "longname"
1013        try:
1014            tarinfo = self.tar.getmember(longname)
1015        except KeyError:
1016            self.fail("longname not found")
1017        self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE,
1018                "read longname as dirtype")
1019
1020    def test_read_longlink(self):
1021        longname = self.subdir + "/" + "123/" * 125 + "longname"
1022        longlink = self.subdir + "/" + "123/" * 125 + "longlink"
1023        try:
1024            tarinfo = self.tar.getmember(longlink)
1025        except KeyError:
1026            self.fail("longlink not found")
1027        self.assertEqual(tarinfo.linkname, longname, "linkname wrong")
1028
1029    def test_truncated_longname(self):
1030        longname = self.subdir + "/" + "123/" * 125 + "longname"
1031        tarinfo = self.tar.getmember(longname)
1032        offset = tarinfo.offset
1033        self.tar.fileobj.seek(offset)
1034        fobj = io.BytesIO(self.tar.fileobj.read(3 * 512))
1035        with self.assertRaises(tarfile.ReadError):
1036            tarfile.open(name="foo.tar", fileobj=fobj)
1037
1038    def test_header_offset(self):
1039        # Test if the start offset of the TarInfo object includes
1040        # the preceding extended header.
1041        longname = self.subdir + "/" + "123/" * 125 + "longname"
1042        offset = self.tar.getmember(longname).offset
1043        with open(tarname, "rb") as fobj:
1044            fobj.seek(offset)
1045            tarinfo = tarfile.TarInfo.frombuf(fobj.read(512),
1046                                              "iso8859-1", "strict")
1047            self.assertEqual(tarinfo.type, self.longnametype)
1048
1049    def test_longname_directory(self):
1050        # Test reading a longlink directory. Issue #47231.
1051        longdir = ('a' * 101) + '/'
1052        with os_helper.temp_cwd():
1053            with tarfile.open(tmpname, 'w') as tar:
1054                tar.format = self.format
1055                try:
1056                    os.mkdir(longdir)
1057                    tar.add(longdir)
1058                finally:
1059                    os.rmdir(longdir.rstrip("/"))
1060            with tarfile.open(tmpname) as tar:
1061                self.assertIsNotNone(tar.getmember(longdir))
1062                self.assertIsNotNone(tar.getmember(longdir.removesuffix('/')))
1063
1064class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase):
1065
1066    subdir = "gnu"
1067    longnametype = tarfile.GNUTYPE_LONGNAME
1068    format = tarfile.GNU_FORMAT
1069
1070    # Since 3.2 tarfile is supposed to accurately restore sparse members and
1071    # produce files with holes. This is what we actually want to test here.
1072    # Unfortunately, not all platforms/filesystems support sparse files, and
1073    # even on platforms that do it is non-trivial to make reliable assertions
1074    # about holes in files. Therefore, we first do one basic test which works
1075    # an all platforms, and after that a test that will work only on
1076    # platforms/filesystems that prove to support sparse files.
1077    def _test_sparse_file(self, name):
1078        self.tar.extract(name, TEMPDIR, filter='data')
1079        filename = os.path.join(TEMPDIR, name)
1080        with open(filename, "rb") as fobj:
1081            data = fobj.read()
1082        self.assertEqual(sha256sum(data), sha256_sparse,
1083                "wrong sha256sum for %s" % name)
1084
1085        if self._fs_supports_holes():
1086            s = os.stat(filename)
1087            self.assertLess(s.st_blocks * 512, s.st_size)
1088
1089    def test_sparse_file_old(self):
1090        self._test_sparse_file("gnu/sparse")
1091
1092    def test_sparse_file_00(self):
1093        self._test_sparse_file("gnu/sparse-0.0")
1094
1095    def test_sparse_file_01(self):
1096        self._test_sparse_file("gnu/sparse-0.1")
1097
1098    def test_sparse_file_10(self):
1099        self._test_sparse_file("gnu/sparse-1.0")
1100
1101    @staticmethod
1102    def _fs_supports_holes():
1103        # Return True if the platform knows the st_blocks stat attribute and
1104        # uses st_blocks units of 512 bytes, and if the filesystem is able to
1105        # store holes of 4 KiB in files.
1106        #
1107        # The function returns False if page size is larger than 4 KiB.
1108        # For example, ppc64 uses pages of 64 KiB.
1109        if sys.platform.startswith("linux"):
1110            # Linux evidentially has 512 byte st_blocks units.
1111            name = os.path.join(TEMPDIR, "sparse-test")
1112            with open(name, "wb") as fobj:
1113                # Seek to "punch a hole" of 4 KiB
1114                fobj.seek(4096)
1115                fobj.write(b'x' * 4096)
1116                fobj.truncate()
1117            s = os.stat(name)
1118            os_helper.unlink(name)
1119            return (s.st_blocks * 512 < s.st_size)
1120        else:
1121            return False
1122
1123
1124class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase):
1125
1126    subdir = "pax"
1127    longnametype = tarfile.XHDTYPE
1128    format = tarfile.PAX_FORMAT
1129
1130    def test_pax_global_headers(self):
1131        tar = tarfile.open(tarname, encoding="iso8859-1")
1132        try:
1133            tarinfo = tar.getmember("pax/regtype1")
1134            self.assertEqual(tarinfo.uname, "foo")
1135            self.assertEqual(tarinfo.gname, "bar")
1136            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1137                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1138
1139            tarinfo = tar.getmember("pax/regtype2")
1140            self.assertEqual(tarinfo.uname, "")
1141            self.assertEqual(tarinfo.gname, "bar")
1142            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1143                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1144
1145            tarinfo = tar.getmember("pax/regtype3")
1146            self.assertEqual(tarinfo.uname, "tarfile")
1147            self.assertEqual(tarinfo.gname, "tarfile")
1148            self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"),
1149                             "\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
1150        finally:
1151            tar.close()
1152
1153    def test_pax_number_fields(self):
1154        # All following number fields are read from the pax header.
1155        tar = tarfile.open(tarname, encoding="iso8859-1")
1156        try:
1157            tarinfo = tar.getmember("pax/regtype4")
1158            self.assertEqual(tarinfo.size, 7011)
1159            self.assertEqual(tarinfo.uid, 123)
1160            self.assertEqual(tarinfo.gid, 123)
1161            self.assertEqual(tarinfo.mtime, 1041808783.0)
1162            self.assertEqual(type(tarinfo.mtime), float)
1163            self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
1164            self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
1165        finally:
1166            tar.close()
1167
1168    def test_pax_header_bad_formats(self):
1169        # The fields from the pax header have priority over the
1170        # TarInfo.
1171        pax_header_replacements = (
1172            b" foo=bar\n",
1173            b"0 \n",
1174            b"1 \n",
1175            b"2 \n",
1176            b"3 =\n",
1177            b"4 =a\n",
1178            b"1000000 foo=bar\n",
1179            b"0 foo=bar\n",
1180            b"-12 foo=bar\n",
1181            b"000000000000000000000000036 foo=bar\n",
1182        )
1183        pax_headers = {"foo": "bar"}
1184
1185        for replacement in pax_header_replacements:
1186            with self.subTest(header=replacement):
1187                tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1188                                   encoding="iso8859-1")
1189                try:
1190                    t = tarfile.TarInfo()
1191                    t.name = "pax"  # non-ASCII
1192                    t.uid = 1
1193                    t.pax_headers = pax_headers
1194                    tar.addfile(t)
1195                finally:
1196                    tar.close()
1197
1198                with open(tmpname, "rb") as f:
1199                    data = f.read()
1200                    self.assertIn(b"11 foo=bar\n", data)
1201                    data = data.replace(b"11 foo=bar\n", replacement)
1202
1203                with open(tmpname, "wb") as f:
1204                    f.truncate()
1205                    f.write(data)
1206
1207                with self.assertRaisesRegex(tarfile.ReadError, r"method tar: ReadError\('invalid header'\)"):
1208                    tarfile.open(tmpname, encoding="iso8859-1")
1209
1210
1211class WriteTestBase(TarTest):
1212    # Put all write tests in here that are supposed to be tested
1213    # in all possible mode combinations.
1214
1215    def test_fileobj_no_close(self):
1216        fobj = io.BytesIO()
1217        with tarfile.open(fileobj=fobj, mode=self.mode) as tar:
1218            tar.addfile(tarfile.TarInfo("foo"))
1219        self.assertFalse(fobj.closed, "external fileobjs must never closed")
1220        # Issue #20238: Incomplete gzip output with mode="w:gz"
1221        data = fobj.getvalue()
1222        del tar
1223        support.gc_collect()
1224        self.assertFalse(fobj.closed)
1225        self.assertEqual(data, fobj.getvalue())
1226
1227    def test_eof_marker(self):
1228        # Make sure an end of archive marker is written (two zero blocks).
1229        # tarfile insists on aligning archives to a 20 * 512 byte recordsize.
1230        # So, we create an archive that has exactly 10240 bytes without the
1231        # marker, and has 20480 bytes once the marker is written.
1232        with tarfile.open(tmpname, self.mode) as tar:
1233            t = tarfile.TarInfo("foo")
1234            t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE
1235            tar.addfile(t, io.BytesIO(b"a" * t.size))
1236
1237        with self.open(tmpname, "rb") as fobj:
1238            self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2)
1239
1240
1241class WriteTest(WriteTestBase, unittest.TestCase):
1242
1243    prefix = "w:"
1244
1245    def test_100_char_name(self):
1246        # The name field in a tar header stores strings of at most 100 chars.
1247        # If a string is shorter than 100 chars it has to be padded with '\0',
1248        # which implies that a string of exactly 100 chars is stored without
1249        # a trailing '\0'.
1250        name = "0123456789" * 10
1251        tar = tarfile.open(tmpname, self.mode)
1252        try:
1253            t = tarfile.TarInfo(name)
1254            tar.addfile(t)
1255        finally:
1256            tar.close()
1257
1258        tar = tarfile.open(tmpname)
1259        try:
1260            self.assertEqual(tar.getnames()[0], name,
1261                    "failed to store 100 char filename")
1262        finally:
1263            tar.close()
1264
1265    def test_tar_size(self):
1266        # Test for bug #1013882.
1267        tar = tarfile.open(tmpname, self.mode)
1268        try:
1269            path = os.path.join(TEMPDIR, "file")
1270            with open(path, "wb") as fobj:
1271                fobj.write(b"aaa")
1272            tar.add(path)
1273        finally:
1274            tar.close()
1275        self.assertGreater(os.path.getsize(tmpname), 0,
1276                "tarfile is empty")
1277
1278    # The test_*_size tests test for bug #1167128.
1279    def test_file_size(self):
1280        tar = tarfile.open(tmpname, self.mode)
1281        try:
1282            path = os.path.join(TEMPDIR, "file")
1283            with open(path, "wb"):
1284                pass
1285            tarinfo = tar.gettarinfo(path)
1286            self.assertEqual(tarinfo.size, 0)
1287
1288            with open(path, "wb") as fobj:
1289                fobj.write(b"aaa")
1290            tarinfo = tar.gettarinfo(path)
1291            self.assertEqual(tarinfo.size, 3)
1292        finally:
1293            tar.close()
1294
1295    def test_directory_size(self):
1296        path = os.path.join(TEMPDIR, "directory")
1297        os.mkdir(path)
1298        try:
1299            tar = tarfile.open(tmpname, self.mode)
1300            try:
1301                tarinfo = tar.gettarinfo(path)
1302                self.assertEqual(tarinfo.size, 0)
1303            finally:
1304                tar.close()
1305        finally:
1306            os_helper.rmdir(path)
1307
1308    # mock the following:
1309    #  os.listdir: so we know that files are in the wrong order
1310    def test_ordered_recursion(self):
1311        path = os.path.join(TEMPDIR, "directory")
1312        os.mkdir(path)
1313        open(os.path.join(path, "1"), "a").close()
1314        open(os.path.join(path, "2"), "a").close()
1315        try:
1316            tar = tarfile.open(tmpname, self.mode)
1317            try:
1318                with unittest.mock.patch('os.listdir') as mock_listdir:
1319                    mock_listdir.return_value = ["2", "1"]
1320                    tar.add(path)
1321                paths = []
1322                for m in tar.getmembers():
1323                    paths.append(os.path.split(m.name)[-1])
1324                self.assertEqual(paths, ["directory", "1", "2"]);
1325            finally:
1326                tar.close()
1327        finally:
1328            os_helper.unlink(os.path.join(path, "1"))
1329            os_helper.unlink(os.path.join(path, "2"))
1330            os_helper.rmdir(path)
1331
1332    def test_gettarinfo_pathlike_name(self):
1333        with tarfile.open(tmpname, self.mode) as tar:
1334            path = pathlib.Path(TEMPDIR) / "file"
1335            with open(path, "wb") as fobj:
1336                fobj.write(b"aaa")
1337            tarinfo = tar.gettarinfo(path)
1338            tarinfo2 = tar.gettarinfo(os.fspath(path))
1339            self.assertIsInstance(tarinfo.name, str)
1340            self.assertEqual(tarinfo.name, tarinfo2.name)
1341            self.assertEqual(tarinfo.size, 3)
1342
1343    @unittest.skipUnless(hasattr(os, "link"),
1344                         "Missing hardlink implementation")
1345    def test_link_size(self):
1346        link = os.path.join(TEMPDIR, "link")
1347        target = os.path.join(TEMPDIR, "link_target")
1348        with open(target, "wb") as fobj:
1349            fobj.write(b"aaa")
1350        try:
1351            os.link(target, link)
1352        except PermissionError as e:
1353            self.skipTest('os.link(): %s' % e)
1354        try:
1355            tar = tarfile.open(tmpname, self.mode)
1356            try:
1357                # Record the link target in the inodes list.
1358                tar.gettarinfo(target)
1359                tarinfo = tar.gettarinfo(link)
1360                self.assertEqual(tarinfo.size, 0)
1361            finally:
1362                tar.close()
1363        finally:
1364            os_helper.unlink(target)
1365            os_helper.unlink(link)
1366
1367    @os_helper.skip_unless_symlink
1368    def test_symlink_size(self):
1369        path = os.path.join(TEMPDIR, "symlink")
1370        os.symlink("link_target", path)
1371        try:
1372            tar = tarfile.open(tmpname, self.mode)
1373            try:
1374                tarinfo = tar.gettarinfo(path)
1375                self.assertEqual(tarinfo.size, 0)
1376            finally:
1377                tar.close()
1378        finally:
1379            os_helper.unlink(path)
1380
1381    def test_add_self(self):
1382        # Test for #1257255.
1383        dstname = os.path.abspath(tmpname)
1384        tar = tarfile.open(tmpname, self.mode)
1385        try:
1386            self.assertEqual(tar.name, dstname,
1387                    "archive name must be absolute")
1388            tar.add(dstname)
1389            self.assertEqual(tar.getnames(), [],
1390                    "added the archive to itself")
1391
1392            with os_helper.change_cwd(TEMPDIR):
1393                tar.add(dstname)
1394            self.assertEqual(tar.getnames(), [],
1395                    "added the archive to itself")
1396        finally:
1397            tar.close()
1398
1399    def test_filter(self):
1400        tempdir = os.path.join(TEMPDIR, "filter")
1401        os.mkdir(tempdir)
1402        try:
1403            for name in ("foo", "bar", "baz"):
1404                name = os.path.join(tempdir, name)
1405                os_helper.create_empty_file(name)
1406
1407            def filter(tarinfo):
1408                if os.path.basename(tarinfo.name) == "bar":
1409                    return
1410                tarinfo.uid = 123
1411                tarinfo.uname = "foo"
1412                return tarinfo
1413
1414            tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
1415            try:
1416                tar.add(tempdir, arcname="empty_dir", filter=filter)
1417            finally:
1418                tar.close()
1419
1420            # Verify that filter is a keyword-only argument
1421            with self.assertRaises(TypeError):
1422                tar.add(tempdir, "empty_dir", True, None, filter)
1423
1424            tar = tarfile.open(tmpname, "r")
1425            try:
1426                for tarinfo in tar:
1427                    self.assertEqual(tarinfo.uid, 123)
1428                    self.assertEqual(tarinfo.uname, "foo")
1429                self.assertEqual(len(tar.getmembers()), 3)
1430            finally:
1431                tar.close()
1432        finally:
1433            os_helper.rmtree(tempdir)
1434
1435    # Guarantee that stored pathnames are not modified. Don't
1436    # remove ./ or ../ or double slashes. Still make absolute
1437    # pathnames relative.
1438    # For details see bug #6054.
1439    def _test_pathname(self, path, cmp_path=None, dir=False):
1440        # Create a tarfile with an empty member named path
1441        # and compare the stored name with the original.
1442        foo = os.path.join(TEMPDIR, "foo")
1443        if not dir:
1444            os_helper.create_empty_file(foo)
1445        else:
1446            os.mkdir(foo)
1447
1448        tar = tarfile.open(tmpname, self.mode)
1449        try:
1450            tar.add(foo, arcname=path)
1451        finally:
1452            tar.close()
1453
1454        tar = tarfile.open(tmpname, "r")
1455        try:
1456            t = tar.next()
1457        finally:
1458            tar.close()
1459
1460        if not dir:
1461            os_helper.unlink(foo)
1462        else:
1463            os_helper.rmdir(foo)
1464
1465        self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
1466
1467
1468    @os_helper.skip_unless_symlink
1469    def test_extractall_symlinks(self):
1470        # Test if extractall works properly when tarfile contains symlinks
1471        tempdir = os.path.join(TEMPDIR, "testsymlinks")
1472        temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
1473        os.mkdir(tempdir)
1474        try:
1475            source_file = os.path.join(tempdir,'source')
1476            target_file = os.path.join(tempdir,'symlink')
1477            with open(source_file,'w') as f:
1478                f.write('something\n')
1479            os.symlink(source_file, target_file)
1480            with tarfile.open(temparchive, 'w') as tar:
1481                tar.add(source_file, arcname="source")
1482                tar.add(target_file, arcname="symlink")
1483            # Let's extract it to the location which contains the symlink
1484            with tarfile.open(temparchive, errorlevel=2) as tar:
1485                # this should not raise OSError: [Errno 17] File exists
1486                try:
1487                    tar.extractall(path=tempdir,
1488                                   filter='fully_trusted')
1489                except OSError:
1490                    self.fail("extractall failed with symlinked files")
1491        finally:
1492            os_helper.unlink(temparchive)
1493            os_helper.rmtree(tempdir)
1494
1495    def test_pathnames(self):
1496        self._test_pathname("foo")
1497        self._test_pathname(os.path.join("foo", ".", "bar"))
1498        self._test_pathname(os.path.join("foo", "..", "bar"))
1499        self._test_pathname(os.path.join(".", "foo"))
1500        self._test_pathname(os.path.join(".", "foo", "."))
1501        self._test_pathname(os.path.join(".", "foo", ".", "bar"))
1502        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1503        self._test_pathname(os.path.join(".", "foo", "..", "bar"))
1504        self._test_pathname(os.path.join("..", "foo"))
1505        self._test_pathname(os.path.join("..", "foo", ".."))
1506        self._test_pathname(os.path.join("..", "foo", ".", "bar"))
1507        self._test_pathname(os.path.join("..", "foo", "..", "bar"))
1508
1509        self._test_pathname("foo" + os.sep + os.sep + "bar")
1510        self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
1511
1512    def test_abs_pathnames(self):
1513        if sys.platform == "win32":
1514            self._test_pathname("C:\\foo", "foo")
1515        else:
1516            self._test_pathname("/foo", "foo")
1517            self._test_pathname("///foo", "foo")
1518
1519    def test_cwd(self):
1520        # Test adding the current working directory.
1521        with os_helper.change_cwd(TEMPDIR):
1522            tar = tarfile.open(tmpname, self.mode)
1523            try:
1524                tar.add(".")
1525            finally:
1526                tar.close()
1527
1528            tar = tarfile.open(tmpname, "r")
1529            try:
1530                for t in tar:
1531                    if t.name != ".":
1532                        self.assertTrue(t.name.startswith("./"), t.name)
1533            finally:
1534                tar.close()
1535
1536    def test_open_nonwritable_fileobj(self):
1537        for exctype in OSError, EOFError, RuntimeError:
1538            class BadFile(io.BytesIO):
1539                first = True
1540                def write(self, data):
1541                    if self.first:
1542                        self.first = False
1543                        raise exctype
1544
1545            f = BadFile()
1546            with self.assertRaises(exctype):
1547                tar = tarfile.open(tmpname, self.mode, fileobj=f,
1548                                   format=tarfile.PAX_FORMAT,
1549                                   pax_headers={'non': 'empty'})
1550            self.assertFalse(f.closed)
1551
1552
1553class GzipWriteTest(GzipTest, WriteTest):
1554    pass
1555
1556
1557class Bz2WriteTest(Bz2Test, WriteTest):
1558    pass
1559
1560
1561class LzmaWriteTest(LzmaTest, WriteTest):
1562    pass
1563
1564
1565class StreamWriteTest(WriteTestBase, unittest.TestCase):
1566
1567    prefix = "w|"
1568    decompressor = None
1569
1570    def test_stream_padding(self):
1571        # Test for bug #1543303.
1572        tar = tarfile.open(tmpname, self.mode)
1573        tar.close()
1574        if self.decompressor:
1575            dec = self.decompressor()
1576            with open(tmpname, "rb") as fobj:
1577                data = fobj.read()
1578            data = dec.decompress(data)
1579            self.assertFalse(dec.unused_data, "found trailing data")
1580        else:
1581            with self.open(tmpname) as fobj:
1582                data = fobj.read()
1583        self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE,
1584                        "incorrect zero padding")
1585
1586    @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"),
1587                         "Missing umask implementation")
1588    @unittest.skipIf(
1589        support.is_emscripten or support.is_wasi,
1590        "Emscripten's/WASI's umask is a stub."
1591    )
1592    def test_file_mode(self):
1593        # Test for issue #8464: Create files with correct
1594        # permissions.
1595        if os.path.exists(tmpname):
1596            os_helper.unlink(tmpname)
1597
1598        original_umask = os.umask(0o022)
1599        try:
1600            tar = tarfile.open(tmpname, self.mode)
1601            tar.close()
1602            mode = os.stat(tmpname).st_mode & 0o777
1603            self.assertEqual(mode, 0o644, "wrong file permissions")
1604        finally:
1605            os.umask(original_umask)
1606
1607
1608class GzipStreamWriteTest(GzipTest, StreamWriteTest):
1609    def test_source_directory_not_leaked(self):
1610        """
1611        Ensure the source directory is not included in the tar header
1612        per bpo-41316.
1613        """
1614        tarfile.open(tmpname, self.mode).close()
1615        payload = pathlib.Path(tmpname).read_text(encoding='latin-1')
1616        assert os.path.dirname(tmpname) not in payload
1617
1618
1619class Bz2StreamWriteTest(Bz2Test, StreamWriteTest):
1620    decompressor = bz2.BZ2Decompressor if bz2 else None
1621
1622class LzmaStreamWriteTest(LzmaTest, StreamWriteTest):
1623    decompressor = lzma.LZMADecompressor if lzma else None
1624
1625
1626class GNUWriteTest(unittest.TestCase):
1627    # This testcase checks for correct creation of GNU Longname
1628    # and Longlink extended headers (cp. bug #812325).
1629
1630    def _length(self, s):
1631        blocks = len(s) // 512 + 1
1632        return blocks * 512
1633
1634    def _calc_size(self, name, link=None):
1635        # Initial tar header
1636        count = 512
1637
1638        if len(name) > tarfile.LENGTH_NAME:
1639            # GNU longname extended header + longname
1640            count += 512
1641            count += self._length(name)
1642        if link is not None and len(link) > tarfile.LENGTH_LINK:
1643            # GNU longlink extended header + longlink
1644            count += 512
1645            count += self._length(link)
1646        return count
1647
1648    def _test(self, name, link=None):
1649        tarinfo = tarfile.TarInfo(name)
1650        if link:
1651            tarinfo.linkname = link
1652            tarinfo.type = tarfile.LNKTYPE
1653
1654        tar = tarfile.open(tmpname, "w")
1655        try:
1656            tar.format = tarfile.GNU_FORMAT
1657            tar.addfile(tarinfo)
1658
1659            v1 = self._calc_size(name, link)
1660            v2 = tar.offset
1661            self.assertEqual(v1, v2, "GNU longname/longlink creation failed")
1662        finally:
1663            tar.close()
1664
1665        tar = tarfile.open(tmpname)
1666        try:
1667            member = tar.next()
1668            self.assertIsNotNone(member,
1669                    "unable to read longname member")
1670            self.assertEqual(tarinfo.name, member.name,
1671                    "unable to read longname member")
1672            self.assertEqual(tarinfo.linkname, member.linkname,
1673                    "unable to read longname member")
1674        finally:
1675            tar.close()
1676
1677    def test_longname_1023(self):
1678        self._test(("longnam/" * 127) + "longnam")
1679
1680    def test_longname_1024(self):
1681        self._test(("longnam/" * 127) + "longname")
1682
1683    def test_longname_1025(self):
1684        self._test(("longnam/" * 127) + "longname_")
1685
1686    def test_longlink_1023(self):
1687        self._test("name", ("longlnk/" * 127) + "longlnk")
1688
1689    def test_longlink_1024(self):
1690        self._test("name", ("longlnk/" * 127) + "longlink")
1691
1692    def test_longlink_1025(self):
1693        self._test("name", ("longlnk/" * 127) + "longlink_")
1694
1695    def test_longnamelink_1023(self):
1696        self._test(("longnam/" * 127) + "longnam",
1697                   ("longlnk/" * 127) + "longlnk")
1698
1699    def test_longnamelink_1024(self):
1700        self._test(("longnam/" * 127) + "longname",
1701                   ("longlnk/" * 127) + "longlink")
1702
1703    def test_longnamelink_1025(self):
1704        self._test(("longnam/" * 127) + "longname_",
1705                   ("longlnk/" * 127) + "longlink_")
1706
1707
1708class DeviceHeaderTest(WriteTestBase, unittest.TestCase):
1709
1710    prefix = "w:"
1711
1712    def test_headers_written_only_for_device_files(self):
1713        # Regression test for bpo-18819.
1714        tempdir = os.path.join(TEMPDIR, "device_header_test")
1715        os.mkdir(tempdir)
1716        try:
1717            tar = tarfile.open(tmpname, self.mode)
1718            try:
1719                input_blk = tarfile.TarInfo(name="my_block_device")
1720                input_reg = tarfile.TarInfo(name="my_regular_file")
1721                input_blk.type = tarfile.BLKTYPE
1722                input_reg.type = tarfile.REGTYPE
1723                tar.addfile(input_blk)
1724                tar.addfile(input_reg)
1725            finally:
1726                tar.close()
1727
1728            # devmajor and devminor should be *interpreted* as 0 in both...
1729            tar = tarfile.open(tmpname, "r")
1730            try:
1731                output_blk = tar.getmember("my_block_device")
1732                output_reg = tar.getmember("my_regular_file")
1733            finally:
1734                tar.close()
1735            self.assertEqual(output_blk.devmajor, 0)
1736            self.assertEqual(output_blk.devminor, 0)
1737            self.assertEqual(output_reg.devmajor, 0)
1738            self.assertEqual(output_reg.devminor, 0)
1739
1740            # ...but the fields should not actually be set on regular files:
1741            with open(tmpname, "rb") as infile:
1742                buf = infile.read()
1743            buf_blk = buf[output_blk.offset:output_blk.offset_data]
1744            buf_reg = buf[output_reg.offset:output_reg.offset_data]
1745            # See `struct posixheader` in GNU docs for byte offsets:
1746            # <https://www.gnu.org/software/tar/manual/html_node/Standard.html>
1747            device_headers = slice(329, 329 + 16)
1748            self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2)
1749            self.assertEqual(buf_reg[device_headers], b"\0" * 16)
1750        finally:
1751            os_helper.rmtree(tempdir)
1752
1753
1754class CreateTest(WriteTestBase, unittest.TestCase):
1755
1756    prefix = "x:"
1757
1758    file_path = os.path.join(TEMPDIR, "spameggs42")
1759
1760    def setUp(self):
1761        os_helper.unlink(tmpname)
1762
1763    @classmethod
1764    def setUpClass(cls):
1765        with open(cls.file_path, "wb") as fobj:
1766            fobj.write(b"aaa")
1767
1768    @classmethod
1769    def tearDownClass(cls):
1770        os_helper.unlink(cls.file_path)
1771
1772    def test_create(self):
1773        with tarfile.open(tmpname, self.mode) as tobj:
1774            tobj.add(self.file_path)
1775
1776        with self.taropen(tmpname) as tobj:
1777            names = tobj.getnames()
1778        self.assertEqual(len(names), 1)
1779        self.assertIn('spameggs42', names[0])
1780
1781    def test_create_existing(self):
1782        with tarfile.open(tmpname, self.mode) as tobj:
1783            tobj.add(self.file_path)
1784
1785        with self.assertRaises(FileExistsError):
1786            tobj = tarfile.open(tmpname, self.mode)
1787
1788        with self.taropen(tmpname) as tobj:
1789            names = tobj.getnames()
1790        self.assertEqual(len(names), 1)
1791        self.assertIn('spameggs42', names[0])
1792
1793    def test_create_taropen(self):
1794        with self.taropen(tmpname, "x") as tobj:
1795            tobj.add(self.file_path)
1796
1797        with self.taropen(tmpname) as tobj:
1798            names = tobj.getnames()
1799        self.assertEqual(len(names), 1)
1800        self.assertIn('spameggs42', names[0])
1801
1802    def test_create_existing_taropen(self):
1803        with self.taropen(tmpname, "x") as tobj:
1804            tobj.add(self.file_path)
1805
1806        with self.assertRaises(FileExistsError):
1807            with self.taropen(tmpname, "x"):
1808                pass
1809
1810        with self.taropen(tmpname) as tobj:
1811            names = tobj.getnames()
1812        self.assertEqual(len(names), 1)
1813        self.assertIn("spameggs42", names[0])
1814
1815    def test_create_pathlike_name(self):
1816        with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj:
1817            self.assertIsInstance(tobj.name, str)
1818            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1819            tobj.add(pathlib.Path(self.file_path))
1820            names = tobj.getnames()
1821        self.assertEqual(len(names), 1)
1822        self.assertIn('spameggs42', names[0])
1823
1824        with self.taropen(tmpname) as tobj:
1825            names = tobj.getnames()
1826        self.assertEqual(len(names), 1)
1827        self.assertIn('spameggs42', names[0])
1828
1829    def test_create_taropen_pathlike_name(self):
1830        with self.taropen(pathlib.Path(tmpname), "x") as tobj:
1831            self.assertIsInstance(tobj.name, str)
1832            self.assertEqual(tobj.name, os.path.abspath(tmpname))
1833            tobj.add(pathlib.Path(self.file_path))
1834            names = tobj.getnames()
1835        self.assertEqual(len(names), 1)
1836        self.assertIn('spameggs42', names[0])
1837
1838        with self.taropen(tmpname) as tobj:
1839            names = tobj.getnames()
1840        self.assertEqual(len(names), 1)
1841        self.assertIn('spameggs42', names[0])
1842
1843
1844class GzipCreateTest(GzipTest, CreateTest):
1845
1846    def test_create_with_compresslevel(self):
1847        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1848            tobj.add(self.file_path)
1849        with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj:
1850            pass
1851
1852
1853class Bz2CreateTest(Bz2Test, CreateTest):
1854
1855    def test_create_with_compresslevel(self):
1856        with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj:
1857            tobj.add(self.file_path)
1858        with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj:
1859            pass
1860
1861
1862class LzmaCreateTest(LzmaTest, CreateTest):
1863
1864    # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel.
1865    # It does not allow for preset to be specified when reading.
1866    def test_create_with_preset(self):
1867        with tarfile.open(tmpname, self.mode, preset=1) as tobj:
1868            tobj.add(self.file_path)
1869
1870
1871class CreateWithXModeTest(CreateTest):
1872
1873    prefix = "x"
1874
1875    test_create_taropen = None
1876    test_create_existing_taropen = None
1877
1878
1879@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation")
1880class HardlinkTest(unittest.TestCase):
1881    # Test the creation of LNKTYPE (hardlink) members in an archive.
1882
1883    def setUp(self):
1884        self.foo = os.path.join(TEMPDIR, "foo")
1885        self.bar = os.path.join(TEMPDIR, "bar")
1886
1887        with open(self.foo, "wb") as fobj:
1888            fobj.write(b"foo")
1889
1890        try:
1891            os.link(self.foo, self.bar)
1892        except PermissionError as e:
1893            self.skipTest('os.link(): %s' % e)
1894
1895        self.tar = tarfile.open(tmpname, "w")
1896        self.tar.add(self.foo)
1897
1898    def tearDown(self):
1899        self.tar.close()
1900        os_helper.unlink(self.foo)
1901        os_helper.unlink(self.bar)
1902
1903    def test_add_twice(self):
1904        # The same name will be added as a REGTYPE every
1905        # time regardless of st_nlink.
1906        tarinfo = self.tar.gettarinfo(self.foo)
1907        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1908                "add file as regular failed")
1909
1910    def test_add_hardlink(self):
1911        tarinfo = self.tar.gettarinfo(self.bar)
1912        self.assertEqual(tarinfo.type, tarfile.LNKTYPE,
1913                "add file as hardlink failed")
1914
1915    def test_dereference_hardlink(self):
1916        self.tar.dereference = True
1917        tarinfo = self.tar.gettarinfo(self.bar)
1918        self.assertEqual(tarinfo.type, tarfile.REGTYPE,
1919                "dereferencing hardlink failed")
1920
1921
1922class PaxWriteTest(GNUWriteTest):
1923
1924    def _test(self, name, link=None):
1925        # See GNUWriteTest.
1926        tarinfo = tarfile.TarInfo(name)
1927        if link:
1928            tarinfo.linkname = link
1929            tarinfo.type = tarfile.LNKTYPE
1930
1931        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
1932        try:
1933            tar.addfile(tarinfo)
1934        finally:
1935            tar.close()
1936
1937        tar = tarfile.open(tmpname)
1938        try:
1939            if link:
1940                l = tar.getmembers()[0].linkname
1941                self.assertEqual(link, l, "PAX longlink creation failed")
1942            else:
1943                n = tar.getmembers()[0].name
1944                self.assertEqual(name, n, "PAX longname creation failed")
1945        finally:
1946            tar.close()
1947
1948    def test_pax_global_header(self):
1949        pax_headers = {
1950                "foo": "bar",
1951                "uid": "0",
1952                "mtime": "1.23",
1953                "test": "\xe4\xf6\xfc",
1954                "\xe4\xf6\xfc": "test"}
1955
1956        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1957                pax_headers=pax_headers)
1958        try:
1959            tar.addfile(tarfile.TarInfo("test"))
1960        finally:
1961            tar.close()
1962
1963        # Test if the global header was written correctly.
1964        tar = tarfile.open(tmpname, encoding="iso8859-1")
1965        try:
1966            self.assertEqual(tar.pax_headers, pax_headers)
1967            self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
1968            # Test if all the fields are strings.
1969            for key, val in tar.pax_headers.items():
1970                self.assertIsNot(type(key), bytes)
1971                self.assertIsNot(type(val), bytes)
1972                if key in tarfile.PAX_NUMBER_FIELDS:
1973                    try:
1974                        tarfile.PAX_NUMBER_FIELDS[key](val)
1975                    except (TypeError, ValueError):
1976                        self.fail("unable to convert pax header field")
1977        finally:
1978            tar.close()
1979
1980    def test_pax_extended_header(self):
1981        # The fields from the pax header have priority over the
1982        # TarInfo.
1983        pax_headers = {"path": "foo", "uid": "123"}
1984
1985        tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
1986                           encoding="iso8859-1")
1987        try:
1988            t = tarfile.TarInfo()
1989            t.name = "\xe4\xf6\xfc" # non-ASCII
1990            t.uid = 8**8 # too large
1991            t.pax_headers = pax_headers
1992            tar.addfile(t)
1993        finally:
1994            tar.close()
1995
1996        tar = tarfile.open(tmpname, encoding="iso8859-1")
1997        try:
1998            t = tar.getmembers()[0]
1999            self.assertEqual(t.pax_headers, pax_headers)
2000            self.assertEqual(t.name, "foo")
2001            self.assertEqual(t.uid, 123)
2002        finally:
2003            tar.close()
2004
2005    def test_create_pax_header(self):
2006        # The ustar header should contain values that can be
2007        # represented reasonably, even if a better (e.g. higher
2008        # precision) version is set in the pax header.
2009        # Issue #45863
2010
2011        # values that should be kept
2012        t = tarfile.TarInfo()
2013        t.name = "foo"
2014        t.mtime = 1000.1
2015        t.size = 100
2016        t.uid = 123
2017        t.gid = 124
2018        info = t.get_info()
2019        header = t.create_pax_header(info, encoding="iso8859-1")
2020        self.assertEqual(info['name'], "foo")
2021        # mtime should be rounded to nearest second
2022        self.assertIsInstance(info['mtime'], int)
2023        self.assertEqual(info['mtime'], 1000)
2024        self.assertEqual(info['size'], 100)
2025        self.assertEqual(info['uid'], 123)
2026        self.assertEqual(info['gid'], 124)
2027        self.assertEqual(header,
2028            b'././@PaxHeader' + bytes(86) \
2029            + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \
2030            + bytes(100) + b'ustar\x0000'+ bytes(247) \
2031            + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \
2032            + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \
2033            + bytes(100) + b'ustar\x0000' + bytes(247))
2034
2035        # values that should be changed
2036        t = tarfile.TarInfo()
2037        t.name = "foo\u3374" # can't be represented in ascii
2038        t.mtime = 10**10 # too big
2039        t.size = 10**10 # too big
2040        t.uid = 8**8 # too big
2041        t.gid = 8**8+1 # too big
2042        info = t.get_info()
2043        header = t.create_pax_header(info, encoding="iso8859-1")
2044        # name is kept as-is in info but should be added to pax header
2045        self.assertEqual(info['name'], "foo\u3374")
2046        self.assertEqual(info['mtime'], 0)
2047        self.assertEqual(info['size'], 0)
2048        self.assertEqual(info['uid'], 0)
2049        self.assertEqual(info['gid'], 0)
2050        self.assertEqual(header,
2051            b'././@PaxHeader' + bytes(86) \
2052            + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \
2053            + bytes(100) + b'ustar\x0000' + bytes(247) \
2054            + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \
2055            + b'16 gid=16777217\n20 size=10000000000\n' \
2056            + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \
2057            + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \
2058            + bytes(100) + b'ustar\x0000' + bytes(247))
2059
2060
2061class UnicodeTest:
2062
2063    def test_iso8859_1_filename(self):
2064        self._test_unicode_filename("iso8859-1")
2065
2066    def test_utf7_filename(self):
2067        self._test_unicode_filename("utf7")
2068
2069    def test_utf8_filename(self):
2070        self._test_unicode_filename("utf-8")
2071
2072    def _test_unicode_filename(self, encoding):
2073        tar = tarfile.open(tmpname, "w", format=self.format,
2074                           encoding=encoding, errors="strict")
2075        try:
2076            name = "\xe4\xf6\xfc"
2077            tar.addfile(tarfile.TarInfo(name))
2078        finally:
2079            tar.close()
2080
2081        tar = tarfile.open(tmpname, encoding=encoding)
2082        try:
2083            self.assertEqual(tar.getmembers()[0].name, name)
2084        finally:
2085            tar.close()
2086
2087    def test_unicode_filename_error(self):
2088        tar = tarfile.open(tmpname, "w", format=self.format,
2089                           encoding="ascii", errors="strict")
2090        try:
2091            tarinfo = tarfile.TarInfo()
2092
2093            tarinfo.name = "\xe4\xf6\xfc"
2094            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2095
2096            tarinfo.name = "foo"
2097            tarinfo.uname = "\xe4\xf6\xfc"
2098            self.assertRaises(UnicodeError, tar.addfile, tarinfo)
2099        finally:
2100            tar.close()
2101
2102    def test_unicode_argument(self):
2103        tar = tarfile.open(tarname, "r",
2104                           encoding="iso8859-1", errors="strict")
2105        try:
2106            for t in tar:
2107                self.assertIs(type(t.name), str)
2108                self.assertIs(type(t.linkname), str)
2109                self.assertIs(type(t.uname), str)
2110                self.assertIs(type(t.gname), str)
2111        finally:
2112            tar.close()
2113
2114    def test_uname_unicode(self):
2115        t = tarfile.TarInfo("foo")
2116        t.uname = "\xe4\xf6\xfc"
2117        t.gname = "\xe4\xf6\xfc"
2118
2119        tar = tarfile.open(tmpname, mode="w", format=self.format,
2120                           encoding="iso8859-1")
2121        try:
2122            tar.addfile(t)
2123        finally:
2124            tar.close()
2125
2126        tar = tarfile.open(tmpname, encoding="iso8859-1")
2127        try:
2128            t = tar.getmember("foo")
2129            self.assertEqual(t.uname, "\xe4\xf6\xfc")
2130            self.assertEqual(t.gname, "\xe4\xf6\xfc")
2131
2132            if self.format != tarfile.PAX_FORMAT:
2133                tar.close()
2134                tar = tarfile.open(tmpname, encoding="ascii")
2135                t = tar.getmember("foo")
2136                self.assertEqual(t.uname, "\udce4\udcf6\udcfc")
2137                self.assertEqual(t.gname, "\udce4\udcf6\udcfc")
2138        finally:
2139            tar.close()
2140
2141
2142class UstarUnicodeTest(UnicodeTest, unittest.TestCase):
2143
2144    format = tarfile.USTAR_FORMAT
2145
2146    # Test whether the utf-8 encoded version of a filename exceeds the 100
2147    # bytes name field limit (every occurrence of '\xff' will be expanded to 2
2148    # bytes).
2149    def test_unicode_name1(self):
2150        self._test_ustar_name("0123456789" * 10)
2151        self._test_ustar_name("0123456789" * 10 + "0", ValueError)
2152        self._test_ustar_name("0123456789" * 9 + "01234567\xff")
2153        self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError)
2154
2155    def test_unicode_name2(self):
2156        self._test_ustar_name("0123456789" * 9 + "012345\xff\xff")
2157        self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError)
2158
2159    # Test whether the utf-8 encoded version of a filename exceeds the 155
2160    # bytes prefix + '/' + 100 bytes name limit.
2161    def test_unicode_longname1(self):
2162        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10)
2163        self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError)
2164        self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10)
2165        self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError)
2166
2167    def test_unicode_longname2(self):
2168        self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError)
2169        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError)
2170
2171    def test_unicode_longname3(self):
2172        self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError)
2173        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff")
2174        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError)
2175
2176    def test_unicode_longname4(self):
2177        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff")
2178        self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError)
2179
2180    def _test_ustar_name(self, name, exc=None):
2181        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2182            t = tarfile.TarInfo(name)
2183            if exc is None:
2184                tar.addfile(t)
2185            else:
2186                self.assertRaises(exc, tar.addfile, t)
2187
2188        if exc is None:
2189            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2190                for t in tar:
2191                    self.assertEqual(name, t.name)
2192                    break
2193
2194    # Test the same as above for the 100 bytes link field.
2195    def test_unicode_link1(self):
2196        self._test_ustar_link("0123456789" * 10)
2197        self._test_ustar_link("0123456789" * 10 + "0", ValueError)
2198        self._test_ustar_link("0123456789" * 9 + "01234567\xff")
2199        self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError)
2200
2201    def test_unicode_link2(self):
2202        self._test_ustar_link("0123456789" * 9 + "012345\xff\xff")
2203        self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError)
2204
2205    def _test_ustar_link(self, name, exc=None):
2206        with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar:
2207            t = tarfile.TarInfo("foo")
2208            t.linkname = name
2209            if exc is None:
2210                tar.addfile(t)
2211            else:
2212                self.assertRaises(exc, tar.addfile, t)
2213
2214        if exc is None:
2215            with tarfile.open(tmpname, "r", encoding="utf-8") as tar:
2216                for t in tar:
2217                    self.assertEqual(name, t.linkname)
2218                    break
2219
2220
2221class GNUUnicodeTest(UnicodeTest, unittest.TestCase):
2222
2223    format = tarfile.GNU_FORMAT
2224
2225    def test_bad_pax_header(self):
2226        # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields
2227        # without a hdrcharset=BINARY header.
2228        for encoding, name in (
2229                ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"),
2230                ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),):
2231            with tarfile.open(tarname, encoding=encoding,
2232                              errors="surrogateescape") as tar:
2233                try:
2234                    t = tar.getmember(name)
2235                except KeyError:
2236                    self.fail("unable to read bad GNU tar pax header")
2237
2238
2239class PAXUnicodeTest(UnicodeTest, unittest.TestCase):
2240
2241    format = tarfile.PAX_FORMAT
2242
2243    # PAX_FORMAT ignores encoding in write mode.
2244    test_unicode_filename_error = None
2245
2246    def test_binary_header(self):
2247        # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field.
2248        for encoding, name in (
2249                ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"),
2250                ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),):
2251            with tarfile.open(tarname, encoding=encoding,
2252                              errors="surrogateescape") as tar:
2253                try:
2254                    t = tar.getmember(name)
2255                except KeyError:
2256                    self.fail("unable to read POSIX.1-2008 binary header")
2257
2258
2259class AppendTestBase:
2260    # Test append mode (cp. patch #1652681).
2261
2262    def setUp(self):
2263        self.tarname = tmpname
2264        if os.path.exists(self.tarname):
2265            os_helper.unlink(self.tarname)
2266
2267    def _create_testtar(self, mode="w:"):
2268        with tarfile.open(tarname, encoding="iso8859-1") as src:
2269            t = src.getmember("ustar/regtype")
2270            t.name = "foo"
2271            with src.extractfile(t) as f:
2272                with tarfile.open(self.tarname, mode) as tar:
2273                    tar.addfile(t, f)
2274
2275    def test_append_compressed(self):
2276        self._create_testtar("w:" + self.suffix)
2277        self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
2278
2279class AppendTest(AppendTestBase, unittest.TestCase):
2280    test_append_compressed = None
2281
2282    def _add_testfile(self, fileobj=None):
2283        with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
2284            tar.addfile(tarfile.TarInfo("bar"))
2285
2286    def _test(self, names=["bar"], fileobj=None):
2287        with tarfile.open(self.tarname, fileobj=fileobj) as tar:
2288            self.assertEqual(tar.getnames(), names)
2289
2290    def test_non_existing(self):
2291        self._add_testfile()
2292        self._test()
2293
2294    def test_empty(self):
2295        tarfile.open(self.tarname, "w:").close()
2296        self._add_testfile()
2297        self._test()
2298
2299    def test_empty_fileobj(self):
2300        fobj = io.BytesIO(b"\0" * 1024)
2301        self._add_testfile(fobj)
2302        fobj.seek(0)
2303        self._test(fileobj=fobj)
2304
2305    def test_fileobj(self):
2306        self._create_testtar()
2307        with open(self.tarname, "rb") as fobj:
2308            data = fobj.read()
2309        fobj = io.BytesIO(data)
2310        self._add_testfile(fobj)
2311        fobj.seek(0)
2312        self._test(names=["foo", "bar"], fileobj=fobj)
2313
2314    def test_existing(self):
2315        self._create_testtar()
2316        self._add_testfile()
2317        self._test(names=["foo", "bar"])
2318
2319    # Append mode is supposed to fail if the tarfile to append to
2320    # does not end with a zero block.
2321    def _test_error(self, data):
2322        with open(self.tarname, "wb") as fobj:
2323            fobj.write(data)
2324        self.assertRaises(tarfile.ReadError, self._add_testfile)
2325
2326    def test_null(self):
2327        self._test_error(b"")
2328
2329    def test_incomplete(self):
2330        self._test_error(b"\0" * 13)
2331
2332    def test_premature_eof(self):
2333        data = tarfile.TarInfo("foo").tobuf()
2334        self._test_error(data)
2335
2336    def test_trailing_garbage(self):
2337        data = tarfile.TarInfo("foo").tobuf()
2338        self._test_error(data + b"\0" * 13)
2339
2340    def test_invalid(self):
2341        self._test_error(b"a" * 512)
2342
2343class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase):
2344    pass
2345
2346class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase):
2347    pass
2348
2349class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase):
2350    pass
2351
2352
2353class LimitsTest(unittest.TestCase):
2354
2355    def test_ustar_limits(self):
2356        # 100 char name
2357        tarinfo = tarfile.TarInfo("0123456789" * 10)
2358        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2359
2360        # 101 char name that cannot be stored
2361        tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
2362        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2363
2364        # 256 char name with a slash at pos 156
2365        tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
2366        tarinfo.tobuf(tarfile.USTAR_FORMAT)
2367
2368        # 256 char name that cannot be stored
2369        tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
2370        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2371
2372        # 512 char name
2373        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2374        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2375
2376        # 512 char linkname
2377        tarinfo = tarfile.TarInfo("longlink")
2378        tarinfo.linkname = "123/" * 126 + "longname"
2379        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2380
2381        # uid > 8 digits
2382        tarinfo = tarfile.TarInfo("name")
2383        tarinfo.uid = 0o10000000
2384        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
2385
2386    def test_gnu_limits(self):
2387        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2388        tarinfo.tobuf(tarfile.GNU_FORMAT)
2389
2390        tarinfo = tarfile.TarInfo("longlink")
2391        tarinfo.linkname = "123/" * 126 + "longname"
2392        tarinfo.tobuf(tarfile.GNU_FORMAT)
2393
2394        # uid >= 256 ** 7
2395        tarinfo = tarfile.TarInfo("name")
2396        tarinfo.uid = 0o4000000000000000000
2397        self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
2398
2399    def test_pax_limits(self):
2400        tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
2401        tarinfo.tobuf(tarfile.PAX_FORMAT)
2402
2403        tarinfo = tarfile.TarInfo("longlink")
2404        tarinfo.linkname = "123/" * 126 + "longname"
2405        tarinfo.tobuf(tarfile.PAX_FORMAT)
2406
2407        tarinfo = tarfile.TarInfo("name")
2408        tarinfo.uid = 0o4000000000000000000
2409        tarinfo.tobuf(tarfile.PAX_FORMAT)
2410
2411
2412class MiscTest(unittest.TestCase):
2413
2414    def test_char_fields(self):
2415        self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"),
2416                         b"foo\0\0\0\0\0")
2417        self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"),
2418                         b"foo")
2419        self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"),
2420                         "foo")
2421        self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"),
2422                         "foo")
2423
2424    def test_read_number_fields(self):
2425        # Issue 13158: Test if GNU tar specific base-256 number fields
2426        # are decoded correctly.
2427        self.assertEqual(tarfile.nti(b"0000001\x00"), 1)
2428        self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777)
2429        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"),
2430                         0o10000000)
2431        self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"),
2432                         0xffffffff)
2433        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"),
2434                         -1)
2435        self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"),
2436                         -100)
2437        self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"),
2438                         -0x100000000000000)
2439
2440        # Issue 24514: Test if empty number fields are converted to zero.
2441        self.assertEqual(tarfile.nti(b"\0"), 0)
2442        self.assertEqual(tarfile.nti(b"       \0"), 0)
2443
2444    def test_write_number_fields(self):
2445        self.assertEqual(tarfile.itn(1), b"0000001\x00")
2446        self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00")
2447        self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT),
2448                         b"\x80\x00\x00\x00\x00\x20\x00\x00")
2449        self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT),
2450                         b"\x80\x00\x00\x00\xff\xff\xff\xff")
2451        self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT),
2452                         b"\xff\xff\xff\xff\xff\xff\xff\xff")
2453        self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT),
2454                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2455        self.assertEqual(tarfile.itn(-0x100000000000000,
2456                                     format=tarfile.GNU_FORMAT),
2457                         b"\xff\x00\x00\x00\x00\x00\x00\x00")
2458
2459        # Issue 32713: Test if itn() supports float values outside the
2460        # non-GNU format range
2461        self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT),
2462                         b"\xff\xff\xff\xff\xff\xff\xff\x9c")
2463        self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT),
2464                         b"\x80\x00\x00\x10\x00\x00\x00\x00")
2465        self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0)
2466
2467    def test_number_field_limits(self):
2468        with self.assertRaises(ValueError):
2469            tarfile.itn(-1, 8, tarfile.USTAR_FORMAT)
2470        with self.assertRaises(ValueError):
2471            tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT)
2472        with self.assertRaises(ValueError):
2473            tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT)
2474        with self.assertRaises(ValueError):
2475            tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT)
2476
2477    def test__all__(self):
2478        not_exported = {
2479            'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE',
2480            'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME',
2481            'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE',
2482            'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE',
2483            'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE',
2484            'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES',
2485            'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS',
2486            'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums',
2487            'copyfileobj', 'filemode', 'EmptyHeaderError',
2488            'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError',
2489            'SubsequentHeaderError', 'ExFileObject', 'main',
2490            "fully_trusted_filter", "data_filter",
2491            "tar_filter", "FilterError", "AbsoluteLinkError",
2492            "OutsideDestinationError", "SpecialFileError", "AbsolutePathError",
2493            "LinkOutsideDestinationError",
2494            }
2495        support.check__all__(self, tarfile, not_exported=not_exported)
2496
2497    def test_useful_error_message_when_modules_missing(self):
2498        fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz')
2499        with self.assertRaises(tarfile.ReadError) as excinfo:
2500            error = tarfile.CompressionError('lzma module is not available'),
2501            with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error):
2502                tarfile.open(fname)
2503
2504        self.assertIn(
2505            "\n- method xz: CompressionError('lzma module is not available')\n",
2506            str(excinfo.exception),
2507        )
2508
2509
2510class CommandLineTest(unittest.TestCase):
2511
2512    def tarfilecmd(self, *args, **kwargs):
2513        rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args,
2514                                                      **kwargs)
2515        return out.replace(os.linesep.encode(), b'\n')
2516
2517    def tarfilecmd_failure(self, *args):
2518        return script_helper.assert_python_failure('-m', 'tarfile', *args)
2519
2520    def make_simple_tarfile(self, tar_name):
2521        files = [support.findfile('tokenize_tests.txt'),
2522                 support.findfile('tokenize_tests-no-coding-cookie-'
2523                                  'and-utf8-bom-sig-only.txt')]
2524        self.addCleanup(os_helper.unlink, tar_name)
2525        with tarfile.open(tar_name, 'w') as tf:
2526            for tardata in files:
2527                tf.add(tardata, arcname=os.path.basename(tardata))
2528
2529    def make_evil_tarfile(self, tar_name):
2530        files = [support.findfile('tokenize_tests.txt')]
2531        self.addCleanup(os_helper.unlink, tar_name)
2532        with tarfile.open(tar_name, 'w') as tf:
2533            benign = tarfile.TarInfo('benign')
2534            tf.addfile(benign, fileobj=io.BytesIO(b''))
2535            evil = tarfile.TarInfo('../evil')
2536            tf.addfile(evil, fileobj=io.BytesIO(b''))
2537
2538    def test_bad_use(self):
2539        rc, out, err = self.tarfilecmd_failure()
2540        self.assertEqual(out, b'')
2541        self.assertIn(b'usage', err.lower())
2542        self.assertIn(b'error', err.lower())
2543        self.assertIn(b'required', err.lower())
2544        rc, out, err = self.tarfilecmd_failure('-l', '')
2545        self.assertEqual(out, b'')
2546        self.assertNotEqual(err.strip(), b'')
2547
2548    def test_test_command(self):
2549        for tar_name in testtarnames:
2550            for opt in '-t', '--test':
2551                out = self.tarfilecmd(opt, tar_name)
2552                self.assertEqual(out, b'')
2553
2554    def test_test_command_verbose(self):
2555        for tar_name in testtarnames:
2556            for opt in '-v', '--verbose':
2557                out = self.tarfilecmd(opt, '-t', tar_name,
2558                                      PYTHONIOENCODING='utf-8')
2559                self.assertIn(b'is a tar archive.\n', out)
2560
2561    def test_test_command_invalid_file(self):
2562        zipname = support.findfile('zipdir.zip')
2563        rc, out, err = self.tarfilecmd_failure('-t', zipname)
2564        self.assertIn(b' is not a tar archive.', err)
2565        self.assertEqual(out, b'')
2566        self.assertEqual(rc, 1)
2567
2568        for tar_name in testtarnames:
2569            with self.subTest(tar_name=tar_name):
2570                with open(tar_name, 'rb') as f:
2571                    data = f.read()
2572                try:
2573                    with open(tmpname, 'wb') as f:
2574                        f.write(data[:511])
2575                    rc, out, err = self.tarfilecmd_failure('-t', tmpname)
2576                    self.assertEqual(out, b'')
2577                    self.assertEqual(rc, 1)
2578                finally:
2579                    os_helper.unlink(tmpname)
2580
2581    def test_list_command(self):
2582        for tar_name in testtarnames:
2583            with support.captured_stdout() as t:
2584                with tarfile.open(tar_name, 'r') as tf:
2585                    tf.list(verbose=False)
2586            expected = t.getvalue().encode('ascii', 'backslashreplace')
2587            for opt in '-l', '--list':
2588                out = self.tarfilecmd(opt, tar_name,
2589                                      PYTHONIOENCODING='ascii')
2590                self.assertEqual(out, expected)
2591
2592    def test_list_command_verbose(self):
2593        for tar_name in testtarnames:
2594            with support.captured_stdout() as t:
2595                with tarfile.open(tar_name, 'r') as tf:
2596                    tf.list(verbose=True)
2597            expected = t.getvalue().encode('ascii', 'backslashreplace')
2598            for opt in '-v', '--verbose':
2599                out = self.tarfilecmd(opt, '-l', tar_name,
2600                                      PYTHONIOENCODING='ascii')
2601                self.assertEqual(out, expected)
2602
2603    def test_list_command_invalid_file(self):
2604        zipname = support.findfile('zipdir.zip')
2605        rc, out, err = self.tarfilecmd_failure('-l', zipname)
2606        self.assertIn(b' is not a tar archive.', err)
2607        self.assertEqual(out, b'')
2608        self.assertEqual(rc, 1)
2609
2610    def test_create_command(self):
2611        files = [support.findfile('tokenize_tests.txt'),
2612                 support.findfile('tokenize_tests-no-coding-cookie-'
2613                                  'and-utf8-bom-sig-only.txt')]
2614        for opt in '-c', '--create':
2615            try:
2616                out = self.tarfilecmd(opt, tmpname, *files)
2617                self.assertEqual(out, b'')
2618                with tarfile.open(tmpname) as tar:
2619                    tar.getmembers()
2620            finally:
2621                os_helper.unlink(tmpname)
2622
2623    def test_create_command_verbose(self):
2624        files = [support.findfile('tokenize_tests.txt'),
2625                 support.findfile('tokenize_tests-no-coding-cookie-'
2626                                  'and-utf8-bom-sig-only.txt')]
2627        for opt in '-v', '--verbose':
2628            try:
2629                out = self.tarfilecmd(opt, '-c', tmpname, *files,
2630                                      PYTHONIOENCODING='utf-8')
2631                self.assertIn(b' file created.', out)
2632                with tarfile.open(tmpname) as tar:
2633                    tar.getmembers()
2634            finally:
2635                os_helper.unlink(tmpname)
2636
2637    def test_create_command_dotless_filename(self):
2638        files = [support.findfile('tokenize_tests.txt')]
2639        try:
2640            out = self.tarfilecmd('-c', dotlessname, *files)
2641            self.assertEqual(out, b'')
2642            with tarfile.open(dotlessname) as tar:
2643                tar.getmembers()
2644        finally:
2645            os_helper.unlink(dotlessname)
2646
2647    def test_create_command_dot_started_filename(self):
2648        tar_name = os.path.join(TEMPDIR, ".testtar")
2649        files = [support.findfile('tokenize_tests.txt')]
2650        try:
2651            out = self.tarfilecmd('-c', tar_name, *files)
2652            self.assertEqual(out, b'')
2653            with tarfile.open(tar_name) as tar:
2654                tar.getmembers()
2655        finally:
2656            os_helper.unlink(tar_name)
2657
2658    def test_create_command_compressed(self):
2659        files = [support.findfile('tokenize_tests.txt'),
2660                 support.findfile('tokenize_tests-no-coding-cookie-'
2661                                  'and-utf8-bom-sig-only.txt')]
2662        for filetype in (GzipTest, Bz2Test, LzmaTest):
2663            if not filetype.open:
2664                continue
2665            try:
2666                tar_name = tmpname + '.' + filetype.suffix
2667                out = self.tarfilecmd('-c', tar_name, *files)
2668                with filetype.taropen(tar_name) as tar:
2669                    tar.getmembers()
2670            finally:
2671                os_helper.unlink(tar_name)
2672
2673    def test_extract_command(self):
2674        self.make_simple_tarfile(tmpname)
2675        for opt in '-e', '--extract':
2676            try:
2677                with os_helper.temp_cwd(tarextdir):
2678                    out = self.tarfilecmd(opt, tmpname)
2679                self.assertEqual(out, b'')
2680            finally:
2681                os_helper.rmtree(tarextdir)
2682
2683    def test_extract_command_verbose(self):
2684        self.make_simple_tarfile(tmpname)
2685        for opt in '-v', '--verbose':
2686            try:
2687                with os_helper.temp_cwd(tarextdir):
2688                    out = self.tarfilecmd(opt, '-e', tmpname,
2689                                          PYTHONIOENCODING='utf-8')
2690                self.assertIn(b' file is extracted.', out)
2691            finally:
2692                os_helper.rmtree(tarextdir)
2693
2694    def test_extract_command_filter(self):
2695        self.make_evil_tarfile(tmpname)
2696        # Make an inner directory, so the member named '../evil'
2697        # is still extracted into `tarextdir`
2698        destdir = os.path.join(tarextdir, 'dest')
2699        os.mkdir(tarextdir)
2700        try:
2701            with os_helper.temp_cwd(destdir):
2702                self.tarfilecmd_failure('-e', tmpname,
2703                                        '-v',
2704                                        '--filter', 'data')
2705                out = self.tarfilecmd('-e', tmpname,
2706                                      '-v',
2707                                      '--filter', 'fully_trusted',
2708                                      PYTHONIOENCODING='utf-8')
2709                self.assertIn(b' file is extracted.', out)
2710        finally:
2711            os_helper.rmtree(tarextdir)
2712
2713    def test_extract_command_different_directory(self):
2714        self.make_simple_tarfile(tmpname)
2715        try:
2716            with os_helper.temp_cwd(tarextdir):
2717                out = self.tarfilecmd('-e', tmpname, 'spamdir')
2718            self.assertEqual(out, b'')
2719        finally:
2720            os_helper.rmtree(tarextdir)
2721
2722    def test_extract_command_invalid_file(self):
2723        zipname = support.findfile('zipdir.zip')
2724        with os_helper.temp_cwd(tarextdir):
2725            rc, out, err = self.tarfilecmd_failure('-e', zipname)
2726        self.assertIn(b' is not a tar archive.', err)
2727        self.assertEqual(out, b'')
2728        self.assertEqual(rc, 1)
2729
2730
2731class ContextManagerTest(unittest.TestCase):
2732
2733    def test_basic(self):
2734        with tarfile.open(tarname) as tar:
2735            self.assertFalse(tar.closed, "closed inside runtime context")
2736        self.assertTrue(tar.closed, "context manager failed")
2737
2738    def test_closed(self):
2739        # The __enter__() method is supposed to raise OSError
2740        # if the TarFile object is already closed.
2741        tar = tarfile.open(tarname)
2742        tar.close()
2743        with self.assertRaises(OSError):
2744            with tar:
2745                pass
2746
2747    def test_exception(self):
2748        # Test if the OSError exception is passed through properly.
2749        with self.assertRaises(Exception) as exc:
2750            with tarfile.open(tarname) as tar:
2751                raise OSError
2752        self.assertIsInstance(exc.exception, OSError,
2753                              "wrong exception raised in context manager")
2754        self.assertTrue(tar.closed, "context manager failed")
2755
2756    def test_no_eof(self):
2757        # __exit__() must not write end-of-archive blocks if an
2758        # exception was raised.
2759        try:
2760            with tarfile.open(tmpname, "w") as tar:
2761                raise Exception
2762        except:
2763            pass
2764        self.assertEqual(os.path.getsize(tmpname), 0,
2765                "context manager wrote an end-of-archive block")
2766        self.assertTrue(tar.closed, "context manager failed")
2767
2768    def test_eof(self):
2769        # __exit__() must write end-of-archive blocks, i.e. call
2770        # TarFile.close() if there was no error.
2771        with tarfile.open(tmpname, "w"):
2772            pass
2773        self.assertNotEqual(os.path.getsize(tmpname), 0,
2774                "context manager wrote no end-of-archive block")
2775
2776    def test_fileobj(self):
2777        # Test that __exit__() did not close the external file
2778        # object.
2779        with open(tmpname, "wb") as fobj:
2780            try:
2781                with tarfile.open(fileobj=fobj, mode="w") as tar:
2782                    raise Exception
2783            except:
2784                pass
2785            self.assertFalse(fobj.closed, "external file object was closed")
2786            self.assertTrue(tar.closed, "context manager failed")
2787
2788
2789@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing")
2790class LinkEmulationTest(ReadTest, unittest.TestCase):
2791
2792    # Test for issue #8741 regression. On platforms that do not support
2793    # symbolic or hard links tarfile tries to extract these types of members
2794    # as the regular files they point to.
2795    def _test_link_extraction(self, name):
2796        self.tar.extract(name, TEMPDIR, filter='fully_trusted')
2797        with open(os.path.join(TEMPDIR, name), "rb") as f:
2798            data = f.read()
2799        self.assertEqual(sha256sum(data), sha256_regtype)
2800
2801    # See issues #1578269, #8879, and #17689 for some history on these skips
2802    @unittest.skipIf(hasattr(os.path, "islink"),
2803                     "Skip emulation - has os.path.islink but not os.link")
2804    def test_hardlink_extraction1(self):
2805        self._test_link_extraction("ustar/lnktype")
2806
2807    @unittest.skipIf(hasattr(os.path, "islink"),
2808                     "Skip emulation - has os.path.islink but not os.link")
2809    def test_hardlink_extraction2(self):
2810        self._test_link_extraction("./ustar/linktest2/lnktype")
2811
2812    @unittest.skipIf(hasattr(os, "symlink"),
2813                     "Skip emulation if symlink exists")
2814    def test_symlink_extraction1(self):
2815        self._test_link_extraction("ustar/symtype")
2816
2817    @unittest.skipIf(hasattr(os, "symlink"),
2818                     "Skip emulation if symlink exists")
2819    def test_symlink_extraction2(self):
2820        self._test_link_extraction("./ustar/linktest2/symtype")
2821
2822
2823class Bz2PartialReadTest(Bz2Test, unittest.TestCase):
2824    # Issue5068: The _BZ2Proxy.read() method loops forever
2825    # on an empty or partial bzipped file.
2826
2827    def _test_partial_input(self, mode):
2828        class MyBytesIO(io.BytesIO):
2829            hit_eof = False
2830            def read(self, n):
2831                if self.hit_eof:
2832                    raise AssertionError("infinite loop detected in "
2833                                         "tarfile.open()")
2834                self.hit_eof = self.tell() == len(self.getvalue())
2835                return super(MyBytesIO, self).read(n)
2836            def seek(self, *args):
2837                self.hit_eof = False
2838                return super(MyBytesIO, self).seek(*args)
2839
2840        data = bz2.compress(tarfile.TarInfo("foo").tobuf())
2841        for x in range(len(data) + 1):
2842            try:
2843                tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
2844            except tarfile.ReadError:
2845                pass # we have no interest in ReadErrors
2846
2847    def test_partial_input(self):
2848        self._test_partial_input("r")
2849
2850    def test_partial_input_bz2(self):
2851        self._test_partial_input("r:bz2")
2852
2853
2854def root_is_uid_gid_0():
2855    try:
2856        import pwd, grp
2857    except ImportError:
2858        return False
2859    if pwd.getpwuid(0)[0] != 'root':
2860        return False
2861    if grp.getgrgid(0)[0] != 'root':
2862        return False
2863    return True
2864
2865
2866@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown")
2867@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid")
2868class NumericOwnerTest(unittest.TestCase):
2869    # mock the following:
2870    #  os.chown: so we can test what's being called
2871    #  os.chmod: so the modes are not actually changed. if they are, we can't
2872    #             delete the files/directories
2873    #  os.geteuid: so we can lie and say we're root (uid = 0)
2874
2875    @staticmethod
2876    def _make_test_archive(filename_1, dirname_1, filename_2):
2877        # the file contents to write
2878        fobj = io.BytesIO(b"content")
2879
2880        # create a tar file with a file, a directory, and a file within that
2881        #  directory. Assign various .uid/.gid values to them
2882        items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj),
2883                 (dirname_1,  77, 76, tarfile.DIRTYPE, None),
2884                 (filename_2, 88, 87, tarfile.REGTYPE, fobj),
2885                 ]
2886        with tarfile.open(tmpname, 'w') as tarfl:
2887            for name, uid, gid, typ, contents in items:
2888                t = tarfile.TarInfo(name)
2889                t.uid = uid
2890                t.gid = gid
2891                t.uname = 'root'
2892                t.gname = 'root'
2893                t.type = typ
2894                tarfl.addfile(t, contents)
2895
2896        # return the full pathname to the tar file
2897        return tmpname
2898
2899    @staticmethod
2900    @contextmanager
2901    def _setup_test(mock_geteuid):
2902        mock_geteuid.return_value = 0  # lie and say we're root
2903        fname = 'numeric-owner-testfile'
2904        dirname = 'dir'
2905
2906        # the names we want stored in the tarfile
2907        filename_1 = fname
2908        dirname_1 = dirname
2909        filename_2 = os.path.join(dirname, fname)
2910
2911        # create the tarfile with the contents we're after
2912        tar_filename = NumericOwnerTest._make_test_archive(filename_1,
2913                                                           dirname_1,
2914                                                           filename_2)
2915
2916        # open the tarfile for reading. yield it and the names of the items
2917        #  we stored into the file
2918        with tarfile.open(tar_filename) as tarfl:
2919            yield tarfl, filename_1, dirname_1, filename_2
2920
2921    @unittest.mock.patch('os.chown')
2922    @unittest.mock.patch('os.chmod')
2923    @unittest.mock.patch('os.geteuid')
2924    def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod,
2925                                        mock_chown):
2926        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _,
2927                                                filename_2):
2928            tarfl.extract(filename_1, TEMPDIR, numeric_owner=True,
2929                          filter='fully_trusted')
2930            tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True,
2931                          filter='fully_trusted')
2932
2933        # convert to filesystem paths
2934        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2935        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2936
2937        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2938                                     unittest.mock.call(f_filename_2, 88, 87),
2939                                     ],
2940                                    any_order=True)
2941
2942    @unittest.mock.patch('os.chown')
2943    @unittest.mock.patch('os.chmod')
2944    @unittest.mock.patch('os.geteuid')
2945    def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod,
2946                                           mock_chown):
2947        with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1,
2948                                                filename_2):
2949            tarfl.extractall(TEMPDIR, numeric_owner=True,
2950                             filter='fully_trusted')
2951
2952        # convert to filesystem paths
2953        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2954        f_dirname_1  = os.path.join(TEMPDIR, dirname_1)
2955        f_filename_2 = os.path.join(TEMPDIR, filename_2)
2956
2957        mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98),
2958                                     unittest.mock.call(f_dirname_1, 77, 76),
2959                                     unittest.mock.call(f_filename_2, 88, 87),
2960                                     ],
2961                                    any_order=True)
2962
2963    # this test requires that uid=0 and gid=0 really be named 'root'. that's
2964    #  because the uname and gname in the test file are 'root', and extract()
2965    #  will look them up using pwd and grp to find their uid and gid, which we
2966    #  test here to be 0.
2967    @unittest.skipUnless(root_is_uid_gid_0(),
2968                         'uid=0,gid=0 must be named "root"')
2969    @unittest.mock.patch('os.chown')
2970    @unittest.mock.patch('os.chmod')
2971    @unittest.mock.patch('os.geteuid')
2972    def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod,
2973                                           mock_chown):
2974        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2975            tarfl.extract(filename_1, TEMPDIR, numeric_owner=False,
2976                          filter='fully_trusted')
2977
2978        # convert to filesystem paths
2979        f_filename_1 = os.path.join(TEMPDIR, filename_1)
2980
2981        mock_chown.assert_called_with(f_filename_1, 0, 0)
2982
2983    @unittest.mock.patch('os.geteuid')
2984    def test_keyword_only(self, mock_geteuid):
2985        with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _):
2986            self.assertRaises(TypeError,
2987                              tarfl.extract, filename_1, TEMPDIR, False, True)
2988
2989
2990class ReplaceTests(ReadTest, unittest.TestCase):
2991    def test_replace_name(self):
2992        member = self.tar.getmember('ustar/regtype')
2993        replaced = member.replace(name='misc/other')
2994        self.assertEqual(replaced.name, 'misc/other')
2995        self.assertEqual(member.name, 'ustar/regtype')
2996        self.assertEqual(self.tar.getmember('ustar/regtype').name,
2997                         'ustar/regtype')
2998
2999    def test_replace_deep(self):
3000        member = self.tar.getmember('pax/regtype1')
3001        replaced = member.replace()
3002        replaced.pax_headers['gname'] = 'not-bar'
3003        self.assertEqual(member.pax_headers['gname'], 'bar')
3004        self.assertEqual(
3005            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar')
3006
3007    def test_replace_shallow(self):
3008        member = self.tar.getmember('pax/regtype1')
3009        replaced = member.replace(deep=False)
3010        replaced.pax_headers['gname'] = 'not-bar'
3011        self.assertEqual(member.pax_headers['gname'], 'not-bar')
3012        self.assertEqual(
3013            self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar')
3014
3015    def test_replace_all(self):
3016        member = self.tar.getmember('ustar/regtype')
3017        for attr_name in ('name', 'mtime', 'mode', 'linkname',
3018                          'uid', 'gid', 'uname', 'gname'):
3019            with self.subTest(attr_name=attr_name):
3020                replaced = member.replace(**{attr_name: None})
3021                self.assertEqual(getattr(replaced, attr_name), None)
3022                self.assertNotEqual(getattr(member, attr_name), None)
3023
3024    def test_replace_internal(self):
3025        member = self.tar.getmember('ustar/regtype')
3026        with self.assertRaises(TypeError):
3027            member.replace(offset=123456789)
3028
3029
3030class NoneInfoExtractTests(ReadTest):
3031    # These mainly check that all kinds of members are extracted successfully
3032    # if some metadata is None.
3033    # Some of the methods do additional spot checks.
3034
3035    # We also test that the default filters can deal with None.
3036
3037    extraction_filter = None
3038
3039    @classmethod
3040    def setUpClass(cls):
3041        tar = tarfile.open(tarname, mode='r', encoding="iso8859-1")
3042        cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl"
3043        tar.errorlevel = 0
3044        tar.extractall(cls.control_dir, filter=cls.extraction_filter)
3045        tar.close()
3046        cls.control_paths = set(
3047            p.relative_to(cls.control_dir)
3048            for p in pathlib.Path(cls.control_dir).glob('**/*'))
3049
3050    @classmethod
3051    def tearDownClass(cls):
3052        shutil.rmtree(cls.control_dir)
3053
3054    def check_files_present(self, directory):
3055        got_paths = set(
3056            p.relative_to(directory)
3057            for p in pathlib.Path(directory).glob('**/*'))
3058        self.assertEqual(self.control_paths, got_paths)
3059
3060    @contextmanager
3061    def extract_with_none(self, *attr_names):
3062        DIR = pathlib.Path(TEMPDIR) / "extractall_none"
3063        self.tar.errorlevel = 0
3064        for member in self.tar.getmembers():
3065            for attr_name in attr_names:
3066                setattr(member, attr_name, None)
3067        with os_helper.temp_dir(DIR):
3068            self.tar.extractall(DIR, filter='fully_trusted')
3069            self.check_files_present(DIR)
3070            yield DIR
3071
3072    def test_extractall_none_mtime(self):
3073        # mtimes of extracted files should be later than 'now' -- the mtime
3074        # of a previously created directory.
3075        now = pathlib.Path(TEMPDIR).stat().st_mtime
3076        with self.extract_with_none('mtime') as DIR:
3077            for path in pathlib.Path(DIR).glob('**/*'):
3078                with self.subTest(path=path):
3079                    try:
3080                        mtime = path.stat().st_mtime
3081                    except OSError:
3082                        # Some systems can't stat symlinks, ignore those
3083                        if not path.is_symlink():
3084                            raise
3085                    else:
3086                        self.assertGreaterEqual(path.stat().st_mtime, now)
3087
3088    def test_extractall_none_mode(self):
3089        # modes of directories and regular files should match the mode
3090        # of a "normally" created directory or regular file
3091        dir_mode = pathlib.Path(TEMPDIR).stat().st_mode
3092        regular_file = pathlib.Path(TEMPDIR) / 'regular_file'
3093        regular_file.write_text('')
3094        regular_file_mode = regular_file.stat().st_mode
3095        with self.extract_with_none('mode') as DIR:
3096            for path in pathlib.Path(DIR).glob('**/*'):
3097                with self.subTest(path=path):
3098                    if path.is_dir():
3099                        self.assertEqual(path.stat().st_mode, dir_mode)
3100                    elif path.is_file():
3101                        self.assertEqual(path.stat().st_mode,
3102                                         regular_file_mode)
3103
3104    def test_extractall_none_uid(self):
3105        with self.extract_with_none('uid'):
3106            pass
3107
3108    def test_extractall_none_gid(self):
3109        with self.extract_with_none('gid'):
3110            pass
3111
3112    def test_extractall_none_uname(self):
3113        with self.extract_with_none('uname'):
3114            pass
3115
3116    def test_extractall_none_gname(self):
3117        with self.extract_with_none('gname'):
3118            pass
3119
3120    def test_extractall_none_ownership(self):
3121        with self.extract_with_none('uid', 'gid', 'uname', 'gname'):
3122            pass
3123
3124class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase):
3125    extraction_filter = 'data'
3126
3127class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests,
3128                                        unittest.TestCase):
3129    extraction_filter = 'fully_trusted'
3130
3131class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase):
3132    extraction_filter = 'tar'
3133
3134class NoneInfoExtractTests_Default(NoneInfoExtractTests,
3135                                   unittest.TestCase):
3136    extraction_filter = None
3137
3138class NoneInfoTests_Misc(unittest.TestCase):
3139    def test_add(self):
3140        # When addfile() encounters None metadata, it raises a ValueError
3141        bio = io.BytesIO()
3142        for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT,
3143                          tarfile.PAX_FORMAT):
3144            with self.subTest(tarformat=tarformat):
3145                tar = tarfile.open(fileobj=bio, mode='w', format=tarformat)
3146                tarinfo = tar.gettarinfo(tarname)
3147                try:
3148                    tar.addfile(tarinfo)
3149                except Exception:
3150                    if tarformat == tarfile.USTAR_FORMAT:
3151                        # In the old, limited format, adding might fail for
3152                        # reasons like the UID being too large
3153                        pass
3154                    else:
3155                        raise
3156                else:
3157                    for attr_name in ('mtime', 'mode', 'uid', 'gid',
3158                                    'uname', 'gname'):
3159                        with self.subTest(attr_name=attr_name):
3160                            replaced = tarinfo.replace(**{attr_name: None})
3161                            with self.assertRaisesRegex(ValueError,
3162                                                        f"{attr_name}"):
3163                                tar.addfile(replaced)
3164
3165    def test_list(self):
3166        # Change some metadata to None, then compare list() output
3167        # word-for-word. We want list() to not raise, and to only change
3168        # printout for the affected piece of metadata.
3169        # (n.b.: some contents of the test archive are hardcoded.)
3170        for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'},
3171                           {'uname'}, {'gname'},
3172                           {'uid', 'uname'}, {'gid', 'gname'}):
3173            with (self.subTest(attr_names=attr_names),
3174                  tarfile.open(tarname, encoding="iso8859-1") as tar):
3175                tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3176                with support.swap_attr(sys, 'stdout', tio_prev):
3177                    tar.list()
3178                for member in tar.getmembers():
3179                    for attr_name in attr_names:
3180                        setattr(member, attr_name, None)
3181                tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n')
3182                with support.swap_attr(sys, 'stdout', tio_new):
3183                    tar.list()
3184                for expected, got in zip(tio_prev.detach().getvalue().split(),
3185                                         tio_new.detach().getvalue().split()):
3186                    if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected):
3187                        self.assertEqual(got, b'????-??-??')
3188                    elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected):
3189                        self.assertEqual(got, b'??:??:??')
3190                    elif attr_names == {'mode'} and re.match(
3191                            rb'.([r-][w-][x-]){3}', expected):
3192                        self.assertEqual(got, b'??????????')
3193                    elif attr_names == {'uname'} and expected.startswith(
3194                            (b'tarfile/', b'lars/', b'foo/')):
3195                        exp_user, exp_group = expected.split(b'/')
3196                        got_user, got_group = got.split(b'/')
3197                        self.assertEqual(got_group, exp_group)
3198                        self.assertRegex(got_user, b'[0-9]+')
3199                    elif attr_names == {'gname'} and expected.endswith(
3200                            (b'/tarfile', b'/users', b'/bar')):
3201                        exp_user, exp_group = expected.split(b'/')
3202                        got_user, got_group = got.split(b'/')
3203                        self.assertEqual(got_user, exp_user)
3204                        self.assertRegex(got_group, b'[0-9]+')
3205                    elif attr_names == {'uid'} and expected.startswith(
3206                            (b'1000/')):
3207                        exp_user, exp_group = expected.split(b'/')
3208                        got_user, got_group = got.split(b'/')
3209                        self.assertEqual(got_group, exp_group)
3210                        self.assertEqual(got_user, b'None')
3211                    elif attr_names == {'gid'} and expected.endswith((b'/100')):
3212                        exp_user, exp_group = expected.split(b'/')
3213                        got_user, got_group = got.split(b'/')
3214                        self.assertEqual(got_user, exp_user)
3215                        self.assertEqual(got_group, b'None')
3216                    elif attr_names == {'uid', 'uname'} and expected.startswith(
3217                            (b'tarfile/', b'lars/', b'foo/', b'1000/')):
3218                        exp_user, exp_group = expected.split(b'/')
3219                        got_user, got_group = got.split(b'/')
3220                        self.assertEqual(got_group, exp_group)
3221                        self.assertEqual(got_user, b'None')
3222                    elif attr_names == {'gname', 'gid'} and expected.endswith(
3223                            (b'/tarfile', b'/users', b'/bar', b'/100')):
3224                        exp_user, exp_group = expected.split(b'/')
3225                        got_user, got_group = got.split(b'/')
3226                        self.assertEqual(got_user, exp_user)
3227                        self.assertEqual(got_group, b'None')
3228                    else:
3229                        # In other cases the output should be the same
3230                        self.assertEqual(expected, got)
3231
3232def _filemode_to_int(mode):
3233    """Inverse of `stat.filemode` (for permission bits)
3234
3235    Using mode strings rather than numbers makes the later tests more readable.
3236    """
3237    str_mode = mode[1:]
3238    result = (
3239          {'r': stat.S_IRUSR, '-': 0}[str_mode[0]]
3240        | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]]
3241        | {'x': stat.S_IXUSR, '-': 0,
3242           's': stat.S_IXUSR | stat.S_ISUID,
3243           'S': stat.S_ISUID}[str_mode[2]]
3244        | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]]
3245        | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]]
3246        | {'x': stat.S_IXGRP, '-': 0,
3247           's': stat.S_IXGRP | stat.S_ISGID,
3248           'S': stat.S_ISGID}[str_mode[5]]
3249        | {'r': stat.S_IROTH, '-': 0}[str_mode[6]]
3250        | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]]
3251        | {'x': stat.S_IXOTH, '-': 0,
3252           't': stat.S_IXOTH | stat.S_ISVTX,
3253           'T': stat.S_ISVTX}[str_mode[8]]
3254        )
3255    # check we did this right
3256    assert stat.filemode(result)[1:] == mode[1:]
3257
3258    return result
3259
3260class ArchiveMaker:
3261    """Helper to create a tar file with specific contents
3262
3263    Usage:
3264
3265        with ArchiveMaker() as t:
3266            t.add('filename', ...)
3267
3268        with t.open() as tar:
3269            ... # `tar` is now a TarFile with 'filename' in it!
3270    """
3271    def __init__(self):
3272        self.bio = io.BytesIO()
3273
3274    def __enter__(self):
3275        self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio)
3276        return self
3277
3278    def __exit__(self, *exc):
3279        self.tar_w.close()
3280        self.contents = self.bio.getvalue()
3281        self.bio = None
3282
3283    def add(self, name, *, type=None, symlink_to=None, hardlink_to=None,
3284            mode=None, **kwargs):
3285        """Add a member to the test archive. Call within `with`."""
3286        name = str(name)
3287        tarinfo = tarfile.TarInfo(name).replace(**kwargs)
3288        if mode:
3289            tarinfo.mode = _filemode_to_int(mode)
3290        if symlink_to is not None:
3291            type = tarfile.SYMTYPE
3292            tarinfo.linkname = str(symlink_to)
3293        if hardlink_to is not None:
3294            type = tarfile.LNKTYPE
3295            tarinfo.linkname = str(hardlink_to)
3296        if name.endswith('/') and type is None:
3297            type = tarfile.DIRTYPE
3298        if type is not None:
3299            tarinfo.type = type
3300        if tarinfo.isreg():
3301            fileobj = io.BytesIO(bytes(tarinfo.size))
3302        else:
3303            fileobj = None
3304        self.tar_w.addfile(tarinfo, fileobj)
3305
3306    def open(self, **kwargs):
3307        """Open the resulting archive as TarFile. Call after `with`."""
3308        bio = io.BytesIO(self.contents)
3309        return tarfile.open(fileobj=bio, **kwargs)
3310
3311# Under WASI, `os_helper.can_symlink` is False to make
3312# `skip_unless_symlink` skip symlink tests. "
3313# But in the following tests we use can_symlink to *determine* which
3314# behavior is expected.
3315# Like other symlink tests, skip these on WASI for now.
3316if support.is_wasi:
3317    def symlink_test(f):
3318        return unittest.skip("WASI: Skip symlink test for now")(f)
3319else:
3320    def symlink_test(f):
3321        return f
3322
3323
3324class TestExtractionFilters(unittest.TestCase):
3325
3326    # A temporary directory for the extraction results.
3327    # All files that "escape" the destination path should still end
3328    # up in this directory.
3329    outerdir = pathlib.Path(TEMPDIR) / 'outerdir'
3330
3331    # The destination for the extraction, within `outerdir`
3332    destdir = outerdir / 'dest'
3333
3334    @contextmanager
3335    def check_context(self, tar, filter):
3336        """Extracts `tar` to `self.destdir` and allows checking the result
3337
3338        If an error occurs, it must be checked using `expect_exception`
3339
3340        Otherwise, all resulting files must be checked using `expect_file`,
3341        except the destination directory itself and parent directories of
3342        other files.
3343        When checking directories, do so before their contents.
3344        """
3345        with os_helper.temp_dir(self.outerdir):
3346            try:
3347                tar.extractall(self.destdir, filter=filter)
3348            except Exception as exc:
3349                self.raised_exception = exc
3350                self.expected_paths = set()
3351            else:
3352                self.raised_exception = None
3353                self.expected_paths = set(self.outerdir.glob('**/*'))
3354                self.expected_paths.discard(self.destdir)
3355            try:
3356                yield
3357            finally:
3358                tar.close()
3359            if self.raised_exception:
3360                raise self.raised_exception
3361            self.assertEqual(self.expected_paths, set())
3362
3363    def expect_file(self, name, type=None, symlink_to=None, mode=None):
3364        """Check a single file. See check_context."""
3365        if self.raised_exception:
3366            raise self.raised_exception
3367        # use normpath() rather than resolve() so we don't follow symlinks
3368        path = pathlib.Path(os.path.normpath(self.destdir / name))
3369        self.assertIn(path, self.expected_paths)
3370        self.expected_paths.remove(path)
3371        if mode is not None and os_helper.can_chmod():
3372            got = stat.filemode(stat.S_IMODE(path.stat().st_mode))
3373            self.assertEqual(got, mode)
3374        if type is None and isinstance(name, str) and name.endswith('/'):
3375            type = tarfile.DIRTYPE
3376        if symlink_to is not None:
3377            got = (self.destdir / name).readlink()
3378            expected = pathlib.Path(symlink_to)
3379            # The symlink might be the same (textually) as what we expect,
3380            # but some systems change the link to an equivalent path, so
3381            # we fall back to samefile().
3382            if expected != got:
3383                self.assertTrue(got.samefile(expected))
3384        elif type == tarfile.REGTYPE or type is None:
3385            self.assertTrue(path.is_file())
3386        elif type == tarfile.DIRTYPE:
3387            self.assertTrue(path.is_dir())
3388        elif type == tarfile.FIFOTYPE:
3389            self.assertTrue(path.is_fifo())
3390        else:
3391            raise NotImplementedError(type)
3392        for parent in path.parents:
3393            self.expected_paths.discard(parent)
3394
3395    def expect_exception(self, exc_type, message_re='.'):
3396        with self.assertRaisesRegex(exc_type, message_re):
3397            if self.raised_exception is not None:
3398                raise self.raised_exception
3399        self.raised_exception = None
3400
3401    def test_benign_file(self):
3402        with ArchiveMaker() as arc:
3403            arc.add('benign.txt')
3404        for filter in 'fully_trusted', 'tar', 'data':
3405            with self.check_context(arc.open(), filter):
3406                self.expect_file('benign.txt')
3407
3408    def test_absolute(self):
3409        # Test handling a member with an absolute path
3410        # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives
3411        with ArchiveMaker() as arc:
3412            arc.add(self.outerdir / 'escaped.evil')
3413
3414        with self.check_context(arc.open(), 'fully_trusted'):
3415            self.expect_file('../escaped.evil')
3416
3417        for filter in 'tar', 'data':
3418            with self.check_context(arc.open(), filter):
3419                if str(self.outerdir).startswith('/'):
3420                    # We strip leading slashes, as e.g. GNU tar does
3421                    # (without --absolute-filenames).
3422                    outerdir_stripped = str(self.outerdir).lstrip('/')
3423                    self.expect_file(f'{outerdir_stripped}/escaped.evil')
3424                else:
3425                    # On this system, absolute paths don't have leading
3426                    # slashes.
3427                    # So, there's nothing to strip. We refuse to unpack
3428                    # to an absolute path, nonetheless.
3429                    self.expect_exception(
3430                        tarfile.AbsolutePathError,
3431                        """['"].*escaped.evil['"] has an absolute path""")
3432
3433    @symlink_test
3434    def test_parent_symlink(self):
3435        # Test interplaying symlinks
3436        # Inspired by 'dirsymlink2a' in jwilk/traversal-archives
3437        with ArchiveMaker() as arc:
3438            arc.add('current', symlink_to='.')
3439            arc.add('parent', symlink_to='current/..')
3440            arc.add('parent/evil')
3441
3442        if os_helper.can_symlink():
3443            with self.check_context(arc.open(), 'fully_trusted'):
3444                if self.raised_exception is not None:
3445                    # Windows will refuse to create a file that's a symlink to itself
3446                    # (and tarfile doesn't swallow that exception)
3447                    self.expect_exception(FileExistsError)
3448                    # The other cases will fail with this error too.
3449                    # Skip the rest of this test.
3450                    return
3451                else:
3452                    self.expect_file('current', symlink_to='.')
3453                    self.expect_file('parent', symlink_to='current/..')
3454                    self.expect_file('../evil')
3455
3456            with self.check_context(arc.open(), 'tar'):
3457                self.expect_exception(
3458                    tarfile.OutsideDestinationError,
3459                    """'parent/evil' would be extracted to ['"].*evil['"], """
3460                    + "which is outside the destination")
3461
3462            with self.check_context(arc.open(), 'data'):
3463                self.expect_exception(
3464                    tarfile.LinkOutsideDestinationError,
3465                    """'parent' would link to ['"].*outerdir['"], """
3466                    + "which is outside the destination")
3467
3468        else:
3469            # No symlink support. The symlinks are ignored.
3470            with self.check_context(arc.open(), 'fully_trusted'):
3471                self.expect_file('parent/evil')
3472            with self.check_context(arc.open(), 'tar'):
3473                self.expect_file('parent/evil')
3474            with self.check_context(arc.open(), 'data'):
3475                self.expect_file('parent/evil')
3476
3477    @symlink_test
3478    def test_parent_symlink2(self):
3479        # Test interplaying symlinks
3480        # Inspired by 'dirsymlink2b' in jwilk/traversal-archives
3481        with ArchiveMaker() as arc:
3482            arc.add('current', symlink_to='.')
3483            arc.add('current/parent', symlink_to='..')
3484            arc.add('parent/evil')
3485
3486        with self.check_context(arc.open(), 'fully_trusted'):
3487            if os_helper.can_symlink():
3488                self.expect_file('current', symlink_to='.')
3489                self.expect_file('parent', symlink_to='..')
3490                self.expect_file('../evil')
3491            else:
3492                self.expect_file('current/')
3493                self.expect_file('parent/evil')
3494
3495        with self.check_context(arc.open(), 'tar'):
3496            if os_helper.can_symlink():
3497                self.expect_exception(
3498                        tarfile.OutsideDestinationError,
3499                        "'parent/evil' would be extracted to "
3500                        + """['"].*evil['"], which is outside """
3501                        + "the destination")
3502            else:
3503                self.expect_file('current/')
3504                self.expect_file('parent/evil')
3505
3506        with self.check_context(arc.open(), 'data'):
3507            self.expect_exception(
3508                    tarfile.LinkOutsideDestinationError,
3509                    """'current/parent' would link to ['"].*['"], """
3510                    + "which is outside the destination")
3511
3512    @symlink_test
3513    def test_absolute_symlink(self):
3514        # Test symlink to an absolute path
3515        # Inspired by 'dirsymlink' in jwilk/traversal-archives
3516        with ArchiveMaker() as arc:
3517            arc.add('parent', symlink_to=self.outerdir)
3518            arc.add('parent/evil')
3519
3520        with self.check_context(arc.open(), 'fully_trusted'):
3521            if os_helper.can_symlink():
3522                self.expect_file('parent', symlink_to=self.outerdir)
3523                self.expect_file('../evil')
3524            else:
3525                self.expect_file('parent/evil')
3526
3527        with self.check_context(arc.open(), 'tar'):
3528            if os_helper.can_symlink():
3529                self.expect_exception(
3530                        tarfile.OutsideDestinationError,
3531                        "'parent/evil' would be extracted to "
3532                        + """['"].*evil['"], which is outside """
3533                        + "the destination")
3534            else:
3535                self.expect_file('parent/evil')
3536
3537        with self.check_context(arc.open(), 'data'):
3538            self.expect_exception(
3539                tarfile.AbsoluteLinkError,
3540                "'parent' is a symlink to an absolute path")
3541
3542    @symlink_test
3543    def test_sly_relative0(self):
3544        # Inspired by 'relative0' in jwilk/traversal-archives
3545        with ArchiveMaker() as arc:
3546            arc.add('../moo', symlink_to='..//tmp/moo')
3547
3548        try:
3549            with self.check_context(arc.open(), filter='fully_trusted'):
3550                if os_helper.can_symlink():
3551                    if isinstance(self.raised_exception, FileExistsError):
3552                        # XXX TarFile happens to fail creating a parent
3553                        # directory.
3554                        # This might be a bug, but fixing it would hurt
3555                        # security.
3556                        # Note that e.g. GNU `tar` rejects '..' components,
3557                        # so you could argue this is an invalid archive and we
3558                        # just raise an bad type of exception.
3559                        self.expect_exception(FileExistsError)
3560                    else:
3561                        self.expect_file('../moo', symlink_to='..//tmp/moo')
3562                else:
3563                    # The symlink can't be extracted and is ignored
3564                    pass
3565        except FileExistsError:
3566            pass
3567
3568        for filter in 'tar', 'data':
3569            with self.check_context(arc.open(), filter):
3570                self.expect_exception(
3571                        tarfile.OutsideDestinationError,
3572                        "'../moo' would be extracted to "
3573                        + "'.*moo', which is outside "
3574                        + "the destination")
3575
3576    @symlink_test
3577    def test_sly_relative2(self):
3578        # Inspired by 'relative2' in jwilk/traversal-archives
3579        with ArchiveMaker() as arc:
3580            arc.add('tmp/')
3581            arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo')
3582
3583        with self.check_context(arc.open(), 'fully_trusted'):
3584            self.expect_file('tmp', type=tarfile.DIRTYPE)
3585            if os_helper.can_symlink():
3586                self.expect_file('../moo', symlink_to='tmp/../../tmp/moo')
3587
3588        for filter in 'tar', 'data':
3589            with self.check_context(arc.open(), filter):
3590                self.expect_exception(
3591                    tarfile.OutsideDestinationError,
3592                    "'tmp/../../moo' would be extracted to "
3593                    + """['"].*moo['"], which is outside the """
3594                    + "destination")
3595
3596    def test_modes(self):
3597        # Test how file modes are extracted
3598        # (Note that the modes are ignored on platforms without working chmod)
3599        with ArchiveMaker() as arc:
3600            arc.add('all_bits', mode='?rwsrwsrwt')
3601            arc.add('perm_bits', mode='?rwxrwxrwx')
3602            arc.add('exec_group_other', mode='?rw-rwxrwx')
3603            arc.add('read_group_only', mode='?---r-----')
3604            arc.add('no_bits', mode='?---------')
3605            arc.add('dir/', mode='?---rwsrwt')
3606
3607        # On some systems, setting the sticky bit is a no-op.
3608        # Check if that's the case.
3609        tmp_filename = os.path.join(TEMPDIR, "tmp.file")
3610        with open(tmp_filename, 'w'):
3611            pass
3612        os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX)
3613        have_sticky_files = (os.stat(tmp_filename).st_mode & stat.S_ISVTX)
3614        os.unlink(tmp_filename)
3615
3616        os.mkdir(tmp_filename)
3617        os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX)
3618        have_sticky_dirs = (os.stat(tmp_filename).st_mode & stat.S_ISVTX)
3619        os.rmdir(tmp_filename)
3620
3621        with self.check_context(arc.open(), 'fully_trusted'):
3622            if have_sticky_files:
3623                self.expect_file('all_bits', mode='?rwsrwsrwt')
3624            else:
3625                self.expect_file('all_bits', mode='?rwsrwsrwx')
3626            self.expect_file('perm_bits', mode='?rwxrwxrwx')
3627            self.expect_file('exec_group_other', mode='?rw-rwxrwx')
3628            self.expect_file('read_group_only', mode='?---r-----')
3629            self.expect_file('no_bits', mode='?---------')
3630            if have_sticky_dirs:
3631                self.expect_file('dir/', mode='?---rwsrwt')
3632            else:
3633                self.expect_file('dir/', mode='?---rwsrwx')
3634
3635        with self.check_context(arc.open(), 'tar'):
3636            self.expect_file('all_bits', mode='?rwxr-xr-x')
3637            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3638            self.expect_file('exec_group_other', mode='?rw-r-xr-x')
3639            self.expect_file('read_group_only', mode='?---r-----')
3640            self.expect_file('no_bits', mode='?---------')
3641            self.expect_file('dir/', mode='?---r-xr-x')
3642
3643        with self.check_context(arc.open(), 'data'):
3644            normal_dir_mode = stat.filemode(stat.S_IMODE(
3645                self.outerdir.stat().st_mode))
3646            self.expect_file('all_bits', mode='?rwxr-xr-x')
3647            self.expect_file('perm_bits', mode='?rwxr-xr-x')
3648            self.expect_file('exec_group_other', mode='?rw-r--r--')
3649            self.expect_file('read_group_only', mode='?rw-r-----')
3650            self.expect_file('no_bits', mode='?rw-------')
3651            self.expect_file('dir/', mode=normal_dir_mode)
3652
3653    def test_pipe(self):
3654        # Test handling of a special file
3655        with ArchiveMaker() as arc:
3656            arc.add('foo', type=tarfile.FIFOTYPE)
3657
3658        for filter in 'fully_trusted', 'tar':
3659            with self.check_context(arc.open(), filter):
3660                if hasattr(os, 'mkfifo'):
3661                    self.expect_file('foo', type=tarfile.FIFOTYPE)
3662                else:
3663                    # The pipe can't be extracted and is skipped.
3664                    pass
3665
3666        with self.check_context(arc.open(), 'data'):
3667            self.expect_exception(
3668                tarfile.SpecialFileError,
3669                "'foo' is a special file")
3670
3671    def test_special_files(self):
3672        # Creating device files is tricky. Instead of attempting that let's
3673        # only check the filter result.
3674        for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE:
3675            tarinfo = tarfile.TarInfo('foo')
3676            tarinfo.type = special_type
3677            trusted = tarfile.fully_trusted_filter(tarinfo, '')
3678            self.assertIs(trusted, tarinfo)
3679            tar = tarfile.tar_filter(tarinfo, '')
3680            self.assertEqual(tar.type, special_type)
3681            with self.assertRaises(tarfile.SpecialFileError) as cm:
3682                tarfile.data_filter(tarinfo, '')
3683            self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo)
3684            self.assertEqual(cm.exception.tarinfo.name, 'foo')
3685
3686    def test_fully_trusted_filter(self):
3687        # The 'fully_trusted' filter returns the original TarInfo objects.
3688        with tarfile.TarFile.open(tarname) as tar:
3689            for tarinfo in tar.getmembers():
3690                filtered = tarfile.fully_trusted_filter(tarinfo, '')
3691                self.assertIs(filtered, tarinfo)
3692
3693    def test_tar_filter(self):
3694        # The 'tar' filter returns TarInfo objects with the same name/type.
3695        # (It can also fail for particularly "evil" input, but we don't have
3696        # that in the test archive.)
3697        with tarfile.TarFile.open(tarname) as tar:
3698            for tarinfo in tar.getmembers():
3699                filtered = tarfile.tar_filter(tarinfo, '')
3700                self.assertIs(filtered.name, tarinfo.name)
3701                self.assertIs(filtered.type, tarinfo.type)
3702
3703    def test_data_filter(self):
3704        # The 'data' filter either raises, or returns TarInfo with the same
3705        # name/type.
3706        with tarfile.TarFile.open(tarname) as tar:
3707            for tarinfo in tar.getmembers():
3708                try:
3709                    filtered = tarfile.data_filter(tarinfo, '')
3710                except tarfile.FilterError:
3711                    continue
3712                self.assertIs(filtered.name, tarinfo.name)
3713                self.assertIs(filtered.type, tarinfo.type)
3714
3715    def test_default_filter_warns_not(self):
3716        """Ensure the default filter does not warn (like in 3.12)"""
3717        with ArchiveMaker() as arc:
3718            arc.add('foo')
3719        with warnings_helper.check_no_warnings(self):
3720            with self.check_context(arc.open(), None):
3721                self.expect_file('foo')
3722
3723    def test_change_default_filter_on_instance(self):
3724        tar = tarfile.TarFile(tarname, 'r')
3725        def strict_filter(tarinfo, path):
3726            if tarinfo.name == 'ustar/regtype':
3727                return tarinfo
3728            else:
3729                return None
3730        tar.extraction_filter = strict_filter
3731        with self.check_context(tar, None):
3732            self.expect_file('ustar/regtype')
3733
3734    def test_change_default_filter_on_class(self):
3735        def strict_filter(tarinfo, path):
3736            if tarinfo.name == 'ustar/regtype':
3737                return tarinfo
3738            else:
3739                return None
3740        tar = tarfile.TarFile(tarname, 'r')
3741        with support.swap_attr(tarfile.TarFile, 'extraction_filter',
3742                               staticmethod(strict_filter)):
3743            with self.check_context(tar, None):
3744                self.expect_file('ustar/regtype')
3745
3746    def test_change_default_filter_on_subclass(self):
3747        class TarSubclass(tarfile.TarFile):
3748            def extraction_filter(self, tarinfo, path):
3749                if tarinfo.name == 'ustar/regtype':
3750                    return tarinfo
3751                else:
3752                    return None
3753
3754        tar = TarSubclass(tarname, 'r')
3755        with self.check_context(tar, None):
3756            self.expect_file('ustar/regtype')
3757
3758    def test_change_default_filter_to_string(self):
3759        tar = tarfile.TarFile(tarname, 'r')
3760        tar.extraction_filter = 'data'
3761        with self.check_context(tar, None):
3762            self.expect_exception(TypeError)
3763
3764    def test_custom_filter(self):
3765        def custom_filter(tarinfo, path):
3766            self.assertIs(path, self.destdir)
3767            if tarinfo.name == 'move_this':
3768                return tarinfo.replace(name='moved')
3769            if tarinfo.name == 'ignore_this':
3770                return None
3771            return tarinfo
3772
3773        with ArchiveMaker() as arc:
3774            arc.add('move_this')
3775            arc.add('ignore_this')
3776            arc.add('keep')
3777        with self.check_context(arc.open(), custom_filter):
3778            self.expect_file('moved')
3779            self.expect_file('keep')
3780
3781    def test_bad_filter_name(self):
3782        with ArchiveMaker() as arc:
3783            arc.add('foo')
3784        with self.check_context(arc.open(), 'bad filter name'):
3785            self.expect_exception(ValueError)
3786
3787    def test_stateful_filter(self):
3788        # Stateful filters should be possible.
3789        # (This doesn't really test tarfile. Rather, it demonstrates
3790        # that third parties can implement a stateful filter.)
3791        class StatefulFilter:
3792            def __enter__(self):
3793                self.num_files_processed = 0
3794                return self
3795
3796            def __call__(self, tarinfo, path):
3797                try:
3798                    tarinfo = tarfile.data_filter(tarinfo, path)
3799                except tarfile.FilterError:
3800                    return None
3801                self.num_files_processed += 1
3802                return tarinfo
3803
3804            def __exit__(self, *exc_info):
3805                self.done = True
3806
3807        with ArchiveMaker() as arc:
3808            arc.add('good')
3809            arc.add('bad', symlink_to='/')
3810            arc.add('good')
3811        with StatefulFilter() as custom_filter:
3812            with self.check_context(arc.open(), custom_filter):
3813                self.expect_file('good')
3814        self.assertEqual(custom_filter.num_files_processed, 2)
3815        self.assertEqual(custom_filter.done, True)
3816
3817    def test_errorlevel(self):
3818        def extracterror_filter(tarinfo, path):
3819            raise tarfile.ExtractError('failed with ExtractError')
3820        def filtererror_filter(tarinfo, path):
3821            raise tarfile.FilterError('failed with FilterError')
3822        def oserror_filter(tarinfo, path):
3823            raise OSError('failed with OSError')
3824        def tarerror_filter(tarinfo, path):
3825            raise tarfile.TarError('failed with base TarError')
3826        def valueerror_filter(tarinfo, path):
3827            raise ValueError('failed with ValueError')
3828
3829        with ArchiveMaker() as arc:
3830            arc.add('file')
3831
3832        # If errorlevel is 0, errors affected by errorlevel are ignored
3833
3834        with self.check_context(arc.open(errorlevel=0), extracterror_filter):
3835            self.expect_file('file')
3836
3837        with self.check_context(arc.open(errorlevel=0), filtererror_filter):
3838            self.expect_file('file')
3839
3840        with self.check_context(arc.open(errorlevel=0), oserror_filter):
3841            self.expect_file('file')
3842
3843        with self.check_context(arc.open(errorlevel=0), tarerror_filter):
3844            self.expect_exception(tarfile.TarError)
3845
3846        with self.check_context(arc.open(errorlevel=0), valueerror_filter):
3847            self.expect_exception(ValueError)
3848
3849        # If 1, all fatal errors are raised
3850
3851        with self.check_context(arc.open(errorlevel=1), extracterror_filter):
3852            self.expect_file('file')
3853
3854        with self.check_context(arc.open(errorlevel=1), filtererror_filter):
3855            self.expect_exception(tarfile.FilterError)
3856
3857        with self.check_context(arc.open(errorlevel=1), oserror_filter):
3858            self.expect_exception(OSError)
3859
3860        with self.check_context(arc.open(errorlevel=1), tarerror_filter):
3861            self.expect_exception(tarfile.TarError)
3862
3863        with self.check_context(arc.open(errorlevel=1), valueerror_filter):
3864            self.expect_exception(ValueError)
3865
3866        # If 2, all non-fatal errors are raised as well.
3867
3868        with self.check_context(arc.open(errorlevel=2), extracterror_filter):
3869            self.expect_exception(tarfile.ExtractError)
3870
3871        with self.check_context(arc.open(errorlevel=2), filtererror_filter):
3872            self.expect_exception(tarfile.FilterError)
3873
3874        with self.check_context(arc.open(errorlevel=2), oserror_filter):
3875            self.expect_exception(OSError)
3876
3877        with self.check_context(arc.open(errorlevel=2), tarerror_filter):
3878            self.expect_exception(tarfile.TarError)
3879
3880        with self.check_context(arc.open(errorlevel=2), valueerror_filter):
3881            self.expect_exception(ValueError)
3882
3883        # We only handle ExtractionError, FilterError & OSError specially.
3884
3885        with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter):
3886            self.expect_exception(TypeError)  # errorlevel is not int
3887
3888
3889def setUpModule():
3890    os_helper.unlink(TEMPDIR)
3891    os.makedirs(TEMPDIR)
3892
3893    global testtarnames
3894    testtarnames = [tarname]
3895    with open(tarname, "rb") as fobj:
3896        data = fobj.read()
3897
3898    # Create compressed tarfiles.
3899    for c in GzipTest, Bz2Test, LzmaTest:
3900        if c.open:
3901            os_helper.unlink(c.tarname)
3902            testtarnames.append(c.tarname)
3903            with c.open(c.tarname, "wb") as tar:
3904                tar.write(data)
3905
3906def tearDownModule():
3907    if os.path.exists(TEMPDIR):
3908        os_helper.rmtree(TEMPDIR)
3909
3910if __name__ == "__main__":
3911    unittest.main()
3912