17db96d56Sopenharmony_ci"""Test largefile support on system where this makes sense. 27db96d56Sopenharmony_ci""" 37db96d56Sopenharmony_ci 47db96d56Sopenharmony_ciimport os 57db96d56Sopenharmony_ciimport stat 67db96d56Sopenharmony_ciimport sys 77db96d56Sopenharmony_ciimport unittest 87db96d56Sopenharmony_ciimport socket 97db96d56Sopenharmony_ciimport shutil 107db96d56Sopenharmony_ciimport threading 117db96d56Sopenharmony_cifrom test.support import requires, bigmemtest 127db96d56Sopenharmony_cifrom test.support import SHORT_TIMEOUT 137db96d56Sopenharmony_cifrom test.support import socket_helper 147db96d56Sopenharmony_cifrom test.support.os_helper import TESTFN, unlink 157db96d56Sopenharmony_ciimport io # C implementation of io 167db96d56Sopenharmony_ciimport _pyio as pyio # Python implementation of io 177db96d56Sopenharmony_ci 187db96d56Sopenharmony_ci# size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes) 197db96d56Sopenharmony_cisize = 2_500_000_000 207db96d56Sopenharmony_ciTESTFN2 = TESTFN + '2' 217db96d56Sopenharmony_ci 227db96d56Sopenharmony_ci 237db96d56Sopenharmony_ciclass LargeFileTest: 247db96d56Sopenharmony_ci 257db96d56Sopenharmony_ci def setUp(self): 267db96d56Sopenharmony_ci if os.path.exists(TESTFN): 277db96d56Sopenharmony_ci mode = 'r+b' 287db96d56Sopenharmony_ci else: 297db96d56Sopenharmony_ci mode = 'w+b' 307db96d56Sopenharmony_ci 317db96d56Sopenharmony_ci with self.open(TESTFN, mode) as f: 327db96d56Sopenharmony_ci current_size = os.fstat(f.fileno())[stat.ST_SIZE] 337db96d56Sopenharmony_ci if current_size == size+1: 347db96d56Sopenharmony_ci return 357db96d56Sopenharmony_ci 367db96d56Sopenharmony_ci if current_size == 0: 377db96d56Sopenharmony_ci f.write(b'z') 387db96d56Sopenharmony_ci 397db96d56Sopenharmony_ci f.seek(0) 407db96d56Sopenharmony_ci f.seek(size) 417db96d56Sopenharmony_ci f.write(b'a') 427db96d56Sopenharmony_ci f.flush() 437db96d56Sopenharmony_ci self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1) 447db96d56Sopenharmony_ci 457db96d56Sopenharmony_ci @classmethod 467db96d56Sopenharmony_ci def tearDownClass(cls): 477db96d56Sopenharmony_ci with cls.open(TESTFN, 'wb'): 487db96d56Sopenharmony_ci pass 497db96d56Sopenharmony_ci if not os.stat(TESTFN)[stat.ST_SIZE] == 0: 507db96d56Sopenharmony_ci raise cls.failureException('File was not truncated by opening ' 517db96d56Sopenharmony_ci 'with mode "wb"') 527db96d56Sopenharmony_ci unlink(TESTFN2) 537db96d56Sopenharmony_ci 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_ciclass TestFileMethods(LargeFileTest): 567db96d56Sopenharmony_ci """Test that each file function works as expected for large 577db96d56Sopenharmony_ci (i.e. > 2 GiB) files. 587db96d56Sopenharmony_ci """ 597db96d56Sopenharmony_ci 607db96d56Sopenharmony_ci # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes, 617db96d56Sopenharmony_ci # so memuse=2 is needed 627db96d56Sopenharmony_ci @bigmemtest(size=size, memuse=2, dry_run=False) 637db96d56Sopenharmony_ci def test_large_read(self, _size): 647db96d56Sopenharmony_ci # bpo-24658: Test that a read greater than 2GB does not fail. 657db96d56Sopenharmony_ci with self.open(TESTFN, "rb") as f: 667db96d56Sopenharmony_ci self.assertEqual(len(f.read()), size + 1) 677db96d56Sopenharmony_ci self.assertEqual(f.tell(), size + 1) 687db96d56Sopenharmony_ci 697db96d56Sopenharmony_ci def test_osstat(self): 707db96d56Sopenharmony_ci self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1) 717db96d56Sopenharmony_ci 727db96d56Sopenharmony_ci def test_seek_read(self): 737db96d56Sopenharmony_ci with self.open(TESTFN, 'rb') as f: 747db96d56Sopenharmony_ci self.assertEqual(f.tell(), 0) 757db96d56Sopenharmony_ci self.assertEqual(f.read(1), b'z') 767db96d56Sopenharmony_ci self.assertEqual(f.tell(), 1) 777db96d56Sopenharmony_ci f.seek(0) 787db96d56Sopenharmony_ci self.assertEqual(f.tell(), 0) 797db96d56Sopenharmony_ci f.seek(0, 0) 807db96d56Sopenharmony_ci self.assertEqual(f.tell(), 0) 817db96d56Sopenharmony_ci f.seek(42) 827db96d56Sopenharmony_ci self.assertEqual(f.tell(), 42) 837db96d56Sopenharmony_ci f.seek(42, 0) 847db96d56Sopenharmony_ci self.assertEqual(f.tell(), 42) 857db96d56Sopenharmony_ci f.seek(42, 1) 867db96d56Sopenharmony_ci self.assertEqual(f.tell(), 84) 877db96d56Sopenharmony_ci f.seek(0, 1) 887db96d56Sopenharmony_ci self.assertEqual(f.tell(), 84) 897db96d56Sopenharmony_ci f.seek(0, 2) # seek from the end 907db96d56Sopenharmony_ci self.assertEqual(f.tell(), size + 1 + 0) 917db96d56Sopenharmony_ci f.seek(-10, 2) 927db96d56Sopenharmony_ci self.assertEqual(f.tell(), size + 1 - 10) 937db96d56Sopenharmony_ci f.seek(-size-1, 2) 947db96d56Sopenharmony_ci self.assertEqual(f.tell(), 0) 957db96d56Sopenharmony_ci f.seek(size) 967db96d56Sopenharmony_ci self.assertEqual(f.tell(), size) 977db96d56Sopenharmony_ci # the 'a' that was written at the end of file above 987db96d56Sopenharmony_ci self.assertEqual(f.read(1), b'a') 997db96d56Sopenharmony_ci f.seek(-size-1, 1) 1007db96d56Sopenharmony_ci self.assertEqual(f.read(1), b'z') 1017db96d56Sopenharmony_ci self.assertEqual(f.tell(), 1) 1027db96d56Sopenharmony_ci 1037db96d56Sopenharmony_ci def test_lseek(self): 1047db96d56Sopenharmony_ci with self.open(TESTFN, 'rb') as f: 1057db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), 0, 0), 0) 1067db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), 42, 0), 42) 1077db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), 42, 1), 84) 1087db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), 0, 1), 84) 1097db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0) 1107db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10) 1117db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0) 1127db96d56Sopenharmony_ci self.assertEqual(os.lseek(f.fileno(), size, 0), size) 1137db96d56Sopenharmony_ci # the 'a' that was written at the end of file above 1147db96d56Sopenharmony_ci self.assertEqual(f.read(1), b'a') 1157db96d56Sopenharmony_ci 1167db96d56Sopenharmony_ci def test_truncate(self): 1177db96d56Sopenharmony_ci with self.open(TESTFN, 'r+b') as f: 1187db96d56Sopenharmony_ci if not hasattr(f, 'truncate'): 1197db96d56Sopenharmony_ci raise unittest.SkipTest("open().truncate() not available " 1207db96d56Sopenharmony_ci "on this system") 1217db96d56Sopenharmony_ci f.seek(0, 2) 1227db96d56Sopenharmony_ci # else we've lost track of the true size 1237db96d56Sopenharmony_ci self.assertEqual(f.tell(), size+1) 1247db96d56Sopenharmony_ci # Cut it back via seek + truncate with no argument. 1257db96d56Sopenharmony_ci newsize = size - 10 1267db96d56Sopenharmony_ci f.seek(newsize) 1277db96d56Sopenharmony_ci f.truncate() 1287db96d56Sopenharmony_ci self.assertEqual(f.tell(), newsize) # else pointer moved 1297db96d56Sopenharmony_ci f.seek(0, 2) 1307db96d56Sopenharmony_ci self.assertEqual(f.tell(), newsize) # else wasn't truncated 1317db96d56Sopenharmony_ci # Ensure that truncate(smaller than true size) shrinks 1327db96d56Sopenharmony_ci # the file. 1337db96d56Sopenharmony_ci newsize -= 1 1347db96d56Sopenharmony_ci f.seek(42) 1357db96d56Sopenharmony_ci f.truncate(newsize) 1367db96d56Sopenharmony_ci self.assertEqual(f.tell(), 42) 1377db96d56Sopenharmony_ci f.seek(0, 2) 1387db96d56Sopenharmony_ci self.assertEqual(f.tell(), newsize) 1397db96d56Sopenharmony_ci # XXX truncate(larger than true size) is ill-defined 1407db96d56Sopenharmony_ci # across platform; cut it waaaaay back 1417db96d56Sopenharmony_ci f.seek(0) 1427db96d56Sopenharmony_ci f.truncate(1) 1437db96d56Sopenharmony_ci self.assertEqual(f.tell(), 0) # else pointer moved 1447db96d56Sopenharmony_ci f.seek(0) 1457db96d56Sopenharmony_ci self.assertEqual(len(f.read()), 1) # else wasn't truncated 1467db96d56Sopenharmony_ci 1477db96d56Sopenharmony_ci def test_seekable(self): 1487db96d56Sopenharmony_ci # Issue #5016; seekable() can return False when the current position 1497db96d56Sopenharmony_ci # is negative when truncated to an int. 1507db96d56Sopenharmony_ci for pos in (2**31-1, 2**31, 2**31+1): 1517db96d56Sopenharmony_ci with self.open(TESTFN, 'rb') as f: 1527db96d56Sopenharmony_ci f.seek(pos) 1537db96d56Sopenharmony_ci self.assertTrue(f.seekable()) 1547db96d56Sopenharmony_ci 1557db96d56Sopenharmony_ci 1567db96d56Sopenharmony_cidef skip_no_disk_space(path, required): 1577db96d56Sopenharmony_ci def decorator(fun): 1587db96d56Sopenharmony_ci def wrapper(*args, **kwargs): 1597db96d56Sopenharmony_ci if not hasattr(shutil, "disk_usage"): 1607db96d56Sopenharmony_ci raise unittest.SkipTest("requires shutil.disk_usage") 1617db96d56Sopenharmony_ci if shutil.disk_usage(os.path.realpath(path)).free < required: 1627db96d56Sopenharmony_ci hsize = int(required / 1024 / 1024) 1637db96d56Sopenharmony_ci raise unittest.SkipTest( 1647db96d56Sopenharmony_ci f"required {hsize} MiB of free disk space") 1657db96d56Sopenharmony_ci return fun(*args, **kwargs) 1667db96d56Sopenharmony_ci return wrapper 1677db96d56Sopenharmony_ci return decorator 1687db96d56Sopenharmony_ci 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_ciclass TestCopyfile(LargeFileTest, unittest.TestCase): 1717db96d56Sopenharmony_ci open = staticmethod(io.open) 1727db96d56Sopenharmony_ci 1737db96d56Sopenharmony_ci # Exact required disk space would be (size * 2), but let's give it a 1747db96d56Sopenharmony_ci # bit more tolerance. 1757db96d56Sopenharmony_ci @skip_no_disk_space(TESTFN, size * 2.5) 1767db96d56Sopenharmony_ci def test_it(self): 1777db96d56Sopenharmony_ci # Internally shutil.copyfile() can use "fast copy" methods like 1787db96d56Sopenharmony_ci # os.sendfile(). 1797db96d56Sopenharmony_ci size = os.path.getsize(TESTFN) 1807db96d56Sopenharmony_ci shutil.copyfile(TESTFN, TESTFN2) 1817db96d56Sopenharmony_ci self.assertEqual(os.path.getsize(TESTFN2), size) 1827db96d56Sopenharmony_ci with open(TESTFN2, 'rb') as f: 1837db96d56Sopenharmony_ci self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') 1847db96d56Sopenharmony_ci f.seek(size - 5) 1857db96d56Sopenharmony_ci self.assertEqual(f.read(), b'\x00\x00\x00\x00a') 1867db96d56Sopenharmony_ci 1877db96d56Sopenharmony_ci 1887db96d56Sopenharmony_ci@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported') 1897db96d56Sopenharmony_ciclass TestSocketSendfile(LargeFileTest, unittest.TestCase): 1907db96d56Sopenharmony_ci open = staticmethod(io.open) 1917db96d56Sopenharmony_ci timeout = SHORT_TIMEOUT 1927db96d56Sopenharmony_ci 1937db96d56Sopenharmony_ci def setUp(self): 1947db96d56Sopenharmony_ci super().setUp() 1957db96d56Sopenharmony_ci self.thread = None 1967db96d56Sopenharmony_ci 1977db96d56Sopenharmony_ci def tearDown(self): 1987db96d56Sopenharmony_ci super().tearDown() 1997db96d56Sopenharmony_ci if self.thread is not None: 2007db96d56Sopenharmony_ci self.thread.join(self.timeout) 2017db96d56Sopenharmony_ci self.thread = None 2027db96d56Sopenharmony_ci 2037db96d56Sopenharmony_ci def tcp_server(self, sock): 2047db96d56Sopenharmony_ci def run(sock): 2057db96d56Sopenharmony_ci with sock: 2067db96d56Sopenharmony_ci conn, _ = sock.accept() 2077db96d56Sopenharmony_ci conn.settimeout(self.timeout) 2087db96d56Sopenharmony_ci with conn, open(TESTFN2, 'wb') as f: 2097db96d56Sopenharmony_ci event.wait(self.timeout) 2107db96d56Sopenharmony_ci while True: 2117db96d56Sopenharmony_ci chunk = conn.recv(65536) 2127db96d56Sopenharmony_ci if not chunk: 2137db96d56Sopenharmony_ci return 2147db96d56Sopenharmony_ci f.write(chunk) 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci event = threading.Event() 2177db96d56Sopenharmony_ci sock.settimeout(self.timeout) 2187db96d56Sopenharmony_ci self.thread = threading.Thread(target=run, args=(sock, )) 2197db96d56Sopenharmony_ci self.thread.start() 2207db96d56Sopenharmony_ci event.set() 2217db96d56Sopenharmony_ci 2227db96d56Sopenharmony_ci # Exact required disk space would be (size * 2), but let's give it a 2237db96d56Sopenharmony_ci # bit more tolerance. 2247db96d56Sopenharmony_ci @skip_no_disk_space(TESTFN, size * 2.5) 2257db96d56Sopenharmony_ci def test_it(self): 2267db96d56Sopenharmony_ci port = socket_helper.find_unused_port() 2277db96d56Sopenharmony_ci with socket.create_server(("", port)) as sock: 2287db96d56Sopenharmony_ci self.tcp_server(sock) 2297db96d56Sopenharmony_ci with socket.create_connection(("127.0.0.1", port)) as client: 2307db96d56Sopenharmony_ci with open(TESTFN, 'rb') as f: 2317db96d56Sopenharmony_ci client.sendfile(f) 2327db96d56Sopenharmony_ci self.tearDown() 2337db96d56Sopenharmony_ci 2347db96d56Sopenharmony_ci size = os.path.getsize(TESTFN) 2357db96d56Sopenharmony_ci self.assertEqual(os.path.getsize(TESTFN2), size) 2367db96d56Sopenharmony_ci with open(TESTFN2, 'rb') as f: 2377db96d56Sopenharmony_ci self.assertEqual(f.read(5), b'z\x00\x00\x00\x00') 2387db96d56Sopenharmony_ci f.seek(size - 5) 2397db96d56Sopenharmony_ci self.assertEqual(f.read(), b'\x00\x00\x00\x00a') 2407db96d56Sopenharmony_ci 2417db96d56Sopenharmony_ci 2427db96d56Sopenharmony_cidef setUpModule(): 2437db96d56Sopenharmony_ci try: 2447db96d56Sopenharmony_ci import signal 2457db96d56Sopenharmony_ci # The default handler for SIGXFSZ is to abort the process. 2467db96d56Sopenharmony_ci # By ignoring it, system calls exceeding the file size resource 2477db96d56Sopenharmony_ci # limit will raise OSError instead of crashing the interpreter. 2487db96d56Sopenharmony_ci signal.signal(signal.SIGXFSZ, signal.SIG_IGN) 2497db96d56Sopenharmony_ci except (ImportError, AttributeError): 2507db96d56Sopenharmony_ci pass 2517db96d56Sopenharmony_ci 2527db96d56Sopenharmony_ci # On Windows and Mac OSX this test consumes large resources; It 2537db96d56Sopenharmony_ci # takes a long time to build the >2 GiB file and takes >2 GiB of disk 2547db96d56Sopenharmony_ci # space therefore the resource must be enabled to run this test. 2557db96d56Sopenharmony_ci # If not, nothing after this line stanza will be executed. 2567db96d56Sopenharmony_ci if sys.platform[:3] == 'win' or sys.platform == 'darwin': 2577db96d56Sopenharmony_ci requires('largefile', 2587db96d56Sopenharmony_ci 'test requires %s bytes and a long time to run' % str(size)) 2597db96d56Sopenharmony_ci else: 2607db96d56Sopenharmony_ci # Only run if the current filesystem supports large files. 2617db96d56Sopenharmony_ci # (Skip this test on Windows, since we now always support 2627db96d56Sopenharmony_ci # large files.) 2637db96d56Sopenharmony_ci f = open(TESTFN, 'wb', buffering=0) 2647db96d56Sopenharmony_ci try: 2657db96d56Sopenharmony_ci # 2**31 == 2147483648 2667db96d56Sopenharmony_ci f.seek(2147483649) 2677db96d56Sopenharmony_ci # Seeking is not enough of a test: you must write and flush, too! 2687db96d56Sopenharmony_ci f.write(b'x') 2697db96d56Sopenharmony_ci f.flush() 2707db96d56Sopenharmony_ci except (OSError, OverflowError): 2717db96d56Sopenharmony_ci raise unittest.SkipTest("filesystem does not have " 2727db96d56Sopenharmony_ci "largefile support") 2737db96d56Sopenharmony_ci finally: 2747db96d56Sopenharmony_ci f.close() 2757db96d56Sopenharmony_ci unlink(TESTFN) 2767db96d56Sopenharmony_ci 2777db96d56Sopenharmony_ci 2787db96d56Sopenharmony_ciclass CLargeFileTest(TestFileMethods, unittest.TestCase): 2797db96d56Sopenharmony_ci open = staticmethod(io.open) 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci 2827db96d56Sopenharmony_ciclass PyLargeFileTest(TestFileMethods, unittest.TestCase): 2837db96d56Sopenharmony_ci open = staticmethod(pyio.open) 2847db96d56Sopenharmony_ci 2857db96d56Sopenharmony_ci 2867db96d56Sopenharmony_cidef tearDownModule(): 2877db96d56Sopenharmony_ci unlink(TESTFN) 2887db96d56Sopenharmony_ci unlink(TESTFN2) 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_ci 2917db96d56Sopenharmony_ciif __name__ == '__main__': 2927db96d56Sopenharmony_ci unittest.main() 293