1from test import support 2from test.support import bigmemtest, _4G 3 4import array 5import unittest 6from io import BytesIO, DEFAULT_BUFFER_SIZE 7import os 8import pickle 9import glob 10import tempfile 11import pathlib 12import random 13import shutil 14import subprocess 15import threading 16from test.support import import_helper 17from test.support import threading_helper 18from test.support.os_helper import unlink 19import _compression 20import sys 21 22 23# Skip tests if the bz2 module doesn't exist. 24bz2 = import_helper.import_module('bz2') 25from bz2 import BZ2File, BZ2Compressor, BZ2Decompressor 26 27has_cmdline_bunzip2 = None 28 29def ext_decompress(data): 30 global has_cmdline_bunzip2 31 if has_cmdline_bunzip2 is None: 32 has_cmdline_bunzip2 = bool(shutil.which('bunzip2')) 33 if has_cmdline_bunzip2: 34 return subprocess.check_output(['bunzip2'], input=data) 35 else: 36 return bz2.decompress(data) 37 38class BaseTest(unittest.TestCase): 39 "Base for other testcases." 40 41 TEXT_LINES = [ 42 b'root:x:0:0:root:/root:/bin/bash\n', 43 b'bin:x:1:1:bin:/bin:\n', 44 b'daemon:x:2:2:daemon:/sbin:\n', 45 b'adm:x:3:4:adm:/var/adm:\n', 46 b'lp:x:4:7:lp:/var/spool/lpd:\n', 47 b'sync:x:5:0:sync:/sbin:/bin/sync\n', 48 b'shutdown:x:6:0:shutdown:/sbin:/sbin/shutdown\n', 49 b'halt:x:7:0:halt:/sbin:/sbin/halt\n', 50 b'mail:x:8:12:mail:/var/spool/mail:\n', 51 b'news:x:9:13:news:/var/spool/news:\n', 52 b'uucp:x:10:14:uucp:/var/spool/uucp:\n', 53 b'operator:x:11:0:operator:/root:\n', 54 b'games:x:12:100:games:/usr/games:\n', 55 b'gopher:x:13:30:gopher:/usr/lib/gopher-data:\n', 56 b'ftp:x:14:50:FTP User:/var/ftp:/bin/bash\n', 57 b'nobody:x:65534:65534:Nobody:/home:\n', 58 b'postfix:x:100:101:postfix:/var/spool/postfix:\n', 59 b'niemeyer:x:500:500::/home/niemeyer:/bin/bash\n', 60 b'postgres:x:101:102:PostgreSQL Server:/var/lib/pgsql:/bin/bash\n', 61 b'mysql:x:102:103:MySQL server:/var/lib/mysql:/bin/bash\n', 62 b'www:x:103:104::/var/www:/bin/false\n', 63 ] 64 TEXT = b''.join(TEXT_LINES) 65 DATA = b'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`' 66 EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00' 67 BAD_DATA = b'this is not a valid bzip2 file' 68 69 # Some tests need more than one block of uncompressed data. Since one block 70 # is at least 100,000 bytes, we gather some data dynamically and compress it. 71 # Note that this assumes that compression works correctly, so we cannot 72 # simply use the bigger test data for all tests. 73 test_size = 0 74 BIG_TEXT = bytearray(128*1024) 75 for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')): 76 with open(fname, 'rb') as fh: 77 test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:]) 78 if test_size > 128*1024: 79 break 80 BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1) 81 82 def setUp(self): 83 fd, self.filename = tempfile.mkstemp() 84 os.close(fd) 85 86 def tearDown(self): 87 unlink(self.filename) 88 89 90class BZ2FileTest(BaseTest): 91 "Test the BZ2File class." 92 93 def createTempFile(self, streams=1, suffix=b""): 94 with open(self.filename, "wb") as f: 95 f.write(self.DATA * streams) 96 f.write(suffix) 97 98 def testBadArgs(self): 99 self.assertRaises(TypeError, BZ2File, 123.456) 100 self.assertRaises(ValueError, BZ2File, os.devnull, "z") 101 self.assertRaises(ValueError, BZ2File, os.devnull, "rx") 102 self.assertRaises(ValueError, BZ2File, os.devnull, "rbt") 103 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=0) 104 self.assertRaises(ValueError, BZ2File, os.devnull, compresslevel=10) 105 106 # compresslevel is keyword-only 107 self.assertRaises(TypeError, BZ2File, os.devnull, "r", 3) 108 109 def testRead(self): 110 self.createTempFile() 111 with BZ2File(self.filename) as bz2f: 112 self.assertRaises(TypeError, bz2f.read, float()) 113 self.assertEqual(bz2f.read(), self.TEXT) 114 115 def testReadBadFile(self): 116 self.createTempFile(streams=0, suffix=self.BAD_DATA) 117 with BZ2File(self.filename) as bz2f: 118 self.assertRaises(OSError, bz2f.read) 119 120 def testReadMultiStream(self): 121 self.createTempFile(streams=5) 122 with BZ2File(self.filename) as bz2f: 123 self.assertRaises(TypeError, bz2f.read, float()) 124 self.assertEqual(bz2f.read(), self.TEXT * 5) 125 126 def testReadMonkeyMultiStream(self): 127 # Test BZ2File.read() on a multi-stream archive where a stream 128 # boundary coincides with the end of the raw read buffer. 129 buffer_size = _compression.BUFFER_SIZE 130 _compression.BUFFER_SIZE = len(self.DATA) 131 try: 132 self.createTempFile(streams=5) 133 with BZ2File(self.filename) as bz2f: 134 self.assertRaises(TypeError, bz2f.read, float()) 135 self.assertEqual(bz2f.read(), self.TEXT * 5) 136 finally: 137 _compression.BUFFER_SIZE = buffer_size 138 139 def testReadTrailingJunk(self): 140 self.createTempFile(suffix=self.BAD_DATA) 141 with BZ2File(self.filename) as bz2f: 142 self.assertEqual(bz2f.read(), self.TEXT) 143 144 def testReadMultiStreamTrailingJunk(self): 145 self.createTempFile(streams=5, suffix=self.BAD_DATA) 146 with BZ2File(self.filename) as bz2f: 147 self.assertEqual(bz2f.read(), self.TEXT * 5) 148 149 def testRead0(self): 150 self.createTempFile() 151 with BZ2File(self.filename) as bz2f: 152 self.assertRaises(TypeError, bz2f.read, float()) 153 self.assertEqual(bz2f.read(0), b"") 154 155 def testReadChunk10(self): 156 self.createTempFile() 157 with BZ2File(self.filename) as bz2f: 158 text = b'' 159 while True: 160 str = bz2f.read(10) 161 if not str: 162 break 163 text += str 164 self.assertEqual(text, self.TEXT) 165 166 def testReadChunk10MultiStream(self): 167 self.createTempFile(streams=5) 168 with BZ2File(self.filename) as bz2f: 169 text = b'' 170 while True: 171 str = bz2f.read(10) 172 if not str: 173 break 174 text += str 175 self.assertEqual(text, self.TEXT * 5) 176 177 def testRead100(self): 178 self.createTempFile() 179 with BZ2File(self.filename) as bz2f: 180 self.assertEqual(bz2f.read(100), self.TEXT[:100]) 181 182 def testPeek(self): 183 self.createTempFile() 184 with BZ2File(self.filename) as bz2f: 185 pdata = bz2f.peek() 186 self.assertNotEqual(len(pdata), 0) 187 self.assertTrue(self.TEXT.startswith(pdata)) 188 self.assertEqual(bz2f.read(), self.TEXT) 189 190 def testReadInto(self): 191 self.createTempFile() 192 with BZ2File(self.filename) as bz2f: 193 n = 128 194 b = bytearray(n) 195 self.assertEqual(bz2f.readinto(b), n) 196 self.assertEqual(b, self.TEXT[:n]) 197 n = len(self.TEXT) - n 198 b = bytearray(len(self.TEXT)) 199 self.assertEqual(bz2f.readinto(b), n) 200 self.assertEqual(b[:n], self.TEXT[-n:]) 201 202 def testReadLine(self): 203 self.createTempFile() 204 with BZ2File(self.filename) as bz2f: 205 self.assertRaises(TypeError, bz2f.readline, None) 206 for line in self.TEXT_LINES: 207 self.assertEqual(bz2f.readline(), line) 208 209 def testReadLineMultiStream(self): 210 self.createTempFile(streams=5) 211 with BZ2File(self.filename) as bz2f: 212 self.assertRaises(TypeError, bz2f.readline, None) 213 for line in self.TEXT_LINES * 5: 214 self.assertEqual(bz2f.readline(), line) 215 216 def testReadLines(self): 217 self.createTempFile() 218 with BZ2File(self.filename) as bz2f: 219 self.assertRaises(TypeError, bz2f.readlines, None) 220 self.assertEqual(bz2f.readlines(), self.TEXT_LINES) 221 222 def testReadLinesMultiStream(self): 223 self.createTempFile(streams=5) 224 with BZ2File(self.filename) as bz2f: 225 self.assertRaises(TypeError, bz2f.readlines, None) 226 self.assertEqual(bz2f.readlines(), self.TEXT_LINES * 5) 227 228 def testIterator(self): 229 self.createTempFile() 230 with BZ2File(self.filename) as bz2f: 231 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES) 232 233 def testIteratorMultiStream(self): 234 self.createTempFile(streams=5) 235 with BZ2File(self.filename) as bz2f: 236 self.assertEqual(list(iter(bz2f)), self.TEXT_LINES * 5) 237 238 def testClosedIteratorDeadlock(self): 239 # Issue #3309: Iteration on a closed BZ2File should release the lock. 240 self.createTempFile() 241 bz2f = BZ2File(self.filename) 242 bz2f.close() 243 self.assertRaises(ValueError, next, bz2f) 244 # This call will deadlock if the above call failed to release the lock. 245 self.assertRaises(ValueError, bz2f.readlines) 246 247 def testWrite(self): 248 with BZ2File(self.filename, "w") as bz2f: 249 self.assertRaises(TypeError, bz2f.write) 250 bz2f.write(self.TEXT) 251 with open(self.filename, 'rb') as f: 252 self.assertEqual(ext_decompress(f.read()), self.TEXT) 253 254 def testWriteChunks10(self): 255 with BZ2File(self.filename, "w") as bz2f: 256 n = 0 257 while True: 258 str = self.TEXT[n*10:(n+1)*10] 259 if not str: 260 break 261 bz2f.write(str) 262 n += 1 263 with open(self.filename, 'rb') as f: 264 self.assertEqual(ext_decompress(f.read()), self.TEXT) 265 266 def testWriteNonDefaultCompressLevel(self): 267 expected = bz2.compress(self.TEXT, compresslevel=5) 268 with BZ2File(self.filename, "w", compresslevel=5) as bz2f: 269 bz2f.write(self.TEXT) 270 with open(self.filename, "rb") as f: 271 self.assertEqual(f.read(), expected) 272 273 def testWriteLines(self): 274 with BZ2File(self.filename, "w") as bz2f: 275 self.assertRaises(TypeError, bz2f.writelines) 276 bz2f.writelines(self.TEXT_LINES) 277 # Issue #1535500: Calling writelines() on a closed BZ2File 278 # should raise an exception. 279 self.assertRaises(ValueError, bz2f.writelines, ["a"]) 280 with open(self.filename, 'rb') as f: 281 self.assertEqual(ext_decompress(f.read()), self.TEXT) 282 283 def testWriteMethodsOnReadOnlyFile(self): 284 with BZ2File(self.filename, "w") as bz2f: 285 bz2f.write(b"abc") 286 287 with BZ2File(self.filename, "r") as bz2f: 288 self.assertRaises(OSError, bz2f.write, b"a") 289 self.assertRaises(OSError, bz2f.writelines, [b"a"]) 290 291 def testAppend(self): 292 with BZ2File(self.filename, "w") as bz2f: 293 self.assertRaises(TypeError, bz2f.write) 294 bz2f.write(self.TEXT) 295 with BZ2File(self.filename, "a") as bz2f: 296 self.assertRaises(TypeError, bz2f.write) 297 bz2f.write(self.TEXT) 298 with open(self.filename, 'rb') as f: 299 self.assertEqual(ext_decompress(f.read()), self.TEXT * 2) 300 301 def testSeekForward(self): 302 self.createTempFile() 303 with BZ2File(self.filename) as bz2f: 304 self.assertRaises(TypeError, bz2f.seek) 305 bz2f.seek(150) 306 self.assertEqual(bz2f.read(), self.TEXT[150:]) 307 308 def testSeekForwardAcrossStreams(self): 309 self.createTempFile(streams=2) 310 with BZ2File(self.filename) as bz2f: 311 self.assertRaises(TypeError, bz2f.seek) 312 bz2f.seek(len(self.TEXT) + 150) 313 self.assertEqual(bz2f.read(), self.TEXT[150:]) 314 315 def testSeekBackwards(self): 316 self.createTempFile() 317 with BZ2File(self.filename) as bz2f: 318 bz2f.read(500) 319 bz2f.seek(-150, 1) 320 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 321 322 def testSeekBackwardsAcrossStreams(self): 323 self.createTempFile(streams=2) 324 with BZ2File(self.filename) as bz2f: 325 readto = len(self.TEXT) + 100 326 while readto > 0: 327 readto -= len(bz2f.read(readto)) 328 bz2f.seek(-150, 1) 329 self.assertEqual(bz2f.read(), self.TEXT[100-150:] + self.TEXT) 330 331 def testSeekBackwardsFromEnd(self): 332 self.createTempFile() 333 with BZ2File(self.filename) as bz2f: 334 bz2f.seek(-150, 2) 335 self.assertEqual(bz2f.read(), self.TEXT[len(self.TEXT)-150:]) 336 337 def testSeekBackwardsFromEndAcrossStreams(self): 338 self.createTempFile(streams=2) 339 with BZ2File(self.filename) as bz2f: 340 bz2f.seek(-1000, 2) 341 self.assertEqual(bz2f.read(), (self.TEXT * 2)[-1000:]) 342 343 def testSeekPostEnd(self): 344 self.createTempFile() 345 with BZ2File(self.filename) as bz2f: 346 bz2f.seek(150000) 347 self.assertEqual(bz2f.tell(), len(self.TEXT)) 348 self.assertEqual(bz2f.read(), b"") 349 350 def testSeekPostEndMultiStream(self): 351 self.createTempFile(streams=5) 352 with BZ2File(self.filename) as bz2f: 353 bz2f.seek(150000) 354 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 355 self.assertEqual(bz2f.read(), b"") 356 357 def testSeekPostEndTwice(self): 358 self.createTempFile() 359 with BZ2File(self.filename) as bz2f: 360 bz2f.seek(150000) 361 bz2f.seek(150000) 362 self.assertEqual(bz2f.tell(), len(self.TEXT)) 363 self.assertEqual(bz2f.read(), b"") 364 365 def testSeekPostEndTwiceMultiStream(self): 366 self.createTempFile(streams=5) 367 with BZ2File(self.filename) as bz2f: 368 bz2f.seek(150000) 369 bz2f.seek(150000) 370 self.assertEqual(bz2f.tell(), len(self.TEXT) * 5) 371 self.assertEqual(bz2f.read(), b"") 372 373 def testSeekPreStart(self): 374 self.createTempFile() 375 with BZ2File(self.filename) as bz2f: 376 bz2f.seek(-150) 377 self.assertEqual(bz2f.tell(), 0) 378 self.assertEqual(bz2f.read(), self.TEXT) 379 380 def testSeekPreStartMultiStream(self): 381 self.createTempFile(streams=2) 382 with BZ2File(self.filename) as bz2f: 383 bz2f.seek(-150) 384 self.assertEqual(bz2f.tell(), 0) 385 self.assertEqual(bz2f.read(), self.TEXT * 2) 386 387 def testFileno(self): 388 self.createTempFile() 389 with open(self.filename, 'rb') as rawf: 390 bz2f = BZ2File(rawf) 391 try: 392 self.assertEqual(bz2f.fileno(), rawf.fileno()) 393 finally: 394 bz2f.close() 395 self.assertRaises(ValueError, bz2f.fileno) 396 397 def testSeekable(self): 398 bz2f = BZ2File(BytesIO(self.DATA)) 399 try: 400 self.assertTrue(bz2f.seekable()) 401 bz2f.read() 402 self.assertTrue(bz2f.seekable()) 403 finally: 404 bz2f.close() 405 self.assertRaises(ValueError, bz2f.seekable) 406 407 bz2f = BZ2File(BytesIO(), "w") 408 try: 409 self.assertFalse(bz2f.seekable()) 410 finally: 411 bz2f.close() 412 self.assertRaises(ValueError, bz2f.seekable) 413 414 src = BytesIO(self.DATA) 415 src.seekable = lambda: False 416 bz2f = BZ2File(src) 417 try: 418 self.assertFalse(bz2f.seekable()) 419 finally: 420 bz2f.close() 421 self.assertRaises(ValueError, bz2f.seekable) 422 423 def testReadable(self): 424 bz2f = BZ2File(BytesIO(self.DATA)) 425 try: 426 self.assertTrue(bz2f.readable()) 427 bz2f.read() 428 self.assertTrue(bz2f.readable()) 429 finally: 430 bz2f.close() 431 self.assertRaises(ValueError, bz2f.readable) 432 433 bz2f = BZ2File(BytesIO(), "w") 434 try: 435 self.assertFalse(bz2f.readable()) 436 finally: 437 bz2f.close() 438 self.assertRaises(ValueError, bz2f.readable) 439 440 def testWritable(self): 441 bz2f = BZ2File(BytesIO(self.DATA)) 442 try: 443 self.assertFalse(bz2f.writable()) 444 bz2f.read() 445 self.assertFalse(bz2f.writable()) 446 finally: 447 bz2f.close() 448 self.assertRaises(ValueError, bz2f.writable) 449 450 bz2f = BZ2File(BytesIO(), "w") 451 try: 452 self.assertTrue(bz2f.writable()) 453 finally: 454 bz2f.close() 455 self.assertRaises(ValueError, bz2f.writable) 456 457 def testOpenDel(self): 458 self.createTempFile() 459 for i in range(10000): 460 o = BZ2File(self.filename) 461 del o 462 463 def testOpenNonexistent(self): 464 self.assertRaises(OSError, BZ2File, "/non/existent") 465 466 def testReadlinesNoNewline(self): 467 # Issue #1191043: readlines() fails on a file containing no newline. 468 data = b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00 \x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t' 469 with open(self.filename, "wb") as f: 470 f.write(data) 471 with BZ2File(self.filename) as bz2f: 472 lines = bz2f.readlines() 473 self.assertEqual(lines, [b'Test']) 474 with BZ2File(self.filename) as bz2f: 475 xlines = list(bz2f.readlines()) 476 self.assertEqual(xlines, [b'Test']) 477 478 def testContextProtocol(self): 479 f = None 480 with BZ2File(self.filename, "wb") as f: 481 f.write(b"xxx") 482 f = BZ2File(self.filename, "rb") 483 f.close() 484 try: 485 with f: 486 pass 487 except ValueError: 488 pass 489 else: 490 self.fail("__enter__ on a closed file didn't raise an exception") 491 try: 492 with BZ2File(self.filename, "wb") as f: 493 1/0 494 except ZeroDivisionError: 495 pass 496 else: 497 self.fail("1/0 didn't raise an exception") 498 499 @threading_helper.requires_working_threading() 500 def testThreading(self): 501 # Issue #7205: Using a BZ2File from several threads shouldn't deadlock. 502 data = b"1" * 2**20 503 nthreads = 10 504 with BZ2File(self.filename, 'wb') as f: 505 def comp(): 506 for i in range(5): 507 f.write(data) 508 threads = [threading.Thread(target=comp) for i in range(nthreads)] 509 with threading_helper.start_threads(threads): 510 pass 511 512 def testMixedIterationAndReads(self): 513 self.createTempFile() 514 linelen = len(self.TEXT_LINES[0]) 515 halflen = linelen // 2 516 with BZ2File(self.filename) as bz2f: 517 bz2f.read(halflen) 518 self.assertEqual(next(bz2f), self.TEXT_LINES[0][halflen:]) 519 self.assertEqual(bz2f.read(), self.TEXT[linelen:]) 520 with BZ2File(self.filename) as bz2f: 521 bz2f.readline() 522 self.assertEqual(next(bz2f), self.TEXT_LINES[1]) 523 self.assertEqual(bz2f.readline(), self.TEXT_LINES[2]) 524 with BZ2File(self.filename) as bz2f: 525 bz2f.readlines() 526 self.assertRaises(StopIteration, next, bz2f) 527 self.assertEqual(bz2f.readlines(), []) 528 529 def testMultiStreamOrdering(self): 530 # Test the ordering of streams when reading a multi-stream archive. 531 data1 = b"foo" * 1000 532 data2 = b"bar" * 1000 533 with BZ2File(self.filename, "w") as bz2f: 534 bz2f.write(data1) 535 with BZ2File(self.filename, "a") as bz2f: 536 bz2f.write(data2) 537 with BZ2File(self.filename) as bz2f: 538 self.assertEqual(bz2f.read(), data1 + data2) 539 540 def testOpenBytesFilename(self): 541 str_filename = self.filename 542 try: 543 bytes_filename = str_filename.encode("ascii") 544 except UnicodeEncodeError: 545 self.skipTest("Temporary file name needs to be ASCII") 546 with BZ2File(bytes_filename, "wb") as f: 547 f.write(self.DATA) 548 with BZ2File(bytes_filename, "rb") as f: 549 self.assertEqual(f.read(), self.DATA) 550 # Sanity check that we are actually operating on the right file. 551 with BZ2File(str_filename, "rb") as f: 552 self.assertEqual(f.read(), self.DATA) 553 554 def testOpenPathLikeFilename(self): 555 filename = pathlib.Path(self.filename) 556 with BZ2File(filename, "wb") as f: 557 f.write(self.DATA) 558 with BZ2File(filename, "rb") as f: 559 self.assertEqual(f.read(), self.DATA) 560 561 def testDecompressLimited(self): 562 """Decompressed data buffering should be limited""" 563 bomb = bz2.compress(b'\0' * int(2e6), compresslevel=9) 564 self.assertLess(len(bomb), _compression.BUFFER_SIZE) 565 566 decomp = BZ2File(BytesIO(bomb)) 567 self.assertEqual(decomp.read(1), b'\0') 568 max_decomp = 1 + DEFAULT_BUFFER_SIZE 569 self.assertLessEqual(decomp._buffer.raw.tell(), max_decomp, 570 "Excessive amount of data was decompressed") 571 572 573 # Tests for a BZ2File wrapping another file object: 574 575 def testReadBytesIO(self): 576 with BytesIO(self.DATA) as bio: 577 with BZ2File(bio) as bz2f: 578 self.assertRaises(TypeError, bz2f.read, float()) 579 self.assertEqual(bz2f.read(), self.TEXT) 580 self.assertFalse(bio.closed) 581 582 def testPeekBytesIO(self): 583 with BytesIO(self.DATA) as bio: 584 with BZ2File(bio) as bz2f: 585 pdata = bz2f.peek() 586 self.assertNotEqual(len(pdata), 0) 587 self.assertTrue(self.TEXT.startswith(pdata)) 588 self.assertEqual(bz2f.read(), self.TEXT) 589 590 def testWriteBytesIO(self): 591 with BytesIO() as bio: 592 with BZ2File(bio, "w") as bz2f: 593 self.assertRaises(TypeError, bz2f.write) 594 bz2f.write(self.TEXT) 595 self.assertEqual(ext_decompress(bio.getvalue()), self.TEXT) 596 self.assertFalse(bio.closed) 597 598 def testSeekForwardBytesIO(self): 599 with BytesIO(self.DATA) as bio: 600 with BZ2File(bio) as bz2f: 601 self.assertRaises(TypeError, bz2f.seek) 602 bz2f.seek(150) 603 self.assertEqual(bz2f.read(), self.TEXT[150:]) 604 605 def testSeekBackwardsBytesIO(self): 606 with BytesIO(self.DATA) as bio: 607 with BZ2File(bio) as bz2f: 608 bz2f.read(500) 609 bz2f.seek(-150, 1) 610 self.assertEqual(bz2f.read(), self.TEXT[500-150:]) 611 612 def test_read_truncated(self): 613 # Drop the eos_magic field (6 bytes) and CRC (4 bytes). 614 truncated = self.DATA[:-10] 615 with BZ2File(BytesIO(truncated)) as f: 616 self.assertRaises(EOFError, f.read) 617 with BZ2File(BytesIO(truncated)) as f: 618 self.assertEqual(f.read(len(self.TEXT)), self.TEXT) 619 self.assertRaises(EOFError, f.read, 1) 620 # Incomplete 4-byte file header, and block header of at least 146 bits. 621 for i in range(22): 622 with BZ2File(BytesIO(truncated[:i])) as f: 623 self.assertRaises(EOFError, f.read, 1) 624 625 def test_issue44439(self): 626 q = array.array('Q', [1, 2, 3, 4, 5]) 627 LENGTH = len(q) * q.itemsize 628 629 with BZ2File(BytesIO(), 'w') as f: 630 self.assertEqual(f.write(q), LENGTH) 631 self.assertEqual(f.tell(), LENGTH) 632 633 634class BZ2CompressorTest(BaseTest): 635 def testCompress(self): 636 bz2c = BZ2Compressor() 637 self.assertRaises(TypeError, bz2c.compress) 638 data = bz2c.compress(self.TEXT) 639 data += bz2c.flush() 640 self.assertEqual(ext_decompress(data), self.TEXT) 641 642 def testCompressEmptyString(self): 643 bz2c = BZ2Compressor() 644 data = bz2c.compress(b'') 645 data += bz2c.flush() 646 self.assertEqual(data, self.EMPTY_DATA) 647 648 def testCompressChunks10(self): 649 bz2c = BZ2Compressor() 650 n = 0 651 data = b'' 652 while True: 653 str = self.TEXT[n*10:(n+1)*10] 654 if not str: 655 break 656 data += bz2c.compress(str) 657 n += 1 658 data += bz2c.flush() 659 self.assertEqual(ext_decompress(data), self.TEXT) 660 661 @support.skip_if_pgo_task 662 @bigmemtest(size=_4G + 100, memuse=2) 663 def testCompress4G(self, size): 664 # "Test BZ2Compressor.compress()/flush() with >4GiB input" 665 bz2c = BZ2Compressor() 666 data = b"x" * size 667 try: 668 compressed = bz2c.compress(data) 669 compressed += bz2c.flush() 670 finally: 671 data = None # Release memory 672 data = bz2.decompress(compressed) 673 try: 674 self.assertEqual(len(data), size) 675 self.assertEqual(len(data.strip(b"x")), 0) 676 finally: 677 data = None 678 679 def testPickle(self): 680 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 681 with self.assertRaises(TypeError): 682 pickle.dumps(BZ2Compressor(), proto) 683 684 685class BZ2DecompressorTest(BaseTest): 686 def test_Constructor(self): 687 self.assertRaises(TypeError, BZ2Decompressor, 42) 688 689 def testDecompress(self): 690 bz2d = BZ2Decompressor() 691 self.assertRaises(TypeError, bz2d.decompress) 692 text = bz2d.decompress(self.DATA) 693 self.assertEqual(text, self.TEXT) 694 695 def testDecompressChunks10(self): 696 bz2d = BZ2Decompressor() 697 text = b'' 698 n = 0 699 while True: 700 str = self.DATA[n*10:(n+1)*10] 701 if not str: 702 break 703 text += bz2d.decompress(str) 704 n += 1 705 self.assertEqual(text, self.TEXT) 706 707 def testDecompressUnusedData(self): 708 bz2d = BZ2Decompressor() 709 unused_data = b"this is unused data" 710 text = bz2d.decompress(self.DATA+unused_data) 711 self.assertEqual(text, self.TEXT) 712 self.assertEqual(bz2d.unused_data, unused_data) 713 714 def testEOFError(self): 715 bz2d = BZ2Decompressor() 716 text = bz2d.decompress(self.DATA) 717 self.assertRaises(EOFError, bz2d.decompress, b"anything") 718 self.assertRaises(EOFError, bz2d.decompress, b"") 719 720 @support.skip_if_pgo_task 721 @bigmemtest(size=_4G + 100, memuse=3.3) 722 def testDecompress4G(self, size): 723 # "Test BZ2Decompressor.decompress() with >4GiB input" 724 blocksize = 10 * 1024 * 1024 725 block = random.randbytes(blocksize) 726 try: 727 data = block * (size // blocksize + 1) 728 compressed = bz2.compress(data) 729 bz2d = BZ2Decompressor() 730 decompressed = bz2d.decompress(compressed) 731 self.assertTrue(decompressed == data) 732 finally: 733 data = None 734 compressed = None 735 decompressed = None 736 737 def testPickle(self): 738 for proto in range(pickle.HIGHEST_PROTOCOL + 1): 739 with self.assertRaises(TypeError): 740 pickle.dumps(BZ2Decompressor(), proto) 741 742 def testDecompressorChunksMaxsize(self): 743 bzd = BZ2Decompressor() 744 max_length = 100 745 out = [] 746 747 # Feed some input 748 len_ = len(self.BIG_DATA) - 64 749 out.append(bzd.decompress(self.BIG_DATA[:len_], 750 max_length=max_length)) 751 self.assertFalse(bzd.needs_input) 752 self.assertEqual(len(out[-1]), max_length) 753 754 # Retrieve more data without providing more input 755 out.append(bzd.decompress(b'', max_length=max_length)) 756 self.assertFalse(bzd.needs_input) 757 self.assertEqual(len(out[-1]), max_length) 758 759 # Retrieve more data while providing more input 760 out.append(bzd.decompress(self.BIG_DATA[len_:], 761 max_length=max_length)) 762 self.assertLessEqual(len(out[-1]), max_length) 763 764 # Retrieve remaining uncompressed data 765 while not bzd.eof: 766 out.append(bzd.decompress(b'', max_length=max_length)) 767 self.assertLessEqual(len(out[-1]), max_length) 768 769 out = b"".join(out) 770 self.assertEqual(out, self.BIG_TEXT) 771 self.assertEqual(bzd.unused_data, b"") 772 773 def test_decompressor_inputbuf_1(self): 774 # Test reusing input buffer after moving existing 775 # contents to beginning 776 bzd = BZ2Decompressor() 777 out = [] 778 779 # Create input buffer and fill it 780 self.assertEqual(bzd.decompress(self.DATA[:100], 781 max_length=0), b'') 782 783 # Retrieve some results, freeing capacity at beginning 784 # of input buffer 785 out.append(bzd.decompress(b'', 2)) 786 787 # Add more data that fits into input buffer after 788 # moving existing data to beginning 789 out.append(bzd.decompress(self.DATA[100:105], 15)) 790 791 # Decompress rest of data 792 out.append(bzd.decompress(self.DATA[105:])) 793 self.assertEqual(b''.join(out), self.TEXT) 794 795 def test_decompressor_inputbuf_2(self): 796 # Test reusing input buffer by appending data at the 797 # end right away 798 bzd = BZ2Decompressor() 799 out = [] 800 801 # Create input buffer and empty it 802 self.assertEqual(bzd.decompress(self.DATA[:200], 803 max_length=0), b'') 804 out.append(bzd.decompress(b'')) 805 806 # Fill buffer with new data 807 out.append(bzd.decompress(self.DATA[200:280], 2)) 808 809 # Append some more data, not enough to require resize 810 out.append(bzd.decompress(self.DATA[280:300], 2)) 811 812 # Decompress rest of data 813 out.append(bzd.decompress(self.DATA[300:])) 814 self.assertEqual(b''.join(out), self.TEXT) 815 816 def test_decompressor_inputbuf_3(self): 817 # Test reusing input buffer after extending it 818 819 bzd = BZ2Decompressor() 820 out = [] 821 822 # Create almost full input buffer 823 out.append(bzd.decompress(self.DATA[:200], 5)) 824 825 # Add even more data to it, requiring resize 826 out.append(bzd.decompress(self.DATA[200:300], 5)) 827 828 # Decompress rest of data 829 out.append(bzd.decompress(self.DATA[300:])) 830 self.assertEqual(b''.join(out), self.TEXT) 831 832 def test_failure(self): 833 bzd = BZ2Decompressor() 834 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 835 # Previously, a second call could crash due to internal inconsistency 836 self.assertRaises(Exception, bzd.decompress, self.BAD_DATA * 30) 837 838 @support.refcount_test 839 def test_refleaks_in___init__(self): 840 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 841 bzd = BZ2Decompressor() 842 refs_before = gettotalrefcount() 843 for i in range(100): 844 bzd.__init__() 845 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 846 847 848class CompressDecompressTest(BaseTest): 849 def testCompress(self): 850 data = bz2.compress(self.TEXT) 851 self.assertEqual(ext_decompress(data), self.TEXT) 852 853 def testCompressEmptyString(self): 854 text = bz2.compress(b'') 855 self.assertEqual(text, self.EMPTY_DATA) 856 857 def testDecompress(self): 858 text = bz2.decompress(self.DATA) 859 self.assertEqual(text, self.TEXT) 860 861 def testDecompressEmpty(self): 862 text = bz2.decompress(b"") 863 self.assertEqual(text, b"") 864 865 def testDecompressToEmptyString(self): 866 text = bz2.decompress(self.EMPTY_DATA) 867 self.assertEqual(text, b'') 868 869 def testDecompressIncomplete(self): 870 self.assertRaises(ValueError, bz2.decompress, self.DATA[:-10]) 871 872 def testDecompressBadData(self): 873 self.assertRaises(OSError, bz2.decompress, self.BAD_DATA) 874 875 def testDecompressMultiStream(self): 876 text = bz2.decompress(self.DATA * 5) 877 self.assertEqual(text, self.TEXT * 5) 878 879 def testDecompressTrailingJunk(self): 880 text = bz2.decompress(self.DATA + self.BAD_DATA) 881 self.assertEqual(text, self.TEXT) 882 883 def testDecompressMultiStreamTrailingJunk(self): 884 text = bz2.decompress(self.DATA * 5 + self.BAD_DATA) 885 self.assertEqual(text, self.TEXT * 5) 886 887 888class OpenTest(BaseTest): 889 "Test the open function." 890 891 def open(self, *args, **kwargs): 892 return bz2.open(*args, **kwargs) 893 894 def test_binary_modes(self): 895 for mode in ("wb", "xb"): 896 if mode == "xb": 897 unlink(self.filename) 898 with self.open(self.filename, mode) as f: 899 f.write(self.TEXT) 900 with open(self.filename, "rb") as f: 901 file_data = ext_decompress(f.read()) 902 self.assertEqual(file_data, self.TEXT) 903 with self.open(self.filename, "rb") as f: 904 self.assertEqual(f.read(), self.TEXT) 905 with self.open(self.filename, "ab") as f: 906 f.write(self.TEXT) 907 with open(self.filename, "rb") as f: 908 file_data = ext_decompress(f.read()) 909 self.assertEqual(file_data, self.TEXT * 2) 910 911 def test_implicit_binary_modes(self): 912 # Test implicit binary modes (no "b" or "t" in mode string). 913 for mode in ("w", "x"): 914 if mode == "x": 915 unlink(self.filename) 916 with self.open(self.filename, mode) as f: 917 f.write(self.TEXT) 918 with open(self.filename, "rb") as f: 919 file_data = ext_decompress(f.read()) 920 self.assertEqual(file_data, self.TEXT) 921 with self.open(self.filename, "r") as f: 922 self.assertEqual(f.read(), self.TEXT) 923 with self.open(self.filename, "a") as f: 924 f.write(self.TEXT) 925 with open(self.filename, "rb") as f: 926 file_data = ext_decompress(f.read()) 927 self.assertEqual(file_data, self.TEXT * 2) 928 929 def test_text_modes(self): 930 text = self.TEXT.decode("ascii") 931 text_native_eol = text.replace("\n", os.linesep) 932 for mode in ("wt", "xt"): 933 if mode == "xt": 934 unlink(self.filename) 935 with self.open(self.filename, mode, encoding="ascii") as f: 936 f.write(text) 937 with open(self.filename, "rb") as f: 938 file_data = ext_decompress(f.read()).decode("ascii") 939 self.assertEqual(file_data, text_native_eol) 940 with self.open(self.filename, "rt", encoding="ascii") as f: 941 self.assertEqual(f.read(), text) 942 with self.open(self.filename, "at", encoding="ascii") as f: 943 f.write(text) 944 with open(self.filename, "rb") as f: 945 file_data = ext_decompress(f.read()).decode("ascii") 946 self.assertEqual(file_data, text_native_eol * 2) 947 948 def test_x_mode(self): 949 for mode in ("x", "xb", "xt"): 950 unlink(self.filename) 951 encoding = "utf-8" if "t" in mode else None 952 with self.open(self.filename, mode, encoding=encoding) as f: 953 pass 954 with self.assertRaises(FileExistsError): 955 with self.open(self.filename, mode) as f: 956 pass 957 958 def test_fileobj(self): 959 with self.open(BytesIO(self.DATA), "r") as f: 960 self.assertEqual(f.read(), self.TEXT) 961 with self.open(BytesIO(self.DATA), "rb") as f: 962 self.assertEqual(f.read(), self.TEXT) 963 text = self.TEXT.decode("ascii") 964 with self.open(BytesIO(self.DATA), "rt", encoding="utf-8") as f: 965 self.assertEqual(f.read(), text) 966 967 def test_bad_params(self): 968 # Test invalid parameter combinations. 969 self.assertRaises(ValueError, 970 self.open, self.filename, "wbt") 971 self.assertRaises(ValueError, 972 self.open, self.filename, "xbt") 973 self.assertRaises(ValueError, 974 self.open, self.filename, "rb", encoding="utf-8") 975 self.assertRaises(ValueError, 976 self.open, self.filename, "rb", errors="ignore") 977 self.assertRaises(ValueError, 978 self.open, self.filename, "rb", newline="\n") 979 980 def test_encoding(self): 981 # Test non-default encoding. 982 text = self.TEXT.decode("ascii") 983 text_native_eol = text.replace("\n", os.linesep) 984 with self.open(self.filename, "wt", encoding="utf-16-le") as f: 985 f.write(text) 986 with open(self.filename, "rb") as f: 987 file_data = ext_decompress(f.read()).decode("utf-16-le") 988 self.assertEqual(file_data, text_native_eol) 989 with self.open(self.filename, "rt", encoding="utf-16-le") as f: 990 self.assertEqual(f.read(), text) 991 992 def test_encoding_error_handler(self): 993 # Test with non-default encoding error handler. 994 with self.open(self.filename, "wb") as f: 995 f.write(b"foo\xffbar") 996 with self.open(self.filename, "rt", encoding="ascii", errors="ignore") \ 997 as f: 998 self.assertEqual(f.read(), "foobar") 999 1000 def test_newline(self): 1001 # Test with explicit newline (universal newline mode disabled). 1002 text = self.TEXT.decode("ascii") 1003 with self.open(self.filename, "wt", encoding="utf-8", newline="\n") as f: 1004 f.write(text) 1005 with self.open(self.filename, "rt", encoding="utf-8", newline="\r") as f: 1006 self.assertEqual(f.readlines(), [text]) 1007 1008 1009def tearDownModule(): 1010 support.reap_children() 1011 1012 1013if __name__ == '__main__': 1014 unittest.main() 1015