17db96d56Sopenharmony_ci"""Test largefile support on system where this makes sense.
27db96d56Sopenharmony_ci"""
37db96d56Sopenharmony_ci
47db96d56Sopenharmony_ciimport os
57db96d56Sopenharmony_ciimport stat
67db96d56Sopenharmony_ciimport sys
77db96d56Sopenharmony_ciimport unittest
87db96d56Sopenharmony_ciimport socket
97db96d56Sopenharmony_ciimport shutil
107db96d56Sopenharmony_ciimport threading
117db96d56Sopenharmony_cifrom test.support import requires, bigmemtest
127db96d56Sopenharmony_cifrom test.support import SHORT_TIMEOUT
137db96d56Sopenharmony_cifrom test.support import socket_helper
147db96d56Sopenharmony_cifrom test.support.os_helper import TESTFN, unlink
157db96d56Sopenharmony_ciimport io  # C implementation of io
167db96d56Sopenharmony_ciimport _pyio as pyio # Python implementation of io
177db96d56Sopenharmony_ci
187db96d56Sopenharmony_ci# size of file to create (>2 GiB; 2 GiB == 2,147,483,648 bytes)
197db96d56Sopenharmony_cisize = 2_500_000_000
207db96d56Sopenharmony_ciTESTFN2 = TESTFN + '2'
217db96d56Sopenharmony_ci
227db96d56Sopenharmony_ci
237db96d56Sopenharmony_ciclass LargeFileTest:
247db96d56Sopenharmony_ci
257db96d56Sopenharmony_ci    def setUp(self):
267db96d56Sopenharmony_ci        if os.path.exists(TESTFN):
277db96d56Sopenharmony_ci            mode = 'r+b'
287db96d56Sopenharmony_ci        else:
297db96d56Sopenharmony_ci            mode = 'w+b'
307db96d56Sopenharmony_ci
317db96d56Sopenharmony_ci        with self.open(TESTFN, mode) as f:
327db96d56Sopenharmony_ci            current_size = os.fstat(f.fileno())[stat.ST_SIZE]
337db96d56Sopenharmony_ci            if current_size == size+1:
347db96d56Sopenharmony_ci                return
357db96d56Sopenharmony_ci
367db96d56Sopenharmony_ci            if current_size == 0:
377db96d56Sopenharmony_ci                f.write(b'z')
387db96d56Sopenharmony_ci
397db96d56Sopenharmony_ci            f.seek(0)
407db96d56Sopenharmony_ci            f.seek(size)
417db96d56Sopenharmony_ci            f.write(b'a')
427db96d56Sopenharmony_ci            f.flush()
437db96d56Sopenharmony_ci            self.assertEqual(os.fstat(f.fileno())[stat.ST_SIZE], size+1)
447db96d56Sopenharmony_ci
457db96d56Sopenharmony_ci    @classmethod
467db96d56Sopenharmony_ci    def tearDownClass(cls):
477db96d56Sopenharmony_ci        with cls.open(TESTFN, 'wb'):
487db96d56Sopenharmony_ci            pass
497db96d56Sopenharmony_ci        if not os.stat(TESTFN)[stat.ST_SIZE] == 0:
507db96d56Sopenharmony_ci            raise cls.failureException('File was not truncated by opening '
517db96d56Sopenharmony_ci                                       'with mode "wb"')
527db96d56Sopenharmony_ci        unlink(TESTFN2)
537db96d56Sopenharmony_ci
547db96d56Sopenharmony_ci
557db96d56Sopenharmony_ciclass TestFileMethods(LargeFileTest):
567db96d56Sopenharmony_ci    """Test that each file function works as expected for large
577db96d56Sopenharmony_ci    (i.e. > 2 GiB) files.
587db96d56Sopenharmony_ci    """
597db96d56Sopenharmony_ci
607db96d56Sopenharmony_ci    # _pyio.FileIO.readall() uses a temporary bytearray then casted to bytes,
617db96d56Sopenharmony_ci    # so memuse=2 is needed
627db96d56Sopenharmony_ci    @bigmemtest(size=size, memuse=2, dry_run=False)
637db96d56Sopenharmony_ci    def test_large_read(self, _size):
647db96d56Sopenharmony_ci        # bpo-24658: Test that a read greater than 2GB does not fail.
657db96d56Sopenharmony_ci        with self.open(TESTFN, "rb") as f:
667db96d56Sopenharmony_ci            self.assertEqual(len(f.read()), size + 1)
677db96d56Sopenharmony_ci            self.assertEqual(f.tell(), size + 1)
687db96d56Sopenharmony_ci
697db96d56Sopenharmony_ci    def test_osstat(self):
707db96d56Sopenharmony_ci        self.assertEqual(os.stat(TESTFN)[stat.ST_SIZE], size+1)
717db96d56Sopenharmony_ci
727db96d56Sopenharmony_ci    def test_seek_read(self):
737db96d56Sopenharmony_ci        with self.open(TESTFN, 'rb') as f:
747db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 0)
757db96d56Sopenharmony_ci            self.assertEqual(f.read(1), b'z')
767db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 1)
777db96d56Sopenharmony_ci            f.seek(0)
787db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 0)
797db96d56Sopenharmony_ci            f.seek(0, 0)
807db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 0)
817db96d56Sopenharmony_ci            f.seek(42)
827db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 42)
837db96d56Sopenharmony_ci            f.seek(42, 0)
847db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 42)
857db96d56Sopenharmony_ci            f.seek(42, 1)
867db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 84)
877db96d56Sopenharmony_ci            f.seek(0, 1)
887db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 84)
897db96d56Sopenharmony_ci            f.seek(0, 2)  # seek from the end
907db96d56Sopenharmony_ci            self.assertEqual(f.tell(), size + 1 + 0)
917db96d56Sopenharmony_ci            f.seek(-10, 2)
927db96d56Sopenharmony_ci            self.assertEqual(f.tell(), size + 1 - 10)
937db96d56Sopenharmony_ci            f.seek(-size-1, 2)
947db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 0)
957db96d56Sopenharmony_ci            f.seek(size)
967db96d56Sopenharmony_ci            self.assertEqual(f.tell(), size)
977db96d56Sopenharmony_ci            # the 'a' that was written at the end of file above
987db96d56Sopenharmony_ci            self.assertEqual(f.read(1), b'a')
997db96d56Sopenharmony_ci            f.seek(-size-1, 1)
1007db96d56Sopenharmony_ci            self.assertEqual(f.read(1), b'z')
1017db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 1)
1027db96d56Sopenharmony_ci
1037db96d56Sopenharmony_ci    def test_lseek(self):
1047db96d56Sopenharmony_ci        with self.open(TESTFN, 'rb') as f:
1057db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), 0, 0), 0)
1067db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), 42, 0), 42)
1077db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), 42, 1), 84)
1087db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), 0, 1), 84)
1097db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), 0, 2), size+1+0)
1107db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), -10, 2), size+1-10)
1117db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), -size-1, 2), 0)
1127db96d56Sopenharmony_ci            self.assertEqual(os.lseek(f.fileno(), size, 0), size)
1137db96d56Sopenharmony_ci            # the 'a' that was written at the end of file above
1147db96d56Sopenharmony_ci            self.assertEqual(f.read(1), b'a')
1157db96d56Sopenharmony_ci
1167db96d56Sopenharmony_ci    def test_truncate(self):
1177db96d56Sopenharmony_ci        with self.open(TESTFN, 'r+b') as f:
1187db96d56Sopenharmony_ci            if not hasattr(f, 'truncate'):
1197db96d56Sopenharmony_ci                raise unittest.SkipTest("open().truncate() not available "
1207db96d56Sopenharmony_ci                                        "on this system")
1217db96d56Sopenharmony_ci            f.seek(0, 2)
1227db96d56Sopenharmony_ci            # else we've lost track of the true size
1237db96d56Sopenharmony_ci            self.assertEqual(f.tell(), size+1)
1247db96d56Sopenharmony_ci            # Cut it back via seek + truncate with no argument.
1257db96d56Sopenharmony_ci            newsize = size - 10
1267db96d56Sopenharmony_ci            f.seek(newsize)
1277db96d56Sopenharmony_ci            f.truncate()
1287db96d56Sopenharmony_ci            self.assertEqual(f.tell(), newsize)  # else pointer moved
1297db96d56Sopenharmony_ci            f.seek(0, 2)
1307db96d56Sopenharmony_ci            self.assertEqual(f.tell(), newsize)  # else wasn't truncated
1317db96d56Sopenharmony_ci            # Ensure that truncate(smaller than true size) shrinks
1327db96d56Sopenharmony_ci            # the file.
1337db96d56Sopenharmony_ci            newsize -= 1
1347db96d56Sopenharmony_ci            f.seek(42)
1357db96d56Sopenharmony_ci            f.truncate(newsize)
1367db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 42)
1377db96d56Sopenharmony_ci            f.seek(0, 2)
1387db96d56Sopenharmony_ci            self.assertEqual(f.tell(), newsize)
1397db96d56Sopenharmony_ci            # XXX truncate(larger than true size) is ill-defined
1407db96d56Sopenharmony_ci            # across platform; cut it waaaaay back
1417db96d56Sopenharmony_ci            f.seek(0)
1427db96d56Sopenharmony_ci            f.truncate(1)
1437db96d56Sopenharmony_ci            self.assertEqual(f.tell(), 0)       # else pointer moved
1447db96d56Sopenharmony_ci            f.seek(0)
1457db96d56Sopenharmony_ci            self.assertEqual(len(f.read()), 1)  # else wasn't truncated
1467db96d56Sopenharmony_ci
1477db96d56Sopenharmony_ci    def test_seekable(self):
1487db96d56Sopenharmony_ci        # Issue #5016; seekable() can return False when the current position
1497db96d56Sopenharmony_ci        # is negative when truncated to an int.
1507db96d56Sopenharmony_ci        for pos in (2**31-1, 2**31, 2**31+1):
1517db96d56Sopenharmony_ci            with self.open(TESTFN, 'rb') as f:
1527db96d56Sopenharmony_ci                f.seek(pos)
1537db96d56Sopenharmony_ci                self.assertTrue(f.seekable())
1547db96d56Sopenharmony_ci
1557db96d56Sopenharmony_ci
1567db96d56Sopenharmony_cidef skip_no_disk_space(path, required):
1577db96d56Sopenharmony_ci    def decorator(fun):
1587db96d56Sopenharmony_ci        def wrapper(*args, **kwargs):
1597db96d56Sopenharmony_ci            if not hasattr(shutil, "disk_usage"):
1607db96d56Sopenharmony_ci                raise unittest.SkipTest("requires shutil.disk_usage")
1617db96d56Sopenharmony_ci            if shutil.disk_usage(os.path.realpath(path)).free < required:
1627db96d56Sopenharmony_ci                hsize = int(required / 1024 / 1024)
1637db96d56Sopenharmony_ci                raise unittest.SkipTest(
1647db96d56Sopenharmony_ci                    f"required {hsize} MiB of free disk space")
1657db96d56Sopenharmony_ci            return fun(*args, **kwargs)
1667db96d56Sopenharmony_ci        return wrapper
1677db96d56Sopenharmony_ci    return decorator
1687db96d56Sopenharmony_ci
1697db96d56Sopenharmony_ci
1707db96d56Sopenharmony_ciclass TestCopyfile(LargeFileTest, unittest.TestCase):
1717db96d56Sopenharmony_ci    open = staticmethod(io.open)
1727db96d56Sopenharmony_ci
1737db96d56Sopenharmony_ci    # Exact required disk space would be (size * 2), but let's give it a
1747db96d56Sopenharmony_ci    # bit more tolerance.
1757db96d56Sopenharmony_ci    @skip_no_disk_space(TESTFN, size * 2.5)
1767db96d56Sopenharmony_ci    def test_it(self):
1777db96d56Sopenharmony_ci        # Internally shutil.copyfile() can use "fast copy" methods like
1787db96d56Sopenharmony_ci        # os.sendfile().
1797db96d56Sopenharmony_ci        size = os.path.getsize(TESTFN)
1807db96d56Sopenharmony_ci        shutil.copyfile(TESTFN, TESTFN2)
1817db96d56Sopenharmony_ci        self.assertEqual(os.path.getsize(TESTFN2), size)
1827db96d56Sopenharmony_ci        with open(TESTFN2, 'rb') as f:
1837db96d56Sopenharmony_ci            self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
1847db96d56Sopenharmony_ci            f.seek(size - 5)
1857db96d56Sopenharmony_ci            self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
1867db96d56Sopenharmony_ci
1877db96d56Sopenharmony_ci
1887db96d56Sopenharmony_ci@unittest.skipIf(not hasattr(os, 'sendfile'), 'sendfile not supported')
1897db96d56Sopenharmony_ciclass TestSocketSendfile(LargeFileTest, unittest.TestCase):
1907db96d56Sopenharmony_ci    open = staticmethod(io.open)
1917db96d56Sopenharmony_ci    timeout = SHORT_TIMEOUT
1927db96d56Sopenharmony_ci
1937db96d56Sopenharmony_ci    def setUp(self):
1947db96d56Sopenharmony_ci        super().setUp()
1957db96d56Sopenharmony_ci        self.thread = None
1967db96d56Sopenharmony_ci
1977db96d56Sopenharmony_ci    def tearDown(self):
1987db96d56Sopenharmony_ci        super().tearDown()
1997db96d56Sopenharmony_ci        if self.thread is not None:
2007db96d56Sopenharmony_ci            self.thread.join(self.timeout)
2017db96d56Sopenharmony_ci            self.thread = None
2027db96d56Sopenharmony_ci
2037db96d56Sopenharmony_ci    def tcp_server(self, sock):
2047db96d56Sopenharmony_ci        def run(sock):
2057db96d56Sopenharmony_ci            with sock:
2067db96d56Sopenharmony_ci                conn, _ = sock.accept()
2077db96d56Sopenharmony_ci                conn.settimeout(self.timeout)
2087db96d56Sopenharmony_ci                with conn, open(TESTFN2, 'wb') as f:
2097db96d56Sopenharmony_ci                    event.wait(self.timeout)
2107db96d56Sopenharmony_ci                    while True:
2117db96d56Sopenharmony_ci                        chunk = conn.recv(65536)
2127db96d56Sopenharmony_ci                        if not chunk:
2137db96d56Sopenharmony_ci                            return
2147db96d56Sopenharmony_ci                        f.write(chunk)
2157db96d56Sopenharmony_ci
2167db96d56Sopenharmony_ci        event = threading.Event()
2177db96d56Sopenharmony_ci        sock.settimeout(self.timeout)
2187db96d56Sopenharmony_ci        self.thread = threading.Thread(target=run, args=(sock, ))
2197db96d56Sopenharmony_ci        self.thread.start()
2207db96d56Sopenharmony_ci        event.set()
2217db96d56Sopenharmony_ci
2227db96d56Sopenharmony_ci    # Exact required disk space would be (size * 2), but let's give it a
2237db96d56Sopenharmony_ci    # bit more tolerance.
2247db96d56Sopenharmony_ci    @skip_no_disk_space(TESTFN, size * 2.5)
2257db96d56Sopenharmony_ci    def test_it(self):
2267db96d56Sopenharmony_ci        port = socket_helper.find_unused_port()
2277db96d56Sopenharmony_ci        with socket.create_server(("", port)) as sock:
2287db96d56Sopenharmony_ci            self.tcp_server(sock)
2297db96d56Sopenharmony_ci            with socket.create_connection(("127.0.0.1", port)) as client:
2307db96d56Sopenharmony_ci                with open(TESTFN, 'rb') as f:
2317db96d56Sopenharmony_ci                    client.sendfile(f)
2327db96d56Sopenharmony_ci        self.tearDown()
2337db96d56Sopenharmony_ci
2347db96d56Sopenharmony_ci        size = os.path.getsize(TESTFN)
2357db96d56Sopenharmony_ci        self.assertEqual(os.path.getsize(TESTFN2), size)
2367db96d56Sopenharmony_ci        with open(TESTFN2, 'rb') as f:
2377db96d56Sopenharmony_ci            self.assertEqual(f.read(5), b'z\x00\x00\x00\x00')
2387db96d56Sopenharmony_ci            f.seek(size - 5)
2397db96d56Sopenharmony_ci            self.assertEqual(f.read(), b'\x00\x00\x00\x00a')
2407db96d56Sopenharmony_ci
2417db96d56Sopenharmony_ci
2427db96d56Sopenharmony_cidef setUpModule():
2437db96d56Sopenharmony_ci    try:
2447db96d56Sopenharmony_ci        import signal
2457db96d56Sopenharmony_ci        # The default handler for SIGXFSZ is to abort the process.
2467db96d56Sopenharmony_ci        # By ignoring it, system calls exceeding the file size resource
2477db96d56Sopenharmony_ci        # limit will raise OSError instead of crashing the interpreter.
2487db96d56Sopenharmony_ci        signal.signal(signal.SIGXFSZ, signal.SIG_IGN)
2497db96d56Sopenharmony_ci    except (ImportError, AttributeError):
2507db96d56Sopenharmony_ci        pass
2517db96d56Sopenharmony_ci
2527db96d56Sopenharmony_ci    # On Windows and Mac OSX this test consumes large resources; It
2537db96d56Sopenharmony_ci    # takes a long time to build the >2 GiB file and takes >2 GiB of disk
2547db96d56Sopenharmony_ci    # space therefore the resource must be enabled to run this test.
2557db96d56Sopenharmony_ci    # If not, nothing after this line stanza will be executed.
2567db96d56Sopenharmony_ci    if sys.platform[:3] == 'win' or sys.platform == 'darwin':
2577db96d56Sopenharmony_ci        requires('largefile',
2587db96d56Sopenharmony_ci                 'test requires %s bytes and a long time to run' % str(size))
2597db96d56Sopenharmony_ci    else:
2607db96d56Sopenharmony_ci        # Only run if the current filesystem supports large files.
2617db96d56Sopenharmony_ci        # (Skip this test on Windows, since we now always support
2627db96d56Sopenharmony_ci        # large files.)
2637db96d56Sopenharmony_ci        f = open(TESTFN, 'wb', buffering=0)
2647db96d56Sopenharmony_ci        try:
2657db96d56Sopenharmony_ci            # 2**31 == 2147483648
2667db96d56Sopenharmony_ci            f.seek(2147483649)
2677db96d56Sopenharmony_ci            # Seeking is not enough of a test: you must write and flush, too!
2687db96d56Sopenharmony_ci            f.write(b'x')
2697db96d56Sopenharmony_ci            f.flush()
2707db96d56Sopenharmony_ci        except (OSError, OverflowError):
2717db96d56Sopenharmony_ci            raise unittest.SkipTest("filesystem does not have "
2727db96d56Sopenharmony_ci                                    "largefile support")
2737db96d56Sopenharmony_ci        finally:
2747db96d56Sopenharmony_ci            f.close()
2757db96d56Sopenharmony_ci            unlink(TESTFN)
2767db96d56Sopenharmony_ci
2777db96d56Sopenharmony_ci
2787db96d56Sopenharmony_ciclass CLargeFileTest(TestFileMethods, unittest.TestCase):
2797db96d56Sopenharmony_ci    open = staticmethod(io.open)
2807db96d56Sopenharmony_ci
2817db96d56Sopenharmony_ci
2827db96d56Sopenharmony_ciclass PyLargeFileTest(TestFileMethods, unittest.TestCase):
2837db96d56Sopenharmony_ci    open = staticmethod(pyio.open)
2847db96d56Sopenharmony_ci
2857db96d56Sopenharmony_ci
2867db96d56Sopenharmony_cidef tearDownModule():
2877db96d56Sopenharmony_ci    unlink(TESTFN)
2887db96d56Sopenharmony_ci    unlink(TESTFN2)
2897db96d56Sopenharmony_ci
2907db96d56Sopenharmony_ci
2917db96d56Sopenharmony_ciif __name__ == '__main__':
2927db96d56Sopenharmony_ci    unittest.main()
293