1import sys 2import os 3import io 4from hashlib import sha256 5from contextlib import contextmanager 6from random import Random 7import pathlib 8import shutil 9import re 10import warnings 11import stat 12 13import unittest 14import unittest.mock 15import tarfile 16 17from test import support 18from test.support import os_helper 19from test.support import script_helper 20from test.support import warnings_helper 21 22# Check for our compression modules. 23try: 24 import gzip 25except ImportError: 26 gzip = None 27try: 28 import zlib 29except ImportError: 30 zlib = None 31try: 32 import bz2 33except ImportError: 34 bz2 = None 35try: 36 import lzma 37except ImportError: 38 lzma = None 39 40def sha256sum(data): 41 return sha256(data).hexdigest() 42 43TEMPDIR = os.path.abspath(os_helper.TESTFN) + "-tardir" 44tarextdir = TEMPDIR + '-extract-test' 45tarname = support.findfile("testtar.tar") 46gzipname = os.path.join(TEMPDIR, "testtar.tar.gz") 47bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2") 48xzname = os.path.join(TEMPDIR, "testtar.tar.xz") 49tmpname = os.path.join(TEMPDIR, "tmp.tar") 50dotlessname = os.path.join(TEMPDIR, "testtar") 51 52sha256_regtype = ( 53 "e09e4bc8b3c9d9177e77256353b36c159f5f040531bbd4b024a8f9b9196c71ce" 54) 55sha256_sparse = ( 56 "4f05a776071146756345ceee937b33fc5644f5a96b9780d1c7d6a32cdf164d7b" 57) 58 59 60class TarTest: 61 tarname = tarname 62 suffix = '' 63 open = io.FileIO 64 taropen = tarfile.TarFile.taropen 65 66 @property 67 def mode(self): 68 return self.prefix + self.suffix 69 70@support.requires_gzip() 71class GzipTest: 72 tarname = gzipname 73 suffix = 'gz' 74 open = gzip.GzipFile if gzip else None 75 taropen = tarfile.TarFile.gzopen 76 77@support.requires_bz2() 78class Bz2Test: 79 tarname = bz2name 80 suffix = 'bz2' 81 open = bz2.BZ2File if bz2 else None 82 taropen = tarfile.TarFile.bz2open 83 84@support.requires_lzma() 85class LzmaTest: 86 tarname = xzname 87 suffix = 'xz' 88 open = lzma.LZMAFile if lzma else None 89 taropen = tarfile.TarFile.xzopen 90 91 92class ReadTest(TarTest): 93 94 prefix = "r:" 95 96 def setUp(self): 97 self.tar = tarfile.open(self.tarname, mode=self.mode, 98 encoding="iso8859-1") 99 100 def tearDown(self): 101 self.tar.close() 102 103 104class UstarReadTest(ReadTest, unittest.TestCase): 105 106 def test_fileobj_regular_file(self): 107 tarinfo = self.tar.getmember("ustar/regtype") 108 with self.tar.extractfile(tarinfo) as fobj: 109 data = fobj.read() 110 self.assertEqual(len(data), tarinfo.size, 111 "regular file extraction failed") 112 self.assertEqual(sha256sum(data), sha256_regtype, 113 "regular file extraction failed") 114 115 def test_fileobj_readlines(self): 116 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 117 tarinfo = self.tar.getmember("ustar/regtype") 118 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 119 lines1 = fobj1.readlines() 120 121 with self.tar.extractfile(tarinfo) as fobj: 122 fobj2 = io.TextIOWrapper(fobj) 123 lines2 = fobj2.readlines() 124 self.assertEqual(lines1, lines2, 125 "fileobj.readlines() failed") 126 self.assertEqual(len(lines2), 114, 127 "fileobj.readlines() failed") 128 self.assertEqual(lines2[83], 129 "I will gladly admit that Python is not the fastest " 130 "running scripting language.\n", 131 "fileobj.readlines() failed") 132 133 def test_fileobj_iter(self): 134 self.tar.extract("ustar/regtype", TEMPDIR, filter='data') 135 tarinfo = self.tar.getmember("ustar/regtype") 136 with open(os.path.join(TEMPDIR, "ustar/regtype"), "r") as fobj1: 137 lines1 = fobj1.readlines() 138 with self.tar.extractfile(tarinfo) as fobj2: 139 lines2 = list(io.TextIOWrapper(fobj2)) 140 self.assertEqual(lines1, lines2, 141 "fileobj.__iter__() failed") 142 143 def test_fileobj_seek(self): 144 self.tar.extract("ustar/regtype", TEMPDIR, 145 filter='data') 146 with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj: 147 data = fobj.read() 148 149 tarinfo = self.tar.getmember("ustar/regtype") 150 with self.tar.extractfile(tarinfo) as fobj: 151 text = fobj.read() 152 fobj.seek(0) 153 self.assertEqual(0, fobj.tell(), 154 "seek() to file's start failed") 155 fobj.seek(2048, 0) 156 self.assertEqual(2048, fobj.tell(), 157 "seek() to absolute position failed") 158 fobj.seek(-1024, 1) 159 self.assertEqual(1024, fobj.tell(), 160 "seek() to negative relative position failed") 161 fobj.seek(1024, 1) 162 self.assertEqual(2048, fobj.tell(), 163 "seek() to positive relative position failed") 164 s = fobj.read(10) 165 self.assertEqual(s, data[2048:2058], 166 "read() after seek failed") 167 fobj.seek(0, 2) 168 self.assertEqual(tarinfo.size, fobj.tell(), 169 "seek() to file's end failed") 170 self.assertEqual(fobj.read(), b"", 171 "read() at file's end did not return empty string") 172 fobj.seek(-tarinfo.size, 2) 173 self.assertEqual(0, fobj.tell(), 174 "relative seek() to file's end failed") 175 fobj.seek(512) 176 s1 = fobj.readlines() 177 fobj.seek(512) 178 s2 = fobj.readlines() 179 self.assertEqual(s1, s2, 180 "readlines() after seek failed") 181 fobj.seek(0) 182 self.assertEqual(len(fobj.readline()), fobj.tell(), 183 "tell() after readline() failed") 184 fobj.seek(512) 185 self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), 186 "tell() after seek() and readline() failed") 187 fobj.seek(0) 188 line = fobj.readline() 189 self.assertEqual(fobj.read(), data[len(line):], 190 "read() after readline() failed") 191 192 def test_fileobj_text(self): 193 with self.tar.extractfile("ustar/regtype") as fobj: 194 fobj = io.TextIOWrapper(fobj) 195 data = fobj.read().encode("iso8859-1") 196 self.assertEqual(sha256sum(data), sha256_regtype) 197 try: 198 fobj.seek(100) 199 except AttributeError: 200 # Issue #13815: seek() complained about a missing 201 # flush() method. 202 self.fail("seeking failed in text mode") 203 204 # Test if symbolic and hard links are resolved by extractfile(). The 205 # test link members each point to a regular member whose data is 206 # supposed to be exported. 207 def _test_fileobj_link(self, lnktype, regtype): 208 with self.tar.extractfile(lnktype) as a, \ 209 self.tar.extractfile(regtype) as b: 210 self.assertEqual(a.name, b.name) 211 212 def test_fileobj_link1(self): 213 self._test_fileobj_link("ustar/lnktype", "ustar/regtype") 214 215 def test_fileobj_link2(self): 216 self._test_fileobj_link("./ustar/linktest2/lnktype", 217 "ustar/linktest1/regtype") 218 219 def test_fileobj_symlink1(self): 220 self._test_fileobj_link("ustar/symtype", "ustar/regtype") 221 222 def test_fileobj_symlink2(self): 223 self._test_fileobj_link("./ustar/linktest2/symtype", 224 "ustar/linktest1/regtype") 225 226 def test_issue14160(self): 227 self._test_fileobj_link("symtype2", "ustar/regtype") 228 229 def test_add_dir_getmember(self): 230 # bpo-21987 231 self.add_dir_and_getmember('bar') 232 self.add_dir_and_getmember('a'*101) 233 234 @unittest.skipUnless(hasattr(os, "getuid") and hasattr(os, "getgid"), 235 "Missing getuid or getgid implementation") 236 def add_dir_and_getmember(self, name): 237 def filter(tarinfo): 238 tarinfo.uid = tarinfo.gid = 100 239 return tarinfo 240 241 with os_helper.temp_cwd(): 242 with tarfile.open(tmpname, 'w') as tar: 243 tar.format = tarfile.USTAR_FORMAT 244 try: 245 os.mkdir(name) 246 tar.add(name, filter=filter) 247 finally: 248 os.rmdir(name) 249 with tarfile.open(tmpname) as tar: 250 self.assertEqual( 251 tar.getmember(name), 252 tar.getmember(name + '/') 253 ) 254 255class GzipUstarReadTest(GzipTest, UstarReadTest): 256 pass 257 258class Bz2UstarReadTest(Bz2Test, UstarReadTest): 259 pass 260 261class LzmaUstarReadTest(LzmaTest, UstarReadTest): 262 pass 263 264 265class ListTest(ReadTest, unittest.TestCase): 266 267 # Override setUp to use default encoding (UTF-8) 268 def setUp(self): 269 self.tar = tarfile.open(self.tarname, mode=self.mode) 270 271 def test_list(self): 272 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 273 with support.swap_attr(sys, 'stdout', tio): 274 self.tar.list(verbose=False) 275 out = tio.detach().getvalue() 276 self.assertIn(b'ustar/conttype', out) 277 self.assertIn(b'ustar/regtype', out) 278 self.assertIn(b'ustar/lnktype', out) 279 self.assertIn(b'ustar' + (b'/12345' * 40) + b'67/longname', out) 280 self.assertIn(b'./ustar/linktest2/symtype', out) 281 self.assertIn(b'./ustar/linktest2/lnktype', out) 282 # Make sure it puts trailing slash for directory 283 self.assertIn(b'ustar/dirtype/', out) 284 self.assertIn(b'ustar/dirtype-with-size/', out) 285 # Make sure it is able to print unencodable characters 286 def conv(b): 287 s = b.decode(self.tar.encoding, 'surrogateescape') 288 return s.encode('ascii', 'backslashreplace') 289 self.assertIn(conv(b'ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 290 self.assertIn(conv(b'misc/regtype-hpux-signed-chksum-' 291 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 292 self.assertIn(conv(b'misc/regtype-old-v7-signed-chksum-' 293 b'\xc4\xd6\xdc\xe4\xf6\xfc\xdf'), out) 294 self.assertIn(conv(b'pax/bad-pax-\xe4\xf6\xfc'), out) 295 self.assertIn(conv(b'pax/hdrcharset-\xe4\xf6\xfc'), out) 296 # Make sure it prints files separated by one newline without any 297 # 'ls -l'-like accessories if verbose flag is not being used 298 # ... 299 # ustar/conttype 300 # ustar/regtype 301 # ... 302 self.assertRegex(out, br'ustar/conttype ?\r?\n' 303 br'ustar/regtype ?\r?\n') 304 # Make sure it does not print the source of link without verbose flag 305 self.assertNotIn(b'link to', out) 306 self.assertNotIn(b'->', out) 307 308 def test_list_verbose(self): 309 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 310 with support.swap_attr(sys, 'stdout', tio): 311 self.tar.list(verbose=True) 312 out = tio.detach().getvalue() 313 # Make sure it prints files separated by one newline with 'ls -l'-like 314 # accessories if verbose flag is being used 315 # ... 316 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype 317 # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype 318 # ... 319 self.assertRegex(out, (br'\?rw-r--r-- tarfile/tarfile\s+7011 ' 320 br'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d ' 321 br'ustar/\w+type ?\r?\n') * 2) 322 # Make sure it prints the source of link with verbose flag 323 self.assertIn(b'ustar/symtype -> regtype', out) 324 self.assertIn(b'./ustar/linktest2/symtype -> ../linktest1/regtype', out) 325 self.assertIn(b'./ustar/linktest2/lnktype link to ' 326 b'./ustar/linktest1/regtype', out) 327 self.assertIn(b'gnu' + (b'/123' * 125) + b'/longlink link to gnu' + 328 (b'/123' * 125) + b'/longname', out) 329 self.assertIn(b'pax' + (b'/123' * 125) + b'/longlink link to pax' + 330 (b'/123' * 125) + b'/longname', out) 331 332 def test_list_members(self): 333 tio = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 334 def members(tar): 335 for tarinfo in tar.getmembers(): 336 if 'reg' in tarinfo.name: 337 yield tarinfo 338 with support.swap_attr(sys, 'stdout', tio): 339 self.tar.list(verbose=False, members=members(self.tar)) 340 out = tio.detach().getvalue() 341 self.assertIn(b'ustar/regtype', out) 342 self.assertNotIn(b'ustar/conttype', out) 343 344 345class GzipListTest(GzipTest, ListTest): 346 pass 347 348 349class Bz2ListTest(Bz2Test, ListTest): 350 pass 351 352 353class LzmaListTest(LzmaTest, ListTest): 354 pass 355 356 357class CommonReadTest(ReadTest): 358 359 def test_is_tarfile_erroneous(self): 360 with open(tmpname, "wb"): 361 pass 362 363 # is_tarfile works on filenames 364 self.assertFalse(tarfile.is_tarfile(tmpname)) 365 366 # is_tarfile works on path-like objects 367 self.assertFalse(tarfile.is_tarfile(pathlib.Path(tmpname))) 368 369 # is_tarfile works on file objects 370 with open(tmpname, "rb") as fobj: 371 self.assertFalse(tarfile.is_tarfile(fobj)) 372 373 # is_tarfile works on file-like objects 374 self.assertFalse(tarfile.is_tarfile(io.BytesIO(b"invalid"))) 375 376 def test_is_tarfile_valid(self): 377 # is_tarfile works on filenames 378 self.assertTrue(tarfile.is_tarfile(self.tarname)) 379 380 # is_tarfile works on path-like objects 381 self.assertTrue(tarfile.is_tarfile(pathlib.Path(self.tarname))) 382 383 # is_tarfile works on file objects 384 with open(self.tarname, "rb") as fobj: 385 self.assertTrue(tarfile.is_tarfile(fobj)) 386 387 # is_tarfile works on file-like objects 388 with open(self.tarname, "rb") as fobj: 389 self.assertTrue(tarfile.is_tarfile(io.BytesIO(fobj.read()))) 390 391 def test_is_tarfile_keeps_position(self): 392 # Test for issue44289: tarfile.is_tarfile() modifies 393 # file object's current position 394 with open(self.tarname, "rb") as fobj: 395 tarfile.is_tarfile(fobj) 396 self.assertEqual(fobj.tell(), 0) 397 398 with open(self.tarname, "rb") as fobj: 399 file_like = io.BytesIO(fobj.read()) 400 tarfile.is_tarfile(file_like) 401 self.assertEqual(file_like.tell(), 0) 402 403 def test_empty_tarfile(self): 404 # Test for issue6123: Allow opening empty archives. 405 # This test checks if tarfile.open() is able to open an empty tar 406 # archive successfully. Note that an empty tar archive is not the 407 # same as an empty file! 408 with tarfile.open(tmpname, self.mode.replace("r", "w")): 409 pass 410 try: 411 tar = tarfile.open(tmpname, self.mode) 412 tar.getnames() 413 except tarfile.ReadError: 414 self.fail("tarfile.open() failed on empty archive") 415 else: 416 self.assertListEqual(tar.getmembers(), []) 417 finally: 418 tar.close() 419 420 def test_non_existent_tarfile(self): 421 # Test for issue11513: prevent non-existent gzipped tarfiles raising 422 # multiple exceptions. 423 with self.assertRaisesRegex(FileNotFoundError, "xxx"): 424 tarfile.open("xxx", self.mode) 425 426 def test_null_tarfile(self): 427 # Test for issue6123: Allow opening empty archives. 428 # This test guarantees that tarfile.open() does not treat an empty 429 # file as an empty tar archive. 430 with open(tmpname, "wb"): 431 pass 432 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode) 433 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname) 434 435 def test_ignore_zeros(self): 436 # Test TarFile's ignore_zeros option. 437 # generate 512 pseudorandom bytes 438 data = Random(0).randbytes(512) 439 for char in (b'\0', b'a'): 440 # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') 441 # are ignored correctly. 442 with self.open(tmpname, "w") as fobj: 443 fobj.write(char * 1024) 444 tarinfo = tarfile.TarInfo("foo") 445 tarinfo.size = len(data) 446 fobj.write(tarinfo.tobuf()) 447 fobj.write(data) 448 449 tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) 450 try: 451 self.assertListEqual(tar.getnames(), ["foo"], 452 "ignore_zeros=True should have skipped the %r-blocks" % 453 char) 454 finally: 455 tar.close() 456 457 def test_premature_end_of_archive(self): 458 for size in (512, 600, 1024, 1200): 459 with tarfile.open(tmpname, "w:") as tar: 460 t = tarfile.TarInfo("foo") 461 t.size = 1024 462 tar.addfile(t, io.BytesIO(b"a" * 1024)) 463 464 with open(tmpname, "r+b") as fobj: 465 fobj.truncate(size) 466 467 with tarfile.open(tmpname) as tar: 468 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 469 for t in tar: 470 pass 471 472 with tarfile.open(tmpname) as tar: 473 t = tar.next() 474 475 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 476 tar.extract(t, TEMPDIR, filter='data') 477 478 with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"): 479 tar.extractfile(t).read() 480 481 def test_length_zero_header(self): 482 # bpo-39017 (CVE-2019-20907): reading a zero-length header should fail 483 # with an exception 484 with self.assertRaisesRegex(tarfile.ReadError, "file could not be opened successfully"): 485 with tarfile.open(support.findfile('recursion.tar')) as tar: 486 pass 487 488class MiscReadTestBase(CommonReadTest): 489 def requires_name_attribute(self): 490 pass 491 492 def test_no_name_argument(self): 493 self.requires_name_attribute() 494 with open(self.tarname, "rb") as fobj: 495 self.assertIsInstance(fobj.name, str) 496 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 497 self.assertIsInstance(tar.name, str) 498 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 499 500 def test_no_name_attribute(self): 501 with open(self.tarname, "rb") as fobj: 502 data = fobj.read() 503 fobj = io.BytesIO(data) 504 self.assertRaises(AttributeError, getattr, fobj, "name") 505 tar = tarfile.open(fileobj=fobj, mode=self.mode) 506 self.assertIsNone(tar.name) 507 508 def test_empty_name_attribute(self): 509 with open(self.tarname, "rb") as fobj: 510 data = fobj.read() 511 fobj = io.BytesIO(data) 512 fobj.name = "" 513 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 514 self.assertIsNone(tar.name) 515 516 def test_int_name_attribute(self): 517 # Issue 21044: tarfile.open() should handle fileobj with an integer 518 # 'name' attribute. 519 fd = os.open(self.tarname, os.O_RDONLY) 520 with open(fd, 'rb') as fobj: 521 self.assertIsInstance(fobj.name, int) 522 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 523 self.assertIsNone(tar.name) 524 525 def test_bytes_name_attribute(self): 526 self.requires_name_attribute() 527 tarname = os.fsencode(self.tarname) 528 with open(tarname, 'rb') as fobj: 529 self.assertIsInstance(fobj.name, bytes) 530 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 531 self.assertIsInstance(tar.name, bytes) 532 self.assertEqual(tar.name, os.path.abspath(fobj.name)) 533 534 def test_pathlike_name(self): 535 tarname = pathlib.Path(self.tarname) 536 with tarfile.open(tarname, mode=self.mode) as tar: 537 self.assertIsInstance(tar.name, str) 538 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 539 with self.taropen(tarname) as tar: 540 self.assertIsInstance(tar.name, str) 541 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 542 with tarfile.TarFile.open(tarname, mode=self.mode) as tar: 543 self.assertIsInstance(tar.name, str) 544 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 545 if self.suffix == '': 546 with tarfile.TarFile(tarname, mode='r') as tar: 547 self.assertIsInstance(tar.name, str) 548 self.assertEqual(tar.name, os.path.abspath(os.fspath(tarname))) 549 550 def test_illegal_mode_arg(self): 551 with open(tmpname, 'wb'): 552 pass 553 with self.assertRaisesRegex(ValueError, 'mode must be '): 554 tar = self.taropen(tmpname, 'q') 555 with self.assertRaisesRegex(ValueError, 'mode must be '): 556 tar = self.taropen(tmpname, 'rw') 557 with self.assertRaisesRegex(ValueError, 'mode must be '): 558 tar = self.taropen(tmpname, '') 559 560 def test_fileobj_with_offset(self): 561 # Skip the first member and store values from the second member 562 # of the testtar. 563 tar = tarfile.open(self.tarname, mode=self.mode) 564 try: 565 tar.next() 566 t = tar.next() 567 name = t.name 568 offset = t.offset 569 with tar.extractfile(t) as f: 570 data = f.read() 571 finally: 572 tar.close() 573 574 # Open the testtar and seek to the offset of the second member. 575 with self.open(self.tarname) as fobj: 576 fobj.seek(offset) 577 578 # Test if the tarfile starts with the second member. 579 with tar.open(self.tarname, mode="r:", fileobj=fobj) as tar: 580 t = tar.next() 581 self.assertEqual(t.name, name) 582 # Read to the end of fileobj and test if seeking back to the 583 # beginning works. 584 tar.getmembers() 585 self.assertEqual(tar.extractfile(t).read(), data, 586 "seek back did not work") 587 588 def test_fail_comp(self): 589 # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. 590 self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) 591 with open(tarname, "rb") as fobj: 592 self.assertRaises(tarfile.ReadError, tarfile.open, 593 fileobj=fobj, mode=self.mode) 594 595 def test_v7_dirtype(self): 596 # Test old style dirtype member (bug #1336623): 597 # Old V7 tars create directory members using an AREGTYPE 598 # header with a "/" appended to the filename field. 599 tarinfo = self.tar.getmember("misc/dirtype-old-v7") 600 self.assertEqual(tarinfo.type, tarfile.DIRTYPE, 601 "v7 dirtype failed") 602 603 def test_xstar_type(self): 604 # The xstar format stores extra atime and ctime fields inside the 605 # space reserved for the prefix field. The prefix field must be 606 # ignored in this case, otherwise it will mess up the name. 607 try: 608 self.tar.getmember("misc/regtype-xstar") 609 except KeyError: 610 self.fail("failed to find misc/regtype-xstar (mangled prefix?)") 611 612 def test_check_members(self): 613 for tarinfo in self.tar: 614 self.assertEqual(int(tarinfo.mtime), 0o7606136617, 615 "wrong mtime for %s" % tarinfo.name) 616 if not tarinfo.name.startswith("ustar/"): 617 continue 618 self.assertEqual(tarinfo.uname, "tarfile", 619 "wrong uname for %s" % tarinfo.name) 620 621 def test_find_members(self): 622 self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", 623 "could not find all members") 624 625 @unittest.skipUnless(hasattr(os, "link"), 626 "Missing hardlink implementation") 627 @os_helper.skip_unless_symlink 628 def test_extract_hardlink(self): 629 # Test hardlink extraction (e.g. bug #857297). 630 with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar: 631 tar.extract("ustar/regtype", TEMPDIR, filter='data') 632 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/regtype")) 633 634 tar.extract("ustar/lnktype", TEMPDIR, filter='data') 635 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/lnktype")) 636 with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f: 637 data = f.read() 638 self.assertEqual(sha256sum(data), sha256_regtype) 639 640 tar.extract("ustar/symtype", TEMPDIR, filter='data') 641 self.addCleanup(os_helper.unlink, os.path.join(TEMPDIR, "ustar/symtype")) 642 with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f: 643 data = f.read() 644 self.assertEqual(sha256sum(data), sha256_regtype) 645 646 @os_helper.skip_unless_working_chmod 647 def test_extractall(self): 648 # Test if extractall() correctly restores directory permissions 649 # and times (see issue1735). 650 tar = tarfile.open(tarname, encoding="iso8859-1") 651 DIR = os.path.join(TEMPDIR, "extractall") 652 os.mkdir(DIR) 653 try: 654 directories = [t for t in tar if t.isdir()] 655 tar.extractall(DIR, directories, filter='fully_trusted') 656 for tarinfo in directories: 657 path = os.path.join(DIR, tarinfo.name) 658 if sys.platform != "win32": 659 # Win32 has no support for fine grained permissions. 660 self.assertEqual(tarinfo.mode & 0o777, 661 os.stat(path).st_mode & 0o777, 662 tarinfo.name) 663 def format_mtime(mtime): 664 if isinstance(mtime, float): 665 return "{} ({})".format(mtime, mtime.hex()) 666 else: 667 return "{!r} (int)".format(mtime) 668 file_mtime = os.path.getmtime(path) 669 errmsg = "tar mtime {0} != file time {1} of path {2!a}".format( 670 format_mtime(tarinfo.mtime), 671 format_mtime(file_mtime), 672 path) 673 self.assertEqual(tarinfo.mtime, file_mtime, errmsg) 674 finally: 675 tar.close() 676 os_helper.rmtree(DIR) 677 678 @os_helper.skip_unless_working_chmod 679 def test_extract_directory(self): 680 dirtype = "ustar/dirtype" 681 DIR = os.path.join(TEMPDIR, "extractdir") 682 os.mkdir(DIR) 683 try: 684 with tarfile.open(tarname, encoding="iso8859-1") as tar: 685 tarinfo = tar.getmember(dirtype) 686 tar.extract(tarinfo, path=DIR, filter='fully_trusted') 687 extracted = os.path.join(DIR, dirtype) 688 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 689 if sys.platform != "win32": 690 self.assertEqual(os.stat(extracted).st_mode & 0o777, 0o755) 691 finally: 692 os_helper.rmtree(DIR) 693 694 def test_extractall_pathlike_name(self): 695 DIR = pathlib.Path(TEMPDIR) / "extractall" 696 with os_helper.temp_dir(DIR), \ 697 tarfile.open(tarname, encoding="iso8859-1") as tar: 698 directories = [t for t in tar if t.isdir()] 699 tar.extractall(DIR, directories, filter='fully_trusted') 700 for tarinfo in directories: 701 path = DIR / tarinfo.name 702 self.assertEqual(os.path.getmtime(path), tarinfo.mtime) 703 704 def test_extract_pathlike_name(self): 705 dirtype = "ustar/dirtype" 706 DIR = pathlib.Path(TEMPDIR) / "extractall" 707 with os_helper.temp_dir(DIR), \ 708 tarfile.open(tarname, encoding="iso8859-1") as tar: 709 tarinfo = tar.getmember(dirtype) 710 tar.extract(tarinfo, path=DIR, filter='fully_trusted') 711 extracted = DIR / dirtype 712 self.assertEqual(os.path.getmtime(extracted), tarinfo.mtime) 713 714 def test_init_close_fobj(self): 715 # Issue #7341: Close the internal file object in the TarFile 716 # constructor in case of an error. For the test we rely on 717 # the fact that opening an empty file raises a ReadError. 718 empty = os.path.join(TEMPDIR, "empty") 719 with open(empty, "wb") as fobj: 720 fobj.write(b"") 721 722 try: 723 tar = object.__new__(tarfile.TarFile) 724 try: 725 tar.__init__(empty) 726 except tarfile.ReadError: 727 self.assertTrue(tar.fileobj.closed) 728 else: 729 self.fail("ReadError not raised") 730 finally: 731 os_helper.unlink(empty) 732 733 def test_parallel_iteration(self): 734 # Issue #16601: Restarting iteration over tarfile continued 735 # from where it left off. 736 with tarfile.open(self.tarname) as tar: 737 for m1, m2 in zip(tar, tar): 738 self.assertEqual(m1.offset, m2.offset) 739 self.assertEqual(m1.get_info(), m2.get_info()) 740 741 @unittest.skipIf(zlib is None, "requires zlib") 742 def test_zlib_error_does_not_leak(self): 743 # bpo-39039: tarfile.open allowed zlib exceptions to bubble up when 744 # parsing certain types of invalid data 745 with unittest.mock.patch("tarfile.TarInfo.fromtarfile") as mock: 746 mock.side_effect = zlib.error 747 with self.assertRaises(tarfile.ReadError): 748 tarfile.open(self.tarname) 749 750 def test_next_on_empty_tarfile(self): 751 fd = io.BytesIO() 752 tf = tarfile.open(fileobj=fd, mode="w") 753 tf.close() 754 755 fd.seek(0) 756 with tarfile.open(fileobj=fd, mode="r|") as tf: 757 self.assertEqual(tf.next(), None) 758 759 fd.seek(0) 760 with tarfile.open(fileobj=fd, mode="r") as tf: 761 self.assertEqual(tf.next(), None) 762 763class MiscReadTest(MiscReadTestBase, unittest.TestCase): 764 test_fail_comp = None 765 766class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): 767 pass 768 769class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): 770 def requires_name_attribute(self): 771 self.skipTest("BZ2File have no name attribute") 772 773class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): 774 def requires_name_attribute(self): 775 self.skipTest("LZMAFile have no name attribute") 776 777 778class StreamReadTest(CommonReadTest, unittest.TestCase): 779 780 prefix="r|" 781 782 def test_read_through(self): 783 # Issue #11224: A poorly designed _FileInFile.read() method 784 # caused seeking errors with stream tar files. 785 for tarinfo in self.tar: 786 if not tarinfo.isreg(): 787 continue 788 with self.tar.extractfile(tarinfo) as fobj: 789 while True: 790 try: 791 buf = fobj.read(512) 792 except tarfile.StreamError: 793 self.fail("simple read-through using " 794 "TarFile.extractfile() failed") 795 if not buf: 796 break 797 798 def test_fileobj_regular_file(self): 799 tarinfo = self.tar.next() # get "regtype" (can't use getmember) 800 with self.tar.extractfile(tarinfo) as fobj: 801 data = fobj.read() 802 self.assertEqual(len(data), tarinfo.size, 803 "regular file extraction failed") 804 self.assertEqual(sha256sum(data), sha256_regtype, 805 "regular file extraction failed") 806 807 def test_provoke_stream_error(self): 808 tarinfos = self.tar.getmembers() 809 with self.tar.extractfile(tarinfos[0]) as f: # read the first member 810 self.assertRaises(tarfile.StreamError, f.read) 811 812 def test_compare_members(self): 813 tar1 = tarfile.open(tarname, encoding="iso8859-1") 814 try: 815 tar2 = self.tar 816 817 while True: 818 t1 = tar1.next() 819 t2 = tar2.next() 820 if t1 is None: 821 break 822 self.assertIsNotNone(t2, "stream.next() failed.") 823 824 if t2.islnk() or t2.issym(): 825 with self.assertRaises(tarfile.StreamError): 826 tar2.extractfile(t2) 827 continue 828 829 v1 = tar1.extractfile(t1) 830 v2 = tar2.extractfile(t2) 831 if v1 is None: 832 continue 833 self.assertIsNotNone(v2, "stream.extractfile() failed") 834 self.assertEqual(v1.read(), v2.read(), 835 "stream extraction failed") 836 finally: 837 tar1.close() 838 839class GzipStreamReadTest(GzipTest, StreamReadTest): 840 pass 841 842class Bz2StreamReadTest(Bz2Test, StreamReadTest): 843 pass 844 845class LzmaStreamReadTest(LzmaTest, StreamReadTest): 846 pass 847 848 849class DetectReadTest(TarTest, unittest.TestCase): 850 def _testfunc_file(self, name, mode): 851 try: 852 tar = tarfile.open(name, mode) 853 except tarfile.ReadError as e: 854 self.fail() 855 else: 856 tar.close() 857 858 def _testfunc_fileobj(self, name, mode): 859 try: 860 with open(name, "rb") as f: 861 tar = tarfile.open(name, mode, fileobj=f) 862 except tarfile.ReadError as e: 863 self.fail() 864 else: 865 tar.close() 866 867 def _test_modes(self, testfunc): 868 if self.suffix: 869 with self.assertRaises(tarfile.ReadError): 870 tarfile.open(tarname, mode="r:" + self.suffix) 871 with self.assertRaises(tarfile.ReadError): 872 tarfile.open(tarname, mode="r|" + self.suffix) 873 with self.assertRaises(tarfile.ReadError): 874 tarfile.open(self.tarname, mode="r:") 875 with self.assertRaises(tarfile.ReadError): 876 tarfile.open(self.tarname, mode="r|") 877 testfunc(self.tarname, "r") 878 testfunc(self.tarname, "r:" + self.suffix) 879 testfunc(self.tarname, "r:*") 880 testfunc(self.tarname, "r|" + self.suffix) 881 testfunc(self.tarname, "r|*") 882 883 def test_detect_file(self): 884 self._test_modes(self._testfunc_file) 885 886 def test_detect_fileobj(self): 887 self._test_modes(self._testfunc_fileobj) 888 889class GzipDetectReadTest(GzipTest, DetectReadTest): 890 pass 891 892class Bz2DetectReadTest(Bz2Test, DetectReadTest): 893 def test_detect_stream_bz2(self): 894 # Originally, tarfile's stream detection looked for the string 895 # "BZh91" at the start of the file. This is incorrect because 896 # the '9' represents the blocksize (900,000 bytes). If the file was 897 # compressed using another blocksize autodetection fails. 898 with open(tarname, "rb") as fobj: 899 data = fobj.read() 900 901 # Compress with blocksize 100,000 bytes, the file starts with "BZh11". 902 with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj: 903 fobj.write(data) 904 905 self._testfunc_file(tmpname, "r|*") 906 907class LzmaDetectReadTest(LzmaTest, DetectReadTest): 908 pass 909 910 911class MemberReadTest(ReadTest, unittest.TestCase): 912 913 def _test_member(self, tarinfo, chksum=None, **kwargs): 914 if chksum is not None: 915 with self.tar.extractfile(tarinfo) as f: 916 self.assertEqual(sha256sum(f.read()), chksum, 917 "wrong sha256sum for %s" % tarinfo.name) 918 919 kwargs["mtime"] = 0o7606136617 920 kwargs["uid"] = 1000 921 kwargs["gid"] = 100 922 if "old-v7" not in tarinfo.name: 923 # V7 tar can't handle alphabetic owners. 924 kwargs["uname"] = "tarfile" 925 kwargs["gname"] = "tarfile" 926 for k, v in kwargs.items(): 927 self.assertEqual(getattr(tarinfo, k), v, 928 "wrong value in %s field of %s" % (k, tarinfo.name)) 929 930 def test_find_regtype(self): 931 tarinfo = self.tar.getmember("ustar/regtype") 932 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 933 934 def test_find_conttype(self): 935 tarinfo = self.tar.getmember("ustar/conttype") 936 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 937 938 def test_find_dirtype(self): 939 tarinfo = self.tar.getmember("ustar/dirtype") 940 self._test_member(tarinfo, size=0) 941 942 def test_find_dirtype_with_size(self): 943 tarinfo = self.tar.getmember("ustar/dirtype-with-size") 944 self._test_member(tarinfo, size=255) 945 946 def test_find_lnktype(self): 947 tarinfo = self.tar.getmember("ustar/lnktype") 948 self._test_member(tarinfo, size=0, linkname="ustar/regtype") 949 950 def test_find_symtype(self): 951 tarinfo = self.tar.getmember("ustar/symtype") 952 self._test_member(tarinfo, size=0, linkname="regtype") 953 954 def test_find_blktype(self): 955 tarinfo = self.tar.getmember("ustar/blktype") 956 self._test_member(tarinfo, size=0, devmajor=3, devminor=0) 957 958 def test_find_chrtype(self): 959 tarinfo = self.tar.getmember("ustar/chrtype") 960 self._test_member(tarinfo, size=0, devmajor=1, devminor=3) 961 962 def test_find_fifotype(self): 963 tarinfo = self.tar.getmember("ustar/fifotype") 964 self._test_member(tarinfo, size=0) 965 966 def test_find_sparse(self): 967 tarinfo = self.tar.getmember("ustar/sparse") 968 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 969 970 def test_find_gnusparse(self): 971 tarinfo = self.tar.getmember("gnu/sparse") 972 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 973 974 def test_find_gnusparse_00(self): 975 tarinfo = self.tar.getmember("gnu/sparse-0.0") 976 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 977 978 def test_find_gnusparse_01(self): 979 tarinfo = self.tar.getmember("gnu/sparse-0.1") 980 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 981 982 def test_find_gnusparse_10(self): 983 tarinfo = self.tar.getmember("gnu/sparse-1.0") 984 self._test_member(tarinfo, size=86016, chksum=sha256_sparse) 985 986 def test_find_umlauts(self): 987 tarinfo = self.tar.getmember("ustar/umlauts-" 988 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 989 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 990 991 def test_find_ustar_longname(self): 992 name = "ustar/" + "12345/" * 39 + "1234567/longname" 993 self.assertIn(name, self.tar.getnames()) 994 995 def test_find_regtype_oldv7(self): 996 tarinfo = self.tar.getmember("misc/regtype-old-v7") 997 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 998 999 def test_find_pax_umlauts(self): 1000 self.tar.close() 1001 self.tar = tarfile.open(self.tarname, mode=self.mode, 1002 encoding="iso8859-1") 1003 tarinfo = self.tar.getmember("pax/umlauts-" 1004 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1005 self._test_member(tarinfo, size=7011, chksum=sha256_regtype) 1006 1007 1008class LongnameTest: 1009 1010 def test_read_longname(self): 1011 # Test reading of longname (bug #1471427). 1012 longname = self.subdir + "/" + "123/" * 125 + "longname" 1013 try: 1014 tarinfo = self.tar.getmember(longname) 1015 except KeyError: 1016 self.fail("longname not found") 1017 self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, 1018 "read longname as dirtype") 1019 1020 def test_read_longlink(self): 1021 longname = self.subdir + "/" + "123/" * 125 + "longname" 1022 longlink = self.subdir + "/" + "123/" * 125 + "longlink" 1023 try: 1024 tarinfo = self.tar.getmember(longlink) 1025 except KeyError: 1026 self.fail("longlink not found") 1027 self.assertEqual(tarinfo.linkname, longname, "linkname wrong") 1028 1029 def test_truncated_longname(self): 1030 longname = self.subdir + "/" + "123/" * 125 + "longname" 1031 tarinfo = self.tar.getmember(longname) 1032 offset = tarinfo.offset 1033 self.tar.fileobj.seek(offset) 1034 fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) 1035 with self.assertRaises(tarfile.ReadError): 1036 tarfile.open(name="foo.tar", fileobj=fobj) 1037 1038 def test_header_offset(self): 1039 # Test if the start offset of the TarInfo object includes 1040 # the preceding extended header. 1041 longname = self.subdir + "/" + "123/" * 125 + "longname" 1042 offset = self.tar.getmember(longname).offset 1043 with open(tarname, "rb") as fobj: 1044 fobj.seek(offset) 1045 tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), 1046 "iso8859-1", "strict") 1047 self.assertEqual(tarinfo.type, self.longnametype) 1048 1049 def test_longname_directory(self): 1050 # Test reading a longlink directory. Issue #47231. 1051 longdir = ('a' * 101) + '/' 1052 with os_helper.temp_cwd(): 1053 with tarfile.open(tmpname, 'w') as tar: 1054 tar.format = self.format 1055 try: 1056 os.mkdir(longdir) 1057 tar.add(longdir) 1058 finally: 1059 os.rmdir(longdir.rstrip("/")) 1060 with tarfile.open(tmpname) as tar: 1061 self.assertIsNotNone(tar.getmember(longdir)) 1062 self.assertIsNotNone(tar.getmember(longdir.removesuffix('/'))) 1063 1064class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): 1065 1066 subdir = "gnu" 1067 longnametype = tarfile.GNUTYPE_LONGNAME 1068 format = tarfile.GNU_FORMAT 1069 1070 # Since 3.2 tarfile is supposed to accurately restore sparse members and 1071 # produce files with holes. This is what we actually want to test here. 1072 # Unfortunately, not all platforms/filesystems support sparse files, and 1073 # even on platforms that do it is non-trivial to make reliable assertions 1074 # about holes in files. Therefore, we first do one basic test which works 1075 # an all platforms, and after that a test that will work only on 1076 # platforms/filesystems that prove to support sparse files. 1077 def _test_sparse_file(self, name): 1078 self.tar.extract(name, TEMPDIR, filter='data') 1079 filename = os.path.join(TEMPDIR, name) 1080 with open(filename, "rb") as fobj: 1081 data = fobj.read() 1082 self.assertEqual(sha256sum(data), sha256_sparse, 1083 "wrong sha256sum for %s" % name) 1084 1085 if self._fs_supports_holes(): 1086 s = os.stat(filename) 1087 self.assertLess(s.st_blocks * 512, s.st_size) 1088 1089 def test_sparse_file_old(self): 1090 self._test_sparse_file("gnu/sparse") 1091 1092 def test_sparse_file_00(self): 1093 self._test_sparse_file("gnu/sparse-0.0") 1094 1095 def test_sparse_file_01(self): 1096 self._test_sparse_file("gnu/sparse-0.1") 1097 1098 def test_sparse_file_10(self): 1099 self._test_sparse_file("gnu/sparse-1.0") 1100 1101 @staticmethod 1102 def _fs_supports_holes(): 1103 # Return True if the platform knows the st_blocks stat attribute and 1104 # uses st_blocks units of 512 bytes, and if the filesystem is able to 1105 # store holes of 4 KiB in files. 1106 # 1107 # The function returns False if page size is larger than 4 KiB. 1108 # For example, ppc64 uses pages of 64 KiB. 1109 if sys.platform.startswith("linux"): 1110 # Linux evidentially has 512 byte st_blocks units. 1111 name = os.path.join(TEMPDIR, "sparse-test") 1112 with open(name, "wb") as fobj: 1113 # Seek to "punch a hole" of 4 KiB 1114 fobj.seek(4096) 1115 fobj.write(b'x' * 4096) 1116 fobj.truncate() 1117 s = os.stat(name) 1118 os_helper.unlink(name) 1119 return (s.st_blocks * 512 < s.st_size) 1120 else: 1121 return False 1122 1123 1124class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): 1125 1126 subdir = "pax" 1127 longnametype = tarfile.XHDTYPE 1128 format = tarfile.PAX_FORMAT 1129 1130 def test_pax_global_headers(self): 1131 tar = tarfile.open(tarname, encoding="iso8859-1") 1132 try: 1133 tarinfo = tar.getmember("pax/regtype1") 1134 self.assertEqual(tarinfo.uname, "foo") 1135 self.assertEqual(tarinfo.gname, "bar") 1136 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1137 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1138 1139 tarinfo = tar.getmember("pax/regtype2") 1140 self.assertEqual(tarinfo.uname, "") 1141 self.assertEqual(tarinfo.gname, "bar") 1142 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1143 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1144 1145 tarinfo = tar.getmember("pax/regtype3") 1146 self.assertEqual(tarinfo.uname, "tarfile") 1147 self.assertEqual(tarinfo.gname, "tarfile") 1148 self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), 1149 "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") 1150 finally: 1151 tar.close() 1152 1153 def test_pax_number_fields(self): 1154 # All following number fields are read from the pax header. 1155 tar = tarfile.open(tarname, encoding="iso8859-1") 1156 try: 1157 tarinfo = tar.getmember("pax/regtype4") 1158 self.assertEqual(tarinfo.size, 7011) 1159 self.assertEqual(tarinfo.uid, 123) 1160 self.assertEqual(tarinfo.gid, 123) 1161 self.assertEqual(tarinfo.mtime, 1041808783.0) 1162 self.assertEqual(type(tarinfo.mtime), float) 1163 self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0) 1164 self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0) 1165 finally: 1166 tar.close() 1167 1168 def test_pax_header_bad_formats(self): 1169 # The fields from the pax header have priority over the 1170 # TarInfo. 1171 pax_header_replacements = ( 1172 b" foo=bar\n", 1173 b"0 \n", 1174 b"1 \n", 1175 b"2 \n", 1176 b"3 =\n", 1177 b"4 =a\n", 1178 b"1000000 foo=bar\n", 1179 b"0 foo=bar\n", 1180 b"-12 foo=bar\n", 1181 b"000000000000000000000000036 foo=bar\n", 1182 ) 1183 pax_headers = {"foo": "bar"} 1184 1185 for replacement in pax_header_replacements: 1186 with self.subTest(header=replacement): 1187 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1188 encoding="iso8859-1") 1189 try: 1190 t = tarfile.TarInfo() 1191 t.name = "pax" # non-ASCII 1192 t.uid = 1 1193 t.pax_headers = pax_headers 1194 tar.addfile(t) 1195 finally: 1196 tar.close() 1197 1198 with open(tmpname, "rb") as f: 1199 data = f.read() 1200 self.assertIn(b"11 foo=bar\n", data) 1201 data = data.replace(b"11 foo=bar\n", replacement) 1202 1203 with open(tmpname, "wb") as f: 1204 f.truncate() 1205 f.write(data) 1206 1207 with self.assertRaisesRegex(tarfile.ReadError, r"method tar: ReadError\('invalid header'\)"): 1208 tarfile.open(tmpname, encoding="iso8859-1") 1209 1210 1211class WriteTestBase(TarTest): 1212 # Put all write tests in here that are supposed to be tested 1213 # in all possible mode combinations. 1214 1215 def test_fileobj_no_close(self): 1216 fobj = io.BytesIO() 1217 with tarfile.open(fileobj=fobj, mode=self.mode) as tar: 1218 tar.addfile(tarfile.TarInfo("foo")) 1219 self.assertFalse(fobj.closed, "external fileobjs must never closed") 1220 # Issue #20238: Incomplete gzip output with mode="w:gz" 1221 data = fobj.getvalue() 1222 del tar 1223 support.gc_collect() 1224 self.assertFalse(fobj.closed) 1225 self.assertEqual(data, fobj.getvalue()) 1226 1227 def test_eof_marker(self): 1228 # Make sure an end of archive marker is written (two zero blocks). 1229 # tarfile insists on aligning archives to a 20 * 512 byte recordsize. 1230 # So, we create an archive that has exactly 10240 bytes without the 1231 # marker, and has 20480 bytes once the marker is written. 1232 with tarfile.open(tmpname, self.mode) as tar: 1233 t = tarfile.TarInfo("foo") 1234 t.size = tarfile.RECORDSIZE - tarfile.BLOCKSIZE 1235 tar.addfile(t, io.BytesIO(b"a" * t.size)) 1236 1237 with self.open(tmpname, "rb") as fobj: 1238 self.assertEqual(len(fobj.read()), tarfile.RECORDSIZE * 2) 1239 1240 1241class WriteTest(WriteTestBase, unittest.TestCase): 1242 1243 prefix = "w:" 1244 1245 def test_100_char_name(self): 1246 # The name field in a tar header stores strings of at most 100 chars. 1247 # If a string is shorter than 100 chars it has to be padded with '\0', 1248 # which implies that a string of exactly 100 chars is stored without 1249 # a trailing '\0'. 1250 name = "0123456789" * 10 1251 tar = tarfile.open(tmpname, self.mode) 1252 try: 1253 t = tarfile.TarInfo(name) 1254 tar.addfile(t) 1255 finally: 1256 tar.close() 1257 1258 tar = tarfile.open(tmpname) 1259 try: 1260 self.assertEqual(tar.getnames()[0], name, 1261 "failed to store 100 char filename") 1262 finally: 1263 tar.close() 1264 1265 def test_tar_size(self): 1266 # Test for bug #1013882. 1267 tar = tarfile.open(tmpname, self.mode) 1268 try: 1269 path = os.path.join(TEMPDIR, "file") 1270 with open(path, "wb") as fobj: 1271 fobj.write(b"aaa") 1272 tar.add(path) 1273 finally: 1274 tar.close() 1275 self.assertGreater(os.path.getsize(tmpname), 0, 1276 "tarfile is empty") 1277 1278 # The test_*_size tests test for bug #1167128. 1279 def test_file_size(self): 1280 tar = tarfile.open(tmpname, self.mode) 1281 try: 1282 path = os.path.join(TEMPDIR, "file") 1283 with open(path, "wb"): 1284 pass 1285 tarinfo = tar.gettarinfo(path) 1286 self.assertEqual(tarinfo.size, 0) 1287 1288 with open(path, "wb") as fobj: 1289 fobj.write(b"aaa") 1290 tarinfo = tar.gettarinfo(path) 1291 self.assertEqual(tarinfo.size, 3) 1292 finally: 1293 tar.close() 1294 1295 def test_directory_size(self): 1296 path = os.path.join(TEMPDIR, "directory") 1297 os.mkdir(path) 1298 try: 1299 tar = tarfile.open(tmpname, self.mode) 1300 try: 1301 tarinfo = tar.gettarinfo(path) 1302 self.assertEqual(tarinfo.size, 0) 1303 finally: 1304 tar.close() 1305 finally: 1306 os_helper.rmdir(path) 1307 1308 # mock the following: 1309 # os.listdir: so we know that files are in the wrong order 1310 def test_ordered_recursion(self): 1311 path = os.path.join(TEMPDIR, "directory") 1312 os.mkdir(path) 1313 open(os.path.join(path, "1"), "a").close() 1314 open(os.path.join(path, "2"), "a").close() 1315 try: 1316 tar = tarfile.open(tmpname, self.mode) 1317 try: 1318 with unittest.mock.patch('os.listdir') as mock_listdir: 1319 mock_listdir.return_value = ["2", "1"] 1320 tar.add(path) 1321 paths = [] 1322 for m in tar.getmembers(): 1323 paths.append(os.path.split(m.name)[-1]) 1324 self.assertEqual(paths, ["directory", "1", "2"]); 1325 finally: 1326 tar.close() 1327 finally: 1328 os_helper.unlink(os.path.join(path, "1")) 1329 os_helper.unlink(os.path.join(path, "2")) 1330 os_helper.rmdir(path) 1331 1332 def test_gettarinfo_pathlike_name(self): 1333 with tarfile.open(tmpname, self.mode) as tar: 1334 path = pathlib.Path(TEMPDIR) / "file" 1335 with open(path, "wb") as fobj: 1336 fobj.write(b"aaa") 1337 tarinfo = tar.gettarinfo(path) 1338 tarinfo2 = tar.gettarinfo(os.fspath(path)) 1339 self.assertIsInstance(tarinfo.name, str) 1340 self.assertEqual(tarinfo.name, tarinfo2.name) 1341 self.assertEqual(tarinfo.size, 3) 1342 1343 @unittest.skipUnless(hasattr(os, "link"), 1344 "Missing hardlink implementation") 1345 def test_link_size(self): 1346 link = os.path.join(TEMPDIR, "link") 1347 target = os.path.join(TEMPDIR, "link_target") 1348 with open(target, "wb") as fobj: 1349 fobj.write(b"aaa") 1350 try: 1351 os.link(target, link) 1352 except PermissionError as e: 1353 self.skipTest('os.link(): %s' % e) 1354 try: 1355 tar = tarfile.open(tmpname, self.mode) 1356 try: 1357 # Record the link target in the inodes list. 1358 tar.gettarinfo(target) 1359 tarinfo = tar.gettarinfo(link) 1360 self.assertEqual(tarinfo.size, 0) 1361 finally: 1362 tar.close() 1363 finally: 1364 os_helper.unlink(target) 1365 os_helper.unlink(link) 1366 1367 @os_helper.skip_unless_symlink 1368 def test_symlink_size(self): 1369 path = os.path.join(TEMPDIR, "symlink") 1370 os.symlink("link_target", path) 1371 try: 1372 tar = tarfile.open(tmpname, self.mode) 1373 try: 1374 tarinfo = tar.gettarinfo(path) 1375 self.assertEqual(tarinfo.size, 0) 1376 finally: 1377 tar.close() 1378 finally: 1379 os_helper.unlink(path) 1380 1381 def test_add_self(self): 1382 # Test for #1257255. 1383 dstname = os.path.abspath(tmpname) 1384 tar = tarfile.open(tmpname, self.mode) 1385 try: 1386 self.assertEqual(tar.name, dstname, 1387 "archive name must be absolute") 1388 tar.add(dstname) 1389 self.assertEqual(tar.getnames(), [], 1390 "added the archive to itself") 1391 1392 with os_helper.change_cwd(TEMPDIR): 1393 tar.add(dstname) 1394 self.assertEqual(tar.getnames(), [], 1395 "added the archive to itself") 1396 finally: 1397 tar.close() 1398 1399 def test_filter(self): 1400 tempdir = os.path.join(TEMPDIR, "filter") 1401 os.mkdir(tempdir) 1402 try: 1403 for name in ("foo", "bar", "baz"): 1404 name = os.path.join(tempdir, name) 1405 os_helper.create_empty_file(name) 1406 1407 def filter(tarinfo): 1408 if os.path.basename(tarinfo.name) == "bar": 1409 return 1410 tarinfo.uid = 123 1411 tarinfo.uname = "foo" 1412 return tarinfo 1413 1414 tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1") 1415 try: 1416 tar.add(tempdir, arcname="empty_dir", filter=filter) 1417 finally: 1418 tar.close() 1419 1420 # Verify that filter is a keyword-only argument 1421 with self.assertRaises(TypeError): 1422 tar.add(tempdir, "empty_dir", True, None, filter) 1423 1424 tar = tarfile.open(tmpname, "r") 1425 try: 1426 for tarinfo in tar: 1427 self.assertEqual(tarinfo.uid, 123) 1428 self.assertEqual(tarinfo.uname, "foo") 1429 self.assertEqual(len(tar.getmembers()), 3) 1430 finally: 1431 tar.close() 1432 finally: 1433 os_helper.rmtree(tempdir) 1434 1435 # Guarantee that stored pathnames are not modified. Don't 1436 # remove ./ or ../ or double slashes. Still make absolute 1437 # pathnames relative. 1438 # For details see bug #6054. 1439 def _test_pathname(self, path, cmp_path=None, dir=False): 1440 # Create a tarfile with an empty member named path 1441 # and compare the stored name with the original. 1442 foo = os.path.join(TEMPDIR, "foo") 1443 if not dir: 1444 os_helper.create_empty_file(foo) 1445 else: 1446 os.mkdir(foo) 1447 1448 tar = tarfile.open(tmpname, self.mode) 1449 try: 1450 tar.add(foo, arcname=path) 1451 finally: 1452 tar.close() 1453 1454 tar = tarfile.open(tmpname, "r") 1455 try: 1456 t = tar.next() 1457 finally: 1458 tar.close() 1459 1460 if not dir: 1461 os_helper.unlink(foo) 1462 else: 1463 os_helper.rmdir(foo) 1464 1465 self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/")) 1466 1467 1468 @os_helper.skip_unless_symlink 1469 def test_extractall_symlinks(self): 1470 # Test if extractall works properly when tarfile contains symlinks 1471 tempdir = os.path.join(TEMPDIR, "testsymlinks") 1472 temparchive = os.path.join(TEMPDIR, "testsymlinks.tar") 1473 os.mkdir(tempdir) 1474 try: 1475 source_file = os.path.join(tempdir,'source') 1476 target_file = os.path.join(tempdir,'symlink') 1477 with open(source_file,'w') as f: 1478 f.write('something\n') 1479 os.symlink(source_file, target_file) 1480 with tarfile.open(temparchive, 'w') as tar: 1481 tar.add(source_file, arcname="source") 1482 tar.add(target_file, arcname="symlink") 1483 # Let's extract it to the location which contains the symlink 1484 with tarfile.open(temparchive, errorlevel=2) as tar: 1485 # this should not raise OSError: [Errno 17] File exists 1486 try: 1487 tar.extractall(path=tempdir, 1488 filter='fully_trusted') 1489 except OSError: 1490 self.fail("extractall failed with symlinked files") 1491 finally: 1492 os_helper.unlink(temparchive) 1493 os_helper.rmtree(tempdir) 1494 1495 def test_pathnames(self): 1496 self._test_pathname("foo") 1497 self._test_pathname(os.path.join("foo", ".", "bar")) 1498 self._test_pathname(os.path.join("foo", "..", "bar")) 1499 self._test_pathname(os.path.join(".", "foo")) 1500 self._test_pathname(os.path.join(".", "foo", ".")) 1501 self._test_pathname(os.path.join(".", "foo", ".", "bar")) 1502 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1503 self._test_pathname(os.path.join(".", "foo", "..", "bar")) 1504 self._test_pathname(os.path.join("..", "foo")) 1505 self._test_pathname(os.path.join("..", "foo", "..")) 1506 self._test_pathname(os.path.join("..", "foo", ".", "bar")) 1507 self._test_pathname(os.path.join("..", "foo", "..", "bar")) 1508 1509 self._test_pathname("foo" + os.sep + os.sep + "bar") 1510 self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True) 1511 1512 def test_abs_pathnames(self): 1513 if sys.platform == "win32": 1514 self._test_pathname("C:\\foo", "foo") 1515 else: 1516 self._test_pathname("/foo", "foo") 1517 self._test_pathname("///foo", "foo") 1518 1519 def test_cwd(self): 1520 # Test adding the current working directory. 1521 with os_helper.change_cwd(TEMPDIR): 1522 tar = tarfile.open(tmpname, self.mode) 1523 try: 1524 tar.add(".") 1525 finally: 1526 tar.close() 1527 1528 tar = tarfile.open(tmpname, "r") 1529 try: 1530 for t in tar: 1531 if t.name != ".": 1532 self.assertTrue(t.name.startswith("./"), t.name) 1533 finally: 1534 tar.close() 1535 1536 def test_open_nonwritable_fileobj(self): 1537 for exctype in OSError, EOFError, RuntimeError: 1538 class BadFile(io.BytesIO): 1539 first = True 1540 def write(self, data): 1541 if self.first: 1542 self.first = False 1543 raise exctype 1544 1545 f = BadFile() 1546 with self.assertRaises(exctype): 1547 tar = tarfile.open(tmpname, self.mode, fileobj=f, 1548 format=tarfile.PAX_FORMAT, 1549 pax_headers={'non': 'empty'}) 1550 self.assertFalse(f.closed) 1551 1552 1553class GzipWriteTest(GzipTest, WriteTest): 1554 pass 1555 1556 1557class Bz2WriteTest(Bz2Test, WriteTest): 1558 pass 1559 1560 1561class LzmaWriteTest(LzmaTest, WriteTest): 1562 pass 1563 1564 1565class StreamWriteTest(WriteTestBase, unittest.TestCase): 1566 1567 prefix = "w|" 1568 decompressor = None 1569 1570 def test_stream_padding(self): 1571 # Test for bug #1543303. 1572 tar = tarfile.open(tmpname, self.mode) 1573 tar.close() 1574 if self.decompressor: 1575 dec = self.decompressor() 1576 with open(tmpname, "rb") as fobj: 1577 data = fobj.read() 1578 data = dec.decompress(data) 1579 self.assertFalse(dec.unused_data, "found trailing data") 1580 else: 1581 with self.open(tmpname) as fobj: 1582 data = fobj.read() 1583 self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, 1584 "incorrect zero padding") 1585 1586 @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), 1587 "Missing umask implementation") 1588 @unittest.skipIf( 1589 support.is_emscripten or support.is_wasi, 1590 "Emscripten's/WASI's umask is a stub." 1591 ) 1592 def test_file_mode(self): 1593 # Test for issue #8464: Create files with correct 1594 # permissions. 1595 if os.path.exists(tmpname): 1596 os_helper.unlink(tmpname) 1597 1598 original_umask = os.umask(0o022) 1599 try: 1600 tar = tarfile.open(tmpname, self.mode) 1601 tar.close() 1602 mode = os.stat(tmpname).st_mode & 0o777 1603 self.assertEqual(mode, 0o644, "wrong file permissions") 1604 finally: 1605 os.umask(original_umask) 1606 1607 1608class GzipStreamWriteTest(GzipTest, StreamWriteTest): 1609 def test_source_directory_not_leaked(self): 1610 """ 1611 Ensure the source directory is not included in the tar header 1612 per bpo-41316. 1613 """ 1614 tarfile.open(tmpname, self.mode).close() 1615 payload = pathlib.Path(tmpname).read_text(encoding='latin-1') 1616 assert os.path.dirname(tmpname) not in payload 1617 1618 1619class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): 1620 decompressor = bz2.BZ2Decompressor if bz2 else None 1621 1622class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): 1623 decompressor = lzma.LZMADecompressor if lzma else None 1624 1625 1626class GNUWriteTest(unittest.TestCase): 1627 # This testcase checks for correct creation of GNU Longname 1628 # and Longlink extended headers (cp. bug #812325). 1629 1630 def _length(self, s): 1631 blocks = len(s) // 512 + 1 1632 return blocks * 512 1633 1634 def _calc_size(self, name, link=None): 1635 # Initial tar header 1636 count = 512 1637 1638 if len(name) > tarfile.LENGTH_NAME: 1639 # GNU longname extended header + longname 1640 count += 512 1641 count += self._length(name) 1642 if link is not None and len(link) > tarfile.LENGTH_LINK: 1643 # GNU longlink extended header + longlink 1644 count += 512 1645 count += self._length(link) 1646 return count 1647 1648 def _test(self, name, link=None): 1649 tarinfo = tarfile.TarInfo(name) 1650 if link: 1651 tarinfo.linkname = link 1652 tarinfo.type = tarfile.LNKTYPE 1653 1654 tar = tarfile.open(tmpname, "w") 1655 try: 1656 tar.format = tarfile.GNU_FORMAT 1657 tar.addfile(tarinfo) 1658 1659 v1 = self._calc_size(name, link) 1660 v2 = tar.offset 1661 self.assertEqual(v1, v2, "GNU longname/longlink creation failed") 1662 finally: 1663 tar.close() 1664 1665 tar = tarfile.open(tmpname) 1666 try: 1667 member = tar.next() 1668 self.assertIsNotNone(member, 1669 "unable to read longname member") 1670 self.assertEqual(tarinfo.name, member.name, 1671 "unable to read longname member") 1672 self.assertEqual(tarinfo.linkname, member.linkname, 1673 "unable to read longname member") 1674 finally: 1675 tar.close() 1676 1677 def test_longname_1023(self): 1678 self._test(("longnam/" * 127) + "longnam") 1679 1680 def test_longname_1024(self): 1681 self._test(("longnam/" * 127) + "longname") 1682 1683 def test_longname_1025(self): 1684 self._test(("longnam/" * 127) + "longname_") 1685 1686 def test_longlink_1023(self): 1687 self._test("name", ("longlnk/" * 127) + "longlnk") 1688 1689 def test_longlink_1024(self): 1690 self._test("name", ("longlnk/" * 127) + "longlink") 1691 1692 def test_longlink_1025(self): 1693 self._test("name", ("longlnk/" * 127) + "longlink_") 1694 1695 def test_longnamelink_1023(self): 1696 self._test(("longnam/" * 127) + "longnam", 1697 ("longlnk/" * 127) + "longlnk") 1698 1699 def test_longnamelink_1024(self): 1700 self._test(("longnam/" * 127) + "longname", 1701 ("longlnk/" * 127) + "longlink") 1702 1703 def test_longnamelink_1025(self): 1704 self._test(("longnam/" * 127) + "longname_", 1705 ("longlnk/" * 127) + "longlink_") 1706 1707 1708class DeviceHeaderTest(WriteTestBase, unittest.TestCase): 1709 1710 prefix = "w:" 1711 1712 def test_headers_written_only_for_device_files(self): 1713 # Regression test for bpo-18819. 1714 tempdir = os.path.join(TEMPDIR, "device_header_test") 1715 os.mkdir(tempdir) 1716 try: 1717 tar = tarfile.open(tmpname, self.mode) 1718 try: 1719 input_blk = tarfile.TarInfo(name="my_block_device") 1720 input_reg = tarfile.TarInfo(name="my_regular_file") 1721 input_blk.type = tarfile.BLKTYPE 1722 input_reg.type = tarfile.REGTYPE 1723 tar.addfile(input_blk) 1724 tar.addfile(input_reg) 1725 finally: 1726 tar.close() 1727 1728 # devmajor and devminor should be *interpreted* as 0 in both... 1729 tar = tarfile.open(tmpname, "r") 1730 try: 1731 output_blk = tar.getmember("my_block_device") 1732 output_reg = tar.getmember("my_regular_file") 1733 finally: 1734 tar.close() 1735 self.assertEqual(output_blk.devmajor, 0) 1736 self.assertEqual(output_blk.devminor, 0) 1737 self.assertEqual(output_reg.devmajor, 0) 1738 self.assertEqual(output_reg.devminor, 0) 1739 1740 # ...but the fields should not actually be set on regular files: 1741 with open(tmpname, "rb") as infile: 1742 buf = infile.read() 1743 buf_blk = buf[output_blk.offset:output_blk.offset_data] 1744 buf_reg = buf[output_reg.offset:output_reg.offset_data] 1745 # See `struct posixheader` in GNU docs for byte offsets: 1746 # <https://www.gnu.org/software/tar/manual/html_node/Standard.html> 1747 device_headers = slice(329, 329 + 16) 1748 self.assertEqual(buf_blk[device_headers], b"0000000\0" * 2) 1749 self.assertEqual(buf_reg[device_headers], b"\0" * 16) 1750 finally: 1751 os_helper.rmtree(tempdir) 1752 1753 1754class CreateTest(WriteTestBase, unittest.TestCase): 1755 1756 prefix = "x:" 1757 1758 file_path = os.path.join(TEMPDIR, "spameggs42") 1759 1760 def setUp(self): 1761 os_helper.unlink(tmpname) 1762 1763 @classmethod 1764 def setUpClass(cls): 1765 with open(cls.file_path, "wb") as fobj: 1766 fobj.write(b"aaa") 1767 1768 @classmethod 1769 def tearDownClass(cls): 1770 os_helper.unlink(cls.file_path) 1771 1772 def test_create(self): 1773 with tarfile.open(tmpname, self.mode) as tobj: 1774 tobj.add(self.file_path) 1775 1776 with self.taropen(tmpname) as tobj: 1777 names = tobj.getnames() 1778 self.assertEqual(len(names), 1) 1779 self.assertIn('spameggs42', names[0]) 1780 1781 def test_create_existing(self): 1782 with tarfile.open(tmpname, self.mode) as tobj: 1783 tobj.add(self.file_path) 1784 1785 with self.assertRaises(FileExistsError): 1786 tobj = tarfile.open(tmpname, self.mode) 1787 1788 with self.taropen(tmpname) as tobj: 1789 names = tobj.getnames() 1790 self.assertEqual(len(names), 1) 1791 self.assertIn('spameggs42', names[0]) 1792 1793 def test_create_taropen(self): 1794 with self.taropen(tmpname, "x") as tobj: 1795 tobj.add(self.file_path) 1796 1797 with self.taropen(tmpname) as tobj: 1798 names = tobj.getnames() 1799 self.assertEqual(len(names), 1) 1800 self.assertIn('spameggs42', names[0]) 1801 1802 def test_create_existing_taropen(self): 1803 with self.taropen(tmpname, "x") as tobj: 1804 tobj.add(self.file_path) 1805 1806 with self.assertRaises(FileExistsError): 1807 with self.taropen(tmpname, "x"): 1808 pass 1809 1810 with self.taropen(tmpname) as tobj: 1811 names = tobj.getnames() 1812 self.assertEqual(len(names), 1) 1813 self.assertIn("spameggs42", names[0]) 1814 1815 def test_create_pathlike_name(self): 1816 with tarfile.open(pathlib.Path(tmpname), self.mode) as tobj: 1817 self.assertIsInstance(tobj.name, str) 1818 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1819 tobj.add(pathlib.Path(self.file_path)) 1820 names = tobj.getnames() 1821 self.assertEqual(len(names), 1) 1822 self.assertIn('spameggs42', names[0]) 1823 1824 with self.taropen(tmpname) as tobj: 1825 names = tobj.getnames() 1826 self.assertEqual(len(names), 1) 1827 self.assertIn('spameggs42', names[0]) 1828 1829 def test_create_taropen_pathlike_name(self): 1830 with self.taropen(pathlib.Path(tmpname), "x") as tobj: 1831 self.assertIsInstance(tobj.name, str) 1832 self.assertEqual(tobj.name, os.path.abspath(tmpname)) 1833 tobj.add(pathlib.Path(self.file_path)) 1834 names = tobj.getnames() 1835 self.assertEqual(len(names), 1) 1836 self.assertIn('spameggs42', names[0]) 1837 1838 with self.taropen(tmpname) as tobj: 1839 names = tobj.getnames() 1840 self.assertEqual(len(names), 1) 1841 self.assertIn('spameggs42', names[0]) 1842 1843 1844class GzipCreateTest(GzipTest, CreateTest): 1845 1846 def test_create_with_compresslevel(self): 1847 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1848 tobj.add(self.file_path) 1849 with tarfile.open(tmpname, 'r:gz', compresslevel=1) as tobj: 1850 pass 1851 1852 1853class Bz2CreateTest(Bz2Test, CreateTest): 1854 1855 def test_create_with_compresslevel(self): 1856 with tarfile.open(tmpname, self.mode, compresslevel=1) as tobj: 1857 tobj.add(self.file_path) 1858 with tarfile.open(tmpname, 'r:bz2', compresslevel=1) as tobj: 1859 pass 1860 1861 1862class LzmaCreateTest(LzmaTest, CreateTest): 1863 1864 # Unlike gz and bz2, xz uses the preset keyword instead of compresslevel. 1865 # It does not allow for preset to be specified when reading. 1866 def test_create_with_preset(self): 1867 with tarfile.open(tmpname, self.mode, preset=1) as tobj: 1868 tobj.add(self.file_path) 1869 1870 1871class CreateWithXModeTest(CreateTest): 1872 1873 prefix = "x" 1874 1875 test_create_taropen = None 1876 test_create_existing_taropen = None 1877 1878 1879@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") 1880class HardlinkTest(unittest.TestCase): 1881 # Test the creation of LNKTYPE (hardlink) members in an archive. 1882 1883 def setUp(self): 1884 self.foo = os.path.join(TEMPDIR, "foo") 1885 self.bar = os.path.join(TEMPDIR, "bar") 1886 1887 with open(self.foo, "wb") as fobj: 1888 fobj.write(b"foo") 1889 1890 try: 1891 os.link(self.foo, self.bar) 1892 except PermissionError as e: 1893 self.skipTest('os.link(): %s' % e) 1894 1895 self.tar = tarfile.open(tmpname, "w") 1896 self.tar.add(self.foo) 1897 1898 def tearDown(self): 1899 self.tar.close() 1900 os_helper.unlink(self.foo) 1901 os_helper.unlink(self.bar) 1902 1903 def test_add_twice(self): 1904 # The same name will be added as a REGTYPE every 1905 # time regardless of st_nlink. 1906 tarinfo = self.tar.gettarinfo(self.foo) 1907 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1908 "add file as regular failed") 1909 1910 def test_add_hardlink(self): 1911 tarinfo = self.tar.gettarinfo(self.bar) 1912 self.assertEqual(tarinfo.type, tarfile.LNKTYPE, 1913 "add file as hardlink failed") 1914 1915 def test_dereference_hardlink(self): 1916 self.tar.dereference = True 1917 tarinfo = self.tar.gettarinfo(self.bar) 1918 self.assertEqual(tarinfo.type, tarfile.REGTYPE, 1919 "dereferencing hardlink failed") 1920 1921 1922class PaxWriteTest(GNUWriteTest): 1923 1924 def _test(self, name, link=None): 1925 # See GNUWriteTest. 1926 tarinfo = tarfile.TarInfo(name) 1927 if link: 1928 tarinfo.linkname = link 1929 tarinfo.type = tarfile.LNKTYPE 1930 1931 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT) 1932 try: 1933 tar.addfile(tarinfo) 1934 finally: 1935 tar.close() 1936 1937 tar = tarfile.open(tmpname) 1938 try: 1939 if link: 1940 l = tar.getmembers()[0].linkname 1941 self.assertEqual(link, l, "PAX longlink creation failed") 1942 else: 1943 n = tar.getmembers()[0].name 1944 self.assertEqual(name, n, "PAX longname creation failed") 1945 finally: 1946 tar.close() 1947 1948 def test_pax_global_header(self): 1949 pax_headers = { 1950 "foo": "bar", 1951 "uid": "0", 1952 "mtime": "1.23", 1953 "test": "\xe4\xf6\xfc", 1954 "\xe4\xf6\xfc": "test"} 1955 1956 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1957 pax_headers=pax_headers) 1958 try: 1959 tar.addfile(tarfile.TarInfo("test")) 1960 finally: 1961 tar.close() 1962 1963 # Test if the global header was written correctly. 1964 tar = tarfile.open(tmpname, encoding="iso8859-1") 1965 try: 1966 self.assertEqual(tar.pax_headers, pax_headers) 1967 self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) 1968 # Test if all the fields are strings. 1969 for key, val in tar.pax_headers.items(): 1970 self.assertIsNot(type(key), bytes) 1971 self.assertIsNot(type(val), bytes) 1972 if key in tarfile.PAX_NUMBER_FIELDS: 1973 try: 1974 tarfile.PAX_NUMBER_FIELDS[key](val) 1975 except (TypeError, ValueError): 1976 self.fail("unable to convert pax header field") 1977 finally: 1978 tar.close() 1979 1980 def test_pax_extended_header(self): 1981 # The fields from the pax header have priority over the 1982 # TarInfo. 1983 pax_headers = {"path": "foo", "uid": "123"} 1984 1985 tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, 1986 encoding="iso8859-1") 1987 try: 1988 t = tarfile.TarInfo() 1989 t.name = "\xe4\xf6\xfc" # non-ASCII 1990 t.uid = 8**8 # too large 1991 t.pax_headers = pax_headers 1992 tar.addfile(t) 1993 finally: 1994 tar.close() 1995 1996 tar = tarfile.open(tmpname, encoding="iso8859-1") 1997 try: 1998 t = tar.getmembers()[0] 1999 self.assertEqual(t.pax_headers, pax_headers) 2000 self.assertEqual(t.name, "foo") 2001 self.assertEqual(t.uid, 123) 2002 finally: 2003 tar.close() 2004 2005 def test_create_pax_header(self): 2006 # The ustar header should contain values that can be 2007 # represented reasonably, even if a better (e.g. higher 2008 # precision) version is set in the pax header. 2009 # Issue #45863 2010 2011 # values that should be kept 2012 t = tarfile.TarInfo() 2013 t.name = "foo" 2014 t.mtime = 1000.1 2015 t.size = 100 2016 t.uid = 123 2017 t.gid = 124 2018 info = t.get_info() 2019 header = t.create_pax_header(info, encoding="iso8859-1") 2020 self.assertEqual(info['name'], "foo") 2021 # mtime should be rounded to nearest second 2022 self.assertIsInstance(info['mtime'], int) 2023 self.assertEqual(info['mtime'], 1000) 2024 self.assertEqual(info['size'], 100) 2025 self.assertEqual(info['uid'], 123) 2026 self.assertEqual(info['gid'], 124) 2027 self.assertEqual(header, 2028 b'././@PaxHeader' + bytes(86) \ 2029 + b'0000000\x000000000\x000000000\x0000000000020\x0000000000000\x00010205\x00 x' \ 2030 + bytes(100) + b'ustar\x0000'+ bytes(247) \ 2031 + b'16 mtime=1000.1\n' + bytes(496) + b'foo' + bytes(97) \ 2032 + b'0000644\x000000173\x000000174\x0000000000144\x0000000001750\x00006516\x00 0' \ 2033 + bytes(100) + b'ustar\x0000' + bytes(247)) 2034 2035 # values that should be changed 2036 t = tarfile.TarInfo() 2037 t.name = "foo\u3374" # can't be represented in ascii 2038 t.mtime = 10**10 # too big 2039 t.size = 10**10 # too big 2040 t.uid = 8**8 # too big 2041 t.gid = 8**8+1 # too big 2042 info = t.get_info() 2043 header = t.create_pax_header(info, encoding="iso8859-1") 2044 # name is kept as-is in info but should be added to pax header 2045 self.assertEqual(info['name'], "foo\u3374") 2046 self.assertEqual(info['mtime'], 0) 2047 self.assertEqual(info['size'], 0) 2048 self.assertEqual(info['uid'], 0) 2049 self.assertEqual(info['gid'], 0) 2050 self.assertEqual(header, 2051 b'././@PaxHeader' + bytes(86) \ 2052 + b'0000000\x000000000\x000000000\x0000000000130\x0000000000000\x00010207\x00 x' \ 2053 + bytes(100) + b'ustar\x0000' + bytes(247) \ 2054 + b'15 path=foo\xe3\x8d\xb4\n16 uid=16777216\n' \ 2055 + b'16 gid=16777217\n20 size=10000000000\n' \ 2056 + b'21 mtime=10000000000\n'+ bytes(424) + b'foo?' + bytes(96) \ 2057 + b'0000644\x000000000\x000000000\x0000000000000\x0000000000000\x00006540\x00 0' \ 2058 + bytes(100) + b'ustar\x0000' + bytes(247)) 2059 2060 2061class UnicodeTest: 2062 2063 def test_iso8859_1_filename(self): 2064 self._test_unicode_filename("iso8859-1") 2065 2066 def test_utf7_filename(self): 2067 self._test_unicode_filename("utf7") 2068 2069 def test_utf8_filename(self): 2070 self._test_unicode_filename("utf-8") 2071 2072 def _test_unicode_filename(self, encoding): 2073 tar = tarfile.open(tmpname, "w", format=self.format, 2074 encoding=encoding, errors="strict") 2075 try: 2076 name = "\xe4\xf6\xfc" 2077 tar.addfile(tarfile.TarInfo(name)) 2078 finally: 2079 tar.close() 2080 2081 tar = tarfile.open(tmpname, encoding=encoding) 2082 try: 2083 self.assertEqual(tar.getmembers()[0].name, name) 2084 finally: 2085 tar.close() 2086 2087 def test_unicode_filename_error(self): 2088 tar = tarfile.open(tmpname, "w", format=self.format, 2089 encoding="ascii", errors="strict") 2090 try: 2091 tarinfo = tarfile.TarInfo() 2092 2093 tarinfo.name = "\xe4\xf6\xfc" 2094 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2095 2096 tarinfo.name = "foo" 2097 tarinfo.uname = "\xe4\xf6\xfc" 2098 self.assertRaises(UnicodeError, tar.addfile, tarinfo) 2099 finally: 2100 tar.close() 2101 2102 def test_unicode_argument(self): 2103 tar = tarfile.open(tarname, "r", 2104 encoding="iso8859-1", errors="strict") 2105 try: 2106 for t in tar: 2107 self.assertIs(type(t.name), str) 2108 self.assertIs(type(t.linkname), str) 2109 self.assertIs(type(t.uname), str) 2110 self.assertIs(type(t.gname), str) 2111 finally: 2112 tar.close() 2113 2114 def test_uname_unicode(self): 2115 t = tarfile.TarInfo("foo") 2116 t.uname = "\xe4\xf6\xfc" 2117 t.gname = "\xe4\xf6\xfc" 2118 2119 tar = tarfile.open(tmpname, mode="w", format=self.format, 2120 encoding="iso8859-1") 2121 try: 2122 tar.addfile(t) 2123 finally: 2124 tar.close() 2125 2126 tar = tarfile.open(tmpname, encoding="iso8859-1") 2127 try: 2128 t = tar.getmember("foo") 2129 self.assertEqual(t.uname, "\xe4\xf6\xfc") 2130 self.assertEqual(t.gname, "\xe4\xf6\xfc") 2131 2132 if self.format != tarfile.PAX_FORMAT: 2133 tar.close() 2134 tar = tarfile.open(tmpname, encoding="ascii") 2135 t = tar.getmember("foo") 2136 self.assertEqual(t.uname, "\udce4\udcf6\udcfc") 2137 self.assertEqual(t.gname, "\udce4\udcf6\udcfc") 2138 finally: 2139 tar.close() 2140 2141 2142class UstarUnicodeTest(UnicodeTest, unittest.TestCase): 2143 2144 format = tarfile.USTAR_FORMAT 2145 2146 # Test whether the utf-8 encoded version of a filename exceeds the 100 2147 # bytes name field limit (every occurrence of '\xff' will be expanded to 2 2148 # bytes). 2149 def test_unicode_name1(self): 2150 self._test_ustar_name("0123456789" * 10) 2151 self._test_ustar_name("0123456789" * 10 + "0", ValueError) 2152 self._test_ustar_name("0123456789" * 9 + "01234567\xff") 2153 self._test_ustar_name("0123456789" * 9 + "012345678\xff", ValueError) 2154 2155 def test_unicode_name2(self): 2156 self._test_ustar_name("0123456789" * 9 + "012345\xff\xff") 2157 self._test_ustar_name("0123456789" * 9 + "0123456\xff\xff", ValueError) 2158 2159 # Test whether the utf-8 encoded version of a filename exceeds the 155 2160 # bytes prefix + '/' + 100 bytes name limit. 2161 def test_unicode_longname1(self): 2162 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 10) 2163 self._test_ustar_name("0123456789" * 15 + "0123/4" + "0123456789" * 10, ValueError) 2164 self._test_ustar_name("0123456789" * 15 + "012\xff/" + "0123456789" * 10) 2165 self._test_ustar_name("0123456789" * 15 + "0123\xff/" + "0123456789" * 10, ValueError) 2166 2167 def test_unicode_longname2(self): 2168 self._test_ustar_name("0123456789" * 15 + "01\xff/2" + "0123456789" * 10, ValueError) 2169 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/" + "0123456789" * 10, ValueError) 2170 2171 def test_unicode_longname3(self): 2172 self._test_ustar_name("0123456789" * 15 + "01\xff\xff/2" + "0123456789" * 10, ValueError) 2173 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "01234567\xff") 2174 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345678\xff", ValueError) 2175 2176 def test_unicode_longname4(self): 2177 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "012345\xff\xff") 2178 self._test_ustar_name("0123456789" * 15 + "01234/" + "0123456789" * 9 + "0123456\xff\xff", ValueError) 2179 2180 def _test_ustar_name(self, name, exc=None): 2181 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2182 t = tarfile.TarInfo(name) 2183 if exc is None: 2184 tar.addfile(t) 2185 else: 2186 self.assertRaises(exc, tar.addfile, t) 2187 2188 if exc is None: 2189 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2190 for t in tar: 2191 self.assertEqual(name, t.name) 2192 break 2193 2194 # Test the same as above for the 100 bytes link field. 2195 def test_unicode_link1(self): 2196 self._test_ustar_link("0123456789" * 10) 2197 self._test_ustar_link("0123456789" * 10 + "0", ValueError) 2198 self._test_ustar_link("0123456789" * 9 + "01234567\xff") 2199 self._test_ustar_link("0123456789" * 9 + "012345678\xff", ValueError) 2200 2201 def test_unicode_link2(self): 2202 self._test_ustar_link("0123456789" * 9 + "012345\xff\xff") 2203 self._test_ustar_link("0123456789" * 9 + "0123456\xff\xff", ValueError) 2204 2205 def _test_ustar_link(self, name, exc=None): 2206 with tarfile.open(tmpname, "w", format=self.format, encoding="utf-8") as tar: 2207 t = tarfile.TarInfo("foo") 2208 t.linkname = name 2209 if exc is None: 2210 tar.addfile(t) 2211 else: 2212 self.assertRaises(exc, tar.addfile, t) 2213 2214 if exc is None: 2215 with tarfile.open(tmpname, "r", encoding="utf-8") as tar: 2216 for t in tar: 2217 self.assertEqual(name, t.linkname) 2218 break 2219 2220 2221class GNUUnicodeTest(UnicodeTest, unittest.TestCase): 2222 2223 format = tarfile.GNU_FORMAT 2224 2225 def test_bad_pax_header(self): 2226 # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields 2227 # without a hdrcharset=BINARY header. 2228 for encoding, name in ( 2229 ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), 2230 ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): 2231 with tarfile.open(tarname, encoding=encoding, 2232 errors="surrogateescape") as tar: 2233 try: 2234 t = tar.getmember(name) 2235 except KeyError: 2236 self.fail("unable to read bad GNU tar pax header") 2237 2238 2239class PAXUnicodeTest(UnicodeTest, unittest.TestCase): 2240 2241 format = tarfile.PAX_FORMAT 2242 2243 # PAX_FORMAT ignores encoding in write mode. 2244 test_unicode_filename_error = None 2245 2246 def test_binary_header(self): 2247 # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. 2248 for encoding, name in ( 2249 ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), 2250 ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): 2251 with tarfile.open(tarname, encoding=encoding, 2252 errors="surrogateescape") as tar: 2253 try: 2254 t = tar.getmember(name) 2255 except KeyError: 2256 self.fail("unable to read POSIX.1-2008 binary header") 2257 2258 2259class AppendTestBase: 2260 # Test append mode (cp. patch #1652681). 2261 2262 def setUp(self): 2263 self.tarname = tmpname 2264 if os.path.exists(self.tarname): 2265 os_helper.unlink(self.tarname) 2266 2267 def _create_testtar(self, mode="w:"): 2268 with tarfile.open(tarname, encoding="iso8859-1") as src: 2269 t = src.getmember("ustar/regtype") 2270 t.name = "foo" 2271 with src.extractfile(t) as f: 2272 with tarfile.open(self.tarname, mode) as tar: 2273 tar.addfile(t, f) 2274 2275 def test_append_compressed(self): 2276 self._create_testtar("w:" + self.suffix) 2277 self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") 2278 2279class AppendTest(AppendTestBase, unittest.TestCase): 2280 test_append_compressed = None 2281 2282 def _add_testfile(self, fileobj=None): 2283 with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: 2284 tar.addfile(tarfile.TarInfo("bar")) 2285 2286 def _test(self, names=["bar"], fileobj=None): 2287 with tarfile.open(self.tarname, fileobj=fileobj) as tar: 2288 self.assertEqual(tar.getnames(), names) 2289 2290 def test_non_existing(self): 2291 self._add_testfile() 2292 self._test() 2293 2294 def test_empty(self): 2295 tarfile.open(self.tarname, "w:").close() 2296 self._add_testfile() 2297 self._test() 2298 2299 def test_empty_fileobj(self): 2300 fobj = io.BytesIO(b"\0" * 1024) 2301 self._add_testfile(fobj) 2302 fobj.seek(0) 2303 self._test(fileobj=fobj) 2304 2305 def test_fileobj(self): 2306 self._create_testtar() 2307 with open(self.tarname, "rb") as fobj: 2308 data = fobj.read() 2309 fobj = io.BytesIO(data) 2310 self._add_testfile(fobj) 2311 fobj.seek(0) 2312 self._test(names=["foo", "bar"], fileobj=fobj) 2313 2314 def test_existing(self): 2315 self._create_testtar() 2316 self._add_testfile() 2317 self._test(names=["foo", "bar"]) 2318 2319 # Append mode is supposed to fail if the tarfile to append to 2320 # does not end with a zero block. 2321 def _test_error(self, data): 2322 with open(self.tarname, "wb") as fobj: 2323 fobj.write(data) 2324 self.assertRaises(tarfile.ReadError, self._add_testfile) 2325 2326 def test_null(self): 2327 self._test_error(b"") 2328 2329 def test_incomplete(self): 2330 self._test_error(b"\0" * 13) 2331 2332 def test_premature_eof(self): 2333 data = tarfile.TarInfo("foo").tobuf() 2334 self._test_error(data) 2335 2336 def test_trailing_garbage(self): 2337 data = tarfile.TarInfo("foo").tobuf() 2338 self._test_error(data + b"\0" * 13) 2339 2340 def test_invalid(self): 2341 self._test_error(b"a" * 512) 2342 2343class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): 2344 pass 2345 2346class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): 2347 pass 2348 2349class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): 2350 pass 2351 2352 2353class LimitsTest(unittest.TestCase): 2354 2355 def test_ustar_limits(self): 2356 # 100 char name 2357 tarinfo = tarfile.TarInfo("0123456789" * 10) 2358 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2359 2360 # 101 char name that cannot be stored 2361 tarinfo = tarfile.TarInfo("0123456789" * 10 + "0") 2362 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2363 2364 # 256 char name with a slash at pos 156 2365 tarinfo = tarfile.TarInfo("123/" * 62 + "longname") 2366 tarinfo.tobuf(tarfile.USTAR_FORMAT) 2367 2368 # 256 char name that cannot be stored 2369 tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname") 2370 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2371 2372 # 512 char name 2373 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2374 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2375 2376 # 512 char linkname 2377 tarinfo = tarfile.TarInfo("longlink") 2378 tarinfo.linkname = "123/" * 126 + "longname" 2379 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2380 2381 # uid > 8 digits 2382 tarinfo = tarfile.TarInfo("name") 2383 tarinfo.uid = 0o10000000 2384 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT) 2385 2386 def test_gnu_limits(self): 2387 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2388 tarinfo.tobuf(tarfile.GNU_FORMAT) 2389 2390 tarinfo = tarfile.TarInfo("longlink") 2391 tarinfo.linkname = "123/" * 126 + "longname" 2392 tarinfo.tobuf(tarfile.GNU_FORMAT) 2393 2394 # uid >= 256 ** 7 2395 tarinfo = tarfile.TarInfo("name") 2396 tarinfo.uid = 0o4000000000000000000 2397 self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT) 2398 2399 def test_pax_limits(self): 2400 tarinfo = tarfile.TarInfo("123/" * 126 + "longname") 2401 tarinfo.tobuf(tarfile.PAX_FORMAT) 2402 2403 tarinfo = tarfile.TarInfo("longlink") 2404 tarinfo.linkname = "123/" * 126 + "longname" 2405 tarinfo.tobuf(tarfile.PAX_FORMAT) 2406 2407 tarinfo = tarfile.TarInfo("name") 2408 tarinfo.uid = 0o4000000000000000000 2409 tarinfo.tobuf(tarfile.PAX_FORMAT) 2410 2411 2412class MiscTest(unittest.TestCase): 2413 2414 def test_char_fields(self): 2415 self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), 2416 b"foo\0\0\0\0\0") 2417 self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), 2418 b"foo") 2419 self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), 2420 "foo") 2421 self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), 2422 "foo") 2423 2424 def test_read_number_fields(self): 2425 # Issue 13158: Test if GNU tar specific base-256 number fields 2426 # are decoded correctly. 2427 self.assertEqual(tarfile.nti(b"0000001\x00"), 1) 2428 self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) 2429 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 2430 0o10000000) 2431 self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 2432 0xffffffff) 2433 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), 2434 -1) 2435 self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), 2436 -100) 2437 self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), 2438 -0x100000000000000) 2439 2440 # Issue 24514: Test if empty number fields are converted to zero. 2441 self.assertEqual(tarfile.nti(b"\0"), 0) 2442 self.assertEqual(tarfile.nti(b" \0"), 0) 2443 2444 def test_write_number_fields(self): 2445 self.assertEqual(tarfile.itn(1), b"0000001\x00") 2446 self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") 2447 self.assertEqual(tarfile.itn(0o10000000, format=tarfile.GNU_FORMAT), 2448 b"\x80\x00\x00\x00\x00\x20\x00\x00") 2449 self.assertEqual(tarfile.itn(0xffffffff, format=tarfile.GNU_FORMAT), 2450 b"\x80\x00\x00\x00\xff\xff\xff\xff") 2451 self.assertEqual(tarfile.itn(-1, format=tarfile.GNU_FORMAT), 2452 b"\xff\xff\xff\xff\xff\xff\xff\xff") 2453 self.assertEqual(tarfile.itn(-100, format=tarfile.GNU_FORMAT), 2454 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2455 self.assertEqual(tarfile.itn(-0x100000000000000, 2456 format=tarfile.GNU_FORMAT), 2457 b"\xff\x00\x00\x00\x00\x00\x00\x00") 2458 2459 # Issue 32713: Test if itn() supports float values outside the 2460 # non-GNU format range 2461 self.assertEqual(tarfile.itn(-100.0, format=tarfile.GNU_FORMAT), 2462 b"\xff\xff\xff\xff\xff\xff\xff\x9c") 2463 self.assertEqual(tarfile.itn(8 ** 12 + 0.0, format=tarfile.GNU_FORMAT), 2464 b"\x80\x00\x00\x10\x00\x00\x00\x00") 2465 self.assertEqual(tarfile.nti(tarfile.itn(-0.1, format=tarfile.GNU_FORMAT)), 0) 2466 2467 def test_number_field_limits(self): 2468 with self.assertRaises(ValueError): 2469 tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) 2470 with self.assertRaises(ValueError): 2471 tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) 2472 with self.assertRaises(ValueError): 2473 tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) 2474 with self.assertRaises(ValueError): 2475 tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) 2476 2477 def test__all__(self): 2478 not_exported = { 2479 'version', 'grp', 'pwd', 'symlink_exception', 'NUL', 'BLOCKSIZE', 2480 'RECORDSIZE', 'GNU_MAGIC', 'POSIX_MAGIC', 'LENGTH_NAME', 2481 'LENGTH_LINK', 'LENGTH_PREFIX', 'REGTYPE', 'AREGTYPE', 'LNKTYPE', 2482 'SYMTYPE', 'CHRTYPE', 'BLKTYPE', 'DIRTYPE', 'FIFOTYPE', 'CONTTYPE', 2483 'GNUTYPE_LONGNAME', 'GNUTYPE_LONGLINK', 'GNUTYPE_SPARSE', 2484 'XHDTYPE', 'XGLTYPE', 'SOLARIS_XHDTYPE', 'SUPPORTED_TYPES', 2485 'REGULAR_TYPES', 'GNU_TYPES', 'PAX_FIELDS', 'PAX_NAME_FIELDS', 2486 'PAX_NUMBER_FIELDS', 'stn', 'nts', 'nti', 'itn', 'calc_chksums', 2487 'copyfileobj', 'filemode', 'EmptyHeaderError', 2488 'TruncatedHeaderError', 'EOFHeaderError', 'InvalidHeaderError', 2489 'SubsequentHeaderError', 'ExFileObject', 'main', 2490 "fully_trusted_filter", "data_filter", 2491 "tar_filter", "FilterError", "AbsoluteLinkError", 2492 "OutsideDestinationError", "SpecialFileError", "AbsolutePathError", 2493 "LinkOutsideDestinationError", 2494 } 2495 support.check__all__(self, tarfile, not_exported=not_exported) 2496 2497 def test_useful_error_message_when_modules_missing(self): 2498 fname = os.path.join(os.path.dirname(__file__), 'testtar.tar.xz') 2499 with self.assertRaises(tarfile.ReadError) as excinfo: 2500 error = tarfile.CompressionError('lzma module is not available'), 2501 with unittest.mock.patch.object(tarfile.TarFile, 'xzopen', side_effect=error): 2502 tarfile.open(fname) 2503 2504 self.assertIn( 2505 "\n- method xz: CompressionError('lzma module is not available')\n", 2506 str(excinfo.exception), 2507 ) 2508 2509 2510class CommandLineTest(unittest.TestCase): 2511 2512 def tarfilecmd(self, *args, **kwargs): 2513 rc, out, err = script_helper.assert_python_ok('-m', 'tarfile', *args, 2514 **kwargs) 2515 return out.replace(os.linesep.encode(), b'\n') 2516 2517 def tarfilecmd_failure(self, *args): 2518 return script_helper.assert_python_failure('-m', 'tarfile', *args) 2519 2520 def make_simple_tarfile(self, tar_name): 2521 files = [support.findfile('tokenize_tests.txt'), 2522 support.findfile('tokenize_tests-no-coding-cookie-' 2523 'and-utf8-bom-sig-only.txt')] 2524 self.addCleanup(os_helper.unlink, tar_name) 2525 with tarfile.open(tar_name, 'w') as tf: 2526 for tardata in files: 2527 tf.add(tardata, arcname=os.path.basename(tardata)) 2528 2529 def make_evil_tarfile(self, tar_name): 2530 files = [support.findfile('tokenize_tests.txt')] 2531 self.addCleanup(os_helper.unlink, tar_name) 2532 with tarfile.open(tar_name, 'w') as tf: 2533 benign = tarfile.TarInfo('benign') 2534 tf.addfile(benign, fileobj=io.BytesIO(b'')) 2535 evil = tarfile.TarInfo('../evil') 2536 tf.addfile(evil, fileobj=io.BytesIO(b'')) 2537 2538 def test_bad_use(self): 2539 rc, out, err = self.tarfilecmd_failure() 2540 self.assertEqual(out, b'') 2541 self.assertIn(b'usage', err.lower()) 2542 self.assertIn(b'error', err.lower()) 2543 self.assertIn(b'required', err.lower()) 2544 rc, out, err = self.tarfilecmd_failure('-l', '') 2545 self.assertEqual(out, b'') 2546 self.assertNotEqual(err.strip(), b'') 2547 2548 def test_test_command(self): 2549 for tar_name in testtarnames: 2550 for opt in '-t', '--test': 2551 out = self.tarfilecmd(opt, tar_name) 2552 self.assertEqual(out, b'') 2553 2554 def test_test_command_verbose(self): 2555 for tar_name in testtarnames: 2556 for opt in '-v', '--verbose': 2557 out = self.tarfilecmd(opt, '-t', tar_name, 2558 PYTHONIOENCODING='utf-8') 2559 self.assertIn(b'is a tar archive.\n', out) 2560 2561 def test_test_command_invalid_file(self): 2562 zipname = support.findfile('zipdir.zip') 2563 rc, out, err = self.tarfilecmd_failure('-t', zipname) 2564 self.assertIn(b' is not a tar archive.', err) 2565 self.assertEqual(out, b'') 2566 self.assertEqual(rc, 1) 2567 2568 for tar_name in testtarnames: 2569 with self.subTest(tar_name=tar_name): 2570 with open(tar_name, 'rb') as f: 2571 data = f.read() 2572 try: 2573 with open(tmpname, 'wb') as f: 2574 f.write(data[:511]) 2575 rc, out, err = self.tarfilecmd_failure('-t', tmpname) 2576 self.assertEqual(out, b'') 2577 self.assertEqual(rc, 1) 2578 finally: 2579 os_helper.unlink(tmpname) 2580 2581 def test_list_command(self): 2582 for tar_name in testtarnames: 2583 with support.captured_stdout() as t: 2584 with tarfile.open(tar_name, 'r') as tf: 2585 tf.list(verbose=False) 2586 expected = t.getvalue().encode('ascii', 'backslashreplace') 2587 for opt in '-l', '--list': 2588 out = self.tarfilecmd(opt, tar_name, 2589 PYTHONIOENCODING='ascii') 2590 self.assertEqual(out, expected) 2591 2592 def test_list_command_verbose(self): 2593 for tar_name in testtarnames: 2594 with support.captured_stdout() as t: 2595 with tarfile.open(tar_name, 'r') as tf: 2596 tf.list(verbose=True) 2597 expected = t.getvalue().encode('ascii', 'backslashreplace') 2598 for opt in '-v', '--verbose': 2599 out = self.tarfilecmd(opt, '-l', tar_name, 2600 PYTHONIOENCODING='ascii') 2601 self.assertEqual(out, expected) 2602 2603 def test_list_command_invalid_file(self): 2604 zipname = support.findfile('zipdir.zip') 2605 rc, out, err = self.tarfilecmd_failure('-l', zipname) 2606 self.assertIn(b' is not a tar archive.', err) 2607 self.assertEqual(out, b'') 2608 self.assertEqual(rc, 1) 2609 2610 def test_create_command(self): 2611 files = [support.findfile('tokenize_tests.txt'), 2612 support.findfile('tokenize_tests-no-coding-cookie-' 2613 'and-utf8-bom-sig-only.txt')] 2614 for opt in '-c', '--create': 2615 try: 2616 out = self.tarfilecmd(opt, tmpname, *files) 2617 self.assertEqual(out, b'') 2618 with tarfile.open(tmpname) as tar: 2619 tar.getmembers() 2620 finally: 2621 os_helper.unlink(tmpname) 2622 2623 def test_create_command_verbose(self): 2624 files = [support.findfile('tokenize_tests.txt'), 2625 support.findfile('tokenize_tests-no-coding-cookie-' 2626 'and-utf8-bom-sig-only.txt')] 2627 for opt in '-v', '--verbose': 2628 try: 2629 out = self.tarfilecmd(opt, '-c', tmpname, *files, 2630 PYTHONIOENCODING='utf-8') 2631 self.assertIn(b' file created.', out) 2632 with tarfile.open(tmpname) as tar: 2633 tar.getmembers() 2634 finally: 2635 os_helper.unlink(tmpname) 2636 2637 def test_create_command_dotless_filename(self): 2638 files = [support.findfile('tokenize_tests.txt')] 2639 try: 2640 out = self.tarfilecmd('-c', dotlessname, *files) 2641 self.assertEqual(out, b'') 2642 with tarfile.open(dotlessname) as tar: 2643 tar.getmembers() 2644 finally: 2645 os_helper.unlink(dotlessname) 2646 2647 def test_create_command_dot_started_filename(self): 2648 tar_name = os.path.join(TEMPDIR, ".testtar") 2649 files = [support.findfile('tokenize_tests.txt')] 2650 try: 2651 out = self.tarfilecmd('-c', tar_name, *files) 2652 self.assertEqual(out, b'') 2653 with tarfile.open(tar_name) as tar: 2654 tar.getmembers() 2655 finally: 2656 os_helper.unlink(tar_name) 2657 2658 def test_create_command_compressed(self): 2659 files = [support.findfile('tokenize_tests.txt'), 2660 support.findfile('tokenize_tests-no-coding-cookie-' 2661 'and-utf8-bom-sig-only.txt')] 2662 for filetype in (GzipTest, Bz2Test, LzmaTest): 2663 if not filetype.open: 2664 continue 2665 try: 2666 tar_name = tmpname + '.' + filetype.suffix 2667 out = self.tarfilecmd('-c', tar_name, *files) 2668 with filetype.taropen(tar_name) as tar: 2669 tar.getmembers() 2670 finally: 2671 os_helper.unlink(tar_name) 2672 2673 def test_extract_command(self): 2674 self.make_simple_tarfile(tmpname) 2675 for opt in '-e', '--extract': 2676 try: 2677 with os_helper.temp_cwd(tarextdir): 2678 out = self.tarfilecmd(opt, tmpname) 2679 self.assertEqual(out, b'') 2680 finally: 2681 os_helper.rmtree(tarextdir) 2682 2683 def test_extract_command_verbose(self): 2684 self.make_simple_tarfile(tmpname) 2685 for opt in '-v', '--verbose': 2686 try: 2687 with os_helper.temp_cwd(tarextdir): 2688 out = self.tarfilecmd(opt, '-e', tmpname, 2689 PYTHONIOENCODING='utf-8') 2690 self.assertIn(b' file is extracted.', out) 2691 finally: 2692 os_helper.rmtree(tarextdir) 2693 2694 def test_extract_command_filter(self): 2695 self.make_evil_tarfile(tmpname) 2696 # Make an inner directory, so the member named '../evil' 2697 # is still extracted into `tarextdir` 2698 destdir = os.path.join(tarextdir, 'dest') 2699 os.mkdir(tarextdir) 2700 try: 2701 with os_helper.temp_cwd(destdir): 2702 self.tarfilecmd_failure('-e', tmpname, 2703 '-v', 2704 '--filter', 'data') 2705 out = self.tarfilecmd('-e', tmpname, 2706 '-v', 2707 '--filter', 'fully_trusted', 2708 PYTHONIOENCODING='utf-8') 2709 self.assertIn(b' file is extracted.', out) 2710 finally: 2711 os_helper.rmtree(tarextdir) 2712 2713 def test_extract_command_different_directory(self): 2714 self.make_simple_tarfile(tmpname) 2715 try: 2716 with os_helper.temp_cwd(tarextdir): 2717 out = self.tarfilecmd('-e', tmpname, 'spamdir') 2718 self.assertEqual(out, b'') 2719 finally: 2720 os_helper.rmtree(tarextdir) 2721 2722 def test_extract_command_invalid_file(self): 2723 zipname = support.findfile('zipdir.zip') 2724 with os_helper.temp_cwd(tarextdir): 2725 rc, out, err = self.tarfilecmd_failure('-e', zipname) 2726 self.assertIn(b' is not a tar archive.', err) 2727 self.assertEqual(out, b'') 2728 self.assertEqual(rc, 1) 2729 2730 2731class ContextManagerTest(unittest.TestCase): 2732 2733 def test_basic(self): 2734 with tarfile.open(tarname) as tar: 2735 self.assertFalse(tar.closed, "closed inside runtime context") 2736 self.assertTrue(tar.closed, "context manager failed") 2737 2738 def test_closed(self): 2739 # The __enter__() method is supposed to raise OSError 2740 # if the TarFile object is already closed. 2741 tar = tarfile.open(tarname) 2742 tar.close() 2743 with self.assertRaises(OSError): 2744 with tar: 2745 pass 2746 2747 def test_exception(self): 2748 # Test if the OSError exception is passed through properly. 2749 with self.assertRaises(Exception) as exc: 2750 with tarfile.open(tarname) as tar: 2751 raise OSError 2752 self.assertIsInstance(exc.exception, OSError, 2753 "wrong exception raised in context manager") 2754 self.assertTrue(tar.closed, "context manager failed") 2755 2756 def test_no_eof(self): 2757 # __exit__() must not write end-of-archive blocks if an 2758 # exception was raised. 2759 try: 2760 with tarfile.open(tmpname, "w") as tar: 2761 raise Exception 2762 except: 2763 pass 2764 self.assertEqual(os.path.getsize(tmpname), 0, 2765 "context manager wrote an end-of-archive block") 2766 self.assertTrue(tar.closed, "context manager failed") 2767 2768 def test_eof(self): 2769 # __exit__() must write end-of-archive blocks, i.e. call 2770 # TarFile.close() if there was no error. 2771 with tarfile.open(tmpname, "w"): 2772 pass 2773 self.assertNotEqual(os.path.getsize(tmpname), 0, 2774 "context manager wrote no end-of-archive block") 2775 2776 def test_fileobj(self): 2777 # Test that __exit__() did not close the external file 2778 # object. 2779 with open(tmpname, "wb") as fobj: 2780 try: 2781 with tarfile.open(fileobj=fobj, mode="w") as tar: 2782 raise Exception 2783 except: 2784 pass 2785 self.assertFalse(fobj.closed, "external file object was closed") 2786 self.assertTrue(tar.closed, "context manager failed") 2787 2788 2789@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") 2790class LinkEmulationTest(ReadTest, unittest.TestCase): 2791 2792 # Test for issue #8741 regression. On platforms that do not support 2793 # symbolic or hard links tarfile tries to extract these types of members 2794 # as the regular files they point to. 2795 def _test_link_extraction(self, name): 2796 self.tar.extract(name, TEMPDIR, filter='fully_trusted') 2797 with open(os.path.join(TEMPDIR, name), "rb") as f: 2798 data = f.read() 2799 self.assertEqual(sha256sum(data), sha256_regtype) 2800 2801 # See issues #1578269, #8879, and #17689 for some history on these skips 2802 @unittest.skipIf(hasattr(os.path, "islink"), 2803 "Skip emulation - has os.path.islink but not os.link") 2804 def test_hardlink_extraction1(self): 2805 self._test_link_extraction("ustar/lnktype") 2806 2807 @unittest.skipIf(hasattr(os.path, "islink"), 2808 "Skip emulation - has os.path.islink but not os.link") 2809 def test_hardlink_extraction2(self): 2810 self._test_link_extraction("./ustar/linktest2/lnktype") 2811 2812 @unittest.skipIf(hasattr(os, "symlink"), 2813 "Skip emulation if symlink exists") 2814 def test_symlink_extraction1(self): 2815 self._test_link_extraction("ustar/symtype") 2816 2817 @unittest.skipIf(hasattr(os, "symlink"), 2818 "Skip emulation if symlink exists") 2819 def test_symlink_extraction2(self): 2820 self._test_link_extraction("./ustar/linktest2/symtype") 2821 2822 2823class Bz2PartialReadTest(Bz2Test, unittest.TestCase): 2824 # Issue5068: The _BZ2Proxy.read() method loops forever 2825 # on an empty or partial bzipped file. 2826 2827 def _test_partial_input(self, mode): 2828 class MyBytesIO(io.BytesIO): 2829 hit_eof = False 2830 def read(self, n): 2831 if self.hit_eof: 2832 raise AssertionError("infinite loop detected in " 2833 "tarfile.open()") 2834 self.hit_eof = self.tell() == len(self.getvalue()) 2835 return super(MyBytesIO, self).read(n) 2836 def seek(self, *args): 2837 self.hit_eof = False 2838 return super(MyBytesIO, self).seek(*args) 2839 2840 data = bz2.compress(tarfile.TarInfo("foo").tobuf()) 2841 for x in range(len(data) + 1): 2842 try: 2843 tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode) 2844 except tarfile.ReadError: 2845 pass # we have no interest in ReadErrors 2846 2847 def test_partial_input(self): 2848 self._test_partial_input("r") 2849 2850 def test_partial_input_bz2(self): 2851 self._test_partial_input("r:bz2") 2852 2853 2854def root_is_uid_gid_0(): 2855 try: 2856 import pwd, grp 2857 except ImportError: 2858 return False 2859 if pwd.getpwuid(0)[0] != 'root': 2860 return False 2861 if grp.getgrgid(0)[0] != 'root': 2862 return False 2863 return True 2864 2865 2866@unittest.skipUnless(hasattr(os, 'chown'), "missing os.chown") 2867@unittest.skipUnless(hasattr(os, 'geteuid'), "missing os.geteuid") 2868class NumericOwnerTest(unittest.TestCase): 2869 # mock the following: 2870 # os.chown: so we can test what's being called 2871 # os.chmod: so the modes are not actually changed. if they are, we can't 2872 # delete the files/directories 2873 # os.geteuid: so we can lie and say we're root (uid = 0) 2874 2875 @staticmethod 2876 def _make_test_archive(filename_1, dirname_1, filename_2): 2877 # the file contents to write 2878 fobj = io.BytesIO(b"content") 2879 2880 # create a tar file with a file, a directory, and a file within that 2881 # directory. Assign various .uid/.gid values to them 2882 items = [(filename_1, 99, 98, tarfile.REGTYPE, fobj), 2883 (dirname_1, 77, 76, tarfile.DIRTYPE, None), 2884 (filename_2, 88, 87, tarfile.REGTYPE, fobj), 2885 ] 2886 with tarfile.open(tmpname, 'w') as tarfl: 2887 for name, uid, gid, typ, contents in items: 2888 t = tarfile.TarInfo(name) 2889 t.uid = uid 2890 t.gid = gid 2891 t.uname = 'root' 2892 t.gname = 'root' 2893 t.type = typ 2894 tarfl.addfile(t, contents) 2895 2896 # return the full pathname to the tar file 2897 return tmpname 2898 2899 @staticmethod 2900 @contextmanager 2901 def _setup_test(mock_geteuid): 2902 mock_geteuid.return_value = 0 # lie and say we're root 2903 fname = 'numeric-owner-testfile' 2904 dirname = 'dir' 2905 2906 # the names we want stored in the tarfile 2907 filename_1 = fname 2908 dirname_1 = dirname 2909 filename_2 = os.path.join(dirname, fname) 2910 2911 # create the tarfile with the contents we're after 2912 tar_filename = NumericOwnerTest._make_test_archive(filename_1, 2913 dirname_1, 2914 filename_2) 2915 2916 # open the tarfile for reading. yield it and the names of the items 2917 # we stored into the file 2918 with tarfile.open(tar_filename) as tarfl: 2919 yield tarfl, filename_1, dirname_1, filename_2 2920 2921 @unittest.mock.patch('os.chown') 2922 @unittest.mock.patch('os.chmod') 2923 @unittest.mock.patch('os.geteuid') 2924 def test_extract_with_numeric_owner(self, mock_geteuid, mock_chmod, 2925 mock_chown): 2926 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, 2927 filename_2): 2928 tarfl.extract(filename_1, TEMPDIR, numeric_owner=True, 2929 filter='fully_trusted') 2930 tarfl.extract(filename_2 , TEMPDIR, numeric_owner=True, 2931 filter='fully_trusted') 2932 2933 # convert to filesystem paths 2934 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2935 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2936 2937 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2938 unittest.mock.call(f_filename_2, 88, 87), 2939 ], 2940 any_order=True) 2941 2942 @unittest.mock.patch('os.chown') 2943 @unittest.mock.patch('os.chmod') 2944 @unittest.mock.patch('os.geteuid') 2945 def test_extractall_with_numeric_owner(self, mock_geteuid, mock_chmod, 2946 mock_chown): 2947 with self._setup_test(mock_geteuid) as (tarfl, filename_1, dirname_1, 2948 filename_2): 2949 tarfl.extractall(TEMPDIR, numeric_owner=True, 2950 filter='fully_trusted') 2951 2952 # convert to filesystem paths 2953 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2954 f_dirname_1 = os.path.join(TEMPDIR, dirname_1) 2955 f_filename_2 = os.path.join(TEMPDIR, filename_2) 2956 2957 mock_chown.assert_has_calls([unittest.mock.call(f_filename_1, 99, 98), 2958 unittest.mock.call(f_dirname_1, 77, 76), 2959 unittest.mock.call(f_filename_2, 88, 87), 2960 ], 2961 any_order=True) 2962 2963 # this test requires that uid=0 and gid=0 really be named 'root'. that's 2964 # because the uname and gname in the test file are 'root', and extract() 2965 # will look them up using pwd and grp to find their uid and gid, which we 2966 # test here to be 0. 2967 @unittest.skipUnless(root_is_uid_gid_0(), 2968 'uid=0,gid=0 must be named "root"') 2969 @unittest.mock.patch('os.chown') 2970 @unittest.mock.patch('os.chmod') 2971 @unittest.mock.patch('os.geteuid') 2972 def test_extract_without_numeric_owner(self, mock_geteuid, mock_chmod, 2973 mock_chown): 2974 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2975 tarfl.extract(filename_1, TEMPDIR, numeric_owner=False, 2976 filter='fully_trusted') 2977 2978 # convert to filesystem paths 2979 f_filename_1 = os.path.join(TEMPDIR, filename_1) 2980 2981 mock_chown.assert_called_with(f_filename_1, 0, 0) 2982 2983 @unittest.mock.patch('os.geteuid') 2984 def test_keyword_only(self, mock_geteuid): 2985 with self._setup_test(mock_geteuid) as (tarfl, filename_1, _, _): 2986 self.assertRaises(TypeError, 2987 tarfl.extract, filename_1, TEMPDIR, False, True) 2988 2989 2990class ReplaceTests(ReadTest, unittest.TestCase): 2991 def test_replace_name(self): 2992 member = self.tar.getmember('ustar/regtype') 2993 replaced = member.replace(name='misc/other') 2994 self.assertEqual(replaced.name, 'misc/other') 2995 self.assertEqual(member.name, 'ustar/regtype') 2996 self.assertEqual(self.tar.getmember('ustar/regtype').name, 2997 'ustar/regtype') 2998 2999 def test_replace_deep(self): 3000 member = self.tar.getmember('pax/regtype1') 3001 replaced = member.replace() 3002 replaced.pax_headers['gname'] = 'not-bar' 3003 self.assertEqual(member.pax_headers['gname'], 'bar') 3004 self.assertEqual( 3005 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'bar') 3006 3007 def test_replace_shallow(self): 3008 member = self.tar.getmember('pax/regtype1') 3009 replaced = member.replace(deep=False) 3010 replaced.pax_headers['gname'] = 'not-bar' 3011 self.assertEqual(member.pax_headers['gname'], 'not-bar') 3012 self.assertEqual( 3013 self.tar.getmember('pax/regtype1').pax_headers['gname'], 'not-bar') 3014 3015 def test_replace_all(self): 3016 member = self.tar.getmember('ustar/regtype') 3017 for attr_name in ('name', 'mtime', 'mode', 'linkname', 3018 'uid', 'gid', 'uname', 'gname'): 3019 with self.subTest(attr_name=attr_name): 3020 replaced = member.replace(**{attr_name: None}) 3021 self.assertEqual(getattr(replaced, attr_name), None) 3022 self.assertNotEqual(getattr(member, attr_name), None) 3023 3024 def test_replace_internal(self): 3025 member = self.tar.getmember('ustar/regtype') 3026 with self.assertRaises(TypeError): 3027 member.replace(offset=123456789) 3028 3029 3030class NoneInfoExtractTests(ReadTest): 3031 # These mainly check that all kinds of members are extracted successfully 3032 # if some metadata is None. 3033 # Some of the methods do additional spot checks. 3034 3035 # We also test that the default filters can deal with None. 3036 3037 extraction_filter = None 3038 3039 @classmethod 3040 def setUpClass(cls): 3041 tar = tarfile.open(tarname, mode='r', encoding="iso8859-1") 3042 cls.control_dir = pathlib.Path(TEMPDIR) / "extractall_ctrl" 3043 tar.errorlevel = 0 3044 tar.extractall(cls.control_dir, filter=cls.extraction_filter) 3045 tar.close() 3046 cls.control_paths = set( 3047 p.relative_to(cls.control_dir) 3048 for p in pathlib.Path(cls.control_dir).glob('**/*')) 3049 3050 @classmethod 3051 def tearDownClass(cls): 3052 shutil.rmtree(cls.control_dir) 3053 3054 def check_files_present(self, directory): 3055 got_paths = set( 3056 p.relative_to(directory) 3057 for p in pathlib.Path(directory).glob('**/*')) 3058 self.assertEqual(self.control_paths, got_paths) 3059 3060 @contextmanager 3061 def extract_with_none(self, *attr_names): 3062 DIR = pathlib.Path(TEMPDIR) / "extractall_none" 3063 self.tar.errorlevel = 0 3064 for member in self.tar.getmembers(): 3065 for attr_name in attr_names: 3066 setattr(member, attr_name, None) 3067 with os_helper.temp_dir(DIR): 3068 self.tar.extractall(DIR, filter='fully_trusted') 3069 self.check_files_present(DIR) 3070 yield DIR 3071 3072 def test_extractall_none_mtime(self): 3073 # mtimes of extracted files should be later than 'now' -- the mtime 3074 # of a previously created directory. 3075 now = pathlib.Path(TEMPDIR).stat().st_mtime 3076 with self.extract_with_none('mtime') as DIR: 3077 for path in pathlib.Path(DIR).glob('**/*'): 3078 with self.subTest(path=path): 3079 try: 3080 mtime = path.stat().st_mtime 3081 except OSError: 3082 # Some systems can't stat symlinks, ignore those 3083 if not path.is_symlink(): 3084 raise 3085 else: 3086 self.assertGreaterEqual(path.stat().st_mtime, now) 3087 3088 def test_extractall_none_mode(self): 3089 # modes of directories and regular files should match the mode 3090 # of a "normally" created directory or regular file 3091 dir_mode = pathlib.Path(TEMPDIR).stat().st_mode 3092 regular_file = pathlib.Path(TEMPDIR) / 'regular_file' 3093 regular_file.write_text('') 3094 regular_file_mode = regular_file.stat().st_mode 3095 with self.extract_with_none('mode') as DIR: 3096 for path in pathlib.Path(DIR).glob('**/*'): 3097 with self.subTest(path=path): 3098 if path.is_dir(): 3099 self.assertEqual(path.stat().st_mode, dir_mode) 3100 elif path.is_file(): 3101 self.assertEqual(path.stat().st_mode, 3102 regular_file_mode) 3103 3104 def test_extractall_none_uid(self): 3105 with self.extract_with_none('uid'): 3106 pass 3107 3108 def test_extractall_none_gid(self): 3109 with self.extract_with_none('gid'): 3110 pass 3111 3112 def test_extractall_none_uname(self): 3113 with self.extract_with_none('uname'): 3114 pass 3115 3116 def test_extractall_none_gname(self): 3117 with self.extract_with_none('gname'): 3118 pass 3119 3120 def test_extractall_none_ownership(self): 3121 with self.extract_with_none('uid', 'gid', 'uname', 'gname'): 3122 pass 3123 3124class NoneInfoExtractTests_Data(NoneInfoExtractTests, unittest.TestCase): 3125 extraction_filter = 'data' 3126 3127class NoneInfoExtractTests_FullyTrusted(NoneInfoExtractTests, 3128 unittest.TestCase): 3129 extraction_filter = 'fully_trusted' 3130 3131class NoneInfoExtractTests_Tar(NoneInfoExtractTests, unittest.TestCase): 3132 extraction_filter = 'tar' 3133 3134class NoneInfoExtractTests_Default(NoneInfoExtractTests, 3135 unittest.TestCase): 3136 extraction_filter = None 3137 3138class NoneInfoTests_Misc(unittest.TestCase): 3139 def test_add(self): 3140 # When addfile() encounters None metadata, it raises a ValueError 3141 bio = io.BytesIO() 3142 for tarformat in (tarfile.USTAR_FORMAT, tarfile.GNU_FORMAT, 3143 tarfile.PAX_FORMAT): 3144 with self.subTest(tarformat=tarformat): 3145 tar = tarfile.open(fileobj=bio, mode='w', format=tarformat) 3146 tarinfo = tar.gettarinfo(tarname) 3147 try: 3148 tar.addfile(tarinfo) 3149 except Exception: 3150 if tarformat == tarfile.USTAR_FORMAT: 3151 # In the old, limited format, adding might fail for 3152 # reasons like the UID being too large 3153 pass 3154 else: 3155 raise 3156 else: 3157 for attr_name in ('mtime', 'mode', 'uid', 'gid', 3158 'uname', 'gname'): 3159 with self.subTest(attr_name=attr_name): 3160 replaced = tarinfo.replace(**{attr_name: None}) 3161 with self.assertRaisesRegex(ValueError, 3162 f"{attr_name}"): 3163 tar.addfile(replaced) 3164 3165 def test_list(self): 3166 # Change some metadata to None, then compare list() output 3167 # word-for-word. We want list() to not raise, and to only change 3168 # printout for the affected piece of metadata. 3169 # (n.b.: some contents of the test archive are hardcoded.) 3170 for attr_names in ({'mtime'}, {'mode'}, {'uid'}, {'gid'}, 3171 {'uname'}, {'gname'}, 3172 {'uid', 'uname'}, {'gid', 'gname'}): 3173 with (self.subTest(attr_names=attr_names), 3174 tarfile.open(tarname, encoding="iso8859-1") as tar): 3175 tio_prev = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3176 with support.swap_attr(sys, 'stdout', tio_prev): 3177 tar.list() 3178 for member in tar.getmembers(): 3179 for attr_name in attr_names: 3180 setattr(member, attr_name, None) 3181 tio_new = io.TextIOWrapper(io.BytesIO(), 'ascii', newline='\n') 3182 with support.swap_attr(sys, 'stdout', tio_new): 3183 tar.list() 3184 for expected, got in zip(tio_prev.detach().getvalue().split(), 3185 tio_new.detach().getvalue().split()): 3186 if attr_names == {'mtime'} and re.match(rb'2003-01-\d\d', expected): 3187 self.assertEqual(got, b'????-??-??') 3188 elif attr_names == {'mtime'} and re.match(rb'\d\d:\d\d:\d\d', expected): 3189 self.assertEqual(got, b'??:??:??') 3190 elif attr_names == {'mode'} and re.match( 3191 rb'.([r-][w-][x-]){3}', expected): 3192 self.assertEqual(got, b'??????????') 3193 elif attr_names == {'uname'} and expected.startswith( 3194 (b'tarfile/', b'lars/', b'foo/')): 3195 exp_user, exp_group = expected.split(b'/') 3196 got_user, got_group = got.split(b'/') 3197 self.assertEqual(got_group, exp_group) 3198 self.assertRegex(got_user, b'[0-9]+') 3199 elif attr_names == {'gname'} and expected.endswith( 3200 (b'/tarfile', b'/users', b'/bar')): 3201 exp_user, exp_group = expected.split(b'/') 3202 got_user, got_group = got.split(b'/') 3203 self.assertEqual(got_user, exp_user) 3204 self.assertRegex(got_group, b'[0-9]+') 3205 elif attr_names == {'uid'} and expected.startswith( 3206 (b'1000/')): 3207 exp_user, exp_group = expected.split(b'/') 3208 got_user, got_group = got.split(b'/') 3209 self.assertEqual(got_group, exp_group) 3210 self.assertEqual(got_user, b'None') 3211 elif attr_names == {'gid'} and expected.endswith((b'/100')): 3212 exp_user, exp_group = expected.split(b'/') 3213 got_user, got_group = got.split(b'/') 3214 self.assertEqual(got_user, exp_user) 3215 self.assertEqual(got_group, b'None') 3216 elif attr_names == {'uid', 'uname'} and expected.startswith( 3217 (b'tarfile/', b'lars/', b'foo/', b'1000/')): 3218 exp_user, exp_group = expected.split(b'/') 3219 got_user, got_group = got.split(b'/') 3220 self.assertEqual(got_group, exp_group) 3221 self.assertEqual(got_user, b'None') 3222 elif attr_names == {'gname', 'gid'} and expected.endswith( 3223 (b'/tarfile', b'/users', b'/bar', b'/100')): 3224 exp_user, exp_group = expected.split(b'/') 3225 got_user, got_group = got.split(b'/') 3226 self.assertEqual(got_user, exp_user) 3227 self.assertEqual(got_group, b'None') 3228 else: 3229 # In other cases the output should be the same 3230 self.assertEqual(expected, got) 3231 3232def _filemode_to_int(mode): 3233 """Inverse of `stat.filemode` (for permission bits) 3234 3235 Using mode strings rather than numbers makes the later tests more readable. 3236 """ 3237 str_mode = mode[1:] 3238 result = ( 3239 {'r': stat.S_IRUSR, '-': 0}[str_mode[0]] 3240 | {'w': stat.S_IWUSR, '-': 0}[str_mode[1]] 3241 | {'x': stat.S_IXUSR, '-': 0, 3242 's': stat.S_IXUSR | stat.S_ISUID, 3243 'S': stat.S_ISUID}[str_mode[2]] 3244 | {'r': stat.S_IRGRP, '-': 0}[str_mode[3]] 3245 | {'w': stat.S_IWGRP, '-': 0}[str_mode[4]] 3246 | {'x': stat.S_IXGRP, '-': 0, 3247 's': stat.S_IXGRP | stat.S_ISGID, 3248 'S': stat.S_ISGID}[str_mode[5]] 3249 | {'r': stat.S_IROTH, '-': 0}[str_mode[6]] 3250 | {'w': stat.S_IWOTH, '-': 0}[str_mode[7]] 3251 | {'x': stat.S_IXOTH, '-': 0, 3252 't': stat.S_IXOTH | stat.S_ISVTX, 3253 'T': stat.S_ISVTX}[str_mode[8]] 3254 ) 3255 # check we did this right 3256 assert stat.filemode(result)[1:] == mode[1:] 3257 3258 return result 3259 3260class ArchiveMaker: 3261 """Helper to create a tar file with specific contents 3262 3263 Usage: 3264 3265 with ArchiveMaker() as t: 3266 t.add('filename', ...) 3267 3268 with t.open() as tar: 3269 ... # `tar` is now a TarFile with 'filename' in it! 3270 """ 3271 def __init__(self): 3272 self.bio = io.BytesIO() 3273 3274 def __enter__(self): 3275 self.tar_w = tarfile.TarFile(mode='w', fileobj=self.bio) 3276 return self 3277 3278 def __exit__(self, *exc): 3279 self.tar_w.close() 3280 self.contents = self.bio.getvalue() 3281 self.bio = None 3282 3283 def add(self, name, *, type=None, symlink_to=None, hardlink_to=None, 3284 mode=None, **kwargs): 3285 """Add a member to the test archive. Call within `with`.""" 3286 name = str(name) 3287 tarinfo = tarfile.TarInfo(name).replace(**kwargs) 3288 if mode: 3289 tarinfo.mode = _filemode_to_int(mode) 3290 if symlink_to is not None: 3291 type = tarfile.SYMTYPE 3292 tarinfo.linkname = str(symlink_to) 3293 if hardlink_to is not None: 3294 type = tarfile.LNKTYPE 3295 tarinfo.linkname = str(hardlink_to) 3296 if name.endswith('/') and type is None: 3297 type = tarfile.DIRTYPE 3298 if type is not None: 3299 tarinfo.type = type 3300 if tarinfo.isreg(): 3301 fileobj = io.BytesIO(bytes(tarinfo.size)) 3302 else: 3303 fileobj = None 3304 self.tar_w.addfile(tarinfo, fileobj) 3305 3306 def open(self, **kwargs): 3307 """Open the resulting archive as TarFile. Call after `with`.""" 3308 bio = io.BytesIO(self.contents) 3309 return tarfile.open(fileobj=bio, **kwargs) 3310 3311# Under WASI, `os_helper.can_symlink` is False to make 3312# `skip_unless_symlink` skip symlink tests. " 3313# But in the following tests we use can_symlink to *determine* which 3314# behavior is expected. 3315# Like other symlink tests, skip these on WASI for now. 3316if support.is_wasi: 3317 def symlink_test(f): 3318 return unittest.skip("WASI: Skip symlink test for now")(f) 3319else: 3320 def symlink_test(f): 3321 return f 3322 3323 3324class TestExtractionFilters(unittest.TestCase): 3325 3326 # A temporary directory for the extraction results. 3327 # All files that "escape" the destination path should still end 3328 # up in this directory. 3329 outerdir = pathlib.Path(TEMPDIR) / 'outerdir' 3330 3331 # The destination for the extraction, within `outerdir` 3332 destdir = outerdir / 'dest' 3333 3334 @contextmanager 3335 def check_context(self, tar, filter): 3336 """Extracts `tar` to `self.destdir` and allows checking the result 3337 3338 If an error occurs, it must be checked using `expect_exception` 3339 3340 Otherwise, all resulting files must be checked using `expect_file`, 3341 except the destination directory itself and parent directories of 3342 other files. 3343 When checking directories, do so before their contents. 3344 """ 3345 with os_helper.temp_dir(self.outerdir): 3346 try: 3347 tar.extractall(self.destdir, filter=filter) 3348 except Exception as exc: 3349 self.raised_exception = exc 3350 self.expected_paths = set() 3351 else: 3352 self.raised_exception = None 3353 self.expected_paths = set(self.outerdir.glob('**/*')) 3354 self.expected_paths.discard(self.destdir) 3355 try: 3356 yield 3357 finally: 3358 tar.close() 3359 if self.raised_exception: 3360 raise self.raised_exception 3361 self.assertEqual(self.expected_paths, set()) 3362 3363 def expect_file(self, name, type=None, symlink_to=None, mode=None): 3364 """Check a single file. See check_context.""" 3365 if self.raised_exception: 3366 raise self.raised_exception 3367 # use normpath() rather than resolve() so we don't follow symlinks 3368 path = pathlib.Path(os.path.normpath(self.destdir / name)) 3369 self.assertIn(path, self.expected_paths) 3370 self.expected_paths.remove(path) 3371 if mode is not None and os_helper.can_chmod(): 3372 got = stat.filemode(stat.S_IMODE(path.stat().st_mode)) 3373 self.assertEqual(got, mode) 3374 if type is None and isinstance(name, str) and name.endswith('/'): 3375 type = tarfile.DIRTYPE 3376 if symlink_to is not None: 3377 got = (self.destdir / name).readlink() 3378 expected = pathlib.Path(symlink_to) 3379 # The symlink might be the same (textually) as what we expect, 3380 # but some systems change the link to an equivalent path, so 3381 # we fall back to samefile(). 3382 if expected != got: 3383 self.assertTrue(got.samefile(expected)) 3384 elif type == tarfile.REGTYPE or type is None: 3385 self.assertTrue(path.is_file()) 3386 elif type == tarfile.DIRTYPE: 3387 self.assertTrue(path.is_dir()) 3388 elif type == tarfile.FIFOTYPE: 3389 self.assertTrue(path.is_fifo()) 3390 else: 3391 raise NotImplementedError(type) 3392 for parent in path.parents: 3393 self.expected_paths.discard(parent) 3394 3395 def expect_exception(self, exc_type, message_re='.'): 3396 with self.assertRaisesRegex(exc_type, message_re): 3397 if self.raised_exception is not None: 3398 raise self.raised_exception 3399 self.raised_exception = None 3400 3401 def test_benign_file(self): 3402 with ArchiveMaker() as arc: 3403 arc.add('benign.txt') 3404 for filter in 'fully_trusted', 'tar', 'data': 3405 with self.check_context(arc.open(), filter): 3406 self.expect_file('benign.txt') 3407 3408 def test_absolute(self): 3409 # Test handling a member with an absolute path 3410 # Inspired by 'absolute1' in https://github.com/jwilk/traversal-archives 3411 with ArchiveMaker() as arc: 3412 arc.add(self.outerdir / 'escaped.evil') 3413 3414 with self.check_context(arc.open(), 'fully_trusted'): 3415 self.expect_file('../escaped.evil') 3416 3417 for filter in 'tar', 'data': 3418 with self.check_context(arc.open(), filter): 3419 if str(self.outerdir).startswith('/'): 3420 # We strip leading slashes, as e.g. GNU tar does 3421 # (without --absolute-filenames). 3422 outerdir_stripped = str(self.outerdir).lstrip('/') 3423 self.expect_file(f'{outerdir_stripped}/escaped.evil') 3424 else: 3425 # On this system, absolute paths don't have leading 3426 # slashes. 3427 # So, there's nothing to strip. We refuse to unpack 3428 # to an absolute path, nonetheless. 3429 self.expect_exception( 3430 tarfile.AbsolutePathError, 3431 """['"].*escaped.evil['"] has an absolute path""") 3432 3433 @symlink_test 3434 def test_parent_symlink(self): 3435 # Test interplaying symlinks 3436 # Inspired by 'dirsymlink2a' in jwilk/traversal-archives 3437 with ArchiveMaker() as arc: 3438 arc.add('current', symlink_to='.') 3439 arc.add('parent', symlink_to='current/..') 3440 arc.add('parent/evil') 3441 3442 if os_helper.can_symlink(): 3443 with self.check_context(arc.open(), 'fully_trusted'): 3444 if self.raised_exception is not None: 3445 # Windows will refuse to create a file that's a symlink to itself 3446 # (and tarfile doesn't swallow that exception) 3447 self.expect_exception(FileExistsError) 3448 # The other cases will fail with this error too. 3449 # Skip the rest of this test. 3450 return 3451 else: 3452 self.expect_file('current', symlink_to='.') 3453 self.expect_file('parent', symlink_to='current/..') 3454 self.expect_file('../evil') 3455 3456 with self.check_context(arc.open(), 'tar'): 3457 self.expect_exception( 3458 tarfile.OutsideDestinationError, 3459 """'parent/evil' would be extracted to ['"].*evil['"], """ 3460 + "which is outside the destination") 3461 3462 with self.check_context(arc.open(), 'data'): 3463 self.expect_exception( 3464 tarfile.LinkOutsideDestinationError, 3465 """'parent' would link to ['"].*outerdir['"], """ 3466 + "which is outside the destination") 3467 3468 else: 3469 # No symlink support. The symlinks are ignored. 3470 with self.check_context(arc.open(), 'fully_trusted'): 3471 self.expect_file('parent/evil') 3472 with self.check_context(arc.open(), 'tar'): 3473 self.expect_file('parent/evil') 3474 with self.check_context(arc.open(), 'data'): 3475 self.expect_file('parent/evil') 3476 3477 @symlink_test 3478 def test_parent_symlink2(self): 3479 # Test interplaying symlinks 3480 # Inspired by 'dirsymlink2b' in jwilk/traversal-archives 3481 with ArchiveMaker() as arc: 3482 arc.add('current', symlink_to='.') 3483 arc.add('current/parent', symlink_to='..') 3484 arc.add('parent/evil') 3485 3486 with self.check_context(arc.open(), 'fully_trusted'): 3487 if os_helper.can_symlink(): 3488 self.expect_file('current', symlink_to='.') 3489 self.expect_file('parent', symlink_to='..') 3490 self.expect_file('../evil') 3491 else: 3492 self.expect_file('current/') 3493 self.expect_file('parent/evil') 3494 3495 with self.check_context(arc.open(), 'tar'): 3496 if os_helper.can_symlink(): 3497 self.expect_exception( 3498 tarfile.OutsideDestinationError, 3499 "'parent/evil' would be extracted to " 3500 + """['"].*evil['"], which is outside """ 3501 + "the destination") 3502 else: 3503 self.expect_file('current/') 3504 self.expect_file('parent/evil') 3505 3506 with self.check_context(arc.open(), 'data'): 3507 self.expect_exception( 3508 tarfile.LinkOutsideDestinationError, 3509 """'current/parent' would link to ['"].*['"], """ 3510 + "which is outside the destination") 3511 3512 @symlink_test 3513 def test_absolute_symlink(self): 3514 # Test symlink to an absolute path 3515 # Inspired by 'dirsymlink' in jwilk/traversal-archives 3516 with ArchiveMaker() as arc: 3517 arc.add('parent', symlink_to=self.outerdir) 3518 arc.add('parent/evil') 3519 3520 with self.check_context(arc.open(), 'fully_trusted'): 3521 if os_helper.can_symlink(): 3522 self.expect_file('parent', symlink_to=self.outerdir) 3523 self.expect_file('../evil') 3524 else: 3525 self.expect_file('parent/evil') 3526 3527 with self.check_context(arc.open(), 'tar'): 3528 if os_helper.can_symlink(): 3529 self.expect_exception( 3530 tarfile.OutsideDestinationError, 3531 "'parent/evil' would be extracted to " 3532 + """['"].*evil['"], which is outside """ 3533 + "the destination") 3534 else: 3535 self.expect_file('parent/evil') 3536 3537 with self.check_context(arc.open(), 'data'): 3538 self.expect_exception( 3539 tarfile.AbsoluteLinkError, 3540 "'parent' is a symlink to an absolute path") 3541 3542 @symlink_test 3543 def test_sly_relative0(self): 3544 # Inspired by 'relative0' in jwilk/traversal-archives 3545 with ArchiveMaker() as arc: 3546 arc.add('../moo', symlink_to='..//tmp/moo') 3547 3548 try: 3549 with self.check_context(arc.open(), filter='fully_trusted'): 3550 if os_helper.can_symlink(): 3551 if isinstance(self.raised_exception, FileExistsError): 3552 # XXX TarFile happens to fail creating a parent 3553 # directory. 3554 # This might be a bug, but fixing it would hurt 3555 # security. 3556 # Note that e.g. GNU `tar` rejects '..' components, 3557 # so you could argue this is an invalid archive and we 3558 # just raise an bad type of exception. 3559 self.expect_exception(FileExistsError) 3560 else: 3561 self.expect_file('../moo', symlink_to='..//tmp/moo') 3562 else: 3563 # The symlink can't be extracted and is ignored 3564 pass 3565 except FileExistsError: 3566 pass 3567 3568 for filter in 'tar', 'data': 3569 with self.check_context(arc.open(), filter): 3570 self.expect_exception( 3571 tarfile.OutsideDestinationError, 3572 "'../moo' would be extracted to " 3573 + "'.*moo', which is outside " 3574 + "the destination") 3575 3576 @symlink_test 3577 def test_sly_relative2(self): 3578 # Inspired by 'relative2' in jwilk/traversal-archives 3579 with ArchiveMaker() as arc: 3580 arc.add('tmp/') 3581 arc.add('tmp/../../moo', symlink_to='tmp/../..//tmp/moo') 3582 3583 with self.check_context(arc.open(), 'fully_trusted'): 3584 self.expect_file('tmp', type=tarfile.DIRTYPE) 3585 if os_helper.can_symlink(): 3586 self.expect_file('../moo', symlink_to='tmp/../../tmp/moo') 3587 3588 for filter in 'tar', 'data': 3589 with self.check_context(arc.open(), filter): 3590 self.expect_exception( 3591 tarfile.OutsideDestinationError, 3592 "'tmp/../../moo' would be extracted to " 3593 + """['"].*moo['"], which is outside the """ 3594 + "destination") 3595 3596 def test_modes(self): 3597 # Test how file modes are extracted 3598 # (Note that the modes are ignored on platforms without working chmod) 3599 with ArchiveMaker() as arc: 3600 arc.add('all_bits', mode='?rwsrwsrwt') 3601 arc.add('perm_bits', mode='?rwxrwxrwx') 3602 arc.add('exec_group_other', mode='?rw-rwxrwx') 3603 arc.add('read_group_only', mode='?---r-----') 3604 arc.add('no_bits', mode='?---------') 3605 arc.add('dir/', mode='?---rwsrwt') 3606 3607 # On some systems, setting the sticky bit is a no-op. 3608 # Check if that's the case. 3609 tmp_filename = os.path.join(TEMPDIR, "tmp.file") 3610 with open(tmp_filename, 'w'): 3611 pass 3612 os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) 3613 have_sticky_files = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) 3614 os.unlink(tmp_filename) 3615 3616 os.mkdir(tmp_filename) 3617 os.chmod(tmp_filename, os.stat(tmp_filename).st_mode | stat.S_ISVTX) 3618 have_sticky_dirs = (os.stat(tmp_filename).st_mode & stat.S_ISVTX) 3619 os.rmdir(tmp_filename) 3620 3621 with self.check_context(arc.open(), 'fully_trusted'): 3622 if have_sticky_files: 3623 self.expect_file('all_bits', mode='?rwsrwsrwt') 3624 else: 3625 self.expect_file('all_bits', mode='?rwsrwsrwx') 3626 self.expect_file('perm_bits', mode='?rwxrwxrwx') 3627 self.expect_file('exec_group_other', mode='?rw-rwxrwx') 3628 self.expect_file('read_group_only', mode='?---r-----') 3629 self.expect_file('no_bits', mode='?---------') 3630 if have_sticky_dirs: 3631 self.expect_file('dir/', mode='?---rwsrwt') 3632 else: 3633 self.expect_file('dir/', mode='?---rwsrwx') 3634 3635 with self.check_context(arc.open(), 'tar'): 3636 self.expect_file('all_bits', mode='?rwxr-xr-x') 3637 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3638 self.expect_file('exec_group_other', mode='?rw-r-xr-x') 3639 self.expect_file('read_group_only', mode='?---r-----') 3640 self.expect_file('no_bits', mode='?---------') 3641 self.expect_file('dir/', mode='?---r-xr-x') 3642 3643 with self.check_context(arc.open(), 'data'): 3644 normal_dir_mode = stat.filemode(stat.S_IMODE( 3645 self.outerdir.stat().st_mode)) 3646 self.expect_file('all_bits', mode='?rwxr-xr-x') 3647 self.expect_file('perm_bits', mode='?rwxr-xr-x') 3648 self.expect_file('exec_group_other', mode='?rw-r--r--') 3649 self.expect_file('read_group_only', mode='?rw-r-----') 3650 self.expect_file('no_bits', mode='?rw-------') 3651 self.expect_file('dir/', mode=normal_dir_mode) 3652 3653 def test_pipe(self): 3654 # Test handling of a special file 3655 with ArchiveMaker() as arc: 3656 arc.add('foo', type=tarfile.FIFOTYPE) 3657 3658 for filter in 'fully_trusted', 'tar': 3659 with self.check_context(arc.open(), filter): 3660 if hasattr(os, 'mkfifo'): 3661 self.expect_file('foo', type=tarfile.FIFOTYPE) 3662 else: 3663 # The pipe can't be extracted and is skipped. 3664 pass 3665 3666 with self.check_context(arc.open(), 'data'): 3667 self.expect_exception( 3668 tarfile.SpecialFileError, 3669 "'foo' is a special file") 3670 3671 def test_special_files(self): 3672 # Creating device files is tricky. Instead of attempting that let's 3673 # only check the filter result. 3674 for special_type in tarfile.FIFOTYPE, tarfile.CHRTYPE, tarfile.BLKTYPE: 3675 tarinfo = tarfile.TarInfo('foo') 3676 tarinfo.type = special_type 3677 trusted = tarfile.fully_trusted_filter(tarinfo, '') 3678 self.assertIs(trusted, tarinfo) 3679 tar = tarfile.tar_filter(tarinfo, '') 3680 self.assertEqual(tar.type, special_type) 3681 with self.assertRaises(tarfile.SpecialFileError) as cm: 3682 tarfile.data_filter(tarinfo, '') 3683 self.assertIsInstance(cm.exception.tarinfo, tarfile.TarInfo) 3684 self.assertEqual(cm.exception.tarinfo.name, 'foo') 3685 3686 def test_fully_trusted_filter(self): 3687 # The 'fully_trusted' filter returns the original TarInfo objects. 3688 with tarfile.TarFile.open(tarname) as tar: 3689 for tarinfo in tar.getmembers(): 3690 filtered = tarfile.fully_trusted_filter(tarinfo, '') 3691 self.assertIs(filtered, tarinfo) 3692 3693 def test_tar_filter(self): 3694 # The 'tar' filter returns TarInfo objects with the same name/type. 3695 # (It can also fail for particularly "evil" input, but we don't have 3696 # that in the test archive.) 3697 with tarfile.TarFile.open(tarname) as tar: 3698 for tarinfo in tar.getmembers(): 3699 filtered = tarfile.tar_filter(tarinfo, '') 3700 self.assertIs(filtered.name, tarinfo.name) 3701 self.assertIs(filtered.type, tarinfo.type) 3702 3703 def test_data_filter(self): 3704 # The 'data' filter either raises, or returns TarInfo with the same 3705 # name/type. 3706 with tarfile.TarFile.open(tarname) as tar: 3707 for tarinfo in tar.getmembers(): 3708 try: 3709 filtered = tarfile.data_filter(tarinfo, '') 3710 except tarfile.FilterError: 3711 continue 3712 self.assertIs(filtered.name, tarinfo.name) 3713 self.assertIs(filtered.type, tarinfo.type) 3714 3715 def test_default_filter_warns_not(self): 3716 """Ensure the default filter does not warn (like in 3.12)""" 3717 with ArchiveMaker() as arc: 3718 arc.add('foo') 3719 with warnings_helper.check_no_warnings(self): 3720 with self.check_context(arc.open(), None): 3721 self.expect_file('foo') 3722 3723 def test_change_default_filter_on_instance(self): 3724 tar = tarfile.TarFile(tarname, 'r') 3725 def strict_filter(tarinfo, path): 3726 if tarinfo.name == 'ustar/regtype': 3727 return tarinfo 3728 else: 3729 return None 3730 tar.extraction_filter = strict_filter 3731 with self.check_context(tar, None): 3732 self.expect_file('ustar/regtype') 3733 3734 def test_change_default_filter_on_class(self): 3735 def strict_filter(tarinfo, path): 3736 if tarinfo.name == 'ustar/regtype': 3737 return tarinfo 3738 else: 3739 return None 3740 tar = tarfile.TarFile(tarname, 'r') 3741 with support.swap_attr(tarfile.TarFile, 'extraction_filter', 3742 staticmethod(strict_filter)): 3743 with self.check_context(tar, None): 3744 self.expect_file('ustar/regtype') 3745 3746 def test_change_default_filter_on_subclass(self): 3747 class TarSubclass(tarfile.TarFile): 3748 def extraction_filter(self, tarinfo, path): 3749 if tarinfo.name == 'ustar/regtype': 3750 return tarinfo 3751 else: 3752 return None 3753 3754 tar = TarSubclass(tarname, 'r') 3755 with self.check_context(tar, None): 3756 self.expect_file('ustar/regtype') 3757 3758 def test_change_default_filter_to_string(self): 3759 tar = tarfile.TarFile(tarname, 'r') 3760 tar.extraction_filter = 'data' 3761 with self.check_context(tar, None): 3762 self.expect_exception(TypeError) 3763 3764 def test_custom_filter(self): 3765 def custom_filter(tarinfo, path): 3766 self.assertIs(path, self.destdir) 3767 if tarinfo.name == 'move_this': 3768 return tarinfo.replace(name='moved') 3769 if tarinfo.name == 'ignore_this': 3770 return None 3771 return tarinfo 3772 3773 with ArchiveMaker() as arc: 3774 arc.add('move_this') 3775 arc.add('ignore_this') 3776 arc.add('keep') 3777 with self.check_context(arc.open(), custom_filter): 3778 self.expect_file('moved') 3779 self.expect_file('keep') 3780 3781 def test_bad_filter_name(self): 3782 with ArchiveMaker() as arc: 3783 arc.add('foo') 3784 with self.check_context(arc.open(), 'bad filter name'): 3785 self.expect_exception(ValueError) 3786 3787 def test_stateful_filter(self): 3788 # Stateful filters should be possible. 3789 # (This doesn't really test tarfile. Rather, it demonstrates 3790 # that third parties can implement a stateful filter.) 3791 class StatefulFilter: 3792 def __enter__(self): 3793 self.num_files_processed = 0 3794 return self 3795 3796 def __call__(self, tarinfo, path): 3797 try: 3798 tarinfo = tarfile.data_filter(tarinfo, path) 3799 except tarfile.FilterError: 3800 return None 3801 self.num_files_processed += 1 3802 return tarinfo 3803 3804 def __exit__(self, *exc_info): 3805 self.done = True 3806 3807 with ArchiveMaker() as arc: 3808 arc.add('good') 3809 arc.add('bad', symlink_to='/') 3810 arc.add('good') 3811 with StatefulFilter() as custom_filter: 3812 with self.check_context(arc.open(), custom_filter): 3813 self.expect_file('good') 3814 self.assertEqual(custom_filter.num_files_processed, 2) 3815 self.assertEqual(custom_filter.done, True) 3816 3817 def test_errorlevel(self): 3818 def extracterror_filter(tarinfo, path): 3819 raise tarfile.ExtractError('failed with ExtractError') 3820 def filtererror_filter(tarinfo, path): 3821 raise tarfile.FilterError('failed with FilterError') 3822 def oserror_filter(tarinfo, path): 3823 raise OSError('failed with OSError') 3824 def tarerror_filter(tarinfo, path): 3825 raise tarfile.TarError('failed with base TarError') 3826 def valueerror_filter(tarinfo, path): 3827 raise ValueError('failed with ValueError') 3828 3829 with ArchiveMaker() as arc: 3830 arc.add('file') 3831 3832 # If errorlevel is 0, errors affected by errorlevel are ignored 3833 3834 with self.check_context(arc.open(errorlevel=0), extracterror_filter): 3835 self.expect_file('file') 3836 3837 with self.check_context(arc.open(errorlevel=0), filtererror_filter): 3838 self.expect_file('file') 3839 3840 with self.check_context(arc.open(errorlevel=0), oserror_filter): 3841 self.expect_file('file') 3842 3843 with self.check_context(arc.open(errorlevel=0), tarerror_filter): 3844 self.expect_exception(tarfile.TarError) 3845 3846 with self.check_context(arc.open(errorlevel=0), valueerror_filter): 3847 self.expect_exception(ValueError) 3848 3849 # If 1, all fatal errors are raised 3850 3851 with self.check_context(arc.open(errorlevel=1), extracterror_filter): 3852 self.expect_file('file') 3853 3854 with self.check_context(arc.open(errorlevel=1), filtererror_filter): 3855 self.expect_exception(tarfile.FilterError) 3856 3857 with self.check_context(arc.open(errorlevel=1), oserror_filter): 3858 self.expect_exception(OSError) 3859 3860 with self.check_context(arc.open(errorlevel=1), tarerror_filter): 3861 self.expect_exception(tarfile.TarError) 3862 3863 with self.check_context(arc.open(errorlevel=1), valueerror_filter): 3864 self.expect_exception(ValueError) 3865 3866 # If 2, all non-fatal errors are raised as well. 3867 3868 with self.check_context(arc.open(errorlevel=2), extracterror_filter): 3869 self.expect_exception(tarfile.ExtractError) 3870 3871 with self.check_context(arc.open(errorlevel=2), filtererror_filter): 3872 self.expect_exception(tarfile.FilterError) 3873 3874 with self.check_context(arc.open(errorlevel=2), oserror_filter): 3875 self.expect_exception(OSError) 3876 3877 with self.check_context(arc.open(errorlevel=2), tarerror_filter): 3878 self.expect_exception(tarfile.TarError) 3879 3880 with self.check_context(arc.open(errorlevel=2), valueerror_filter): 3881 self.expect_exception(ValueError) 3882 3883 # We only handle ExtractionError, FilterError & OSError specially. 3884 3885 with self.check_context(arc.open(errorlevel='boo!'), filtererror_filter): 3886 self.expect_exception(TypeError) # errorlevel is not int 3887 3888 3889def setUpModule(): 3890 os_helper.unlink(TEMPDIR) 3891 os.makedirs(TEMPDIR) 3892 3893 global testtarnames 3894 testtarnames = [tarname] 3895 with open(tarname, "rb") as fobj: 3896 data = fobj.read() 3897 3898 # Create compressed tarfiles. 3899 for c in GzipTest, Bz2Test, LzmaTest: 3900 if c.open: 3901 os_helper.unlink(c.tarname) 3902 testtarnames.append(c.tarname) 3903 with c.open(c.tarname, "wb") as tar: 3904 tar.write(data) 3905 3906def tearDownModule(): 3907 if os.path.exists(TEMPDIR): 3908 os_helper.rmtree(TEMPDIR) 3909 3910if __name__ == "__main__": 3911 unittest.main() 3912