1import array 2import contextlib 3import importlib.util 4import io 5import itertools 6import os 7import pathlib 8import posixpath 9import string 10import struct 11import subprocess 12import sys 13from test.support.script_helper import assert_python_ok 14import time 15import unittest 16import unittest.mock as mock 17import zipfile 18import functools 19 20 21from tempfile import TemporaryFile 22from random import randint, random, randbytes 23 24from test.support import script_helper 25from test.support import ( 26 findfile, requires_zlib, requires_bz2, requires_lzma, 27 captured_stdout, captured_stderr, requires_subprocess 28) 29from test.support.os_helper import ( 30 TESTFN, unlink, rmtree, temp_dir, temp_cwd, fd_count 31) 32 33 34TESTFN2 = TESTFN + "2" 35TESTFNDIR = TESTFN + "d" 36FIXEDTEST_SIZE = 1000 37DATAFILES_DIR = 'zipfile_datafiles' 38 39SMALL_TEST_DATA = [('_ziptest1', '1q2w3e4r5t'), 40 ('ziptest2dir/_ziptest2', 'qawsedrftg'), 41 ('ziptest2dir/ziptest3dir/_ziptest3', 'azsxdcfvgb'), 42 ('ziptest2dir/ziptest3dir/ziptest4dir/_ziptest3', '6y7u8i9o0p')] 43 44def get_files(test): 45 yield TESTFN2 46 with TemporaryFile() as f: 47 yield f 48 test.assertFalse(f.closed) 49 with io.BytesIO() as f: 50 yield f 51 test.assertFalse(f.closed) 52 53class AbstractTestsWithSourceFile: 54 @classmethod 55 def setUpClass(cls): 56 cls.line_gen = [bytes("Zipfile test line %d. random float: %f\n" % 57 (i, random()), "ascii") 58 for i in range(FIXEDTEST_SIZE)] 59 cls.data = b''.join(cls.line_gen) 60 61 def setUp(self): 62 # Make a source file with some lines 63 with open(TESTFN, "wb") as fp: 64 fp.write(self.data) 65 66 def make_test_archive(self, f, compression, compresslevel=None): 67 kwargs = {'compression': compression, 'compresslevel': compresslevel} 68 # Create the ZIP archive 69 with zipfile.ZipFile(f, "w", **kwargs) as zipfp: 70 zipfp.write(TESTFN, "another.name") 71 zipfp.write(TESTFN, TESTFN) 72 zipfp.writestr("strfile", self.data) 73 with zipfp.open('written-open-w', mode='w') as f: 74 for line in self.line_gen: 75 f.write(line) 76 77 def zip_test(self, f, compression, compresslevel=None): 78 self.make_test_archive(f, compression, compresslevel) 79 80 # Read the ZIP archive 81 with zipfile.ZipFile(f, "r", compression) as zipfp: 82 self.assertEqual(zipfp.read(TESTFN), self.data) 83 self.assertEqual(zipfp.read("another.name"), self.data) 84 self.assertEqual(zipfp.read("strfile"), self.data) 85 86 # Print the ZIP directory 87 fp = io.StringIO() 88 zipfp.printdir(file=fp) 89 directory = fp.getvalue() 90 lines = directory.splitlines() 91 self.assertEqual(len(lines), 5) # Number of files + header 92 93 self.assertIn('File Name', lines[0]) 94 self.assertIn('Modified', lines[0]) 95 self.assertIn('Size', lines[0]) 96 97 fn, date, time_, size = lines[1].split() 98 self.assertEqual(fn, 'another.name') 99 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 100 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 101 self.assertEqual(size, str(len(self.data))) 102 103 # Check the namelist 104 names = zipfp.namelist() 105 self.assertEqual(len(names), 4) 106 self.assertIn(TESTFN, names) 107 self.assertIn("another.name", names) 108 self.assertIn("strfile", names) 109 self.assertIn("written-open-w", names) 110 111 # Check infolist 112 infos = zipfp.infolist() 113 names = [i.filename for i in infos] 114 self.assertEqual(len(names), 4) 115 self.assertIn(TESTFN, names) 116 self.assertIn("another.name", names) 117 self.assertIn("strfile", names) 118 self.assertIn("written-open-w", names) 119 for i in infos: 120 self.assertEqual(i.file_size, len(self.data)) 121 122 # check getinfo 123 for nm in (TESTFN, "another.name", "strfile", "written-open-w"): 124 info = zipfp.getinfo(nm) 125 self.assertEqual(info.filename, nm) 126 self.assertEqual(info.file_size, len(self.data)) 127 128 # Check that testzip doesn't raise an exception 129 zipfp.testzip() 130 131 def test_basic(self): 132 for f in get_files(self): 133 self.zip_test(f, self.compression) 134 135 def zip_open_test(self, f, compression): 136 self.make_test_archive(f, compression) 137 138 # Read the ZIP archive 139 with zipfile.ZipFile(f, "r", compression) as zipfp: 140 zipdata1 = [] 141 with zipfp.open(TESTFN) as zipopen1: 142 while True: 143 read_data = zipopen1.read(256) 144 if not read_data: 145 break 146 zipdata1.append(read_data) 147 148 zipdata2 = [] 149 with zipfp.open("another.name") as zipopen2: 150 while True: 151 read_data = zipopen2.read(256) 152 if not read_data: 153 break 154 zipdata2.append(read_data) 155 156 self.assertEqual(b''.join(zipdata1), self.data) 157 self.assertEqual(b''.join(zipdata2), self.data) 158 159 def test_open(self): 160 for f in get_files(self): 161 self.zip_open_test(f, self.compression) 162 163 def test_open_with_pathlike(self): 164 path = pathlib.Path(TESTFN2) 165 self.zip_open_test(path, self.compression) 166 with zipfile.ZipFile(path, "r", self.compression) as zipfp: 167 self.assertIsInstance(zipfp.filename, str) 168 169 def zip_random_open_test(self, f, compression): 170 self.make_test_archive(f, compression) 171 172 # Read the ZIP archive 173 with zipfile.ZipFile(f, "r", compression) as zipfp: 174 zipdata1 = [] 175 with zipfp.open(TESTFN) as zipopen1: 176 while True: 177 read_data = zipopen1.read(randint(1, 1024)) 178 if not read_data: 179 break 180 zipdata1.append(read_data) 181 182 self.assertEqual(b''.join(zipdata1), self.data) 183 184 def test_random_open(self): 185 for f in get_files(self): 186 self.zip_random_open_test(f, self.compression) 187 188 def zip_read1_test(self, f, compression): 189 self.make_test_archive(f, compression) 190 191 # Read the ZIP archive 192 with zipfile.ZipFile(f, "r") as zipfp, \ 193 zipfp.open(TESTFN) as zipopen: 194 zipdata = [] 195 while True: 196 read_data = zipopen.read1(-1) 197 if not read_data: 198 break 199 zipdata.append(read_data) 200 201 self.assertEqual(b''.join(zipdata), self.data) 202 203 def test_read1(self): 204 for f in get_files(self): 205 self.zip_read1_test(f, self.compression) 206 207 def zip_read1_10_test(self, f, compression): 208 self.make_test_archive(f, compression) 209 210 # Read the ZIP archive 211 with zipfile.ZipFile(f, "r") as zipfp, \ 212 zipfp.open(TESTFN) as zipopen: 213 zipdata = [] 214 while True: 215 read_data = zipopen.read1(10) 216 self.assertLessEqual(len(read_data), 10) 217 if not read_data: 218 break 219 zipdata.append(read_data) 220 221 self.assertEqual(b''.join(zipdata), self.data) 222 223 def test_read1_10(self): 224 for f in get_files(self): 225 self.zip_read1_10_test(f, self.compression) 226 227 def zip_readline_read_test(self, f, compression): 228 self.make_test_archive(f, compression) 229 230 # Read the ZIP archive 231 with zipfile.ZipFile(f, "r") as zipfp, \ 232 zipfp.open(TESTFN) as zipopen: 233 data = b'' 234 while True: 235 read = zipopen.readline() 236 if not read: 237 break 238 data += read 239 240 read = zipopen.read(100) 241 if not read: 242 break 243 data += read 244 245 self.assertEqual(data, self.data) 246 247 def test_readline_read(self): 248 # Issue #7610: calls to readline() interleaved with calls to read(). 249 for f in get_files(self): 250 self.zip_readline_read_test(f, self.compression) 251 252 def zip_readline_test(self, f, compression): 253 self.make_test_archive(f, compression) 254 255 # Read the ZIP archive 256 with zipfile.ZipFile(f, "r") as zipfp: 257 with zipfp.open(TESTFN) as zipopen: 258 for line in self.line_gen: 259 linedata = zipopen.readline() 260 self.assertEqual(linedata, line) 261 262 def test_readline(self): 263 for f in get_files(self): 264 self.zip_readline_test(f, self.compression) 265 266 def zip_readlines_test(self, f, compression): 267 self.make_test_archive(f, compression) 268 269 # Read the ZIP archive 270 with zipfile.ZipFile(f, "r") as zipfp: 271 with zipfp.open(TESTFN) as zipopen: 272 ziplines = zipopen.readlines() 273 for line, zipline in zip(self.line_gen, ziplines): 274 self.assertEqual(zipline, line) 275 276 def test_readlines(self): 277 for f in get_files(self): 278 self.zip_readlines_test(f, self.compression) 279 280 def zip_iterlines_test(self, f, compression): 281 self.make_test_archive(f, compression) 282 283 # Read the ZIP archive 284 with zipfile.ZipFile(f, "r") as zipfp: 285 with zipfp.open(TESTFN) as zipopen: 286 for line, zipline in zip(self.line_gen, zipopen): 287 self.assertEqual(zipline, line) 288 289 def test_iterlines(self): 290 for f in get_files(self): 291 self.zip_iterlines_test(f, self.compression) 292 293 def test_low_compression(self): 294 """Check for cases where compressed data is larger than original.""" 295 # Create the ZIP archive 296 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipfp: 297 zipfp.writestr("strfile", '12') 298 299 # Get an open object for strfile 300 with zipfile.ZipFile(TESTFN2, "r", self.compression) as zipfp: 301 with zipfp.open("strfile") as openobj: 302 self.assertEqual(openobj.read(1), b'1') 303 self.assertEqual(openobj.read(1), b'2') 304 305 def test_writestr_compression(self): 306 zipfp = zipfile.ZipFile(TESTFN2, "w") 307 zipfp.writestr("b.txt", "hello world", compress_type=self.compression) 308 info = zipfp.getinfo('b.txt') 309 self.assertEqual(info.compress_type, self.compression) 310 311 def test_writestr_compresslevel(self): 312 zipfp = zipfile.ZipFile(TESTFN2, "w", compresslevel=1) 313 zipfp.writestr("a.txt", "hello world", compress_type=self.compression) 314 zipfp.writestr("b.txt", "hello world", compress_type=self.compression, 315 compresslevel=2) 316 317 # Compression level follows the constructor. 318 a_info = zipfp.getinfo('a.txt') 319 self.assertEqual(a_info.compress_type, self.compression) 320 self.assertEqual(a_info._compresslevel, 1) 321 322 # Compression level is overridden. 323 b_info = zipfp.getinfo('b.txt') 324 self.assertEqual(b_info.compress_type, self.compression) 325 self.assertEqual(b_info._compresslevel, 2) 326 327 def test_read_return_size(self): 328 # Issue #9837: ZipExtFile.read() shouldn't return more bytes 329 # than requested. 330 for test_size in (1, 4095, 4096, 4097, 16384): 331 file_size = test_size + 1 332 junk = randbytes(file_size) 333 with zipfile.ZipFile(io.BytesIO(), "w", self.compression) as zipf: 334 zipf.writestr('foo', junk) 335 with zipf.open('foo', 'r') as fp: 336 buf = fp.read(test_size) 337 self.assertEqual(len(buf), test_size) 338 339 def test_truncated_zipfile(self): 340 fp = io.BytesIO() 341 with zipfile.ZipFile(fp, mode='w') as zipf: 342 zipf.writestr('strfile', self.data, compress_type=self.compression) 343 end_offset = fp.tell() 344 zipfiledata = fp.getvalue() 345 346 fp = io.BytesIO(zipfiledata) 347 with zipfile.ZipFile(fp) as zipf: 348 with zipf.open('strfile') as zipopen: 349 fp.truncate(end_offset - 20) 350 with self.assertRaises(EOFError): 351 zipopen.read() 352 353 fp = io.BytesIO(zipfiledata) 354 with zipfile.ZipFile(fp) as zipf: 355 with zipf.open('strfile') as zipopen: 356 fp.truncate(end_offset - 20) 357 with self.assertRaises(EOFError): 358 while zipopen.read(100): 359 pass 360 361 fp = io.BytesIO(zipfiledata) 362 with zipfile.ZipFile(fp) as zipf: 363 with zipf.open('strfile') as zipopen: 364 fp.truncate(end_offset - 20) 365 with self.assertRaises(EOFError): 366 while zipopen.read1(100): 367 pass 368 369 def test_repr(self): 370 fname = 'file.name' 371 for f in get_files(self): 372 with zipfile.ZipFile(f, 'w', self.compression) as zipfp: 373 zipfp.write(TESTFN, fname) 374 r = repr(zipfp) 375 self.assertIn("mode='w'", r) 376 377 with zipfile.ZipFile(f, 'r') as zipfp: 378 r = repr(zipfp) 379 if isinstance(f, str): 380 self.assertIn('filename=%r' % f, r) 381 else: 382 self.assertIn('file=%r' % f, r) 383 self.assertIn("mode='r'", r) 384 r = repr(zipfp.getinfo(fname)) 385 self.assertIn('filename=%r' % fname, r) 386 self.assertIn('filemode=', r) 387 self.assertIn('file_size=', r) 388 if self.compression != zipfile.ZIP_STORED: 389 self.assertIn('compress_type=', r) 390 self.assertIn('compress_size=', r) 391 with zipfp.open(fname) as zipopen: 392 r = repr(zipopen) 393 self.assertIn('name=%r' % fname, r) 394 self.assertIn("mode='r'", r) 395 if self.compression != zipfile.ZIP_STORED: 396 self.assertIn('compress_type=', r) 397 self.assertIn('[closed]', repr(zipopen)) 398 self.assertIn('[closed]', repr(zipfp)) 399 400 def test_compresslevel_basic(self): 401 for f in get_files(self): 402 self.zip_test(f, self.compression, compresslevel=9) 403 404 def test_per_file_compresslevel(self): 405 """Check that files within a Zip archive can have different 406 compression levels.""" 407 with zipfile.ZipFile(TESTFN2, "w", compresslevel=1) as zipfp: 408 zipfp.write(TESTFN, 'compress_1') 409 zipfp.write(TESTFN, 'compress_9', compresslevel=9) 410 one_info = zipfp.getinfo('compress_1') 411 nine_info = zipfp.getinfo('compress_9') 412 self.assertEqual(one_info._compresslevel, 1) 413 self.assertEqual(nine_info._compresslevel, 9) 414 415 def test_writing_errors(self): 416 class BrokenFile(io.BytesIO): 417 def write(self, data): 418 nonlocal count 419 if count is not None: 420 if count == stop: 421 raise OSError 422 count += 1 423 super().write(data) 424 425 stop = 0 426 while True: 427 testfile = BrokenFile() 428 count = None 429 with zipfile.ZipFile(testfile, 'w', self.compression) as zipfp: 430 with zipfp.open('file1', 'w') as f: 431 f.write(b'data1') 432 count = 0 433 try: 434 with zipfp.open('file2', 'w') as f: 435 f.write(b'data2') 436 except OSError: 437 stop += 1 438 else: 439 break 440 finally: 441 count = None 442 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 443 self.assertEqual(zipfp.namelist(), ['file1']) 444 self.assertEqual(zipfp.read('file1'), b'data1') 445 446 with zipfile.ZipFile(io.BytesIO(testfile.getvalue())) as zipfp: 447 self.assertEqual(zipfp.namelist(), ['file1', 'file2']) 448 self.assertEqual(zipfp.read('file1'), b'data1') 449 self.assertEqual(zipfp.read('file2'), b'data2') 450 451 452 def tearDown(self): 453 unlink(TESTFN) 454 unlink(TESTFN2) 455 456 457class StoredTestsWithSourceFile(AbstractTestsWithSourceFile, 458 unittest.TestCase): 459 compression = zipfile.ZIP_STORED 460 test_low_compression = None 461 462 def zip_test_writestr_permissions(self, f, compression): 463 # Make sure that writestr and open(... mode='w') create files with 464 # mode 0600, when they are passed a name rather than a ZipInfo 465 # instance. 466 467 self.make_test_archive(f, compression) 468 with zipfile.ZipFile(f, "r") as zipfp: 469 zinfo = zipfp.getinfo('strfile') 470 self.assertEqual(zinfo.external_attr, 0o600 << 16) 471 472 zinfo2 = zipfp.getinfo('written-open-w') 473 self.assertEqual(zinfo2.external_attr, 0o600 << 16) 474 475 def test_writestr_permissions(self): 476 for f in get_files(self): 477 self.zip_test_writestr_permissions(f, zipfile.ZIP_STORED) 478 479 def test_absolute_arcnames(self): 480 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 481 zipfp.write(TESTFN, "/absolute") 482 483 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 484 self.assertEqual(zipfp.namelist(), ["absolute"]) 485 486 def test_append_to_zip_file(self): 487 """Test appending to an existing zipfile.""" 488 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 489 zipfp.write(TESTFN, TESTFN) 490 491 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 492 zipfp.writestr("strfile", self.data) 493 self.assertEqual(zipfp.namelist(), [TESTFN, "strfile"]) 494 495 def test_append_to_non_zip_file(self): 496 """Test appending to an existing file that is not a zipfile.""" 497 # NOTE: this test fails if len(d) < 22 because of the first 498 # line "fpin.seek(-22, 2)" in _EndRecData 499 data = b'I am not a ZipFile!'*10 500 with open(TESTFN2, 'wb') as f: 501 f.write(data) 502 503 with zipfile.ZipFile(TESTFN2, "a", zipfile.ZIP_STORED) as zipfp: 504 zipfp.write(TESTFN, TESTFN) 505 506 with open(TESTFN2, 'rb') as f: 507 f.seek(len(data)) 508 with zipfile.ZipFile(f, "r") as zipfp: 509 self.assertEqual(zipfp.namelist(), [TESTFN]) 510 self.assertEqual(zipfp.read(TESTFN), self.data) 511 with open(TESTFN2, 'rb') as f: 512 self.assertEqual(f.read(len(data)), data) 513 zipfiledata = f.read() 514 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 515 self.assertEqual(zipfp.namelist(), [TESTFN]) 516 self.assertEqual(zipfp.read(TESTFN), self.data) 517 518 def test_read_concatenated_zip_file(self): 519 with io.BytesIO() as bio: 520 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 521 zipfp.write(TESTFN, TESTFN) 522 zipfiledata = bio.getvalue() 523 data = b'I am not a ZipFile!'*10 524 with open(TESTFN2, 'wb') as f: 525 f.write(data) 526 f.write(zipfiledata) 527 528 with zipfile.ZipFile(TESTFN2) as zipfp: 529 self.assertEqual(zipfp.namelist(), [TESTFN]) 530 self.assertEqual(zipfp.read(TESTFN), self.data) 531 532 def test_append_to_concatenated_zip_file(self): 533 with io.BytesIO() as bio: 534 with zipfile.ZipFile(bio, 'w', zipfile.ZIP_STORED) as zipfp: 535 zipfp.write(TESTFN, TESTFN) 536 zipfiledata = bio.getvalue() 537 data = b'I am not a ZipFile!'*1000000 538 with open(TESTFN2, 'wb') as f: 539 f.write(data) 540 f.write(zipfiledata) 541 542 with zipfile.ZipFile(TESTFN2, 'a') as zipfp: 543 self.assertEqual(zipfp.namelist(), [TESTFN]) 544 zipfp.writestr('strfile', self.data) 545 546 with open(TESTFN2, 'rb') as f: 547 self.assertEqual(f.read(len(data)), data) 548 zipfiledata = f.read() 549 with io.BytesIO(zipfiledata) as bio, zipfile.ZipFile(bio) as zipfp: 550 self.assertEqual(zipfp.namelist(), [TESTFN, 'strfile']) 551 self.assertEqual(zipfp.read(TESTFN), self.data) 552 self.assertEqual(zipfp.read('strfile'), self.data) 553 554 def test_ignores_newline_at_end(self): 555 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 556 zipfp.write(TESTFN, TESTFN) 557 with open(TESTFN2, 'a', encoding='utf-8') as f: 558 f.write("\r\n\00\00\00") 559 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 560 self.assertIsInstance(zipfp, zipfile.ZipFile) 561 562 def test_ignores_stuff_appended_past_comments(self): 563 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 564 zipfp.comment = b"this is a comment" 565 zipfp.write(TESTFN, TESTFN) 566 with open(TESTFN2, 'a', encoding='utf-8') as f: 567 f.write("abcdef\r\n") 568 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 569 self.assertIsInstance(zipfp, zipfile.ZipFile) 570 self.assertEqual(zipfp.comment, b"this is a comment") 571 572 def test_write_default_name(self): 573 """Check that calling ZipFile.write without arcname specified 574 produces the expected result.""" 575 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 576 zipfp.write(TESTFN) 577 with open(TESTFN, "rb") as f: 578 self.assertEqual(zipfp.read(TESTFN), f.read()) 579 580 def test_io_on_closed_zipextfile(self): 581 fname = "somefile.txt" 582 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 583 zipfp.writestr(fname, "bogus") 584 585 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 586 with zipfp.open(fname) as fid: 587 fid.close() 588 self.assertRaises(ValueError, fid.read) 589 self.assertRaises(ValueError, fid.seek, 0) 590 self.assertRaises(ValueError, fid.tell) 591 self.assertRaises(ValueError, fid.readable) 592 self.assertRaises(ValueError, fid.seekable) 593 594 def test_write_to_readonly(self): 595 """Check that trying to call write() on a readonly ZipFile object 596 raises a ValueError.""" 597 with zipfile.ZipFile(TESTFN2, mode="w") as zipfp: 598 zipfp.writestr("somefile.txt", "bogus") 599 600 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 601 self.assertRaises(ValueError, zipfp.write, TESTFN) 602 603 with zipfile.ZipFile(TESTFN2, mode="r") as zipfp: 604 with self.assertRaises(ValueError): 605 zipfp.open(TESTFN, mode='w') 606 607 def test_add_file_before_1980(self): 608 # Set atime and mtime to 1970-01-01 609 os.utime(TESTFN, (0, 0)) 610 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 611 self.assertRaises(ValueError, zipfp.write, TESTFN) 612 613 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 614 zipfp.write(TESTFN) 615 zinfo = zipfp.getinfo(TESTFN) 616 self.assertEqual(zinfo.date_time, (1980, 1, 1, 0, 0, 0)) 617 618 def test_add_file_after_2107(self): 619 # Set atime and mtime to 2108-12-30 620 ts = 4386268800 621 try: 622 time.localtime(ts) 623 except OverflowError: 624 self.skipTest(f'time.localtime({ts}) raises OverflowError') 625 try: 626 os.utime(TESTFN, (ts, ts)) 627 except OverflowError: 628 self.skipTest('Host fs cannot set timestamp to required value.') 629 630 mtime_ns = os.stat(TESTFN).st_mtime_ns 631 if mtime_ns != (4386268800 * 10**9): 632 # XFS filesystem is limited to 32-bit timestamp, but the syscall 633 # didn't fail. Moreover, there is a VFS bug which returns 634 # a cached timestamp which is different than the value on disk. 635 # 636 # Test st_mtime_ns rather than st_mtime to avoid rounding issues. 637 # 638 # https://bugzilla.redhat.com/show_bug.cgi?id=1795576 639 # https://bugs.python.org/issue39460#msg360952 640 self.skipTest(f"Linux VFS/XFS kernel bug detected: {mtime_ns=}") 641 642 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 643 self.assertRaises(struct.error, zipfp.write, TESTFN) 644 645 with zipfile.ZipFile(TESTFN2, "w", strict_timestamps=False) as zipfp: 646 zipfp.write(TESTFN) 647 zinfo = zipfp.getinfo(TESTFN) 648 self.assertEqual(zinfo.date_time, (2107, 12, 31, 23, 59, 59)) 649 650 651@requires_zlib() 652class DeflateTestsWithSourceFile(AbstractTestsWithSourceFile, 653 unittest.TestCase): 654 compression = zipfile.ZIP_DEFLATED 655 656 def test_per_file_compression(self): 657 """Check that files within a Zip archive can have different 658 compression options.""" 659 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 660 zipfp.write(TESTFN, 'storeme', zipfile.ZIP_STORED) 661 zipfp.write(TESTFN, 'deflateme', zipfile.ZIP_DEFLATED) 662 sinfo = zipfp.getinfo('storeme') 663 dinfo = zipfp.getinfo('deflateme') 664 self.assertEqual(sinfo.compress_type, zipfile.ZIP_STORED) 665 self.assertEqual(dinfo.compress_type, zipfile.ZIP_DEFLATED) 666 667@requires_bz2() 668class Bzip2TestsWithSourceFile(AbstractTestsWithSourceFile, 669 unittest.TestCase): 670 compression = zipfile.ZIP_BZIP2 671 672@requires_lzma() 673class LzmaTestsWithSourceFile(AbstractTestsWithSourceFile, 674 unittest.TestCase): 675 compression = zipfile.ZIP_LZMA 676 677 678class AbstractTestZip64InSmallFiles: 679 # These tests test the ZIP64 functionality without using large files, 680 # see test_zipfile64 for proper tests. 681 682 @classmethod 683 def setUpClass(cls): 684 line_gen = (bytes("Test of zipfile line %d." % i, "ascii") 685 for i in range(0, FIXEDTEST_SIZE)) 686 cls.data = b'\n'.join(line_gen) 687 688 def setUp(self): 689 self._limit = zipfile.ZIP64_LIMIT 690 self._filecount_limit = zipfile.ZIP_FILECOUNT_LIMIT 691 zipfile.ZIP64_LIMIT = 1000 692 zipfile.ZIP_FILECOUNT_LIMIT = 9 693 694 # Make a source file with some lines 695 with open(TESTFN, "wb") as fp: 696 fp.write(self.data) 697 698 def zip_test(self, f, compression): 699 # Create the ZIP archive 700 with zipfile.ZipFile(f, "w", compression, allowZip64=True) as zipfp: 701 zipfp.write(TESTFN, "another.name") 702 zipfp.write(TESTFN, TESTFN) 703 zipfp.writestr("strfile", self.data) 704 705 # Read the ZIP archive 706 with zipfile.ZipFile(f, "r", compression) as zipfp: 707 self.assertEqual(zipfp.read(TESTFN), self.data) 708 self.assertEqual(zipfp.read("another.name"), self.data) 709 self.assertEqual(zipfp.read("strfile"), self.data) 710 711 # Print the ZIP directory 712 fp = io.StringIO() 713 zipfp.printdir(fp) 714 715 directory = fp.getvalue() 716 lines = directory.splitlines() 717 self.assertEqual(len(lines), 4) # Number of files + header 718 719 self.assertIn('File Name', lines[0]) 720 self.assertIn('Modified', lines[0]) 721 self.assertIn('Size', lines[0]) 722 723 fn, date, time_, size = lines[1].split() 724 self.assertEqual(fn, 'another.name') 725 self.assertTrue(time.strptime(date, '%Y-%m-%d')) 726 self.assertTrue(time.strptime(time_, '%H:%M:%S')) 727 self.assertEqual(size, str(len(self.data))) 728 729 # Check the namelist 730 names = zipfp.namelist() 731 self.assertEqual(len(names), 3) 732 self.assertIn(TESTFN, names) 733 self.assertIn("another.name", names) 734 self.assertIn("strfile", names) 735 736 # Check infolist 737 infos = zipfp.infolist() 738 names = [i.filename for i in infos] 739 self.assertEqual(len(names), 3) 740 self.assertIn(TESTFN, names) 741 self.assertIn("another.name", names) 742 self.assertIn("strfile", names) 743 for i in infos: 744 self.assertEqual(i.file_size, len(self.data)) 745 746 # check getinfo 747 for nm in (TESTFN, "another.name", "strfile"): 748 info = zipfp.getinfo(nm) 749 self.assertEqual(info.filename, nm) 750 self.assertEqual(info.file_size, len(self.data)) 751 752 # Check that testzip doesn't raise an exception 753 zipfp.testzip() 754 755 def test_basic(self): 756 for f in get_files(self): 757 self.zip_test(f, self.compression) 758 759 def test_too_many_files(self): 760 # This test checks that more than 64k files can be added to an archive, 761 # and that the resulting archive can be read properly by ZipFile 762 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 763 allowZip64=True) 764 zipf.debug = 100 765 numfiles = 15 766 for i in range(numfiles): 767 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 768 self.assertEqual(len(zipf.namelist()), numfiles) 769 zipf.close() 770 771 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 772 self.assertEqual(len(zipf2.namelist()), numfiles) 773 for i in range(numfiles): 774 content = zipf2.read("foo%08d" % i).decode('ascii') 775 self.assertEqual(content, "%d" % (i**3 % 57)) 776 zipf2.close() 777 778 def test_too_many_files_append(self): 779 zipf = zipfile.ZipFile(TESTFN, "w", self.compression, 780 allowZip64=False) 781 zipf.debug = 100 782 numfiles = 9 783 for i in range(numfiles): 784 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 785 self.assertEqual(len(zipf.namelist()), numfiles) 786 with self.assertRaises(zipfile.LargeZipFile): 787 zipf.writestr("foo%08d" % numfiles, b'') 788 self.assertEqual(len(zipf.namelist()), numfiles) 789 zipf.close() 790 791 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 792 allowZip64=False) 793 zipf.debug = 100 794 self.assertEqual(len(zipf.namelist()), numfiles) 795 with self.assertRaises(zipfile.LargeZipFile): 796 zipf.writestr("foo%08d" % numfiles, b'') 797 self.assertEqual(len(zipf.namelist()), numfiles) 798 zipf.close() 799 800 zipf = zipfile.ZipFile(TESTFN, "a", self.compression, 801 allowZip64=True) 802 zipf.debug = 100 803 self.assertEqual(len(zipf.namelist()), numfiles) 804 numfiles2 = 15 805 for i in range(numfiles, numfiles2): 806 zipf.writestr("foo%08d" % i, "%d" % (i**3 % 57)) 807 self.assertEqual(len(zipf.namelist()), numfiles2) 808 zipf.close() 809 810 zipf2 = zipfile.ZipFile(TESTFN, "r", self.compression) 811 self.assertEqual(len(zipf2.namelist()), numfiles2) 812 for i in range(numfiles2): 813 content = zipf2.read("foo%08d" % i).decode('ascii') 814 self.assertEqual(content, "%d" % (i**3 % 57)) 815 zipf2.close() 816 817 def tearDown(self): 818 zipfile.ZIP64_LIMIT = self._limit 819 zipfile.ZIP_FILECOUNT_LIMIT = self._filecount_limit 820 unlink(TESTFN) 821 unlink(TESTFN2) 822 823 824class StoredTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 825 unittest.TestCase): 826 compression = zipfile.ZIP_STORED 827 828 def large_file_exception_test(self, f, compression): 829 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 830 self.assertRaises(zipfile.LargeZipFile, 831 zipfp.write, TESTFN, "another.name") 832 833 def large_file_exception_test2(self, f, compression): 834 with zipfile.ZipFile(f, "w", compression, allowZip64=False) as zipfp: 835 self.assertRaises(zipfile.LargeZipFile, 836 zipfp.writestr, "another.name", self.data) 837 838 def test_large_file_exception(self): 839 for f in get_files(self): 840 self.large_file_exception_test(f, zipfile.ZIP_STORED) 841 self.large_file_exception_test2(f, zipfile.ZIP_STORED) 842 843 def test_absolute_arcnames(self): 844 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED, 845 allowZip64=True) as zipfp: 846 zipfp.write(TESTFN, "/absolute") 847 848 with zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_STORED) as zipfp: 849 self.assertEqual(zipfp.namelist(), ["absolute"]) 850 851 def test_append(self): 852 # Test that appending to the Zip64 archive doesn't change 853 # extra fields of existing entries. 854 with zipfile.ZipFile(TESTFN2, "w", allowZip64=True) as zipfp: 855 zipfp.writestr("strfile", self.data) 856 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 857 zinfo = zipfp.getinfo("strfile") 858 extra = zinfo.extra 859 with zipfile.ZipFile(TESTFN2, "a", allowZip64=True) as zipfp: 860 zipfp.writestr("strfile2", self.data) 861 with zipfile.ZipFile(TESTFN2, "r", allowZip64=True) as zipfp: 862 zinfo = zipfp.getinfo("strfile") 863 self.assertEqual(zinfo.extra, extra) 864 865 def make_zip64_file( 866 self, file_size_64_set=False, file_size_extra=False, 867 compress_size_64_set=False, compress_size_extra=False, 868 header_offset_64_set=False, header_offset_extra=False, 869 ): 870 """Generate bytes sequence for a zip with (incomplete) zip64 data. 871 872 The actual values (not the zip 64 0xffffffff values) stored in the file 873 are: 874 file_size: 8 875 compress_size: 8 876 header_offset: 0 877 """ 878 actual_size = 8 879 actual_header_offset = 0 880 local_zip64_fields = [] 881 central_zip64_fields = [] 882 883 file_size = actual_size 884 if file_size_64_set: 885 file_size = 0xffffffff 886 if file_size_extra: 887 local_zip64_fields.append(actual_size) 888 central_zip64_fields.append(actual_size) 889 file_size = struct.pack("<L", file_size) 890 891 compress_size = actual_size 892 if compress_size_64_set: 893 compress_size = 0xffffffff 894 if compress_size_extra: 895 local_zip64_fields.append(actual_size) 896 central_zip64_fields.append(actual_size) 897 compress_size = struct.pack("<L", compress_size) 898 899 header_offset = actual_header_offset 900 if header_offset_64_set: 901 header_offset = 0xffffffff 902 if header_offset_extra: 903 central_zip64_fields.append(actual_header_offset) 904 header_offset = struct.pack("<L", header_offset) 905 906 local_extra = struct.pack( 907 '<HH' + 'Q'*len(local_zip64_fields), 908 0x0001, 909 8*len(local_zip64_fields), 910 *local_zip64_fields 911 ) 912 913 central_extra = struct.pack( 914 '<HH' + 'Q'*len(central_zip64_fields), 915 0x0001, 916 8*len(central_zip64_fields), 917 *central_zip64_fields 918 ) 919 920 central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields)) 921 offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields)) 922 923 local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields)) 924 central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields)) 925 926 filename = b"test.txt" 927 content = b"test1234" 928 filename_length = struct.pack("<H", len(filename)) 929 zip64_contents = ( 930 # Local file header 931 b"PK\x03\x04\x14\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 932 + compress_size 933 + file_size 934 + filename_length 935 + local_extra_length 936 + filename 937 + local_extra 938 + content 939 # Central directory: 940 + b"PK\x01\x02-\x03-\x00\x00\x00\x00\x00\x00\x00!\x00\x9e%\xf5\xaf" 941 + compress_size 942 + file_size 943 + filename_length 944 + central_extra_length 945 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01" 946 + header_offset 947 + filename 948 + central_extra 949 # Zip64 end of central directory 950 + b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-" 951 + b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00" 952 + b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00" 953 + central_dir_size 954 + offset_to_central_dir 955 # Zip64 end of central directory locator 956 + b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01" 957 + b"\x00\x00\x00" 958 # end of central directory 959 + b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00" 960 + b"\x00\x00\x00\x00" 961 ) 962 return zip64_contents 963 964 def test_bad_zip64_extra(self): 965 """Missing zip64 extra records raises an exception. 966 967 There are 4 fields that the zip64 format handles (the disk number is 968 not used in this module and so is ignored here). According to the zip 969 spec: 970 The order of the fields in the zip64 extended 971 information record is fixed, but the fields MUST 972 only appear if the corresponding Local or Central 973 directory record field is set to 0xFFFF or 0xFFFFFFFF. 974 975 If the zip64 extra content doesn't contain enough entries for the 976 number of fields marked with 0xFFFF or 0xFFFFFFFF, we raise an error. 977 This test mismatches the length of the zip64 extra field and the number 978 of fields set to indicate the presence of zip64 data. 979 """ 980 # zip64 file size present, no fields in extra, expecting one, equals 981 # missing file size. 982 missing_file_size_extra = self.make_zip64_file( 983 file_size_64_set=True, 984 ) 985 with self.assertRaises(zipfile.BadZipFile) as e: 986 zipfile.ZipFile(io.BytesIO(missing_file_size_extra)) 987 self.assertIn('file size', str(e.exception).lower()) 988 989 # zip64 file size present, zip64 compress size present, one field in 990 # extra, expecting two, equals missing compress size. 991 missing_compress_size_extra = self.make_zip64_file( 992 file_size_64_set=True, 993 file_size_extra=True, 994 compress_size_64_set=True, 995 ) 996 with self.assertRaises(zipfile.BadZipFile) as e: 997 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 998 self.assertIn('compress size', str(e.exception).lower()) 999 1000 # zip64 compress size present, no fields in extra, expecting one, 1001 # equals missing compress size. 1002 missing_compress_size_extra = self.make_zip64_file( 1003 compress_size_64_set=True, 1004 ) 1005 with self.assertRaises(zipfile.BadZipFile) as e: 1006 zipfile.ZipFile(io.BytesIO(missing_compress_size_extra)) 1007 self.assertIn('compress size', str(e.exception).lower()) 1008 1009 # zip64 file size present, zip64 compress size present, zip64 header 1010 # offset present, two fields in extra, expecting three, equals missing 1011 # header offset 1012 missing_header_offset_extra = self.make_zip64_file( 1013 file_size_64_set=True, 1014 file_size_extra=True, 1015 compress_size_64_set=True, 1016 compress_size_extra=True, 1017 header_offset_64_set=True, 1018 ) 1019 with self.assertRaises(zipfile.BadZipFile) as e: 1020 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1021 self.assertIn('header offset', str(e.exception).lower()) 1022 1023 # zip64 compress size present, zip64 header offset present, one field 1024 # in extra, expecting two, equals missing header offset 1025 missing_header_offset_extra = self.make_zip64_file( 1026 file_size_64_set=False, 1027 compress_size_64_set=True, 1028 compress_size_extra=True, 1029 header_offset_64_set=True, 1030 ) 1031 with self.assertRaises(zipfile.BadZipFile) as e: 1032 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1033 self.assertIn('header offset', str(e.exception).lower()) 1034 1035 # zip64 file size present, zip64 header offset present, one field in 1036 # extra, expecting two, equals missing header offset 1037 missing_header_offset_extra = self.make_zip64_file( 1038 file_size_64_set=True, 1039 file_size_extra=True, 1040 compress_size_64_set=False, 1041 header_offset_64_set=True, 1042 ) 1043 with self.assertRaises(zipfile.BadZipFile) as e: 1044 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1045 self.assertIn('header offset', str(e.exception).lower()) 1046 1047 # zip64 header offset present, no fields in extra, expecting one, 1048 # equals missing header offset 1049 missing_header_offset_extra = self.make_zip64_file( 1050 file_size_64_set=False, 1051 compress_size_64_set=False, 1052 header_offset_64_set=True, 1053 ) 1054 with self.assertRaises(zipfile.BadZipFile) as e: 1055 zipfile.ZipFile(io.BytesIO(missing_header_offset_extra)) 1056 self.assertIn('header offset', str(e.exception).lower()) 1057 1058 def test_generated_valid_zip64_extra(self): 1059 # These values are what is set in the make_zip64_file method. 1060 expected_file_size = 8 1061 expected_compress_size = 8 1062 expected_header_offset = 0 1063 expected_content = b"test1234" 1064 1065 # Loop through the various valid combinations of zip64 masks 1066 # present and extra fields present. 1067 params = ( 1068 {"file_size_64_set": True, "file_size_extra": True}, 1069 {"compress_size_64_set": True, "compress_size_extra": True}, 1070 {"header_offset_64_set": True, "header_offset_extra": True}, 1071 ) 1072 1073 for r in range(1, len(params) + 1): 1074 for combo in itertools.combinations(params, r): 1075 kwargs = {} 1076 for c in combo: 1077 kwargs.update(c) 1078 with zipfile.ZipFile(io.BytesIO(self.make_zip64_file(**kwargs))) as zf: 1079 zinfo = zf.infolist()[0] 1080 self.assertEqual(zinfo.file_size, expected_file_size) 1081 self.assertEqual(zinfo.compress_size, expected_compress_size) 1082 self.assertEqual(zinfo.header_offset, expected_header_offset) 1083 self.assertEqual(zf.read(zinfo), expected_content) 1084 1085 def test_force_zip64(self): 1086 """Test that forcing zip64 extensions correctly notes this in the zip file""" 1087 1088 # GH-103861 describes an issue where forcing a small file to use zip64 1089 # extensions would add a zip64 extra record, but not change the data 1090 # sizes to 0xFFFFFFFF to indicate to the extractor that the zip64 1091 # record should be read. Additionally, it would not set the required 1092 # version to indicate that zip64 extensions are required to extract it. 1093 # This test replicates the situation and reads the raw data to specifically ensure: 1094 # - The required extract version is always >= ZIP64_VERSION 1095 # - The compressed and uncompressed size in the file headers are both 1096 # 0xFFFFFFFF (ie. point to zip64 record) 1097 # - The zip64 record is provided and has the correct sizes in it 1098 # Other aspects of the zip are checked as well, but verifying the above is the main goal. 1099 # Because this is hard to verify by parsing the data as a zip, the raw 1100 # bytes are checked to ensure that they line up with the zip spec. 1101 # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT 1102 # The relevent sections for this test are: 1103 # - 4.3.7 for local file header 1104 # - 4.5.3 for zip64 extra field 1105 1106 data = io.BytesIO() 1107 with zipfile.ZipFile(data, mode="w", allowZip64=True) as zf: 1108 with zf.open("text.txt", mode="w", force_zip64=True) as zi: 1109 zi.write(b"_") 1110 1111 zipdata = data.getvalue() 1112 1113 # pull out and check zip information 1114 ( 1115 header, vers, os, flags, comp, csize, usize, fn_len, 1116 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, cd_sig 1117 ) = struct.unpack("<4sBBHH8xIIHH8shhQQx4s", zipdata[:63]) 1118 1119 self.assertEqual(header, b"PK\x03\x04") # local file header 1120 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1121 self.assertEqual(os, 0) # compatible with MS-DOS 1122 self.assertEqual(flags, 0) # no flags 1123 self.assertEqual(comp, 0) # compression method = stored 1124 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1125 self.assertEqual(usize, 0xFFFFFFFF) 1126 self.assertEqual(fn_len, 8) # filename len 1127 self.assertEqual(ex_total_len, 20) # size of extra records 1128 self.assertEqual(ex_id, 1) # Zip64 extra record 1129 self.assertEqual(ex_len, 16) # 16 bytes of data 1130 self.assertEqual(ex_usize, 1) # uncompressed size 1131 self.assertEqual(ex_csize, 1) # compressed size 1132 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1133 1134 z = zipfile.ZipFile(io.BytesIO(zipdata)) 1135 zinfos = z.infolist() 1136 self.assertEqual(len(zinfos), 1) 1137 self.assertGreaterEqual(zinfos[0].extract_version, zipfile.ZIP64_VERSION) # requires zip64 to extract 1138 1139 def test_unseekable_zip_unknown_filesize(self): 1140 """Test that creating a zip with/without seeking will raise a RuntimeError if zip64 was required but not used""" 1141 1142 def make_zip(fp): 1143 with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf: 1144 with zf.open("text.txt", mode="w", force_zip64=False) as zi: 1145 zi.write(b"_" * (zipfile.ZIP64_LIMIT + 1)) 1146 1147 self.assertRaises(RuntimeError, make_zip, io.BytesIO()) 1148 self.assertRaises(RuntimeError, make_zip, Unseekable(io.BytesIO())) 1149 1150 def test_zip64_required_not_allowed_fail(self): 1151 """Test that trying to add a large file to a zip that doesn't allow zip64 extensions fails on add""" 1152 def make_zip(fp): 1153 with zipfile.ZipFile(fp, mode="w", allowZip64=False) as zf: 1154 # pretend zipfile.ZipInfo.from_file was used to get the name and filesize 1155 info = zipfile.ZipInfo("text.txt") 1156 info.file_size = zipfile.ZIP64_LIMIT + 1 1157 zf.open(info, mode="w") 1158 1159 self.assertRaises(zipfile.LargeZipFile, make_zip, io.BytesIO()) 1160 self.assertRaises(zipfile.LargeZipFile, make_zip, Unseekable(io.BytesIO())) 1161 1162 def test_unseekable_zip_known_filesize(self): 1163 """Test that creating a zip without seeking will use zip64 extensions if the file size is provided up-front""" 1164 1165 # This test ensures that the zip will use a zip64 data descriptor (same 1166 # as a regular data descriptor except the sizes are 8 bytes instead of 1167 # 4) record to communicate the size of a file if the zip is being 1168 # written to an unseekable stream. 1169 # Because this sort of thing is hard to verify by parsing the data back 1170 # in as a zip, this test looks at the raw bytes created to ensure that 1171 # the correct data has been generated. 1172 # The spec for this can be found at: https://pkware.cachefly.net/webdocs/casestudies/APPNOTE.TXT 1173 # The relevent sections for this test are: 1174 # - 4.3.7 for local file header 1175 # - 4.3.9 for the data descriptor 1176 # - 4.5.3 for zip64 extra field 1177 1178 file_size = zipfile.ZIP64_LIMIT + 1 1179 1180 def make_zip(fp): 1181 with zipfile.ZipFile(fp, mode="w", allowZip64=True) as zf: 1182 # pretend zipfile.ZipInfo.from_file was used to get the name and filesize 1183 info = zipfile.ZipInfo("text.txt") 1184 info.file_size = file_size 1185 with zf.open(info, mode="w", force_zip64=False) as zi: 1186 zi.write(b"_" * file_size) 1187 return fp 1188 1189 # check seekable file information 1190 seekable_data = make_zip(io.BytesIO()).getvalue() 1191 ( 1192 header, vers, os, flags, comp, csize, usize, fn_len, 1193 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, 1194 cd_sig 1195 ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s".format(file_size), seekable_data[:62 + file_size]) 1196 1197 self.assertEqual(header, b"PK\x03\x04") # local file header 1198 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1199 self.assertEqual(os, 0) # compatible with MS-DOS 1200 self.assertEqual(flags, 0) # no flags set 1201 self.assertEqual(comp, 0) # compression method = stored 1202 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1203 self.assertEqual(usize, 0xFFFFFFFF) 1204 self.assertEqual(fn_len, 8) # filename len 1205 self.assertEqual(ex_total_len, 20) # size of extra records 1206 self.assertEqual(ex_id, 1) # Zip64 extra record 1207 self.assertEqual(ex_len, 16) # 16 bytes of data 1208 self.assertEqual(ex_usize, file_size) # uncompressed size 1209 self.assertEqual(ex_csize, file_size) # compressed size 1210 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1211 1212 # check unseekable file information 1213 unseekable_data = make_zip(Unseekable(io.BytesIO())).fp.getvalue() 1214 ( 1215 header, vers, os, flags, comp, csize, usize, fn_len, 1216 ex_total_len, filename, ex_id, ex_len, ex_usize, ex_csize, 1217 dd_header, dd_usize, dd_csize, cd_sig 1218 ) = struct.unpack("<4sBBHH8xIIHH8shhQQ{}x4s4xQQ4s".format(file_size), unseekable_data[:86 + file_size]) 1219 1220 self.assertEqual(header, b"PK\x03\x04") # local file header 1221 self.assertGreaterEqual(vers, zipfile.ZIP64_VERSION) # requires zip64 to extract 1222 self.assertEqual(os, 0) # compatible with MS-DOS 1223 self.assertEqual("{:b}".format(flags), "1000") # streaming flag set 1224 self.assertEqual(comp, 0) # compression method = stored 1225 self.assertEqual(csize, 0xFFFFFFFF) # sizes are in zip64 extra 1226 self.assertEqual(usize, 0xFFFFFFFF) 1227 self.assertEqual(fn_len, 8) # filename len 1228 self.assertEqual(ex_total_len, 20) # size of extra records 1229 self.assertEqual(ex_id, 1) # Zip64 extra record 1230 self.assertEqual(ex_len, 16) # 16 bytes of data 1231 self.assertEqual(ex_usize, 0) # uncompressed size - 0 to defer to data descriptor 1232 self.assertEqual(ex_csize, 0) # compressed size - 0 to defer to data descriptor 1233 self.assertEqual(dd_header, b"PK\07\x08") # data descriptor 1234 self.assertEqual(dd_usize, file_size) # file size (8 bytes because zip64) 1235 self.assertEqual(dd_csize, file_size) # compressed size (8 bytes because zip64) 1236 self.assertEqual(cd_sig, b"PK\x01\x02") # ensure the central directory header is next 1237 1238 1239@requires_zlib() 1240class DeflateTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1241 unittest.TestCase): 1242 compression = zipfile.ZIP_DEFLATED 1243 1244@requires_bz2() 1245class Bzip2TestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1246 unittest.TestCase): 1247 compression = zipfile.ZIP_BZIP2 1248 1249@requires_lzma() 1250class LzmaTestZip64InSmallFiles(AbstractTestZip64InSmallFiles, 1251 unittest.TestCase): 1252 compression = zipfile.ZIP_LZMA 1253 1254 1255class AbstractWriterTests: 1256 1257 def tearDown(self): 1258 unlink(TESTFN2) 1259 1260 def test_close_after_close(self): 1261 data = b'content' 1262 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1263 w = zipf.open('test', 'w') 1264 w.write(data) 1265 w.close() 1266 self.assertTrue(w.closed) 1267 w.close() 1268 self.assertTrue(w.closed) 1269 self.assertEqual(zipf.read('test'), data) 1270 1271 def test_write_after_close(self): 1272 data = b'content' 1273 with zipfile.ZipFile(TESTFN2, "w", self.compression) as zipf: 1274 w = zipf.open('test', 'w') 1275 w.write(data) 1276 w.close() 1277 self.assertTrue(w.closed) 1278 self.assertRaises(ValueError, w.write, b'') 1279 self.assertEqual(zipf.read('test'), data) 1280 1281 def test_issue44439(self): 1282 q = array.array('Q', [1, 2, 3, 4, 5]) 1283 LENGTH = len(q) * q.itemsize 1284 with zipfile.ZipFile(io.BytesIO(), 'w', self.compression) as zip: 1285 with zip.open('data', 'w') as data: 1286 self.assertEqual(data.write(q), LENGTH) 1287 self.assertEqual(zip.getinfo('data').file_size, LENGTH) 1288 1289class StoredWriterTests(AbstractWriterTests, unittest.TestCase): 1290 compression = zipfile.ZIP_STORED 1291 1292@requires_zlib() 1293class DeflateWriterTests(AbstractWriterTests, unittest.TestCase): 1294 compression = zipfile.ZIP_DEFLATED 1295 1296@requires_bz2() 1297class Bzip2WriterTests(AbstractWriterTests, unittest.TestCase): 1298 compression = zipfile.ZIP_BZIP2 1299 1300@requires_lzma() 1301class LzmaWriterTests(AbstractWriterTests, unittest.TestCase): 1302 compression = zipfile.ZIP_LZMA 1303 1304 1305class PyZipFileTests(unittest.TestCase): 1306 def assertCompiledIn(self, name, namelist): 1307 if name + 'o' not in namelist: 1308 self.assertIn(name + 'c', namelist) 1309 1310 def requiresWriteAccess(self, path): 1311 # effective_ids unavailable on windows 1312 if not os.access(path, os.W_OK, 1313 effective_ids=os.access in os.supports_effective_ids): 1314 self.skipTest('requires write access to the installed location') 1315 filename = os.path.join(path, 'test_zipfile.try') 1316 try: 1317 fd = os.open(filename, os.O_WRONLY | os.O_CREAT) 1318 os.close(fd) 1319 except Exception: 1320 self.skipTest('requires write access to the installed location') 1321 unlink(filename) 1322 1323 def test_write_pyfile(self): 1324 self.requiresWriteAccess(os.path.dirname(__file__)) 1325 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1326 fn = __file__ 1327 if fn.endswith('.pyc'): 1328 path_split = fn.split(os.sep) 1329 if os.altsep is not None: 1330 path_split.extend(fn.split(os.altsep)) 1331 if '__pycache__' in path_split: 1332 fn = importlib.util.source_from_cache(fn) 1333 else: 1334 fn = fn[:-1] 1335 1336 zipfp.writepy(fn) 1337 1338 bn = os.path.basename(fn) 1339 self.assertNotIn(bn, zipfp.namelist()) 1340 self.assertCompiledIn(bn, zipfp.namelist()) 1341 1342 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1343 fn = __file__ 1344 if fn.endswith('.pyc'): 1345 fn = fn[:-1] 1346 1347 zipfp.writepy(fn, "testpackage") 1348 1349 bn = "%s/%s" % ("testpackage", os.path.basename(fn)) 1350 self.assertNotIn(bn, zipfp.namelist()) 1351 self.assertCompiledIn(bn, zipfp.namelist()) 1352 1353 def test_write_python_package(self): 1354 import email 1355 packagedir = os.path.dirname(email.__file__) 1356 self.requiresWriteAccess(packagedir) 1357 1358 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1359 zipfp.writepy(packagedir) 1360 1361 # Check for a couple of modules at different levels of the 1362 # hierarchy 1363 names = zipfp.namelist() 1364 self.assertCompiledIn('email/__init__.py', names) 1365 self.assertCompiledIn('email/mime/text.py', names) 1366 1367 def test_write_filtered_python_package(self): 1368 import test 1369 packagedir = os.path.dirname(test.__file__) 1370 self.requiresWriteAccess(packagedir) 1371 1372 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1373 1374 # first make sure that the test folder gives error messages 1375 # (on the badsyntax_... files) 1376 with captured_stdout() as reportSIO: 1377 zipfp.writepy(packagedir) 1378 reportStr = reportSIO.getvalue() 1379 self.assertTrue('SyntaxError' in reportStr) 1380 1381 # then check that the filter works on the whole package 1382 with captured_stdout() as reportSIO: 1383 zipfp.writepy(packagedir, filterfunc=lambda whatever: False) 1384 reportStr = reportSIO.getvalue() 1385 self.assertTrue('SyntaxError' not in reportStr) 1386 1387 # then check that the filter works on individual files 1388 def filter(path): 1389 return not os.path.basename(path).startswith("bad") 1390 with captured_stdout() as reportSIO, self.assertWarns(UserWarning): 1391 zipfp.writepy(packagedir, filterfunc=filter) 1392 reportStr = reportSIO.getvalue() 1393 if reportStr: 1394 print(reportStr) 1395 self.assertTrue('SyntaxError' not in reportStr) 1396 1397 def test_write_with_optimization(self): 1398 import email 1399 packagedir = os.path.dirname(email.__file__) 1400 self.requiresWriteAccess(packagedir) 1401 optlevel = 1 if __debug__ else 0 1402 ext = '.pyc' 1403 1404 with TemporaryFile() as t, \ 1405 zipfile.PyZipFile(t, "w", optimize=optlevel) as zipfp: 1406 zipfp.writepy(packagedir) 1407 1408 names = zipfp.namelist() 1409 self.assertIn('email/__init__' + ext, names) 1410 self.assertIn('email/mime/text' + ext, names) 1411 1412 def test_write_python_directory(self): 1413 os.mkdir(TESTFN2) 1414 try: 1415 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1416 fp.write("print(42)\n") 1417 1418 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1419 fp.write("print(42 * 42)\n") 1420 1421 with open(os.path.join(TESTFN2, "mod2.txt"), "w", encoding='utf-8') as fp: 1422 fp.write("bla bla bla\n") 1423 1424 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1425 zipfp.writepy(TESTFN2) 1426 1427 names = zipfp.namelist() 1428 self.assertCompiledIn('mod1.py', names) 1429 self.assertCompiledIn('mod2.py', names) 1430 self.assertNotIn('mod2.txt', names) 1431 1432 finally: 1433 rmtree(TESTFN2) 1434 1435 def test_write_python_directory_filtered(self): 1436 os.mkdir(TESTFN2) 1437 try: 1438 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1439 fp.write("print(42)\n") 1440 1441 with open(os.path.join(TESTFN2, "mod2.py"), "w", encoding='utf-8') as fp: 1442 fp.write("print(42 * 42)\n") 1443 1444 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1445 zipfp.writepy(TESTFN2, filterfunc=lambda fn: 1446 not fn.endswith('mod2.py')) 1447 1448 names = zipfp.namelist() 1449 self.assertCompiledIn('mod1.py', names) 1450 self.assertNotIn('mod2.py', names) 1451 1452 finally: 1453 rmtree(TESTFN2) 1454 1455 def test_write_non_pyfile(self): 1456 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1457 with open(TESTFN, 'w', encoding='utf-8') as f: 1458 f.write('most definitely not a python file') 1459 self.assertRaises(RuntimeError, zipfp.writepy, TESTFN) 1460 unlink(TESTFN) 1461 1462 def test_write_pyfile_bad_syntax(self): 1463 os.mkdir(TESTFN2) 1464 try: 1465 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1466 fp.write("Bad syntax in python file\n") 1467 1468 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1469 # syntax errors are printed to stdout 1470 with captured_stdout() as s: 1471 zipfp.writepy(os.path.join(TESTFN2, "mod1.py")) 1472 1473 self.assertIn("SyntaxError", s.getvalue()) 1474 1475 # as it will not have compiled the python file, it will 1476 # include the .py file not .pyc 1477 names = zipfp.namelist() 1478 self.assertIn('mod1.py', names) 1479 self.assertNotIn('mod1.pyc', names) 1480 1481 finally: 1482 rmtree(TESTFN2) 1483 1484 def test_write_pathlike(self): 1485 os.mkdir(TESTFN2) 1486 try: 1487 with open(os.path.join(TESTFN2, "mod1.py"), "w", encoding='utf-8') as fp: 1488 fp.write("print(42)\n") 1489 1490 with TemporaryFile() as t, zipfile.PyZipFile(t, "w") as zipfp: 1491 zipfp.writepy(pathlib.Path(TESTFN2) / "mod1.py") 1492 names = zipfp.namelist() 1493 self.assertCompiledIn('mod1.py', names) 1494 finally: 1495 rmtree(TESTFN2) 1496 1497 1498class ExtractTests(unittest.TestCase): 1499 1500 def make_test_file(self): 1501 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1502 for fpath, fdata in SMALL_TEST_DATA: 1503 zipfp.writestr(fpath, fdata) 1504 1505 def test_extract(self): 1506 with temp_cwd(): 1507 self.make_test_file() 1508 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1509 for fpath, fdata in SMALL_TEST_DATA: 1510 writtenfile = zipfp.extract(fpath) 1511 1512 # make sure it was written to the right place 1513 correctfile = os.path.join(os.getcwd(), fpath) 1514 correctfile = os.path.normpath(correctfile) 1515 1516 self.assertEqual(writtenfile, correctfile) 1517 1518 # make sure correct data is in correct file 1519 with open(writtenfile, "rb") as f: 1520 self.assertEqual(fdata.encode(), f.read()) 1521 1522 unlink(writtenfile) 1523 1524 def _test_extract_with_target(self, target): 1525 self.make_test_file() 1526 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1527 for fpath, fdata in SMALL_TEST_DATA: 1528 writtenfile = zipfp.extract(fpath, target) 1529 1530 # make sure it was written to the right place 1531 correctfile = os.path.join(target, fpath) 1532 correctfile = os.path.normpath(correctfile) 1533 self.assertTrue(os.path.samefile(writtenfile, correctfile), (writtenfile, target)) 1534 1535 # make sure correct data is in correct file 1536 with open(writtenfile, "rb") as f: 1537 self.assertEqual(fdata.encode(), f.read()) 1538 1539 unlink(writtenfile) 1540 1541 unlink(TESTFN2) 1542 1543 def test_extract_with_target(self): 1544 with temp_dir() as extdir: 1545 self._test_extract_with_target(extdir) 1546 1547 def test_extract_with_target_pathlike(self): 1548 with temp_dir() as extdir: 1549 self._test_extract_with_target(pathlib.Path(extdir)) 1550 1551 def test_extract_all(self): 1552 with temp_cwd(): 1553 self.make_test_file() 1554 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1555 zipfp.extractall() 1556 for fpath, fdata in SMALL_TEST_DATA: 1557 outfile = os.path.join(os.getcwd(), fpath) 1558 1559 with open(outfile, "rb") as f: 1560 self.assertEqual(fdata.encode(), f.read()) 1561 1562 unlink(outfile) 1563 1564 def _test_extract_all_with_target(self, target): 1565 self.make_test_file() 1566 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1567 zipfp.extractall(target) 1568 for fpath, fdata in SMALL_TEST_DATA: 1569 outfile = os.path.join(target, fpath) 1570 1571 with open(outfile, "rb") as f: 1572 self.assertEqual(fdata.encode(), f.read()) 1573 1574 unlink(outfile) 1575 1576 unlink(TESTFN2) 1577 1578 def test_extract_all_with_target(self): 1579 with temp_dir() as extdir: 1580 self._test_extract_all_with_target(extdir) 1581 1582 def test_extract_all_with_target_pathlike(self): 1583 with temp_dir() as extdir: 1584 self._test_extract_all_with_target(pathlib.Path(extdir)) 1585 1586 def check_file(self, filename, content): 1587 self.assertTrue(os.path.isfile(filename)) 1588 with open(filename, 'rb') as f: 1589 self.assertEqual(f.read(), content) 1590 1591 def test_sanitize_windows_name(self): 1592 san = zipfile.ZipFile._sanitize_windows_name 1593 # Passing pathsep in allows this test to work regardless of platform. 1594 self.assertEqual(san(r',,?,C:,foo,bar/z', ','), r'_,C_,foo,bar/z') 1595 self.assertEqual(san(r'a\b,c<d>e|f"g?h*i', ','), r'a\b,c_d_e_f_g_h_i') 1596 self.assertEqual(san('../../foo../../ba..r', '/'), r'foo/ba..r') 1597 1598 def test_extract_hackers_arcnames_common_cases(self): 1599 common_hacknames = [ 1600 ('../foo/bar', 'foo/bar'), 1601 ('foo/../bar', 'foo/bar'), 1602 ('foo/../../bar', 'foo/bar'), 1603 ('foo/bar/..', 'foo/bar'), 1604 ('./../foo/bar', 'foo/bar'), 1605 ('/foo/bar', 'foo/bar'), 1606 ('/foo/../bar', 'foo/bar'), 1607 ('/foo/../../bar', 'foo/bar'), 1608 ] 1609 self._test_extract_hackers_arcnames(common_hacknames) 1610 1611 @unittest.skipIf(os.path.sep != '\\', 'Requires \\ as path separator.') 1612 def test_extract_hackers_arcnames_windows_only(self): 1613 """Test combination of path fixing and windows name sanitization.""" 1614 windows_hacknames = [ 1615 (r'..\foo\bar', 'foo/bar'), 1616 (r'..\/foo\/bar', 'foo/bar'), 1617 (r'foo/\..\/bar', 'foo/bar'), 1618 (r'foo\/../\bar', 'foo/bar'), 1619 (r'C:foo/bar', 'foo/bar'), 1620 (r'C:/foo/bar', 'foo/bar'), 1621 (r'C://foo/bar', 'foo/bar'), 1622 (r'C:\foo\bar', 'foo/bar'), 1623 (r'//conky/mountpoint/foo/bar', 'foo/bar'), 1624 (r'\\conky\mountpoint\foo\bar', 'foo/bar'), 1625 (r'///conky/mountpoint/foo/bar', 'mountpoint/foo/bar'), 1626 (r'\\\conky\mountpoint\foo\bar', 'mountpoint/foo/bar'), 1627 (r'//conky//mountpoint/foo/bar', 'mountpoint/foo/bar'), 1628 (r'\\conky\\mountpoint\foo\bar', 'mountpoint/foo/bar'), 1629 (r'//?/C:/foo/bar', 'foo/bar'), 1630 (r'\\?\C:\foo\bar', 'foo/bar'), 1631 (r'C:/../C:/foo/bar', 'C_/foo/bar'), 1632 (r'a:b\c<d>e|f"g?h*i', 'b/c_d_e_f_g_h_i'), 1633 ('../../foo../../ba..r', 'foo/ba..r'), 1634 ] 1635 self._test_extract_hackers_arcnames(windows_hacknames) 1636 1637 @unittest.skipIf(os.path.sep != '/', r'Requires / as path separator.') 1638 def test_extract_hackers_arcnames_posix_only(self): 1639 posix_hacknames = [ 1640 ('//foo/bar', 'foo/bar'), 1641 ('../../foo../../ba..r', 'foo../ba..r'), 1642 (r'foo/..\bar', r'foo/..\bar'), 1643 ] 1644 self._test_extract_hackers_arcnames(posix_hacknames) 1645 1646 def _test_extract_hackers_arcnames(self, hacknames): 1647 for arcname, fixedname in hacknames: 1648 content = b'foobar' + arcname.encode() 1649 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipfp: 1650 zinfo = zipfile.ZipInfo() 1651 # preserve backslashes 1652 zinfo.filename = arcname 1653 zinfo.external_attr = 0o600 << 16 1654 zipfp.writestr(zinfo, content) 1655 1656 arcname = arcname.replace(os.sep, "/") 1657 targetpath = os.path.join('target', 'subdir', 'subsub') 1658 correctfile = os.path.join(targetpath, *fixedname.split('/')) 1659 1660 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1661 writtenfile = zipfp.extract(arcname, targetpath) 1662 self.assertEqual(writtenfile, correctfile, 1663 msg='extract %r: %r != %r' % 1664 (arcname, writtenfile, correctfile)) 1665 self.check_file(correctfile, content) 1666 rmtree('target') 1667 1668 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1669 zipfp.extractall(targetpath) 1670 self.check_file(correctfile, content) 1671 rmtree('target') 1672 1673 correctfile = os.path.join(os.getcwd(), *fixedname.split('/')) 1674 1675 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1676 writtenfile = zipfp.extract(arcname) 1677 self.assertEqual(writtenfile, correctfile, 1678 msg="extract %r" % arcname) 1679 self.check_file(correctfile, content) 1680 rmtree(fixedname.split('/')[0]) 1681 1682 with zipfile.ZipFile(TESTFN2, 'r') as zipfp: 1683 zipfp.extractall() 1684 self.check_file(correctfile, content) 1685 rmtree(fixedname.split('/')[0]) 1686 1687 unlink(TESTFN2) 1688 1689 1690class OtherTests(unittest.TestCase): 1691 def test_open_via_zip_info(self): 1692 # Create the ZIP archive 1693 with zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) as zipfp: 1694 zipfp.writestr("name", "foo") 1695 with self.assertWarns(UserWarning): 1696 zipfp.writestr("name", "bar") 1697 self.assertEqual(zipfp.namelist(), ["name"] * 2) 1698 1699 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1700 infos = zipfp.infolist() 1701 data = b"" 1702 for info in infos: 1703 with zipfp.open(info) as zipopen: 1704 data += zipopen.read() 1705 self.assertIn(data, {b"foobar", b"barfoo"}) 1706 data = b"" 1707 for info in infos: 1708 data += zipfp.read(info) 1709 self.assertIn(data, {b"foobar", b"barfoo"}) 1710 1711 def test_writestr_extended_local_header_issue1202(self): 1712 with zipfile.ZipFile(TESTFN2, 'w') as orig_zip: 1713 for data in 'abcdefghijklmnop': 1714 zinfo = zipfile.ZipInfo(data) 1715 zinfo.flag_bits |= zipfile._MASK_USE_DATA_DESCRIPTOR # Include an extended local header. 1716 orig_zip.writestr(zinfo, data) 1717 1718 def test_close(self): 1719 """Check that the zipfile is closed after the 'with' block.""" 1720 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1721 for fpath, fdata in SMALL_TEST_DATA: 1722 zipfp.writestr(fpath, fdata) 1723 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1724 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1725 1726 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1727 self.assertIsNotNone(zipfp.fp, 'zipfp is not open') 1728 self.assertIsNone(zipfp.fp, 'zipfp is not closed') 1729 1730 def test_close_on_exception(self): 1731 """Check that the zipfile is closed if an exception is raised in the 1732 'with' block.""" 1733 with zipfile.ZipFile(TESTFN2, "w") as zipfp: 1734 for fpath, fdata in SMALL_TEST_DATA: 1735 zipfp.writestr(fpath, fdata) 1736 1737 try: 1738 with zipfile.ZipFile(TESTFN2, "r") as zipfp2: 1739 raise zipfile.BadZipFile() 1740 except zipfile.BadZipFile: 1741 self.assertIsNone(zipfp2.fp, 'zipfp is not closed') 1742 1743 def test_unsupported_version(self): 1744 # File has an extract_version of 120 1745 data = (b'PK\x03\x04x\x00\x00\x00\x00\x00!p\xa1@\x00\x00\x00\x00\x00\x00' 1746 b'\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00xPK\x01\x02x\x03x\x00\x00\x00\x00' 1747 b'\x00!p\xa1@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00' 1748 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00\x00xPK\x05\x06' 1749 b'\x00\x00\x00\x00\x01\x00\x01\x00/\x00\x00\x00\x1f\x00\x00\x00\x00\x00') 1750 1751 self.assertRaises(NotImplementedError, zipfile.ZipFile, 1752 io.BytesIO(data), 'r') 1753 1754 @requires_zlib() 1755 def test_read_unicode_filenames(self): 1756 # bug #10801 1757 fname = findfile('zip_cp437_header.zip') 1758 with zipfile.ZipFile(fname) as zipfp: 1759 for name in zipfp.namelist(): 1760 zipfp.open(name).close() 1761 1762 def test_write_unicode_filenames(self): 1763 with zipfile.ZipFile(TESTFN, "w") as zf: 1764 zf.writestr("foo.txt", "Test for unicode filename") 1765 zf.writestr("\xf6.txt", "Test for unicode filename") 1766 self.assertIsInstance(zf.infolist()[0].filename, str) 1767 1768 with zipfile.ZipFile(TESTFN, "r") as zf: 1769 self.assertEqual(zf.filelist[0].filename, "foo.txt") 1770 self.assertEqual(zf.filelist[1].filename, "\xf6.txt") 1771 1772 def test_read_after_write_unicode_filenames(self): 1773 with zipfile.ZipFile(TESTFN2, 'w') as zipfp: 1774 zipfp.writestr('приклад', b'sample') 1775 self.assertEqual(zipfp.read('приклад'), b'sample') 1776 1777 def test_exclusive_create_zip_file(self): 1778 """Test exclusive creating a new zipfile.""" 1779 unlink(TESTFN2) 1780 filename = 'testfile.txt' 1781 content = b'hello, world. this is some content.' 1782 with zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) as zipfp: 1783 zipfp.writestr(filename, content) 1784 with self.assertRaises(FileExistsError): 1785 zipfile.ZipFile(TESTFN2, "x", zipfile.ZIP_STORED) 1786 with zipfile.ZipFile(TESTFN2, "r") as zipfp: 1787 self.assertEqual(zipfp.namelist(), [filename]) 1788 self.assertEqual(zipfp.read(filename), content) 1789 1790 def test_create_non_existent_file_for_append(self): 1791 if os.path.exists(TESTFN): 1792 os.unlink(TESTFN) 1793 1794 filename = 'testfile.txt' 1795 content = b'hello, world. this is some content.' 1796 1797 try: 1798 with zipfile.ZipFile(TESTFN, 'a') as zf: 1799 zf.writestr(filename, content) 1800 except OSError: 1801 self.fail('Could not append data to a non-existent zip file.') 1802 1803 self.assertTrue(os.path.exists(TESTFN)) 1804 1805 with zipfile.ZipFile(TESTFN, 'r') as zf: 1806 self.assertEqual(zf.read(filename), content) 1807 1808 def test_close_erroneous_file(self): 1809 # This test checks that the ZipFile constructor closes the file object 1810 # it opens if there's an error in the file. If it doesn't, the 1811 # traceback holds a reference to the ZipFile object and, indirectly, 1812 # the file object. 1813 # On Windows, this causes the os.unlink() call to fail because the 1814 # underlying file is still open. This is SF bug #412214. 1815 # 1816 with open(TESTFN, "w", encoding="utf-8") as fp: 1817 fp.write("this is not a legal zip file\n") 1818 try: 1819 zf = zipfile.ZipFile(TESTFN) 1820 except zipfile.BadZipFile: 1821 pass 1822 1823 def test_is_zip_erroneous_file(self): 1824 """Check that is_zipfile() correctly identifies non-zip files.""" 1825 # - passing a filename 1826 with open(TESTFN, "w", encoding='utf-8') as fp: 1827 fp.write("this is not a legal zip file\n") 1828 self.assertFalse(zipfile.is_zipfile(TESTFN)) 1829 # - passing a path-like object 1830 self.assertFalse(zipfile.is_zipfile(pathlib.Path(TESTFN))) 1831 # - passing a file object 1832 with open(TESTFN, "rb") as fp: 1833 self.assertFalse(zipfile.is_zipfile(fp)) 1834 # - passing a file-like object 1835 fp = io.BytesIO() 1836 fp.write(b"this is not a legal zip file\n") 1837 self.assertFalse(zipfile.is_zipfile(fp)) 1838 fp.seek(0, 0) 1839 self.assertFalse(zipfile.is_zipfile(fp)) 1840 1841 def test_damaged_zipfile(self): 1842 """Check that zipfiles with missing bytes at the end raise BadZipFile.""" 1843 # - Create a valid zip file 1844 fp = io.BytesIO() 1845 with zipfile.ZipFile(fp, mode="w") as zipf: 1846 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1847 zipfiledata = fp.getvalue() 1848 1849 # - Now create copies of it missing the last N bytes and make sure 1850 # a BadZipFile exception is raised when we try to open it 1851 for N in range(len(zipfiledata)): 1852 fp = io.BytesIO(zipfiledata[:N]) 1853 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, fp) 1854 1855 def test_is_zip_valid_file(self): 1856 """Check that is_zipfile() correctly identifies zip files.""" 1857 # - passing a filename 1858 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1859 zipf.writestr("foo.txt", b"O, for a Muse of Fire!") 1860 1861 self.assertTrue(zipfile.is_zipfile(TESTFN)) 1862 # - passing a file object 1863 with open(TESTFN, "rb") as fp: 1864 self.assertTrue(zipfile.is_zipfile(fp)) 1865 fp.seek(0, 0) 1866 zip_contents = fp.read() 1867 # - passing a file-like object 1868 fp = io.BytesIO() 1869 fp.write(zip_contents) 1870 self.assertTrue(zipfile.is_zipfile(fp)) 1871 fp.seek(0, 0) 1872 self.assertTrue(zipfile.is_zipfile(fp)) 1873 1874 def test_non_existent_file_raises_OSError(self): 1875 # make sure we don't raise an AttributeError when a partially-constructed 1876 # ZipFile instance is finalized; this tests for regression on SF tracker 1877 # bug #403871. 1878 1879 # The bug we're testing for caused an AttributeError to be raised 1880 # when a ZipFile instance was created for a file that did not 1881 # exist; the .fp member was not initialized but was needed by the 1882 # __del__() method. Since the AttributeError is in the __del__(), 1883 # it is ignored, but the user should be sufficiently annoyed by 1884 # the message on the output that regression will be noticed 1885 # quickly. 1886 self.assertRaises(OSError, zipfile.ZipFile, TESTFN) 1887 1888 def test_empty_file_raises_BadZipFile(self): 1889 f = open(TESTFN, 'w', encoding='utf-8') 1890 f.close() 1891 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1892 1893 with open(TESTFN, 'w', encoding='utf-8') as fp: 1894 fp.write("short file") 1895 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN) 1896 1897 def test_negative_central_directory_offset_raises_BadZipFile(self): 1898 # Zip file containing an empty EOCD record 1899 buffer = bytearray(b'PK\x05\x06' + b'\0'*18) 1900 1901 # Set the size of the central directory bytes to become 1, 1902 # causing the central directory offset to become negative 1903 for dirsize in 1, 2**32-1: 1904 buffer[12:16] = struct.pack('<L', dirsize) 1905 f = io.BytesIO(buffer) 1906 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, f) 1907 1908 def test_closed_zip_raises_ValueError(self): 1909 """Verify that testzip() doesn't swallow inappropriate exceptions.""" 1910 data = io.BytesIO() 1911 with zipfile.ZipFile(data, mode="w") as zipf: 1912 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1913 1914 # This is correct; calling .read on a closed ZipFile should raise 1915 # a ValueError, and so should calling .testzip. An earlier 1916 # version of .testzip would swallow this exception (and any other) 1917 # and report that the first file in the archive was corrupt. 1918 self.assertRaises(ValueError, zipf.read, "foo.txt") 1919 self.assertRaises(ValueError, zipf.open, "foo.txt") 1920 self.assertRaises(ValueError, zipf.testzip) 1921 self.assertRaises(ValueError, zipf.writestr, "bogus.txt", "bogus") 1922 with open(TESTFN, 'w', encoding='utf-8') as f: 1923 f.write('zipfile test data') 1924 self.assertRaises(ValueError, zipf.write, TESTFN) 1925 1926 def test_bad_constructor_mode(self): 1927 """Check that bad modes passed to ZipFile constructor are caught.""" 1928 self.assertRaises(ValueError, zipfile.ZipFile, TESTFN, "q") 1929 1930 def test_bad_open_mode(self): 1931 """Check that bad modes passed to ZipFile.open are caught.""" 1932 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1933 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1934 1935 with zipfile.ZipFile(TESTFN, mode="r") as zipf: 1936 # read the data to make sure the file is there 1937 zipf.read("foo.txt") 1938 self.assertRaises(ValueError, zipf.open, "foo.txt", "q") 1939 # universal newlines support is removed 1940 self.assertRaises(ValueError, zipf.open, "foo.txt", "U") 1941 self.assertRaises(ValueError, zipf.open, "foo.txt", "rU") 1942 1943 def test_read0(self): 1944 """Check that calling read(0) on a ZipExtFile object returns an empty 1945 string and doesn't advance file pointer.""" 1946 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1947 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1948 # read the data to make sure the file is there 1949 with zipf.open("foo.txt") as f: 1950 for i in range(FIXEDTEST_SIZE): 1951 self.assertEqual(f.read(0), b'') 1952 1953 self.assertEqual(f.read(), b"O, for a Muse of Fire!") 1954 1955 def test_open_non_existent_item(self): 1956 """Check that attempting to call open() for an item that doesn't 1957 exist in the archive raises a RuntimeError.""" 1958 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1959 self.assertRaises(KeyError, zipf.open, "foo.txt", "r") 1960 1961 def test_bad_compression_mode(self): 1962 """Check that bad compression methods passed to ZipFile.open are 1963 caught.""" 1964 self.assertRaises(NotImplementedError, zipfile.ZipFile, TESTFN, "w", -1) 1965 1966 def test_unsupported_compression(self): 1967 # data is declared as shrunk, but actually deflated 1968 data = (b'PK\x03\x04.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00' 1969 b'\x00\x02\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00x\x03\x00PK\x01' 1970 b'\x02.\x03.\x00\x00\x00\x01\x00\xe4C\xa1@\x00\x00\x00\x00\x02\x00\x00' 1971 b'\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 1972 b'\x80\x01\x00\x00\x00\x00xPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00' 1973 b'/\x00\x00\x00!\x00\x00\x00\x00\x00') 1974 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 1975 self.assertRaises(NotImplementedError, zipf.open, 'x') 1976 1977 def test_null_byte_in_filename(self): 1978 """Check that a filename containing a null byte is properly 1979 terminated.""" 1980 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1981 zipf.writestr("foo.txt\x00qqq", b"O, for a Muse of Fire!") 1982 self.assertEqual(zipf.namelist(), ['foo.txt']) 1983 1984 def test_struct_sizes(self): 1985 """Check that ZIP internal structure sizes are calculated correctly.""" 1986 self.assertEqual(zipfile.sizeEndCentDir, 22) 1987 self.assertEqual(zipfile.sizeCentralDir, 46) 1988 self.assertEqual(zipfile.sizeEndCentDir64, 56) 1989 self.assertEqual(zipfile.sizeEndCentDir64Locator, 20) 1990 1991 def test_comments(self): 1992 """Check that comments on the archive are handled properly.""" 1993 1994 # check default comment is empty 1995 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 1996 self.assertEqual(zipf.comment, b'') 1997 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 1998 1999 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2000 self.assertEqual(zipfr.comment, b'') 2001 2002 # check a simple short comment 2003 comment = b'Bravely taking to his feet, he beat a very brave retreat.' 2004 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2005 zipf.comment = comment 2006 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2007 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2008 self.assertEqual(zipf.comment, comment) 2009 2010 # check a comment of max length 2011 comment2 = ''.join(['%d' % (i**3 % 10) for i in range((1 << 16)-1)]) 2012 comment2 = comment2.encode("ascii") 2013 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2014 zipf.comment = comment2 2015 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2016 2017 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2018 self.assertEqual(zipfr.comment, comment2) 2019 2020 # check a comment that is too long is truncated 2021 with zipfile.ZipFile(TESTFN, mode="w") as zipf: 2022 with self.assertWarns(UserWarning): 2023 zipf.comment = comment2 + b'oops' 2024 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2025 with zipfile.ZipFile(TESTFN, mode="r") as zipfr: 2026 self.assertEqual(zipfr.comment, comment2) 2027 2028 # check that comments are correctly modified in append mode 2029 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 2030 zipf.comment = b"original comment" 2031 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2032 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 2033 zipf.comment = b"an updated comment" 2034 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 2035 self.assertEqual(zipf.comment, b"an updated comment") 2036 2037 # check that comments are correctly shortened in append mode 2038 # and the file is indeed truncated 2039 with zipfile.ZipFile(TESTFN,mode="w") as zipf: 2040 zipf.comment = b"original comment that's longer" 2041 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2042 original_zip_size = os.path.getsize(TESTFN) 2043 with zipfile.ZipFile(TESTFN,mode="a") as zipf: 2044 zipf.comment = b"shorter comment" 2045 self.assertTrue(original_zip_size > os.path.getsize(TESTFN)) 2046 with zipfile.ZipFile(TESTFN,mode="r") as zipf: 2047 self.assertEqual(zipf.comment, b"shorter comment") 2048 2049 def test_unicode_comment(self): 2050 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 2051 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2052 with self.assertRaises(TypeError): 2053 zipf.comment = "this is an error" 2054 2055 def test_change_comment_in_empty_archive(self): 2056 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 2057 self.assertFalse(zipf.filelist) 2058 zipf.comment = b"this is a comment" 2059 with zipfile.ZipFile(TESTFN, "r") as zipf: 2060 self.assertEqual(zipf.comment, b"this is a comment") 2061 2062 def test_change_comment_in_nonempty_archive(self): 2063 with zipfile.ZipFile(TESTFN, "w", zipfile.ZIP_STORED) as zipf: 2064 zipf.writestr("foo.txt", "O, for a Muse of Fire!") 2065 with zipfile.ZipFile(TESTFN, "a", zipfile.ZIP_STORED) as zipf: 2066 self.assertTrue(zipf.filelist) 2067 zipf.comment = b"this is a comment" 2068 with zipfile.ZipFile(TESTFN, "r") as zipf: 2069 self.assertEqual(zipf.comment, b"this is a comment") 2070 2071 def test_empty_zipfile(self): 2072 # Check that creating a file in 'w' or 'a' mode and closing without 2073 # adding any files to the archives creates a valid empty ZIP file 2074 zipf = zipfile.ZipFile(TESTFN, mode="w") 2075 zipf.close() 2076 try: 2077 zipf = zipfile.ZipFile(TESTFN, mode="r") 2078 except zipfile.BadZipFile: 2079 self.fail("Unable to create empty ZIP file in 'w' mode") 2080 2081 zipf = zipfile.ZipFile(TESTFN, mode="a") 2082 zipf.close() 2083 try: 2084 zipf = zipfile.ZipFile(TESTFN, mode="r") 2085 except: 2086 self.fail("Unable to create empty ZIP file in 'a' mode") 2087 2088 def test_open_empty_file(self): 2089 # Issue 1710703: Check that opening a file with less than 22 bytes 2090 # raises a BadZipFile exception (rather than the previously unhelpful 2091 # OSError) 2092 f = open(TESTFN, 'w', encoding='utf-8') 2093 f.close() 2094 self.assertRaises(zipfile.BadZipFile, zipfile.ZipFile, TESTFN, 'r') 2095 2096 def test_create_zipinfo_before_1980(self): 2097 self.assertRaises(ValueError, 2098 zipfile.ZipInfo, 'seventies', (1979, 1, 1, 0, 0, 0)) 2099 2100 def test_create_empty_zipinfo_repr(self): 2101 """Before bpo-26185, repr() on empty ZipInfo object was failing.""" 2102 zi = zipfile.ZipInfo(filename="empty") 2103 self.assertEqual(repr(zi), "<ZipInfo filename='empty' file_size=0>") 2104 2105 def test_create_empty_zipinfo_default_attributes(self): 2106 """Ensure all required attributes are set.""" 2107 zi = zipfile.ZipInfo() 2108 self.assertEqual(zi.orig_filename, "NoName") 2109 self.assertEqual(zi.filename, "NoName") 2110 self.assertEqual(zi.date_time, (1980, 1, 1, 0, 0, 0)) 2111 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2112 self.assertEqual(zi.comment, b"") 2113 self.assertEqual(zi.extra, b"") 2114 self.assertIn(zi.create_system, (0, 3)) 2115 self.assertEqual(zi.create_version, zipfile.DEFAULT_VERSION) 2116 self.assertEqual(zi.extract_version, zipfile.DEFAULT_VERSION) 2117 self.assertEqual(zi.reserved, 0) 2118 self.assertEqual(zi.flag_bits, 0) 2119 self.assertEqual(zi.volume, 0) 2120 self.assertEqual(zi.internal_attr, 0) 2121 self.assertEqual(zi.external_attr, 0) 2122 2123 # Before bpo-26185, both were missing 2124 self.assertEqual(zi.file_size, 0) 2125 self.assertEqual(zi.compress_size, 0) 2126 2127 def test_zipfile_with_short_extra_field(self): 2128 """If an extra field in the header is less than 4 bytes, skip it.""" 2129 zipdata = ( 2130 b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x93\x9b\xad@\x8b\x9e' 2131 b'\xd9\xd3\x01\x00\x00\x00\x01\x00\x00\x00\x03\x00\x03\x00ab' 2132 b'c\x00\x00\x00APK\x01\x02\x14\x03\x14\x00\x00\x00\x00' 2133 b'\x00\x93\x9b\xad@\x8b\x9e\xd9\xd3\x01\x00\x00\x00\x01\x00\x00' 2134 b'\x00\x03\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00' 2135 b'\x00\x00\x00abc\x00\x00PK\x05\x06\x00\x00\x00\x00' 2136 b'\x01\x00\x01\x003\x00\x00\x00%\x00\x00\x00\x00\x00' 2137 ) 2138 with zipfile.ZipFile(io.BytesIO(zipdata), 'r') as zipf: 2139 # testzip returns the name of the first corrupt file, or None 2140 self.assertIsNone(zipf.testzip()) 2141 2142 def test_open_conflicting_handles(self): 2143 # It's only possible to open one writable file handle at a time 2144 msg1 = b"It's fun to charter an accountant!" 2145 msg2 = b"And sail the wide accountant sea" 2146 msg3 = b"To find, explore the funds offshore" 2147 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_STORED) as zipf: 2148 with zipf.open('foo', mode='w') as w2: 2149 w2.write(msg1) 2150 with zipf.open('bar', mode='w') as w1: 2151 with self.assertRaises(ValueError): 2152 zipf.open('handle', mode='w') 2153 with self.assertRaises(ValueError): 2154 zipf.open('foo', mode='r') 2155 with self.assertRaises(ValueError): 2156 zipf.writestr('str', 'abcde') 2157 with self.assertRaises(ValueError): 2158 zipf.write(__file__, 'file') 2159 with self.assertRaises(ValueError): 2160 zipf.close() 2161 w1.write(msg2) 2162 with zipf.open('baz', mode='w') as w2: 2163 w2.write(msg3) 2164 2165 with zipfile.ZipFile(TESTFN2, 'r') as zipf: 2166 self.assertEqual(zipf.read('foo'), msg1) 2167 self.assertEqual(zipf.read('bar'), msg2) 2168 self.assertEqual(zipf.read('baz'), msg3) 2169 self.assertEqual(zipf.namelist(), ['foo', 'bar', 'baz']) 2170 2171 def test_seek_tell(self): 2172 # Test seek functionality 2173 txt = b"Where's Bruce?" 2174 bloc = txt.find(b"Bruce") 2175 # Check seek on a file 2176 with zipfile.ZipFile(TESTFN, "w") as zipf: 2177 zipf.writestr("foo.txt", txt) 2178 with zipfile.ZipFile(TESTFN, "r") as zipf: 2179 with zipf.open("foo.txt", "r") as fp: 2180 fp.seek(bloc, os.SEEK_SET) 2181 self.assertEqual(fp.tell(), bloc) 2182 fp.seek(-bloc, os.SEEK_CUR) 2183 self.assertEqual(fp.tell(), 0) 2184 fp.seek(bloc, os.SEEK_CUR) 2185 self.assertEqual(fp.tell(), bloc) 2186 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2187 fp.seek(0, os.SEEK_END) 2188 self.assertEqual(fp.tell(), len(txt)) 2189 fp.seek(0, os.SEEK_SET) 2190 self.assertEqual(fp.tell(), 0) 2191 # Check seek on memory file 2192 data = io.BytesIO() 2193 with zipfile.ZipFile(data, mode="w") as zipf: 2194 zipf.writestr("foo.txt", txt) 2195 with zipfile.ZipFile(data, mode="r") as zipf: 2196 with zipf.open("foo.txt", "r") as fp: 2197 fp.seek(bloc, os.SEEK_SET) 2198 self.assertEqual(fp.tell(), bloc) 2199 fp.seek(-bloc, os.SEEK_CUR) 2200 self.assertEqual(fp.tell(), 0) 2201 fp.seek(bloc, os.SEEK_CUR) 2202 self.assertEqual(fp.tell(), bloc) 2203 self.assertEqual(fp.read(5), txt[bloc:bloc+5]) 2204 fp.seek(0, os.SEEK_END) 2205 self.assertEqual(fp.tell(), len(txt)) 2206 fp.seek(0, os.SEEK_SET) 2207 self.assertEqual(fp.tell(), 0) 2208 2209 @requires_bz2() 2210 def test_decompress_without_3rd_party_library(self): 2211 data = b'PK\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2212 zip_file = io.BytesIO(data) 2213 with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_BZIP2) as zf: 2214 zf.writestr('a.txt', b'a') 2215 with mock.patch('zipfile.bz2', None): 2216 with zipfile.ZipFile(zip_file) as zf: 2217 self.assertRaises(RuntimeError, zf.extract, 'a.txt') 2218 2219 @requires_zlib() 2220 def test_full_overlap(self): 2221 data = ( 2222 b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e' 2223 b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00a\xed' 2224 b'\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\d\x0b`P' 2225 b'K\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2' 2226 b'\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00' 2227 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00aPK' 2228 b'\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0lH\x05\xe2\x1e' 2229 b'8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00\x00\x00\x00\x00' 2230 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00bPK\x05' 2231 b'\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00\x00/\x00\x00' 2232 b'\x00\x00\x00' 2233 ) 2234 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 2235 self.assertEqual(zipf.namelist(), ['a', 'b']) 2236 zi = zipf.getinfo('a') 2237 self.assertEqual(zi.header_offset, 0) 2238 self.assertEqual(zi.compress_size, 16) 2239 self.assertEqual(zi.file_size, 1033) 2240 zi = zipf.getinfo('b') 2241 self.assertEqual(zi.header_offset, 0) 2242 self.assertEqual(zi.compress_size, 16) 2243 self.assertEqual(zi.file_size, 1033) 2244 self.assertEqual(len(zipf.read('a')), 1033) 2245 with self.assertRaisesRegex(zipfile.BadZipFile, 'File name.*differ'): 2246 zipf.read('b') 2247 2248 @requires_zlib() 2249 def test_quoted_overlap(self): 2250 data = ( 2251 b'PK\x03\x04\x14\x00\x00\x00\x08\x00\xa0lH\x05Y\xfc' 2252 b'8\x044\x00\x00\x00(\x04\x00\x00\x01\x00\x00\x00a\x00' 2253 b'\x1f\x00\xe0\xffPK\x03\x04\x14\x00\x00\x00\x08\x00\xa0l' 2254 b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00' 2255 b'\x00\x00b\xed\xc0\x81\x08\x00\x00\x00\xc00\xd6\xfbK\\' 2256 b'd\x0b`PK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0' 2257 b'lH\x05Y\xfc8\x044\x00\x00\x00(\x04\x00\x00\x01' 2258 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' 2259 b'\x00aPK\x01\x02\x14\x00\x14\x00\x00\x00\x08\x00\xa0l' 2260 b'H\x05\xe2\x1e8\xbb\x10\x00\x00\x00\t\x04\x00\x00\x01\x00' 2261 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00$\x00\x00\x00' 2262 b'bPK\x05\x06\x00\x00\x00\x00\x02\x00\x02\x00^\x00\x00' 2263 b'\x00S\x00\x00\x00\x00\x00' 2264 ) 2265 with zipfile.ZipFile(io.BytesIO(data), 'r') as zipf: 2266 self.assertEqual(zipf.namelist(), ['a', 'b']) 2267 zi = zipf.getinfo('a') 2268 self.assertEqual(zi.header_offset, 0) 2269 self.assertEqual(zi.compress_size, 52) 2270 self.assertEqual(zi.file_size, 1064) 2271 zi = zipf.getinfo('b') 2272 self.assertEqual(zi.header_offset, 36) 2273 self.assertEqual(zi.compress_size, 16) 2274 self.assertEqual(zi.file_size, 1033) 2275 with self.assertRaisesRegex(zipfile.BadZipFile, 'Overlapped entries'): 2276 zipf.read('a') 2277 self.assertEqual(len(zipf.read('b')), 1033) 2278 2279 def tearDown(self): 2280 unlink(TESTFN) 2281 unlink(TESTFN2) 2282 2283 2284class AbstractBadCrcTests: 2285 def test_testzip_with_bad_crc(self): 2286 """Tests that files with bad CRCs return their name from testzip.""" 2287 zipdata = self.zip_with_bad_crc 2288 2289 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2290 # testzip returns the name of the first corrupt file, or None 2291 self.assertEqual('afile', zipf.testzip()) 2292 2293 def test_read_with_bad_crc(self): 2294 """Tests that files with bad CRCs raise a BadZipFile exception when read.""" 2295 zipdata = self.zip_with_bad_crc 2296 2297 # Using ZipFile.read() 2298 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2299 self.assertRaises(zipfile.BadZipFile, zipf.read, 'afile') 2300 2301 # Using ZipExtFile.read() 2302 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2303 with zipf.open('afile', 'r') as corrupt_file: 2304 self.assertRaises(zipfile.BadZipFile, corrupt_file.read) 2305 2306 # Same with small reads (in order to exercise the buffering logic) 2307 with zipfile.ZipFile(io.BytesIO(zipdata), mode="r") as zipf: 2308 with zipf.open('afile', 'r') as corrupt_file: 2309 corrupt_file.MIN_READ_SIZE = 2 2310 with self.assertRaises(zipfile.BadZipFile): 2311 while corrupt_file.read(2): 2312 pass 2313 2314 2315class StoredBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2316 compression = zipfile.ZIP_STORED 2317 zip_with_bad_crc = ( 2318 b'PK\003\004\024\0\0\0\0\0 \213\212;:r' 2319 b'\253\377\f\0\0\0\f\0\0\0\005\0\0\000af' 2320 b'ilehello,AworldP' 2321 b'K\001\002\024\003\024\0\0\0\0\0 \213\212;:' 2322 b'r\253\377\f\0\0\0\f\0\0\0\005\0\0\0\0' 2323 b'\0\0\0\0\0\0\0\200\001\0\0\0\000afi' 2324 b'lePK\005\006\0\0\0\0\001\0\001\0003\000' 2325 b'\0\0/\0\0\0\0\0') 2326 2327@requires_zlib() 2328class DeflateBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2329 compression = zipfile.ZIP_DEFLATED 2330 zip_with_bad_crc = ( 2331 b'PK\x03\x04\x14\x00\x00\x00\x08\x00n}\x0c=FA' 2332 b'KE\x10\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2333 b'ile\xcbH\xcd\xc9\xc9W(\xcf/\xcaI\xc9\xa0' 2334 b'=\x13\x00PK\x01\x02\x14\x03\x14\x00\x00\x00\x08\x00n' 2335 b'}\x0c=FAKE\x10\x00\x00\x00n\x00\x00\x00\x05' 2336 b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x80\x01\x00\x00\x00' 2337 b'\x00afilePK\x05\x06\x00\x00\x00\x00\x01\x00' 2338 b'\x01\x003\x00\x00\x003\x00\x00\x00\x00\x00') 2339 2340@requires_bz2() 2341class Bzip2BadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2342 compression = zipfile.ZIP_BZIP2 2343 zip_with_bad_crc = ( 2344 b'PK\x03\x04\x14\x03\x00\x00\x0c\x00nu\x0c=FA' 2345 b'KE8\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2346 b'ileBZh91AY&SY\xd4\xa8\xca' 2347 b'\x7f\x00\x00\x0f\x11\x80@\x00\x06D\x90\x80 \x00 \xa5' 2348 b'P\xd9!\x03\x03\x13\x13\x13\x89\xa9\xa9\xc2u5:\x9f' 2349 b'\x8b\xb9"\x9c(HjTe?\x80PK\x01\x02\x14' 2350 b'\x03\x14\x03\x00\x00\x0c\x00nu\x0c=FAKE8' 2351 b'\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00' 2352 b'\x00 \x80\x80\x81\x00\x00\x00\x00afilePK' 2353 b'\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00\x00[\x00' 2354 b'\x00\x00\x00\x00') 2355 2356@requires_lzma() 2357class LzmaBadCrcTests(AbstractBadCrcTests, unittest.TestCase): 2358 compression = zipfile.ZIP_LZMA 2359 zip_with_bad_crc = ( 2360 b'PK\x03\x04\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2361 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00af' 2362 b'ile\t\x04\x05\x00]\x00\x00\x00\x04\x004\x19I' 2363 b'\xee\x8d\xe9\x17\x89:3`\tq!.8\x00PK' 2364 b'\x01\x02\x14\x03\x14\x03\x00\x00\x0e\x00nu\x0c=FA' 2365 b'KE\x1b\x00\x00\x00n\x00\x00\x00\x05\x00\x00\x00\x00\x00' 2366 b'\x00\x00\x00\x00 \x80\x80\x81\x00\x00\x00\x00afil' 2367 b'ePK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x003\x00\x00' 2368 b'\x00>\x00\x00\x00\x00\x00') 2369 2370 2371class DecryptionTests(unittest.TestCase): 2372 """Check that ZIP decryption works. Since the library does not 2373 support encryption at the moment, we use a pre-generated encrypted 2374 ZIP file.""" 2375 2376 data = ( 2377 b'PK\x03\x04\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00\x1a\x00' 2378 b'\x00\x00\x08\x00\x00\x00test.txt\xfa\x10\xa0gly|\xfa-\xc5\xc0=\xf9y' 2379 b'\x18\xe0\xa8r\xb3Z}Lg\xbc\xae\xf9|\x9b\x19\xe4\x8b\xba\xbb)\x8c\xb0\xdbl' 2380 b'PK\x01\x02\x14\x00\x14\x00\x01\x00\x00\x00n\x92i.#y\xef?&\x00\x00\x00' 2381 b'\x1a\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x01\x00 \x00\xb6\x81' 2382 b'\x00\x00\x00\x00test.txtPK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x006\x00' 2383 b'\x00\x00L\x00\x00\x00\x00\x00' ) 2384 data2 = ( 2385 b'PK\x03\x04\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02' 2386 b'\x00\x00\x04\x00\x15\x00zeroUT\t\x00\x03\xd6\x8b\x92G\xda\x8b\x92GUx\x04' 2387 b'\x00\xe8\x03\xe8\x03\xc7<M\xb5a\xceX\xa3Y&\x8b{oE\xd7\x9d\x8c\x98\x02\xc0' 2388 b'PK\x07\x08xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00PK\x01\x02\x17\x03' 2389 b'\x14\x00\t\x00\x08\x00\xcf}38xu\xaa\xb2\x14\x00\x00\x00\x00\x02\x00\x00' 2390 b'\x04\x00\r\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa4\x81\x00\x00\x00\x00ze' 2391 b'roUT\x05\x00\x03\xd6\x8b\x92GUx\x00\x00PK\x05\x06\x00\x00\x00\x00\x01' 2392 b'\x00\x01\x00?\x00\x00\x00[\x00\x00\x00\x00\x00' ) 2393 2394 plain = b'zipfile.py encryption test' 2395 plain2 = b'\x00'*512 2396 2397 def setUp(self): 2398 with open(TESTFN, "wb") as fp: 2399 fp.write(self.data) 2400 self.zip = zipfile.ZipFile(TESTFN, "r") 2401 with open(TESTFN2, "wb") as fp: 2402 fp.write(self.data2) 2403 self.zip2 = zipfile.ZipFile(TESTFN2, "r") 2404 2405 def tearDown(self): 2406 self.zip.close() 2407 os.unlink(TESTFN) 2408 self.zip2.close() 2409 os.unlink(TESTFN2) 2410 2411 def test_no_password(self): 2412 # Reading the encrypted file without password 2413 # must generate a RunTime exception 2414 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2415 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2416 2417 def test_bad_password(self): 2418 self.zip.setpassword(b"perl") 2419 self.assertRaises(RuntimeError, self.zip.read, "test.txt") 2420 self.zip2.setpassword(b"perl") 2421 self.assertRaises(RuntimeError, self.zip2.read, "zero") 2422 2423 @requires_zlib() 2424 def test_good_password(self): 2425 self.zip.setpassword(b"python") 2426 self.assertEqual(self.zip.read("test.txt"), self.plain) 2427 self.zip2.setpassword(b"12345") 2428 self.assertEqual(self.zip2.read("zero"), self.plain2) 2429 2430 def test_unicode_password(self): 2431 expected_msg = "pwd: expected bytes, got str" 2432 2433 with self.assertRaisesRegex(TypeError, expected_msg): 2434 self.zip.setpassword("unicode") 2435 2436 with self.assertRaisesRegex(TypeError, expected_msg): 2437 self.zip.read("test.txt", "python") 2438 2439 with self.assertRaisesRegex(TypeError, expected_msg): 2440 self.zip.open("test.txt", pwd="python") 2441 2442 with self.assertRaisesRegex(TypeError, expected_msg): 2443 self.zip.extract("test.txt", pwd="python") 2444 2445 with self.assertRaisesRegex(TypeError, expected_msg): 2446 self.zip.pwd = "python" 2447 self.zip.open("test.txt") 2448 2449 def test_seek_tell(self): 2450 self.zip.setpassword(b"python") 2451 txt = self.plain 2452 test_word = b'encryption' 2453 bloc = txt.find(test_word) 2454 bloc_len = len(test_word) 2455 with self.zip.open("test.txt", "r") as fp: 2456 fp.seek(bloc, os.SEEK_SET) 2457 self.assertEqual(fp.tell(), bloc) 2458 fp.seek(-bloc, os.SEEK_CUR) 2459 self.assertEqual(fp.tell(), 0) 2460 fp.seek(bloc, os.SEEK_CUR) 2461 self.assertEqual(fp.tell(), bloc) 2462 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2463 2464 # Make sure that the second read after seeking back beyond 2465 # _readbuffer returns the same content (ie. rewind to the start of 2466 # the file to read forward to the required position). 2467 old_read_size = fp.MIN_READ_SIZE 2468 fp.MIN_READ_SIZE = 1 2469 fp._readbuffer = b'' 2470 fp._offset = 0 2471 fp.seek(0, os.SEEK_SET) 2472 self.assertEqual(fp.tell(), 0) 2473 fp.seek(bloc, os.SEEK_CUR) 2474 self.assertEqual(fp.read(bloc_len), txt[bloc:bloc+bloc_len]) 2475 fp.MIN_READ_SIZE = old_read_size 2476 2477 fp.seek(0, os.SEEK_END) 2478 self.assertEqual(fp.tell(), len(txt)) 2479 fp.seek(0, os.SEEK_SET) 2480 self.assertEqual(fp.tell(), 0) 2481 2482 # Read the file completely to definitely call any eof integrity 2483 # checks (crc) and make sure they still pass. 2484 fp.read() 2485 2486 2487class AbstractTestsWithRandomBinaryFiles: 2488 @classmethod 2489 def setUpClass(cls): 2490 datacount = randint(16, 64)*1024 + randint(1, 1024) 2491 cls.data = b''.join(struct.pack('<f', random()*randint(-1000, 1000)) 2492 for i in range(datacount)) 2493 2494 def setUp(self): 2495 # Make a source file with some lines 2496 with open(TESTFN, "wb") as fp: 2497 fp.write(self.data) 2498 2499 def tearDown(self): 2500 unlink(TESTFN) 2501 unlink(TESTFN2) 2502 2503 def make_test_archive(self, f, compression): 2504 # Create the ZIP archive 2505 with zipfile.ZipFile(f, "w", compression) as zipfp: 2506 zipfp.write(TESTFN, "another.name") 2507 zipfp.write(TESTFN, TESTFN) 2508 2509 def zip_test(self, f, compression): 2510 self.make_test_archive(f, compression) 2511 2512 # Read the ZIP archive 2513 with zipfile.ZipFile(f, "r", compression) as zipfp: 2514 testdata = zipfp.read(TESTFN) 2515 self.assertEqual(len(testdata), len(self.data)) 2516 self.assertEqual(testdata, self.data) 2517 self.assertEqual(zipfp.read("another.name"), self.data) 2518 2519 def test_read(self): 2520 for f in get_files(self): 2521 self.zip_test(f, self.compression) 2522 2523 def zip_open_test(self, f, compression): 2524 self.make_test_archive(f, compression) 2525 2526 # Read the ZIP archive 2527 with zipfile.ZipFile(f, "r", compression) as zipfp: 2528 zipdata1 = [] 2529 with zipfp.open(TESTFN) as zipopen1: 2530 while True: 2531 read_data = zipopen1.read(256) 2532 if not read_data: 2533 break 2534 zipdata1.append(read_data) 2535 2536 zipdata2 = [] 2537 with zipfp.open("another.name") as zipopen2: 2538 while True: 2539 read_data = zipopen2.read(256) 2540 if not read_data: 2541 break 2542 zipdata2.append(read_data) 2543 2544 testdata1 = b''.join(zipdata1) 2545 self.assertEqual(len(testdata1), len(self.data)) 2546 self.assertEqual(testdata1, self.data) 2547 2548 testdata2 = b''.join(zipdata2) 2549 self.assertEqual(len(testdata2), len(self.data)) 2550 self.assertEqual(testdata2, self.data) 2551 2552 def test_open(self): 2553 for f in get_files(self): 2554 self.zip_open_test(f, self.compression) 2555 2556 def zip_random_open_test(self, f, compression): 2557 self.make_test_archive(f, compression) 2558 2559 # Read the ZIP archive 2560 with zipfile.ZipFile(f, "r", compression) as zipfp: 2561 zipdata1 = [] 2562 with zipfp.open(TESTFN) as zipopen1: 2563 while True: 2564 read_data = zipopen1.read(randint(1, 1024)) 2565 if not read_data: 2566 break 2567 zipdata1.append(read_data) 2568 2569 testdata = b''.join(zipdata1) 2570 self.assertEqual(len(testdata), len(self.data)) 2571 self.assertEqual(testdata, self.data) 2572 2573 def test_random_open(self): 2574 for f in get_files(self): 2575 self.zip_random_open_test(f, self.compression) 2576 2577 2578class StoredTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2579 unittest.TestCase): 2580 compression = zipfile.ZIP_STORED 2581 2582@requires_zlib() 2583class DeflateTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2584 unittest.TestCase): 2585 compression = zipfile.ZIP_DEFLATED 2586 2587@requires_bz2() 2588class Bzip2TestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2589 unittest.TestCase): 2590 compression = zipfile.ZIP_BZIP2 2591 2592@requires_lzma() 2593class LzmaTestsWithRandomBinaryFiles(AbstractTestsWithRandomBinaryFiles, 2594 unittest.TestCase): 2595 compression = zipfile.ZIP_LZMA 2596 2597 2598# Provide the tell() method but not seek() 2599class Tellable: 2600 def __init__(self, fp): 2601 self.fp = fp 2602 self.offset = 0 2603 2604 def write(self, data): 2605 n = self.fp.write(data) 2606 self.offset += n 2607 return n 2608 2609 def tell(self): 2610 return self.offset 2611 2612 def flush(self): 2613 self.fp.flush() 2614 2615class Unseekable: 2616 def __init__(self, fp): 2617 self.fp = fp 2618 2619 def write(self, data): 2620 return self.fp.write(data) 2621 2622 def flush(self): 2623 self.fp.flush() 2624 2625class UnseekableTests(unittest.TestCase): 2626 def test_writestr(self): 2627 for wrapper in (lambda f: f), Tellable, Unseekable: 2628 with self.subTest(wrapper=wrapper): 2629 f = io.BytesIO() 2630 f.write(b'abc') 2631 bf = io.BufferedWriter(f) 2632 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2633 zipfp.writestr('ones', b'111') 2634 zipfp.writestr('twos', b'222') 2635 self.assertEqual(f.getvalue()[:5], b'abcPK') 2636 with zipfile.ZipFile(f, mode='r') as zipf: 2637 with zipf.open('ones') as zopen: 2638 self.assertEqual(zopen.read(), b'111') 2639 with zipf.open('twos') as zopen: 2640 self.assertEqual(zopen.read(), b'222') 2641 2642 def test_write(self): 2643 for wrapper in (lambda f: f), Tellable, Unseekable: 2644 with self.subTest(wrapper=wrapper): 2645 f = io.BytesIO() 2646 f.write(b'abc') 2647 bf = io.BufferedWriter(f) 2648 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipfp: 2649 self.addCleanup(unlink, TESTFN) 2650 with open(TESTFN, 'wb') as f2: 2651 f2.write(b'111') 2652 zipfp.write(TESTFN, 'ones') 2653 with open(TESTFN, 'wb') as f2: 2654 f2.write(b'222') 2655 zipfp.write(TESTFN, 'twos') 2656 self.assertEqual(f.getvalue()[:5], b'abcPK') 2657 with zipfile.ZipFile(f, mode='r') as zipf: 2658 with zipf.open('ones') as zopen: 2659 self.assertEqual(zopen.read(), b'111') 2660 with zipf.open('twos') as zopen: 2661 self.assertEqual(zopen.read(), b'222') 2662 2663 def test_open_write(self): 2664 for wrapper in (lambda f: f), Tellable, Unseekable: 2665 with self.subTest(wrapper=wrapper): 2666 f = io.BytesIO() 2667 f.write(b'abc') 2668 bf = io.BufferedWriter(f) 2669 with zipfile.ZipFile(wrapper(bf), 'w', zipfile.ZIP_STORED) as zipf: 2670 with zipf.open('ones', 'w') as zopen: 2671 zopen.write(b'111') 2672 with zipf.open('twos', 'w') as zopen: 2673 zopen.write(b'222') 2674 self.assertEqual(f.getvalue()[:5], b'abcPK') 2675 with zipfile.ZipFile(f) as zipf: 2676 self.assertEqual(zipf.read('ones'), b'111') 2677 self.assertEqual(zipf.read('twos'), b'222') 2678 2679 2680@requires_zlib() 2681class TestsWithMultipleOpens(unittest.TestCase): 2682 @classmethod 2683 def setUpClass(cls): 2684 cls.data1 = b'111' + randbytes(10000) 2685 cls.data2 = b'222' + randbytes(10000) 2686 2687 def make_test_archive(self, f): 2688 # Create the ZIP archive 2689 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipfp: 2690 zipfp.writestr('ones', self.data1) 2691 zipfp.writestr('twos', self.data2) 2692 2693 def test_same_file(self): 2694 # Verify that (when the ZipFile is in control of creating file objects) 2695 # multiple open() calls can be made without interfering with each other. 2696 for f in get_files(self): 2697 self.make_test_archive(f) 2698 with zipfile.ZipFile(f, mode="r") as zipf: 2699 with zipf.open('ones') as zopen1, zipf.open('ones') as zopen2: 2700 data1 = zopen1.read(500) 2701 data2 = zopen2.read(500) 2702 data1 += zopen1.read() 2703 data2 += zopen2.read() 2704 self.assertEqual(data1, data2) 2705 self.assertEqual(data1, self.data1) 2706 2707 def test_different_file(self): 2708 # Verify that (when the ZipFile is in control of creating file objects) 2709 # multiple open() calls can be made without interfering with each other. 2710 for f in get_files(self): 2711 self.make_test_archive(f) 2712 with zipfile.ZipFile(f, mode="r") as zipf: 2713 with zipf.open('ones') as zopen1, zipf.open('twos') as zopen2: 2714 data1 = zopen1.read(500) 2715 data2 = zopen2.read(500) 2716 data1 += zopen1.read() 2717 data2 += zopen2.read() 2718 self.assertEqual(data1, self.data1) 2719 self.assertEqual(data2, self.data2) 2720 2721 def test_interleaved(self): 2722 # Verify that (when the ZipFile is in control of creating file objects) 2723 # multiple open() calls can be made without interfering with each other. 2724 for f in get_files(self): 2725 self.make_test_archive(f) 2726 with zipfile.ZipFile(f, mode="r") as zipf: 2727 with zipf.open('ones') as zopen1: 2728 data1 = zopen1.read(500) 2729 with zipf.open('twos') as zopen2: 2730 data2 = zopen2.read(500) 2731 data1 += zopen1.read() 2732 data2 += zopen2.read() 2733 self.assertEqual(data1, self.data1) 2734 self.assertEqual(data2, self.data2) 2735 2736 def test_read_after_close(self): 2737 for f in get_files(self): 2738 self.make_test_archive(f) 2739 with contextlib.ExitStack() as stack: 2740 with zipfile.ZipFile(f, 'r') as zipf: 2741 zopen1 = stack.enter_context(zipf.open('ones')) 2742 zopen2 = stack.enter_context(zipf.open('twos')) 2743 data1 = zopen1.read(500) 2744 data2 = zopen2.read(500) 2745 data1 += zopen1.read() 2746 data2 += zopen2.read() 2747 self.assertEqual(data1, self.data1) 2748 self.assertEqual(data2, self.data2) 2749 2750 def test_read_after_write(self): 2751 for f in get_files(self): 2752 with zipfile.ZipFile(f, 'w', zipfile.ZIP_DEFLATED) as zipf: 2753 zipf.writestr('ones', self.data1) 2754 zipf.writestr('twos', self.data2) 2755 with zipf.open('ones') as zopen1: 2756 data1 = zopen1.read(500) 2757 self.assertEqual(data1, self.data1[:500]) 2758 with zipfile.ZipFile(f, 'r') as zipf: 2759 data1 = zipf.read('ones') 2760 data2 = zipf.read('twos') 2761 self.assertEqual(data1, self.data1) 2762 self.assertEqual(data2, self.data2) 2763 2764 def test_write_after_read(self): 2765 for f in get_files(self): 2766 with zipfile.ZipFile(f, "w", zipfile.ZIP_DEFLATED) as zipf: 2767 zipf.writestr('ones', self.data1) 2768 with zipf.open('ones') as zopen1: 2769 zopen1.read(500) 2770 zipf.writestr('twos', self.data2) 2771 with zipfile.ZipFile(f, 'r') as zipf: 2772 data1 = zipf.read('ones') 2773 data2 = zipf.read('twos') 2774 self.assertEqual(data1, self.data1) 2775 self.assertEqual(data2, self.data2) 2776 2777 def test_many_opens(self): 2778 # Verify that read() and open() promptly close the file descriptor, 2779 # and don't rely on the garbage collector to free resources. 2780 startcount = fd_count() 2781 self.make_test_archive(TESTFN2) 2782 with zipfile.ZipFile(TESTFN2, mode="r") as zipf: 2783 for x in range(100): 2784 zipf.read('ones') 2785 with zipf.open('ones') as zopen1: 2786 pass 2787 self.assertEqual(startcount, fd_count()) 2788 2789 def test_write_while_reading(self): 2790 with zipfile.ZipFile(TESTFN2, 'w', zipfile.ZIP_DEFLATED) as zipf: 2791 zipf.writestr('ones', self.data1) 2792 with zipfile.ZipFile(TESTFN2, 'a', zipfile.ZIP_DEFLATED) as zipf: 2793 with zipf.open('ones', 'r') as r1: 2794 data1 = r1.read(500) 2795 with zipf.open('twos', 'w') as w1: 2796 w1.write(self.data2) 2797 data1 += r1.read() 2798 self.assertEqual(data1, self.data1) 2799 with zipfile.ZipFile(TESTFN2) as zipf: 2800 self.assertEqual(zipf.read('twos'), self.data2) 2801 2802 def tearDown(self): 2803 unlink(TESTFN2) 2804 2805 2806class TestWithDirectory(unittest.TestCase): 2807 def setUp(self): 2808 os.mkdir(TESTFN2) 2809 2810 def test_extract_dir(self): 2811 with zipfile.ZipFile(findfile("zipdir.zip")) as zipf: 2812 zipf.extractall(TESTFN2) 2813 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a"))) 2814 self.assertTrue(os.path.isdir(os.path.join(TESTFN2, "a", "b"))) 2815 self.assertTrue(os.path.exists(os.path.join(TESTFN2, "a", "b", "c"))) 2816 2817 def test_bug_6050(self): 2818 # Extraction should succeed if directories already exist 2819 os.mkdir(os.path.join(TESTFN2, "a")) 2820 self.test_extract_dir() 2821 2822 def test_write_dir(self): 2823 dirpath = os.path.join(TESTFN2, "x") 2824 os.mkdir(dirpath) 2825 mode = os.stat(dirpath).st_mode & 0xFFFF 2826 with zipfile.ZipFile(TESTFN, "w") as zipf: 2827 zipf.write(dirpath) 2828 zinfo = zipf.filelist[0] 2829 self.assertTrue(zinfo.filename.endswith("/x/")) 2830 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2831 zipf.write(dirpath, "y") 2832 zinfo = zipf.filelist[1] 2833 self.assertTrue(zinfo.filename, "y/") 2834 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2835 with zipfile.ZipFile(TESTFN, "r") as zipf: 2836 zinfo = zipf.filelist[0] 2837 self.assertTrue(zinfo.filename.endswith("/x/")) 2838 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2839 zinfo = zipf.filelist[1] 2840 self.assertTrue(zinfo.filename, "y/") 2841 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2842 target = os.path.join(TESTFN2, "target") 2843 os.mkdir(target) 2844 zipf.extractall(target) 2845 self.assertTrue(os.path.isdir(os.path.join(target, "y"))) 2846 self.assertEqual(len(os.listdir(target)), 2) 2847 2848 def test_writestr_dir(self): 2849 os.mkdir(os.path.join(TESTFN2, "x")) 2850 with zipfile.ZipFile(TESTFN, "w") as zipf: 2851 zipf.writestr("x/", b'') 2852 zinfo = zipf.filelist[0] 2853 self.assertEqual(zinfo.filename, "x/") 2854 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2855 with zipfile.ZipFile(TESTFN, "r") as zipf: 2856 zinfo = zipf.filelist[0] 2857 self.assertTrue(zinfo.filename.endswith("x/")) 2858 self.assertEqual(zinfo.external_attr, (0o40775 << 16) | 0x10) 2859 target = os.path.join(TESTFN2, "target") 2860 os.mkdir(target) 2861 zipf.extractall(target) 2862 self.assertTrue(os.path.isdir(os.path.join(target, "x"))) 2863 self.assertEqual(os.listdir(target), ["x"]) 2864 2865 def test_mkdir(self): 2866 with zipfile.ZipFile(TESTFN, "w") as zf: 2867 zf.mkdir("directory") 2868 zinfo = zf.filelist[0] 2869 self.assertEqual(zinfo.filename, "directory/") 2870 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 2871 2872 zf.mkdir("directory2/") 2873 zinfo = zf.filelist[1] 2874 self.assertEqual(zinfo.filename, "directory2/") 2875 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 2876 2877 zf.mkdir("directory3", mode=0o777) 2878 zinfo = zf.filelist[2] 2879 self.assertEqual(zinfo.filename, "directory3/") 2880 self.assertEqual(zinfo.external_attr, (0o40777 << 16) | 0x10) 2881 2882 old_zinfo = zipfile.ZipInfo("directory4/") 2883 old_zinfo.external_attr = (0o40777 << 16) | 0x10 2884 old_zinfo.CRC = 0 2885 old_zinfo.file_size = 0 2886 old_zinfo.compress_size = 0 2887 zf.mkdir(old_zinfo) 2888 new_zinfo = zf.filelist[3] 2889 self.assertEqual(old_zinfo.filename, "directory4/") 2890 self.assertEqual(old_zinfo.external_attr, new_zinfo.external_attr) 2891 2892 target = os.path.join(TESTFN2, "target") 2893 os.mkdir(target) 2894 zf.extractall(target) 2895 self.assertEqual(set(os.listdir(target)), {"directory", "directory2", "directory3", "directory4"}) 2896 2897 def test_create_directory_with_write(self): 2898 with zipfile.ZipFile(TESTFN, "w") as zf: 2899 zf.writestr(zipfile.ZipInfo('directory/'), '') 2900 2901 zinfo = zf.filelist[0] 2902 self.assertEqual(zinfo.filename, "directory/") 2903 2904 directory = os.path.join(TESTFN2, "directory2") 2905 os.mkdir(directory) 2906 mode = os.stat(directory).st_mode 2907 zf.write(directory, arcname="directory2/") 2908 zinfo = zf.filelist[1] 2909 self.assertEqual(zinfo.filename, "directory2/") 2910 self.assertEqual(zinfo.external_attr, (mode << 16) | 0x10) 2911 2912 target = os.path.join(TESTFN2, "target") 2913 os.mkdir(target) 2914 zf.extractall(target) 2915 2916 self.assertEqual(set(os.listdir(target)), {"directory", "directory2"}) 2917 2918 def tearDown(self): 2919 rmtree(TESTFN2) 2920 if os.path.exists(TESTFN): 2921 unlink(TESTFN) 2922 2923 2924class ZipInfoTests(unittest.TestCase): 2925 def test_from_file(self): 2926 zi = zipfile.ZipInfo.from_file(__file__) 2927 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2928 self.assertFalse(zi.is_dir()) 2929 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2930 2931 def test_from_file_pathlike(self): 2932 zi = zipfile.ZipInfo.from_file(pathlib.Path(__file__)) 2933 self.assertEqual(posixpath.basename(zi.filename), 'test_zipfile.py') 2934 self.assertFalse(zi.is_dir()) 2935 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2936 2937 def test_from_file_bytes(self): 2938 zi = zipfile.ZipInfo.from_file(os.fsencode(__file__), 'test') 2939 self.assertEqual(posixpath.basename(zi.filename), 'test') 2940 self.assertFalse(zi.is_dir()) 2941 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2942 2943 def test_from_file_fileno(self): 2944 with open(__file__, 'rb') as f: 2945 zi = zipfile.ZipInfo.from_file(f.fileno(), 'test') 2946 self.assertEqual(posixpath.basename(zi.filename), 'test') 2947 self.assertFalse(zi.is_dir()) 2948 self.assertEqual(zi.file_size, os.path.getsize(__file__)) 2949 2950 def test_from_dir(self): 2951 dirpath = os.path.dirname(os.path.abspath(__file__)) 2952 zi = zipfile.ZipInfo.from_file(dirpath, 'stdlib_tests') 2953 self.assertEqual(zi.filename, 'stdlib_tests/') 2954 self.assertTrue(zi.is_dir()) 2955 self.assertEqual(zi.compress_type, zipfile.ZIP_STORED) 2956 self.assertEqual(zi.file_size, 0) 2957 2958 2959class CommandLineTest(unittest.TestCase): 2960 2961 def zipfilecmd(self, *args, **kwargs): 2962 rc, out, err = script_helper.assert_python_ok('-m', 'zipfile', *args, 2963 **kwargs) 2964 return out.replace(os.linesep.encode(), b'\n') 2965 2966 def zipfilecmd_failure(self, *args): 2967 return script_helper.assert_python_failure('-m', 'zipfile', *args) 2968 2969 def test_bad_use(self): 2970 rc, out, err = self.zipfilecmd_failure() 2971 self.assertEqual(out, b'') 2972 self.assertIn(b'usage', err.lower()) 2973 self.assertIn(b'error', err.lower()) 2974 self.assertIn(b'required', err.lower()) 2975 rc, out, err = self.zipfilecmd_failure('-l', '') 2976 self.assertEqual(out, b'') 2977 self.assertNotEqual(err.strip(), b'') 2978 2979 def test_test_command(self): 2980 zip_name = findfile('zipdir.zip') 2981 for opt in '-t', '--test': 2982 out = self.zipfilecmd(opt, zip_name) 2983 self.assertEqual(out.rstrip(), b'Done testing') 2984 zip_name = findfile('testtar.tar') 2985 rc, out, err = self.zipfilecmd_failure('-t', zip_name) 2986 self.assertEqual(out, b'') 2987 2988 def test_list_command(self): 2989 zip_name = findfile('zipdir.zip') 2990 t = io.StringIO() 2991 with zipfile.ZipFile(zip_name, 'r') as tf: 2992 tf.printdir(t) 2993 expected = t.getvalue().encode('ascii', 'backslashreplace') 2994 for opt in '-l', '--list': 2995 out = self.zipfilecmd(opt, zip_name, 2996 PYTHONIOENCODING='ascii:backslashreplace') 2997 self.assertEqual(out, expected) 2998 2999 @requires_zlib() 3000 def test_create_command(self): 3001 self.addCleanup(unlink, TESTFN) 3002 with open(TESTFN, 'w', encoding='utf-8') as f: 3003 f.write('test 1') 3004 os.mkdir(TESTFNDIR) 3005 self.addCleanup(rmtree, TESTFNDIR) 3006 with open(os.path.join(TESTFNDIR, 'file.txt'), 'w', encoding='utf-8') as f: 3007 f.write('test 2') 3008 files = [TESTFN, TESTFNDIR] 3009 namelist = [TESTFN, TESTFNDIR + '/', TESTFNDIR + '/file.txt'] 3010 for opt in '-c', '--create': 3011 try: 3012 out = self.zipfilecmd(opt, TESTFN2, *files) 3013 self.assertEqual(out, b'') 3014 with zipfile.ZipFile(TESTFN2) as zf: 3015 self.assertEqual(zf.namelist(), namelist) 3016 self.assertEqual(zf.read(namelist[0]), b'test 1') 3017 self.assertEqual(zf.read(namelist[2]), b'test 2') 3018 finally: 3019 unlink(TESTFN2) 3020 3021 def test_extract_command(self): 3022 zip_name = findfile('zipdir.zip') 3023 for opt in '-e', '--extract': 3024 with temp_dir() as extdir: 3025 out = self.zipfilecmd(opt, zip_name, extdir) 3026 self.assertEqual(out, b'') 3027 with zipfile.ZipFile(zip_name) as zf: 3028 for zi in zf.infolist(): 3029 path = os.path.join(extdir, 3030 zi.filename.replace('/', os.sep)) 3031 if zi.is_dir(): 3032 self.assertTrue(os.path.isdir(path)) 3033 else: 3034 self.assertTrue(os.path.isfile(path)) 3035 with open(path, 'rb') as f: 3036 self.assertEqual(f.read(), zf.read(zi)) 3037 3038 3039class TestExecutablePrependedZip(unittest.TestCase): 3040 """Test our ability to open zip files with an executable prepended.""" 3041 3042 def setUp(self): 3043 self.exe_zip = findfile('exe_with_zip', subdir='ziptestdata') 3044 self.exe_zip64 = findfile('exe_with_z64', subdir='ziptestdata') 3045 3046 def _test_zip_works(self, name): 3047 # bpo28494 sanity check: ensure is_zipfile works on these. 3048 self.assertTrue(zipfile.is_zipfile(name), 3049 f'is_zipfile failed on {name}') 3050 # Ensure we can operate on these via ZipFile. 3051 with zipfile.ZipFile(name) as zipfp: 3052 for n in zipfp.namelist(): 3053 data = zipfp.read(n) 3054 self.assertIn(b'FAVORITE_NUMBER', data) 3055 3056 def test_read_zip_with_exe_prepended(self): 3057 self._test_zip_works(self.exe_zip) 3058 3059 def test_read_zip64_with_exe_prepended(self): 3060 self._test_zip_works(self.exe_zip64) 3061 3062 @unittest.skipUnless(sys.executable, 'sys.executable required.') 3063 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 3064 'Test relies on #!/bin/bash working.') 3065 @requires_subprocess() 3066 def test_execute_zip2(self): 3067 output = subprocess.check_output([self.exe_zip, sys.executable]) 3068 self.assertIn(b'number in executable: 5', output) 3069 3070 @unittest.skipUnless(sys.executable, 'sys.executable required.') 3071 @unittest.skipUnless(os.access('/bin/bash', os.X_OK), 3072 'Test relies on #!/bin/bash working.') 3073 @requires_subprocess() 3074 def test_execute_zip64(self): 3075 output = subprocess.check_output([self.exe_zip64, sys.executable]) 3076 self.assertIn(b'number in executable: 5', output) 3077 3078 3079# Poor man's technique to consume a (smallish) iterable. 3080consume = tuple 3081 3082 3083# from jaraco.itertools 5.0 3084class jaraco: 3085 class itertools: 3086 class Counter: 3087 def __init__(self, i): 3088 self.count = 0 3089 self._orig_iter = iter(i) 3090 3091 def __iter__(self): 3092 return self 3093 3094 def __next__(self): 3095 result = next(self._orig_iter) 3096 self.count += 1 3097 return result 3098 3099 3100def add_dirs(zf): 3101 """ 3102 Given a writable zip file zf, inject directory entries for 3103 any directories implied by the presence of children. 3104 """ 3105 for name in zipfile.CompleteDirs._implied_dirs(zf.namelist()): 3106 zf.writestr(name, b"") 3107 return zf 3108 3109 3110def build_alpharep_fixture(): 3111 """ 3112 Create a zip file with this structure: 3113 3114 . 3115 ├── a.txt 3116 ├── b 3117 │ ├── c.txt 3118 │ ├── d 3119 │ │ └── e.txt 3120 │ └── f.txt 3121 └── g 3122 └── h 3123 └── i.txt 3124 3125 This fixture has the following key characteristics: 3126 3127 - a file at the root (a) 3128 - a file two levels deep (b/d/e) 3129 - multiple files in a directory (b/c, b/f) 3130 - a directory containing only a directory (g/h) 3131 3132 "alpha" because it uses alphabet 3133 "rep" because it's a representative example 3134 """ 3135 data = io.BytesIO() 3136 zf = zipfile.ZipFile(data, "w") 3137 zf.writestr("a.txt", b"content of a") 3138 zf.writestr("b/c.txt", b"content of c") 3139 zf.writestr("b/d/e.txt", b"content of e") 3140 zf.writestr("b/f.txt", b"content of f") 3141 zf.writestr("g/h/i.txt", b"content of i") 3142 zf.filename = "alpharep.zip" 3143 return zf 3144 3145 3146def pass_alpharep(meth): 3147 """ 3148 Given a method, wrap it in a for loop that invokes method 3149 with each subtest. 3150 """ 3151 3152 @functools.wraps(meth) 3153 def wrapper(self): 3154 for alpharep in self.zipfile_alpharep(): 3155 meth(self, alpharep=alpharep) 3156 3157 return wrapper 3158 3159 3160class TestPath(unittest.TestCase): 3161 def setUp(self): 3162 self.fixtures = contextlib.ExitStack() 3163 self.addCleanup(self.fixtures.close) 3164 3165 def zipfile_alpharep(self): 3166 with self.subTest(): 3167 yield build_alpharep_fixture() 3168 with self.subTest(): 3169 yield add_dirs(build_alpharep_fixture()) 3170 3171 def zipfile_ondisk(self, alpharep): 3172 tmpdir = pathlib.Path(self.fixtures.enter_context(temp_dir())) 3173 buffer = alpharep.fp 3174 alpharep.close() 3175 path = tmpdir / alpharep.filename 3176 with path.open("wb") as strm: 3177 strm.write(buffer.getvalue()) 3178 return path 3179 3180 @pass_alpharep 3181 def test_iterdir_and_types(self, alpharep): 3182 root = zipfile.Path(alpharep) 3183 assert root.is_dir() 3184 a, b, g = root.iterdir() 3185 assert a.is_file() 3186 assert b.is_dir() 3187 assert g.is_dir() 3188 c, f, d = b.iterdir() 3189 assert c.is_file() and f.is_file() 3190 (e,) = d.iterdir() 3191 assert e.is_file() 3192 (h,) = g.iterdir() 3193 (i,) = h.iterdir() 3194 assert i.is_file() 3195 3196 @pass_alpharep 3197 def test_is_file_missing(self, alpharep): 3198 root = zipfile.Path(alpharep) 3199 assert not root.joinpath('missing.txt').is_file() 3200 3201 @pass_alpharep 3202 def test_iterdir_on_file(self, alpharep): 3203 root = zipfile.Path(alpharep) 3204 a, b, g = root.iterdir() 3205 with self.assertRaises(ValueError): 3206 a.iterdir() 3207 3208 @pass_alpharep 3209 def test_subdir_is_dir(self, alpharep): 3210 root = zipfile.Path(alpharep) 3211 assert (root / 'b').is_dir() 3212 assert (root / 'b/').is_dir() 3213 assert (root / 'g').is_dir() 3214 assert (root / 'g/').is_dir() 3215 3216 @pass_alpharep 3217 def test_open(self, alpharep): 3218 root = zipfile.Path(alpharep) 3219 a, b, g = root.iterdir() 3220 with a.open(encoding="utf-8") as strm: 3221 data = strm.read() 3222 self.assertEqual(data, "content of a") 3223 with a.open('r', "utf-8") as strm: # not a kw, no gh-101144 TypeError 3224 data = strm.read() 3225 self.assertEqual(data, "content of a") 3226 3227 def test_open_encoding_utf16(self): 3228 in_memory_file = io.BytesIO() 3229 zf = zipfile.ZipFile(in_memory_file, "w") 3230 zf.writestr("path/16.txt", "This was utf-16".encode("utf-16")) 3231 zf.filename = "test_open_utf16.zip" 3232 root = zipfile.Path(zf) 3233 (path,) = root.iterdir() 3234 u16 = path.joinpath("16.txt") 3235 with u16.open('r', "utf-16") as strm: 3236 data = strm.read() 3237 self.assertEqual(data, "This was utf-16") 3238 with u16.open(encoding="utf-16") as strm: 3239 data = strm.read() 3240 self.assertEqual(data, "This was utf-16") 3241 3242 def test_open_encoding_errors(self): 3243 in_memory_file = io.BytesIO() 3244 zf = zipfile.ZipFile(in_memory_file, "w") 3245 zf.writestr("path/bad-utf8.bin", b"invalid utf-8: \xff\xff.") 3246 zf.filename = "test_read_text_encoding_errors.zip" 3247 root = zipfile.Path(zf) 3248 (path,) = root.iterdir() 3249 u16 = path.joinpath("bad-utf8.bin") 3250 3251 # encoding= as a positional argument for gh-101144. 3252 data = u16.read_text("utf-8", errors="ignore") 3253 self.assertEqual(data, "invalid utf-8: .") 3254 with u16.open("r", "utf-8", errors="surrogateescape") as f: 3255 self.assertEqual(f.read(), "invalid utf-8: \udcff\udcff.") 3256 3257 # encoding= both positional and keyword is an error; gh-101144. 3258 with self.assertRaisesRegex(TypeError, "encoding"): 3259 data = u16.read_text("utf-8", encoding="utf-8") 3260 3261 # both keyword arguments work. 3262 with u16.open("r", encoding="utf-8", errors="strict") as f: 3263 # error during decoding with wrong codec. 3264 with self.assertRaises(UnicodeDecodeError): 3265 f.read() 3266 3267 def test_encoding_warnings(self): 3268 """EncodingWarning must blame the read_text and open calls.""" 3269 code = '''\ 3270import io, zipfile 3271with zipfile.ZipFile(io.BytesIO(), "w") as zf: 3272 zf.filename = '<test_encoding_warnings in memory zip file>' 3273 zf.writestr("path/file.txt", b"Spanish Inquisition") 3274 root = zipfile.Path(zf) 3275 (path,) = root.iterdir() 3276 file_path = path.joinpath("file.txt") 3277 unused = file_path.read_text() # should warn 3278 file_path.open("r").close() # should warn 3279''' 3280 proc = assert_python_ok('-X', 'warn_default_encoding', '-c', code) 3281 warnings = proc.err.splitlines() 3282 self.assertEqual(len(warnings), 2, proc.err) 3283 self.assertRegex(warnings[0], rb"^<string>:8: EncodingWarning:") 3284 self.assertRegex(warnings[1], rb"^<string>:9: EncodingWarning:") 3285 3286 def test_open_write(self): 3287 """ 3288 If the zipfile is open for write, it should be possible to 3289 write bytes or text to it. 3290 """ 3291 zf = zipfile.Path(zipfile.ZipFile(io.BytesIO(), mode='w')) 3292 with zf.joinpath('file.bin').open('wb') as strm: 3293 strm.write(b'binary contents') 3294 with zf.joinpath('file.txt').open('w', encoding="utf-8") as strm: 3295 strm.write('text file') 3296 3297 def test_open_extant_directory(self): 3298 """ 3299 Attempting to open a directory raises IsADirectoryError. 3300 """ 3301 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 3302 with self.assertRaises(IsADirectoryError): 3303 zf.joinpath('b').open() 3304 3305 @pass_alpharep 3306 def test_open_binary_invalid_args(self, alpharep): 3307 root = zipfile.Path(alpharep) 3308 with self.assertRaises(ValueError): 3309 root.joinpath('a.txt').open('rb', encoding='utf-8') 3310 with self.assertRaises(ValueError): 3311 root.joinpath('a.txt').open('rb', 'utf-8') 3312 3313 def test_open_missing_directory(self): 3314 """ 3315 Attempting to open a missing directory raises FileNotFoundError. 3316 """ 3317 zf = zipfile.Path(add_dirs(build_alpharep_fixture())) 3318 with self.assertRaises(FileNotFoundError): 3319 zf.joinpath('z').open() 3320 3321 @pass_alpharep 3322 def test_read(self, alpharep): 3323 root = zipfile.Path(alpharep) 3324 a, b, g = root.iterdir() 3325 assert a.read_text(encoding="utf-8") == "content of a" 3326 a.read_text("utf-8") # No positional arg TypeError per gh-101144. 3327 assert a.read_bytes() == b"content of a" 3328 3329 @pass_alpharep 3330 def test_joinpath(self, alpharep): 3331 root = zipfile.Path(alpharep) 3332 a = root.joinpath("a.txt") 3333 assert a.is_file() 3334 e = root.joinpath("b").joinpath("d").joinpath("e.txt") 3335 assert e.read_text(encoding="utf-8") == "content of e" 3336 3337 @pass_alpharep 3338 def test_joinpath_multiple(self, alpharep): 3339 root = zipfile.Path(alpharep) 3340 e = root.joinpath("b", "d", "e.txt") 3341 assert e.read_text(encoding="utf-8") == "content of e" 3342 3343 @pass_alpharep 3344 def test_traverse_truediv(self, alpharep): 3345 root = zipfile.Path(alpharep) 3346 a = root / "a.txt" 3347 assert a.is_file() 3348 e = root / "b" / "d" / "e.txt" 3349 assert e.read_text(encoding="utf-8") == "content of e" 3350 3351 @pass_alpharep 3352 def test_traverse_simplediv(self, alpharep): 3353 """ 3354 Disable the __future__.division when testing traversal. 3355 """ 3356 code = compile( 3357 source="zipfile.Path(alpharep) / 'a'", 3358 filename="(test)", 3359 mode="eval", 3360 dont_inherit=True, 3361 ) 3362 eval(code) 3363 3364 @pass_alpharep 3365 def test_pathlike_construction(self, alpharep): 3366 """ 3367 zipfile.Path should be constructable from a path-like object 3368 """ 3369 zipfile_ondisk = self.zipfile_ondisk(alpharep) 3370 pathlike = pathlib.Path(str(zipfile_ondisk)) 3371 zipfile.Path(pathlike) 3372 3373 @pass_alpharep 3374 def test_traverse_pathlike(self, alpharep): 3375 root = zipfile.Path(alpharep) 3376 root / pathlib.Path("a") 3377 3378 @pass_alpharep 3379 def test_parent(self, alpharep): 3380 root = zipfile.Path(alpharep) 3381 assert (root / 'a').parent.at == '' 3382 assert (root / 'a' / 'b').parent.at == 'a/' 3383 3384 @pass_alpharep 3385 def test_dir_parent(self, alpharep): 3386 root = zipfile.Path(alpharep) 3387 assert (root / 'b').parent.at == '' 3388 assert (root / 'b/').parent.at == '' 3389 3390 @pass_alpharep 3391 def test_missing_dir_parent(self, alpharep): 3392 root = zipfile.Path(alpharep) 3393 assert (root / 'missing dir/').parent.at == '' 3394 3395 @pass_alpharep 3396 def test_mutability(self, alpharep): 3397 """ 3398 If the underlying zipfile is changed, the Path object should 3399 reflect that change. 3400 """ 3401 root = zipfile.Path(alpharep) 3402 a, b, g = root.iterdir() 3403 alpharep.writestr('foo.txt', 'foo') 3404 alpharep.writestr('bar/baz.txt', 'baz') 3405 assert any(child.name == 'foo.txt' for child in root.iterdir()) 3406 assert (root / 'foo.txt').read_text(encoding="utf-8") == 'foo' 3407 (baz,) = (root / 'bar').iterdir() 3408 assert baz.read_text(encoding="utf-8") == 'baz' 3409 3410 HUGE_ZIPFILE_NUM_ENTRIES = 2 ** 13 3411 3412 def huge_zipfile(self): 3413 """Create a read-only zipfile with a huge number of entries entries.""" 3414 strm = io.BytesIO() 3415 zf = zipfile.ZipFile(strm, "w") 3416 for entry in map(str, range(self.HUGE_ZIPFILE_NUM_ENTRIES)): 3417 zf.writestr(entry, entry) 3418 zf.mode = 'r' 3419 return zf 3420 3421 def test_joinpath_constant_time(self): 3422 """ 3423 Ensure joinpath on items in zipfile is linear time. 3424 """ 3425 root = zipfile.Path(self.huge_zipfile()) 3426 entries = jaraco.itertools.Counter(root.iterdir()) 3427 for entry in entries: 3428 entry.joinpath('suffix') 3429 # Check the file iterated all items 3430 assert entries.count == self.HUGE_ZIPFILE_NUM_ENTRIES 3431 3432 # @func_timeout.func_set_timeout(3) 3433 def test_implied_dirs_performance(self): 3434 data = ['/'.join(string.ascii_lowercase + str(n)) for n in range(10000)] 3435 zipfile.CompleteDirs._implied_dirs(data) 3436 3437 @pass_alpharep 3438 def test_read_does_not_close(self, alpharep): 3439 alpharep = self.zipfile_ondisk(alpharep) 3440 with zipfile.ZipFile(alpharep) as file: 3441 for rep in range(2): 3442 zipfile.Path(file, 'a.txt').read_text(encoding="utf-8") 3443 3444 @pass_alpharep 3445 def test_subclass(self, alpharep): 3446 class Subclass(zipfile.Path): 3447 pass 3448 3449 root = Subclass(alpharep) 3450 assert isinstance(root / 'b', Subclass) 3451 3452 @pass_alpharep 3453 def test_filename(self, alpharep): 3454 root = zipfile.Path(alpharep) 3455 assert root.filename == pathlib.Path('alpharep.zip') 3456 3457 @pass_alpharep 3458 def test_root_name(self, alpharep): 3459 """ 3460 The name of the root should be the name of the zipfile 3461 """ 3462 root = zipfile.Path(alpharep) 3463 assert root.name == 'alpharep.zip' == root.filename.name 3464 3465 @pass_alpharep 3466 def test_suffix(self, alpharep): 3467 """ 3468 The suffix of the root should be the suffix of the zipfile. 3469 The suffix of each nested file is the final component's last suffix, if any. 3470 Includes the leading period, just like pathlib.Path. 3471 """ 3472 root = zipfile.Path(alpharep) 3473 assert root.suffix == '.zip' == root.filename.suffix 3474 3475 b = root / "b.txt" 3476 assert b.suffix == ".txt" 3477 3478 c = root / "c" / "filename.tar.gz" 3479 assert c.suffix == ".gz" 3480 3481 d = root / "d" 3482 assert d.suffix == "" 3483 3484 @pass_alpharep 3485 def test_suffixes(self, alpharep): 3486 """ 3487 The suffix of the root should be the suffix of the zipfile. 3488 The suffix of each nested file is the final component's last suffix, if any. 3489 Includes the leading period, just like pathlib.Path. 3490 """ 3491 root = zipfile.Path(alpharep) 3492 assert root.suffixes == ['.zip'] == root.filename.suffixes 3493 3494 b = root / 'b.txt' 3495 assert b.suffixes == ['.txt'] 3496 3497 c = root / 'c' / 'filename.tar.gz' 3498 assert c.suffixes == ['.tar', '.gz'] 3499 3500 d = root / 'd' 3501 assert d.suffixes == [] 3502 3503 e = root / '.hgrc' 3504 assert e.suffixes == [] 3505 3506 @pass_alpharep 3507 def test_stem(self, alpharep): 3508 """ 3509 The final path component, without its suffix 3510 """ 3511 root = zipfile.Path(alpharep) 3512 assert root.stem == 'alpharep' == root.filename.stem 3513 3514 b = root / "b.txt" 3515 assert b.stem == "b" 3516 3517 c = root / "c" / "filename.tar.gz" 3518 assert c.stem == "filename.tar" 3519 3520 d = root / "d" 3521 assert d.stem == "d" 3522 3523 @pass_alpharep 3524 def test_root_parent(self, alpharep): 3525 root = zipfile.Path(alpharep) 3526 assert root.parent == pathlib.Path('.') 3527 root.root.filename = 'foo/bar.zip' 3528 assert root.parent == pathlib.Path('foo') 3529 3530 @pass_alpharep 3531 def test_root_unnamed(self, alpharep): 3532 """ 3533 It is an error to attempt to get the name 3534 or parent of an unnamed zipfile. 3535 """ 3536 alpharep.filename = None 3537 root = zipfile.Path(alpharep) 3538 with self.assertRaises(TypeError): 3539 root.name 3540 with self.assertRaises(TypeError): 3541 root.parent 3542 3543 # .name and .parent should still work on subs 3544 sub = root / "b" 3545 assert sub.name == "b" 3546 assert sub.parent 3547 3548 @pass_alpharep 3549 def test_inheritance(self, alpharep): 3550 cls = type('PathChild', (zipfile.Path,), {}) 3551 for alpharep in self.zipfile_alpharep(): 3552 file = cls(alpharep).joinpath('some dir').parent 3553 assert isinstance(file, cls) 3554 3555 @pass_alpharep 3556 def test_extract_orig_with_implied_dirs(self, alpharep): 3557 """ 3558 A zip file wrapped in a Path should extract even with implied dirs. 3559 """ 3560 source_path = self.zipfile_ondisk(alpharep) 3561 zf = zipfile.ZipFile(source_path) 3562 # wrap the zipfile for its side effect 3563 zipfile.Path(zf) 3564 zf.extractall(source_path.parent) 3565 3566 def test_malformed_paths(self): 3567 """ 3568 Path should handle malformed paths. 3569 """ 3570 data = io.BytesIO() 3571 zf = zipfile.ZipFile(data, "w") 3572 zf.writestr("/one-slash.txt", b"content") 3573 zf.writestr("//two-slash.txt", b"content") 3574 zf.writestr("../parent.txt", b"content") 3575 zf.filename = '' 3576 root = zipfile.Path(zf) 3577 assert list(map(str, root.iterdir())) == [ 3578 'one-slash.txt', 3579 'two-slash.txt', 3580 'parent.txt', 3581 ] 3582 3583 3584class EncodedMetadataTests(unittest.TestCase): 3585 file_names = ['\u4e00', '\u4e8c', '\u4e09'] # Han 'one', 'two', 'three' 3586 file_content = [ 3587 "This is pure ASCII.\n".encode('ascii'), 3588 # This is modern Japanese. (UTF-8) 3589 "\u3053\u308c\u306f\u73fe\u4ee3\u7684\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('utf-8'), 3590 # This is obsolete Japanese. (Shift JIS) 3591 "\u3053\u308c\u306f\u53e4\u3044\u65e5\u672c\u8a9e\u3067\u3059\u3002\n".encode('shift_jis'), 3592 ] 3593 3594 def setUp(self): 3595 self.addCleanup(unlink, TESTFN) 3596 # Create .zip of 3 members with Han names encoded in Shift JIS. 3597 # Each name is 1 Han character encoding to 2 bytes in Shift JIS. 3598 # The ASCII names are arbitrary as long as they are length 2 and 3599 # not otherwise contained in the zip file. 3600 # Data elements are encoded bytes (ascii, utf-8, shift_jis). 3601 placeholders = ["n1", "n2"] + self.file_names[2:] 3602 with zipfile.ZipFile(TESTFN, mode="w") as tf: 3603 for temp, content in zip(placeholders, self.file_content): 3604 tf.writestr(temp, content, zipfile.ZIP_STORED) 3605 # Hack in the Shift JIS names with flag bit 11 (UTF-8) unset. 3606 with open(TESTFN, "rb") as tf: 3607 data = tf.read() 3608 for name, temp in zip(self.file_names, placeholders[:2]): 3609 data = data.replace(temp.encode('ascii'), 3610 name.encode('shift_jis')) 3611 with open(TESTFN, "wb") as tf: 3612 tf.write(data) 3613 3614 def _test_read(self, zipfp, expected_names, expected_content): 3615 # Check the namelist 3616 names = zipfp.namelist() 3617 self.assertEqual(sorted(names), sorted(expected_names)) 3618 3619 # Check infolist 3620 infos = zipfp.infolist() 3621 names = [zi.filename for zi in infos] 3622 self.assertEqual(sorted(names), sorted(expected_names)) 3623 3624 # check getinfo 3625 for name, content in zip(expected_names, expected_content): 3626 info = zipfp.getinfo(name) 3627 self.assertEqual(info.filename, name) 3628 self.assertEqual(info.file_size, len(content)) 3629 self.assertEqual(zipfp.read(name), content) 3630 3631 def test_read_with_metadata_encoding(self): 3632 # Read the ZIP archive with correct metadata_encoding 3633 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp: 3634 self._test_read(zipfp, self.file_names, self.file_content) 3635 3636 def test_read_without_metadata_encoding(self): 3637 # Read the ZIP archive without metadata_encoding 3638 expected_names = [name.encode('shift_jis').decode('cp437') 3639 for name in self.file_names[:2]] + self.file_names[2:] 3640 with zipfile.ZipFile(TESTFN, "r") as zipfp: 3641 self._test_read(zipfp, expected_names, self.file_content) 3642 3643 def test_read_with_incorrect_metadata_encoding(self): 3644 # Read the ZIP archive with incorrect metadata_encoding 3645 expected_names = [name.encode('shift_jis').decode('koi8-u') 3646 for name in self.file_names[:2]] + self.file_names[2:] 3647 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='koi8-u') as zipfp: 3648 self._test_read(zipfp, expected_names, self.file_content) 3649 3650 def test_read_with_unsuitable_metadata_encoding(self): 3651 # Read the ZIP archive with metadata_encoding unsuitable for 3652 # decoding metadata 3653 with self.assertRaises(UnicodeDecodeError): 3654 zipfile.ZipFile(TESTFN, "r", metadata_encoding='ascii') 3655 with self.assertRaises(UnicodeDecodeError): 3656 zipfile.ZipFile(TESTFN, "r", metadata_encoding='utf-8') 3657 3658 def test_read_after_append(self): 3659 newname = '\u56db' # Han 'four' 3660 expected_names = [name.encode('shift_jis').decode('cp437') 3661 for name in self.file_names[:2]] + self.file_names[2:] 3662 expected_names.append(newname) 3663 expected_content = (*self.file_content, b"newcontent") 3664 3665 with zipfile.ZipFile(TESTFN, "a") as zipfp: 3666 zipfp.writestr(newname, "newcontent") 3667 self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names)) 3668 3669 with zipfile.ZipFile(TESTFN, "r") as zipfp: 3670 self._test_read(zipfp, expected_names, expected_content) 3671 3672 with zipfile.ZipFile(TESTFN, "r", metadata_encoding='shift_jis') as zipfp: 3673 self.assertEqual(sorted(zipfp.namelist()), sorted(expected_names)) 3674 for i, (name, content) in enumerate(zip(expected_names, expected_content)): 3675 info = zipfp.getinfo(name) 3676 self.assertEqual(info.filename, name) 3677 self.assertEqual(info.file_size, len(content)) 3678 if i < 2: 3679 with self.assertRaises(zipfile.BadZipFile): 3680 zipfp.read(name) 3681 else: 3682 self.assertEqual(zipfp.read(name), content) 3683 3684 def test_write_with_metadata_encoding(self): 3685 ZF = zipfile.ZipFile 3686 for mode in ("w", "x", "a"): 3687 with self.assertRaisesRegex(ValueError, 3688 "^metadata_encoding is only"): 3689 ZF("nonesuch.zip", mode, metadata_encoding="shift_jis") 3690 3691 def test_cli_with_metadata_encoding(self): 3692 errmsg = "Non-conforming encodings not supported with -c." 3693 args = ["--metadata-encoding=shift_jis", "-c", "nonesuch", "nonesuch"] 3694 with captured_stdout() as stdout: 3695 with captured_stderr() as stderr: 3696 self.assertRaises(SystemExit, zipfile.main, args) 3697 self.assertEqual(stdout.getvalue(), "") 3698 self.assertIn(errmsg, stderr.getvalue()) 3699 3700 with captured_stdout() as stdout: 3701 zipfile.main(["--metadata-encoding=shift_jis", "-t", TESTFN]) 3702 listing = stdout.getvalue() 3703 3704 with captured_stdout() as stdout: 3705 zipfile.main(["--metadata-encoding=shift_jis", "-l", TESTFN]) 3706 listing = stdout.getvalue() 3707 for name in self.file_names: 3708 self.assertIn(name, listing) 3709 3710 def test_cli_with_metadata_encoding_extract(self): 3711 os.mkdir(TESTFN2) 3712 self.addCleanup(rmtree, TESTFN2) 3713 # Depending on locale, extracted file names can be not encodable 3714 # with the filesystem encoding. 3715 for fn in self.file_names: 3716 try: 3717 os.stat(os.path.join(TESTFN2, fn)) 3718 except OSError: 3719 pass 3720 except UnicodeEncodeError: 3721 self.skipTest(f'cannot encode file name {fn!r}') 3722 3723 zipfile.main(["--metadata-encoding=shift_jis", "-e", TESTFN, TESTFN2]) 3724 listing = os.listdir(TESTFN2) 3725 for name in self.file_names: 3726 self.assertIn(name, listing) 3727 3728 3729class StripExtraTests(unittest.TestCase): 3730 # Note: all of the "z" characters are technically invalid, but up 3731 # to 3 bytes at the end of the extra will be passed through as they 3732 # are too short to encode a valid extra. 3733 3734 ZIP64_EXTRA = 1 3735 3736 def test_no_data(self): 3737 s = struct.Struct("<HH") 3738 a = s.pack(self.ZIP64_EXTRA, 0) 3739 b = s.pack(2, 0) 3740 c = s.pack(3, 0) 3741 3742 self.assertEqual(b'', zipfile._strip_extra(a, (self.ZIP64_EXTRA,))) 3743 self.assertEqual(b, zipfile._strip_extra(b, (self.ZIP64_EXTRA,))) 3744 self.assertEqual( 3745 b+b"z", zipfile._strip_extra(b+b"z", (self.ZIP64_EXTRA,))) 3746 3747 self.assertEqual(b+c, zipfile._strip_extra(a+b+c, (self.ZIP64_EXTRA,))) 3748 self.assertEqual(b+c, zipfile._strip_extra(b+a+c, (self.ZIP64_EXTRA,))) 3749 self.assertEqual(b+c, zipfile._strip_extra(b+c+a, (self.ZIP64_EXTRA,))) 3750 3751 def test_with_data(self): 3752 s = struct.Struct("<HH") 3753 a = s.pack(self.ZIP64_EXTRA, 1) + b"a" 3754 b = s.pack(2, 2) + b"bb" 3755 c = s.pack(3, 3) + b"ccc" 3756 3757 self.assertEqual(b"", zipfile._strip_extra(a, (self.ZIP64_EXTRA,))) 3758 self.assertEqual(b, zipfile._strip_extra(b, (self.ZIP64_EXTRA,))) 3759 self.assertEqual( 3760 b+b"z", zipfile._strip_extra(b+b"z", (self.ZIP64_EXTRA,))) 3761 3762 self.assertEqual(b+c, zipfile._strip_extra(a+b+c, (self.ZIP64_EXTRA,))) 3763 self.assertEqual(b+c, zipfile._strip_extra(b+a+c, (self.ZIP64_EXTRA,))) 3764 self.assertEqual(b+c, zipfile._strip_extra(b+c+a, (self.ZIP64_EXTRA,))) 3765 3766 def test_multiples(self): 3767 s = struct.Struct("<HH") 3768 a = s.pack(self.ZIP64_EXTRA, 1) + b"a" 3769 b = s.pack(2, 2) + b"bb" 3770 3771 self.assertEqual(b"", zipfile._strip_extra(a+a, (self.ZIP64_EXTRA,))) 3772 self.assertEqual(b"", zipfile._strip_extra(a+a+a, (self.ZIP64_EXTRA,))) 3773 self.assertEqual( 3774 b"z", zipfile._strip_extra(a+a+b"z", (self.ZIP64_EXTRA,))) 3775 self.assertEqual( 3776 b+b"z", zipfile._strip_extra(a+a+b+b"z", (self.ZIP64_EXTRA,))) 3777 3778 self.assertEqual(b, zipfile._strip_extra(a+a+b, (self.ZIP64_EXTRA,))) 3779 self.assertEqual(b, zipfile._strip_extra(a+b+a, (self.ZIP64_EXTRA,))) 3780 self.assertEqual(b, zipfile._strip_extra(b+a+a, (self.ZIP64_EXTRA,))) 3781 3782 def test_too_short(self): 3783 self.assertEqual(b"", zipfile._strip_extra(b"", (self.ZIP64_EXTRA,))) 3784 self.assertEqual(b"z", zipfile._strip_extra(b"z", (self.ZIP64_EXTRA,))) 3785 self.assertEqual( 3786 b"zz", zipfile._strip_extra(b"zz", (self.ZIP64_EXTRA,))) 3787 self.assertEqual( 3788 b"zzz", zipfile._strip_extra(b"zzz", (self.ZIP64_EXTRA,))) 3789 3790 3791if __name__ == "__main__": 3792 unittest.main() 3793