17db96d56Sopenharmony_ci# Copyright (C) 2003 Python Software Foundation 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciimport unittest 47db96d56Sopenharmony_ciimport unittest.mock 57db96d56Sopenharmony_ciimport shutil 67db96d56Sopenharmony_ciimport tempfile 77db96d56Sopenharmony_ciimport sys 87db96d56Sopenharmony_ciimport stat 97db96d56Sopenharmony_ciimport os 107db96d56Sopenharmony_ciimport os.path 117db96d56Sopenharmony_ciimport errno 127db96d56Sopenharmony_ciimport functools 137db96d56Sopenharmony_ciimport pathlib 147db96d56Sopenharmony_ciimport subprocess 157db96d56Sopenharmony_ciimport random 167db96d56Sopenharmony_ciimport string 177db96d56Sopenharmony_ciimport contextlib 187db96d56Sopenharmony_ciimport io 197db96d56Sopenharmony_cifrom shutil import (make_archive, 207db96d56Sopenharmony_ci register_archive_format, unregister_archive_format, 217db96d56Sopenharmony_ci get_archive_formats, Error, unpack_archive, 227db96d56Sopenharmony_ci register_unpack_format, RegistryError, 237db96d56Sopenharmony_ci unregister_unpack_format, get_unpack_formats, 247db96d56Sopenharmony_ci SameFileError, _GiveupOnFastCopy) 257db96d56Sopenharmony_ciimport tarfile 267db96d56Sopenharmony_ciimport zipfile 277db96d56Sopenharmony_citry: 287db96d56Sopenharmony_ci import posix 297db96d56Sopenharmony_ciexcept ImportError: 307db96d56Sopenharmony_ci posix = None 317db96d56Sopenharmony_ci 327db96d56Sopenharmony_cifrom test import support 337db96d56Sopenharmony_cifrom test.support import os_helper 347db96d56Sopenharmony_cifrom test.support.os_helper import TESTFN, FakePath 357db96d56Sopenharmony_cifrom test.support import warnings_helper 367db96d56Sopenharmony_ci 377db96d56Sopenharmony_ciTESTFN2 = TESTFN + "2" 387db96d56Sopenharmony_ciTESTFN_SRC = TESTFN + "_SRC" 397db96d56Sopenharmony_ciTESTFN_DST = TESTFN + "_DST" 407db96d56Sopenharmony_ciMACOS = sys.platform.startswith("darwin") 417db96d56Sopenharmony_ciSOLARIS = sys.platform.startswith("sunos") 427db96d56Sopenharmony_ciAIX = sys.platform[:3] == 'aix' 437db96d56Sopenharmony_citry: 447db96d56Sopenharmony_ci import grp 457db96d56Sopenharmony_ci import pwd 467db96d56Sopenharmony_ci UID_GID_SUPPORT = True 477db96d56Sopenharmony_ciexcept ImportError: 487db96d56Sopenharmony_ci UID_GID_SUPPORT = False 497db96d56Sopenharmony_ci 507db96d56Sopenharmony_citry: 517db96d56Sopenharmony_ci import _winapi 527db96d56Sopenharmony_ciexcept ImportError: 537db96d56Sopenharmony_ci _winapi = None 547db96d56Sopenharmony_ci 557db96d56Sopenharmony_cino_chdir = unittest.mock.patch('os.chdir', 567db96d56Sopenharmony_ci side_effect=AssertionError("shouldn't call os.chdir()")) 577db96d56Sopenharmony_ci 587db96d56Sopenharmony_cidef _fake_rename(*args, **kwargs): 597db96d56Sopenharmony_ci # Pretend the destination path is on a different filesystem. 607db96d56Sopenharmony_ci raise OSError(getattr(errno, 'EXDEV', 18), "Invalid cross-device link") 617db96d56Sopenharmony_ci 627db96d56Sopenharmony_cidef mock_rename(func): 637db96d56Sopenharmony_ci @functools.wraps(func) 647db96d56Sopenharmony_ci def wrap(*args, **kwargs): 657db96d56Sopenharmony_ci try: 667db96d56Sopenharmony_ci builtin_rename = os.rename 677db96d56Sopenharmony_ci os.rename = _fake_rename 687db96d56Sopenharmony_ci return func(*args, **kwargs) 697db96d56Sopenharmony_ci finally: 707db96d56Sopenharmony_ci os.rename = builtin_rename 717db96d56Sopenharmony_ci return wrap 727db96d56Sopenharmony_ci 737db96d56Sopenharmony_cidef write_file(path, content, binary=False): 747db96d56Sopenharmony_ci """Write *content* to a file located at *path*. 757db96d56Sopenharmony_ci 767db96d56Sopenharmony_ci If *path* is a tuple instead of a string, os.path.join will be used to 777db96d56Sopenharmony_ci make a path. If *binary* is true, the file will be opened in binary 787db96d56Sopenharmony_ci mode. 797db96d56Sopenharmony_ci """ 807db96d56Sopenharmony_ci if isinstance(path, tuple): 817db96d56Sopenharmony_ci path = os.path.join(*path) 827db96d56Sopenharmony_ci mode = 'wb' if binary else 'w' 837db96d56Sopenharmony_ci encoding = None if binary else "utf-8" 847db96d56Sopenharmony_ci with open(path, mode, encoding=encoding) as fp: 857db96d56Sopenharmony_ci fp.write(content) 867db96d56Sopenharmony_ci 877db96d56Sopenharmony_cidef write_test_file(path, size): 887db96d56Sopenharmony_ci """Create a test file with an arbitrary size and random text content.""" 897db96d56Sopenharmony_ci def chunks(total, step): 907db96d56Sopenharmony_ci assert total >= step 917db96d56Sopenharmony_ci while total > step: 927db96d56Sopenharmony_ci yield step 937db96d56Sopenharmony_ci total -= step 947db96d56Sopenharmony_ci if total: 957db96d56Sopenharmony_ci yield total 967db96d56Sopenharmony_ci 977db96d56Sopenharmony_ci bufsize = min(size, 8192) 987db96d56Sopenharmony_ci chunk = b"".join([random.choice(string.ascii_letters).encode() 997db96d56Sopenharmony_ci for i in range(bufsize)]) 1007db96d56Sopenharmony_ci with open(path, 'wb') as f: 1017db96d56Sopenharmony_ci for csize in chunks(size, bufsize): 1027db96d56Sopenharmony_ci f.write(chunk) 1037db96d56Sopenharmony_ci assert os.path.getsize(path) == size 1047db96d56Sopenharmony_ci 1057db96d56Sopenharmony_cidef read_file(path, binary=False): 1067db96d56Sopenharmony_ci """Return contents from a file located at *path*. 1077db96d56Sopenharmony_ci 1087db96d56Sopenharmony_ci If *path* is a tuple instead of a string, os.path.join will be used to 1097db96d56Sopenharmony_ci make a path. If *binary* is true, the file will be opened in binary 1107db96d56Sopenharmony_ci mode. 1117db96d56Sopenharmony_ci """ 1127db96d56Sopenharmony_ci if isinstance(path, tuple): 1137db96d56Sopenharmony_ci path = os.path.join(*path) 1147db96d56Sopenharmony_ci mode = 'rb' if binary else 'r' 1157db96d56Sopenharmony_ci encoding = None if binary else "utf-8" 1167db96d56Sopenharmony_ci with open(path, mode, encoding=encoding) as fp: 1177db96d56Sopenharmony_ci return fp.read() 1187db96d56Sopenharmony_ci 1197db96d56Sopenharmony_cidef rlistdir(path): 1207db96d56Sopenharmony_ci res = [] 1217db96d56Sopenharmony_ci for name in sorted(os.listdir(path)): 1227db96d56Sopenharmony_ci p = os.path.join(path, name) 1237db96d56Sopenharmony_ci if os.path.isdir(p) and not os.path.islink(p): 1247db96d56Sopenharmony_ci res.append(name + '/') 1257db96d56Sopenharmony_ci for n in rlistdir(p): 1267db96d56Sopenharmony_ci res.append(name + '/' + n) 1277db96d56Sopenharmony_ci else: 1287db96d56Sopenharmony_ci res.append(name) 1297db96d56Sopenharmony_ci return res 1307db96d56Sopenharmony_ci 1317db96d56Sopenharmony_cidef supports_file2file_sendfile(): 1327db96d56Sopenharmony_ci # ...apparently Linux and Solaris are the only ones 1337db96d56Sopenharmony_ci if not hasattr(os, "sendfile"): 1347db96d56Sopenharmony_ci return False 1357db96d56Sopenharmony_ci srcname = None 1367db96d56Sopenharmony_ci dstname = None 1377db96d56Sopenharmony_ci try: 1387db96d56Sopenharmony_ci with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as f: 1397db96d56Sopenharmony_ci srcname = f.name 1407db96d56Sopenharmony_ci f.write(b"0123456789") 1417db96d56Sopenharmony_ci 1427db96d56Sopenharmony_ci with open(srcname, "rb") as src: 1437db96d56Sopenharmony_ci with tempfile.NamedTemporaryFile("wb", dir=os.getcwd(), delete=False) as dst: 1447db96d56Sopenharmony_ci dstname = dst.name 1457db96d56Sopenharmony_ci infd = src.fileno() 1467db96d56Sopenharmony_ci outfd = dst.fileno() 1477db96d56Sopenharmony_ci try: 1487db96d56Sopenharmony_ci os.sendfile(outfd, infd, 0, 2) 1497db96d56Sopenharmony_ci except OSError: 1507db96d56Sopenharmony_ci return False 1517db96d56Sopenharmony_ci else: 1527db96d56Sopenharmony_ci return True 1537db96d56Sopenharmony_ci finally: 1547db96d56Sopenharmony_ci if srcname is not None: 1557db96d56Sopenharmony_ci os_helper.unlink(srcname) 1567db96d56Sopenharmony_ci if dstname is not None: 1577db96d56Sopenharmony_ci os_helper.unlink(dstname) 1587db96d56Sopenharmony_ci 1597db96d56Sopenharmony_ci 1607db96d56Sopenharmony_ciSUPPORTS_SENDFILE = supports_file2file_sendfile() 1617db96d56Sopenharmony_ci 1627db96d56Sopenharmony_ci# AIX 32-bit mode, by default, lacks enough memory for the xz/lzma compiler test 1637db96d56Sopenharmony_ci# The AIX command 'dump -o program' gives XCOFF header information 1647db96d56Sopenharmony_ci# The second word of the last line in the maxdata value 1657db96d56Sopenharmony_ci# when 32-bit maxdata must be greater than 0x1000000 for the xz test to succeed 1667db96d56Sopenharmony_cidef _maxdataOK(): 1677db96d56Sopenharmony_ci if AIX and sys.maxsize == 2147483647: 1687db96d56Sopenharmony_ci hdrs=subprocess.getoutput("/usr/bin/dump -o %s" % sys.executable) 1697db96d56Sopenharmony_ci maxdata=hdrs.split("\n")[-1].split()[1] 1707db96d56Sopenharmony_ci return int(maxdata,16) >= 0x20000000 1717db96d56Sopenharmony_ci else: 1727db96d56Sopenharmony_ci return True 1737db96d56Sopenharmony_ci 1747db96d56Sopenharmony_ci 1757db96d56Sopenharmony_ciclass BaseTest: 1767db96d56Sopenharmony_ci 1777db96d56Sopenharmony_ci def mkdtemp(self, prefix=None): 1787db96d56Sopenharmony_ci """Create a temporary directory that will be cleaned up. 1797db96d56Sopenharmony_ci 1807db96d56Sopenharmony_ci Returns the path of the directory. 1817db96d56Sopenharmony_ci """ 1827db96d56Sopenharmony_ci d = tempfile.mkdtemp(prefix=prefix, dir=os.getcwd()) 1837db96d56Sopenharmony_ci self.addCleanup(os_helper.rmtree, d) 1847db96d56Sopenharmony_ci return d 1857db96d56Sopenharmony_ci 1867db96d56Sopenharmony_ci 1877db96d56Sopenharmony_ciclass TestRmTree(BaseTest, unittest.TestCase): 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_ci def test_rmtree_works_on_bytes(self): 1907db96d56Sopenharmony_ci tmp = self.mkdtemp() 1917db96d56Sopenharmony_ci victim = os.path.join(tmp, 'killme') 1927db96d56Sopenharmony_ci os.mkdir(victim) 1937db96d56Sopenharmony_ci write_file(os.path.join(victim, 'somefile'), 'foo') 1947db96d56Sopenharmony_ci victim = os.fsencode(victim) 1957db96d56Sopenharmony_ci self.assertIsInstance(victim, bytes) 1967db96d56Sopenharmony_ci shutil.rmtree(victim) 1977db96d56Sopenharmony_ci 1987db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 1997db96d56Sopenharmony_ci def test_rmtree_fails_on_symlink(self): 2007db96d56Sopenharmony_ci tmp = self.mkdtemp() 2017db96d56Sopenharmony_ci dir_ = os.path.join(tmp, 'dir') 2027db96d56Sopenharmony_ci os.mkdir(dir_) 2037db96d56Sopenharmony_ci link = os.path.join(tmp, 'link') 2047db96d56Sopenharmony_ci os.symlink(dir_, link) 2057db96d56Sopenharmony_ci self.assertRaises(OSError, shutil.rmtree, link) 2067db96d56Sopenharmony_ci self.assertTrue(os.path.exists(dir_)) 2077db96d56Sopenharmony_ci self.assertTrue(os.path.lexists(link)) 2087db96d56Sopenharmony_ci errors = [] 2097db96d56Sopenharmony_ci def onerror(*args): 2107db96d56Sopenharmony_ci errors.append(args) 2117db96d56Sopenharmony_ci shutil.rmtree(link, onerror=onerror) 2127db96d56Sopenharmony_ci self.assertEqual(len(errors), 1) 2137db96d56Sopenharmony_ci self.assertIs(errors[0][0], os.path.islink) 2147db96d56Sopenharmony_ci self.assertEqual(errors[0][1], link) 2157db96d56Sopenharmony_ci self.assertIsInstance(errors[0][2][1], OSError) 2167db96d56Sopenharmony_ci 2177db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 2187db96d56Sopenharmony_ci def test_rmtree_works_on_symlinks(self): 2197db96d56Sopenharmony_ci tmp = self.mkdtemp() 2207db96d56Sopenharmony_ci dir1 = os.path.join(tmp, 'dir1') 2217db96d56Sopenharmony_ci dir2 = os.path.join(dir1, 'dir2') 2227db96d56Sopenharmony_ci dir3 = os.path.join(tmp, 'dir3') 2237db96d56Sopenharmony_ci for d in dir1, dir2, dir3: 2247db96d56Sopenharmony_ci os.mkdir(d) 2257db96d56Sopenharmony_ci file1 = os.path.join(tmp, 'file1') 2267db96d56Sopenharmony_ci write_file(file1, 'foo') 2277db96d56Sopenharmony_ci link1 = os.path.join(dir1, 'link1') 2287db96d56Sopenharmony_ci os.symlink(dir2, link1) 2297db96d56Sopenharmony_ci link2 = os.path.join(dir1, 'link2') 2307db96d56Sopenharmony_ci os.symlink(dir3, link2) 2317db96d56Sopenharmony_ci link3 = os.path.join(dir1, 'link3') 2327db96d56Sopenharmony_ci os.symlink(file1, link3) 2337db96d56Sopenharmony_ci # make sure symlinks are removed but not followed 2347db96d56Sopenharmony_ci shutil.rmtree(dir1) 2357db96d56Sopenharmony_ci self.assertFalse(os.path.exists(dir1)) 2367db96d56Sopenharmony_ci self.assertTrue(os.path.exists(dir3)) 2377db96d56Sopenharmony_ci self.assertTrue(os.path.exists(file1)) 2387db96d56Sopenharmony_ci 2397db96d56Sopenharmony_ci @unittest.skipUnless(_winapi, 'only relevant on Windows') 2407db96d56Sopenharmony_ci def test_rmtree_fails_on_junctions(self): 2417db96d56Sopenharmony_ci tmp = self.mkdtemp() 2427db96d56Sopenharmony_ci dir_ = os.path.join(tmp, 'dir') 2437db96d56Sopenharmony_ci os.mkdir(dir_) 2447db96d56Sopenharmony_ci link = os.path.join(tmp, 'link') 2457db96d56Sopenharmony_ci _winapi.CreateJunction(dir_, link) 2467db96d56Sopenharmony_ci self.addCleanup(os_helper.unlink, link) 2477db96d56Sopenharmony_ci self.assertRaises(OSError, shutil.rmtree, link) 2487db96d56Sopenharmony_ci self.assertTrue(os.path.exists(dir_)) 2497db96d56Sopenharmony_ci self.assertTrue(os.path.lexists(link)) 2507db96d56Sopenharmony_ci errors = [] 2517db96d56Sopenharmony_ci def onerror(*args): 2527db96d56Sopenharmony_ci errors.append(args) 2537db96d56Sopenharmony_ci shutil.rmtree(link, onerror=onerror) 2547db96d56Sopenharmony_ci self.assertEqual(len(errors), 1) 2557db96d56Sopenharmony_ci self.assertIs(errors[0][0], os.path.islink) 2567db96d56Sopenharmony_ci self.assertEqual(errors[0][1], link) 2577db96d56Sopenharmony_ci self.assertIsInstance(errors[0][2][1], OSError) 2587db96d56Sopenharmony_ci 2597db96d56Sopenharmony_ci @unittest.skipUnless(_winapi, 'only relevant on Windows') 2607db96d56Sopenharmony_ci def test_rmtree_works_on_junctions(self): 2617db96d56Sopenharmony_ci tmp = self.mkdtemp() 2627db96d56Sopenharmony_ci dir1 = os.path.join(tmp, 'dir1') 2637db96d56Sopenharmony_ci dir2 = os.path.join(dir1, 'dir2') 2647db96d56Sopenharmony_ci dir3 = os.path.join(tmp, 'dir3') 2657db96d56Sopenharmony_ci for d in dir1, dir2, dir3: 2667db96d56Sopenharmony_ci os.mkdir(d) 2677db96d56Sopenharmony_ci file1 = os.path.join(tmp, 'file1') 2687db96d56Sopenharmony_ci write_file(file1, 'foo') 2697db96d56Sopenharmony_ci link1 = os.path.join(dir1, 'link1') 2707db96d56Sopenharmony_ci _winapi.CreateJunction(dir2, link1) 2717db96d56Sopenharmony_ci link2 = os.path.join(dir1, 'link2') 2727db96d56Sopenharmony_ci _winapi.CreateJunction(dir3, link2) 2737db96d56Sopenharmony_ci link3 = os.path.join(dir1, 'link3') 2747db96d56Sopenharmony_ci _winapi.CreateJunction(file1, link3) 2757db96d56Sopenharmony_ci # make sure junctions are removed but not followed 2767db96d56Sopenharmony_ci shutil.rmtree(dir1) 2777db96d56Sopenharmony_ci self.assertFalse(os.path.exists(dir1)) 2787db96d56Sopenharmony_ci self.assertTrue(os.path.exists(dir3)) 2797db96d56Sopenharmony_ci self.assertTrue(os.path.exists(file1)) 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci def test_rmtree_errors(self): 2827db96d56Sopenharmony_ci # filename is guaranteed not to exist 2837db96d56Sopenharmony_ci filename = tempfile.mktemp(dir=self.mkdtemp()) 2847db96d56Sopenharmony_ci self.assertRaises(FileNotFoundError, shutil.rmtree, filename) 2857db96d56Sopenharmony_ci # test that ignore_errors option is honored 2867db96d56Sopenharmony_ci shutil.rmtree(filename, ignore_errors=True) 2877db96d56Sopenharmony_ci 2887db96d56Sopenharmony_ci # existing file 2897db96d56Sopenharmony_ci tmpdir = self.mkdtemp() 2907db96d56Sopenharmony_ci write_file((tmpdir, "tstfile"), "") 2917db96d56Sopenharmony_ci filename = os.path.join(tmpdir, "tstfile") 2927db96d56Sopenharmony_ci with self.assertRaises(NotADirectoryError) as cm: 2937db96d56Sopenharmony_ci shutil.rmtree(filename) 2947db96d56Sopenharmony_ci self.assertEqual(cm.exception.filename, filename) 2957db96d56Sopenharmony_ci self.assertTrue(os.path.exists(filename)) 2967db96d56Sopenharmony_ci # test that ignore_errors option is honored 2977db96d56Sopenharmony_ci shutil.rmtree(filename, ignore_errors=True) 2987db96d56Sopenharmony_ci self.assertTrue(os.path.exists(filename)) 2997db96d56Sopenharmony_ci errors = [] 3007db96d56Sopenharmony_ci def onerror(*args): 3017db96d56Sopenharmony_ci errors.append(args) 3027db96d56Sopenharmony_ci shutil.rmtree(filename, onerror=onerror) 3037db96d56Sopenharmony_ci self.assertEqual(len(errors), 2) 3047db96d56Sopenharmony_ci self.assertIs(errors[0][0], os.scandir) 3057db96d56Sopenharmony_ci self.assertEqual(errors[0][1], filename) 3067db96d56Sopenharmony_ci self.assertIsInstance(errors[0][2][1], NotADirectoryError) 3077db96d56Sopenharmony_ci self.assertEqual(errors[0][2][1].filename, filename) 3087db96d56Sopenharmony_ci self.assertIs(errors[1][0], os.rmdir) 3097db96d56Sopenharmony_ci self.assertEqual(errors[1][1], filename) 3107db96d56Sopenharmony_ci self.assertIsInstance(errors[1][2][1], NotADirectoryError) 3117db96d56Sopenharmony_ci self.assertEqual(errors[1][2][1].filename, filename) 3127db96d56Sopenharmony_ci 3137db96d56Sopenharmony_ci 3147db96d56Sopenharmony_ci @unittest.skipIf(sys.platform[:6] == 'cygwin', 3157db96d56Sopenharmony_ci "This test can't be run on Cygwin (issue #1071513).") 3167db96d56Sopenharmony_ci @os_helper.skip_if_dac_override 3177db96d56Sopenharmony_ci @os_helper.skip_unless_working_chmod 3187db96d56Sopenharmony_ci def test_on_error(self): 3197db96d56Sopenharmony_ci self.errorState = 0 3207db96d56Sopenharmony_ci os.mkdir(TESTFN) 3217db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, TESTFN) 3227db96d56Sopenharmony_ci 3237db96d56Sopenharmony_ci self.child_file_path = os.path.join(TESTFN, 'a') 3247db96d56Sopenharmony_ci self.child_dir_path = os.path.join(TESTFN, 'b') 3257db96d56Sopenharmony_ci os_helper.create_empty_file(self.child_file_path) 3267db96d56Sopenharmony_ci os.mkdir(self.child_dir_path) 3277db96d56Sopenharmony_ci old_dir_mode = os.stat(TESTFN).st_mode 3287db96d56Sopenharmony_ci old_child_file_mode = os.stat(self.child_file_path).st_mode 3297db96d56Sopenharmony_ci old_child_dir_mode = os.stat(self.child_dir_path).st_mode 3307db96d56Sopenharmony_ci # Make unwritable. 3317db96d56Sopenharmony_ci new_mode = stat.S_IREAD|stat.S_IEXEC 3327db96d56Sopenharmony_ci os.chmod(self.child_file_path, new_mode) 3337db96d56Sopenharmony_ci os.chmod(self.child_dir_path, new_mode) 3347db96d56Sopenharmony_ci os.chmod(TESTFN, new_mode) 3357db96d56Sopenharmony_ci 3367db96d56Sopenharmony_ci self.addCleanup(os.chmod, TESTFN, old_dir_mode) 3377db96d56Sopenharmony_ci self.addCleanup(os.chmod, self.child_file_path, old_child_file_mode) 3387db96d56Sopenharmony_ci self.addCleanup(os.chmod, self.child_dir_path, old_child_dir_mode) 3397db96d56Sopenharmony_ci 3407db96d56Sopenharmony_ci shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror) 3417db96d56Sopenharmony_ci # Test whether onerror has actually been called. 3427db96d56Sopenharmony_ci self.assertEqual(self.errorState, 3, 3437db96d56Sopenharmony_ci "Expected call to onerror function did not happen.") 3447db96d56Sopenharmony_ci 3457db96d56Sopenharmony_ci def check_args_to_onerror(self, func, arg, exc): 3467db96d56Sopenharmony_ci # test_rmtree_errors deliberately runs rmtree 3477db96d56Sopenharmony_ci # on a directory that is chmod 500, which will fail. 3487db96d56Sopenharmony_ci # This function is run when shutil.rmtree fails. 3497db96d56Sopenharmony_ci # 99.9% of the time it initially fails to remove 3507db96d56Sopenharmony_ci # a file in the directory, so the first time through 3517db96d56Sopenharmony_ci # func is os.remove. 3527db96d56Sopenharmony_ci # However, some Linux machines running ZFS on 3537db96d56Sopenharmony_ci # FUSE experienced a failure earlier in the process 3547db96d56Sopenharmony_ci # at os.listdir. The first failure may legally 3557db96d56Sopenharmony_ci # be either. 3567db96d56Sopenharmony_ci if self.errorState < 2: 3577db96d56Sopenharmony_ci if func is os.unlink: 3587db96d56Sopenharmony_ci self.assertEqual(arg, self.child_file_path) 3597db96d56Sopenharmony_ci elif func is os.rmdir: 3607db96d56Sopenharmony_ci self.assertEqual(arg, self.child_dir_path) 3617db96d56Sopenharmony_ci else: 3627db96d56Sopenharmony_ci self.assertIs(func, os.listdir) 3637db96d56Sopenharmony_ci self.assertIn(arg, [TESTFN, self.child_dir_path]) 3647db96d56Sopenharmony_ci self.assertTrue(issubclass(exc[0], OSError)) 3657db96d56Sopenharmony_ci self.errorState += 1 3667db96d56Sopenharmony_ci else: 3677db96d56Sopenharmony_ci self.assertEqual(func, os.rmdir) 3687db96d56Sopenharmony_ci self.assertEqual(arg, TESTFN) 3697db96d56Sopenharmony_ci self.assertTrue(issubclass(exc[0], OSError)) 3707db96d56Sopenharmony_ci self.errorState = 3 3717db96d56Sopenharmony_ci 3727db96d56Sopenharmony_ci def test_rmtree_does_not_choke_on_failing_lstat(self): 3737db96d56Sopenharmony_ci try: 3747db96d56Sopenharmony_ci orig_lstat = os.lstat 3757db96d56Sopenharmony_ci def raiser(fn, *args, **kwargs): 3767db96d56Sopenharmony_ci if fn != TESTFN: 3777db96d56Sopenharmony_ci raise OSError() 3787db96d56Sopenharmony_ci else: 3797db96d56Sopenharmony_ci return orig_lstat(fn) 3807db96d56Sopenharmony_ci os.lstat = raiser 3817db96d56Sopenharmony_ci 3827db96d56Sopenharmony_ci os.mkdir(TESTFN) 3837db96d56Sopenharmony_ci write_file((TESTFN, 'foo'), 'foo') 3847db96d56Sopenharmony_ci shutil.rmtree(TESTFN) 3857db96d56Sopenharmony_ci finally: 3867db96d56Sopenharmony_ci os.lstat = orig_lstat 3877db96d56Sopenharmony_ci 3887db96d56Sopenharmony_ci def test_rmtree_uses_safe_fd_version_if_available(self): 3897db96d56Sopenharmony_ci _use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 3907db96d56Sopenharmony_ci os.supports_dir_fd and 3917db96d56Sopenharmony_ci os.listdir in os.supports_fd and 3927db96d56Sopenharmony_ci os.stat in os.supports_follow_symlinks) 3937db96d56Sopenharmony_ci if _use_fd_functions: 3947db96d56Sopenharmony_ci self.assertTrue(shutil._use_fd_functions) 3957db96d56Sopenharmony_ci self.assertTrue(shutil.rmtree.avoids_symlink_attacks) 3967db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 3977db96d56Sopenharmony_ci d = os.path.join(tmp_dir, 'a') 3987db96d56Sopenharmony_ci os.mkdir(d) 3997db96d56Sopenharmony_ci try: 4007db96d56Sopenharmony_ci real_rmtree = shutil._rmtree_safe_fd 4017db96d56Sopenharmony_ci class Called(Exception): pass 4027db96d56Sopenharmony_ci def _raiser(*args, **kwargs): 4037db96d56Sopenharmony_ci raise Called 4047db96d56Sopenharmony_ci shutil._rmtree_safe_fd = _raiser 4057db96d56Sopenharmony_ci self.assertRaises(Called, shutil.rmtree, d) 4067db96d56Sopenharmony_ci finally: 4077db96d56Sopenharmony_ci shutil._rmtree_safe_fd = real_rmtree 4087db96d56Sopenharmony_ci else: 4097db96d56Sopenharmony_ci self.assertFalse(shutil._use_fd_functions) 4107db96d56Sopenharmony_ci self.assertFalse(shutil.rmtree.avoids_symlink_attacks) 4117db96d56Sopenharmony_ci 4127db96d56Sopenharmony_ci @unittest.skipUnless(shutil._use_fd_functions, "dir_fd is not supported") 4137db96d56Sopenharmony_ci def test_rmtree_with_dir_fd(self): 4147db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 4157db96d56Sopenharmony_ci victim = 'killme' 4167db96d56Sopenharmony_ci fullname = os.path.join(tmp_dir, victim) 4177db96d56Sopenharmony_ci dir_fd = os.open(tmp_dir, os.O_RDONLY) 4187db96d56Sopenharmony_ci self.addCleanup(os.close, dir_fd) 4197db96d56Sopenharmony_ci os.mkdir(fullname) 4207db96d56Sopenharmony_ci os.mkdir(os.path.join(fullname, 'subdir')) 4217db96d56Sopenharmony_ci write_file(os.path.join(fullname, 'subdir', 'somefile'), 'foo') 4227db96d56Sopenharmony_ci self.assertTrue(os.path.exists(fullname)) 4237db96d56Sopenharmony_ci shutil.rmtree(victim, dir_fd=dir_fd) 4247db96d56Sopenharmony_ci self.assertFalse(os.path.exists(fullname)) 4257db96d56Sopenharmony_ci 4267db96d56Sopenharmony_ci @unittest.skipIf(shutil._use_fd_functions, "dir_fd is supported") 4277db96d56Sopenharmony_ci def test_rmtree_with_dir_fd_unsupported(self): 4287db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 4297db96d56Sopenharmony_ci with self.assertRaises(NotImplementedError): 4307db96d56Sopenharmony_ci shutil.rmtree(tmp_dir, dir_fd=0) 4317db96d56Sopenharmony_ci self.assertTrue(os.path.exists(tmp_dir)) 4327db96d56Sopenharmony_ci 4337db96d56Sopenharmony_ci def test_rmtree_dont_delete_file(self): 4347db96d56Sopenharmony_ci # When called on a file instead of a directory, don't delete it. 4357db96d56Sopenharmony_ci handle, path = tempfile.mkstemp(dir=self.mkdtemp()) 4367db96d56Sopenharmony_ci os.close(handle) 4377db96d56Sopenharmony_ci self.assertRaises(NotADirectoryError, shutil.rmtree, path) 4387db96d56Sopenharmony_ci os.remove(path) 4397db96d56Sopenharmony_ci 4407db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 4417db96d56Sopenharmony_ci def test_rmtree_on_symlink(self): 4427db96d56Sopenharmony_ci # bug 1669. 4437db96d56Sopenharmony_ci os.mkdir(TESTFN) 4447db96d56Sopenharmony_ci try: 4457db96d56Sopenharmony_ci src = os.path.join(TESTFN, 'cheese') 4467db96d56Sopenharmony_ci dst = os.path.join(TESTFN, 'shop') 4477db96d56Sopenharmony_ci os.mkdir(src) 4487db96d56Sopenharmony_ci os.symlink(src, dst) 4497db96d56Sopenharmony_ci self.assertRaises(OSError, shutil.rmtree, dst) 4507db96d56Sopenharmony_ci shutil.rmtree(dst, ignore_errors=True) 4517db96d56Sopenharmony_ci finally: 4527db96d56Sopenharmony_ci shutil.rmtree(TESTFN, ignore_errors=True) 4537db96d56Sopenharmony_ci 4547db96d56Sopenharmony_ci @unittest.skipUnless(_winapi, 'only relevant on Windows') 4557db96d56Sopenharmony_ci def test_rmtree_on_junction(self): 4567db96d56Sopenharmony_ci os.mkdir(TESTFN) 4577db96d56Sopenharmony_ci try: 4587db96d56Sopenharmony_ci src = os.path.join(TESTFN, 'cheese') 4597db96d56Sopenharmony_ci dst = os.path.join(TESTFN, 'shop') 4607db96d56Sopenharmony_ci os.mkdir(src) 4617db96d56Sopenharmony_ci open(os.path.join(src, 'spam'), 'wb').close() 4627db96d56Sopenharmony_ci _winapi.CreateJunction(src, dst) 4637db96d56Sopenharmony_ci self.assertRaises(OSError, shutil.rmtree, dst) 4647db96d56Sopenharmony_ci shutil.rmtree(dst, ignore_errors=True) 4657db96d56Sopenharmony_ci finally: 4667db96d56Sopenharmony_ci shutil.rmtree(TESTFN, ignore_errors=True) 4677db96d56Sopenharmony_ci 4687db96d56Sopenharmony_ci 4697db96d56Sopenharmony_ciclass TestCopyTree(BaseTest, unittest.TestCase): 4707db96d56Sopenharmony_ci 4717db96d56Sopenharmony_ci def test_copytree_simple(self): 4727db96d56Sopenharmony_ci src_dir = self.mkdtemp() 4737db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination') 4747db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, src_dir) 4757db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 4767db96d56Sopenharmony_ci write_file((src_dir, 'test.txt'), '123') 4777db96d56Sopenharmony_ci os.mkdir(os.path.join(src_dir, 'test_dir')) 4787db96d56Sopenharmony_ci write_file((src_dir, 'test_dir', 'test.txt'), '456') 4797db96d56Sopenharmony_ci 4807db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir) 4817db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt'))) 4827db96d56Sopenharmony_ci self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir'))) 4837db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir', 4847db96d56Sopenharmony_ci 'test.txt'))) 4857db96d56Sopenharmony_ci actual = read_file((dst_dir, 'test.txt')) 4867db96d56Sopenharmony_ci self.assertEqual(actual, '123') 4877db96d56Sopenharmony_ci actual = read_file((dst_dir, 'test_dir', 'test.txt')) 4887db96d56Sopenharmony_ci self.assertEqual(actual, '456') 4897db96d56Sopenharmony_ci 4907db96d56Sopenharmony_ci def test_copytree_dirs_exist_ok(self): 4917db96d56Sopenharmony_ci src_dir = self.mkdtemp() 4927db96d56Sopenharmony_ci dst_dir = self.mkdtemp() 4937db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, src_dir) 4947db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, dst_dir) 4957db96d56Sopenharmony_ci 4967db96d56Sopenharmony_ci write_file((src_dir, 'nonexisting.txt'), '123') 4977db96d56Sopenharmony_ci os.mkdir(os.path.join(src_dir, 'existing_dir')) 4987db96d56Sopenharmony_ci os.mkdir(os.path.join(dst_dir, 'existing_dir')) 4997db96d56Sopenharmony_ci write_file((dst_dir, 'existing_dir', 'existing.txt'), 'will be replaced') 5007db96d56Sopenharmony_ci write_file((src_dir, 'existing_dir', 'existing.txt'), 'has been replaced') 5017db96d56Sopenharmony_ci 5027db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True) 5037db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'nonexisting.txt'))) 5047db96d56Sopenharmony_ci self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'existing_dir'))) 5057db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'existing_dir', 5067db96d56Sopenharmony_ci 'existing.txt'))) 5077db96d56Sopenharmony_ci actual = read_file((dst_dir, 'nonexisting.txt')) 5087db96d56Sopenharmony_ci self.assertEqual(actual, '123') 5097db96d56Sopenharmony_ci actual = read_file((dst_dir, 'existing_dir', 'existing.txt')) 5107db96d56Sopenharmony_ci self.assertEqual(actual, 'has been replaced') 5117db96d56Sopenharmony_ci 5127db96d56Sopenharmony_ci with self.assertRaises(FileExistsError): 5137db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, dirs_exist_ok=False) 5147db96d56Sopenharmony_ci 5157db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 5167db96d56Sopenharmony_ci def test_copytree_symlinks(self): 5177db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 5187db96d56Sopenharmony_ci src_dir = os.path.join(tmp_dir, 'src') 5197db96d56Sopenharmony_ci dst_dir = os.path.join(tmp_dir, 'dst') 5207db96d56Sopenharmony_ci sub_dir = os.path.join(src_dir, 'sub') 5217db96d56Sopenharmony_ci os.mkdir(src_dir) 5227db96d56Sopenharmony_ci os.mkdir(sub_dir) 5237db96d56Sopenharmony_ci write_file((src_dir, 'file.txt'), 'foo') 5247db96d56Sopenharmony_ci src_link = os.path.join(sub_dir, 'link') 5257db96d56Sopenharmony_ci dst_link = os.path.join(dst_dir, 'sub/link') 5267db96d56Sopenharmony_ci os.symlink(os.path.join(src_dir, 'file.txt'), 5277db96d56Sopenharmony_ci src_link) 5287db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 5297db96d56Sopenharmony_ci os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 5307db96d56Sopenharmony_ci if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 5317db96d56Sopenharmony_ci os.lchflags(src_link, stat.UF_NODUMP) 5327db96d56Sopenharmony_ci src_stat = os.lstat(src_link) 5337db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, symlinks=True) 5347db96d56Sopenharmony_ci self.assertTrue(os.path.islink(os.path.join(dst_dir, 'sub', 'link'))) 5357db96d56Sopenharmony_ci actual = os.readlink(os.path.join(dst_dir, 'sub', 'link')) 5367db96d56Sopenharmony_ci # Bad practice to blindly strip the prefix as it may be required to 5377db96d56Sopenharmony_ci # correctly refer to the file, but we're only comparing paths here. 5387db96d56Sopenharmony_ci if os.name == 'nt' and actual.startswith('\\\\?\\'): 5397db96d56Sopenharmony_ci actual = actual[4:] 5407db96d56Sopenharmony_ci self.assertEqual(actual, os.path.join(src_dir, 'file.txt')) 5417db96d56Sopenharmony_ci dst_stat = os.lstat(dst_link) 5427db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 5437db96d56Sopenharmony_ci self.assertEqual(dst_stat.st_mode, src_stat.st_mode) 5447db96d56Sopenharmony_ci if hasattr(os, 'lchflags'): 5457db96d56Sopenharmony_ci self.assertEqual(dst_stat.st_flags, src_stat.st_flags) 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci def test_copytree_with_exclude(self): 5487db96d56Sopenharmony_ci # creating data 5497db96d56Sopenharmony_ci join = os.path.join 5507db96d56Sopenharmony_ci exists = os.path.exists 5517db96d56Sopenharmony_ci src_dir = self.mkdtemp() 5527db96d56Sopenharmony_ci try: 5537db96d56Sopenharmony_ci dst_dir = join(self.mkdtemp(), 'destination') 5547db96d56Sopenharmony_ci write_file((src_dir, 'test.txt'), '123') 5557db96d56Sopenharmony_ci write_file((src_dir, 'test.tmp'), '123') 5567db96d56Sopenharmony_ci os.mkdir(join(src_dir, 'test_dir')) 5577db96d56Sopenharmony_ci write_file((src_dir, 'test_dir', 'test.txt'), '456') 5587db96d56Sopenharmony_ci os.mkdir(join(src_dir, 'test_dir2')) 5597db96d56Sopenharmony_ci write_file((src_dir, 'test_dir2', 'test.txt'), '456') 5607db96d56Sopenharmony_ci os.mkdir(join(src_dir, 'test_dir2', 'subdir')) 5617db96d56Sopenharmony_ci os.mkdir(join(src_dir, 'test_dir2', 'subdir2')) 5627db96d56Sopenharmony_ci write_file((src_dir, 'test_dir2', 'subdir', 'test.txt'), '456') 5637db96d56Sopenharmony_ci write_file((src_dir, 'test_dir2', 'subdir2', 'test.py'), '456') 5647db96d56Sopenharmony_ci 5657db96d56Sopenharmony_ci # testing glob-like patterns 5667db96d56Sopenharmony_ci try: 5677db96d56Sopenharmony_ci patterns = shutil.ignore_patterns('*.tmp', 'test_dir2') 5687db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, ignore=patterns) 5697db96d56Sopenharmony_ci # checking the result: some elements should not be copied 5707db96d56Sopenharmony_ci self.assertTrue(exists(join(dst_dir, 'test.txt'))) 5717db96d56Sopenharmony_ci self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 5727db96d56Sopenharmony_ci self.assertFalse(exists(join(dst_dir, 'test_dir2'))) 5737db96d56Sopenharmony_ci finally: 5747db96d56Sopenharmony_ci shutil.rmtree(dst_dir) 5757db96d56Sopenharmony_ci try: 5767db96d56Sopenharmony_ci patterns = shutil.ignore_patterns('*.tmp', 'subdir*') 5777db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, ignore=patterns) 5787db96d56Sopenharmony_ci # checking the result: some elements should not be copied 5797db96d56Sopenharmony_ci self.assertFalse(exists(join(dst_dir, 'test.tmp'))) 5807db96d56Sopenharmony_ci self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2'))) 5817db96d56Sopenharmony_ci self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 5827db96d56Sopenharmony_ci finally: 5837db96d56Sopenharmony_ci shutil.rmtree(dst_dir) 5847db96d56Sopenharmony_ci 5857db96d56Sopenharmony_ci # testing callable-style 5867db96d56Sopenharmony_ci try: 5877db96d56Sopenharmony_ci def _filter(src, names): 5887db96d56Sopenharmony_ci res = [] 5897db96d56Sopenharmony_ci for name in names: 5907db96d56Sopenharmony_ci path = os.path.join(src, name) 5917db96d56Sopenharmony_ci 5927db96d56Sopenharmony_ci if (os.path.isdir(path) and 5937db96d56Sopenharmony_ci path.split()[-1] == 'subdir'): 5947db96d56Sopenharmony_ci res.append(name) 5957db96d56Sopenharmony_ci elif os.path.splitext(path)[-1] in ('.py'): 5967db96d56Sopenharmony_ci res.append(name) 5977db96d56Sopenharmony_ci return res 5987db96d56Sopenharmony_ci 5997db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, ignore=_filter) 6007db96d56Sopenharmony_ci 6017db96d56Sopenharmony_ci # checking the result: some elements should not be copied 6027db96d56Sopenharmony_ci self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir2', 6037db96d56Sopenharmony_ci 'test.py'))) 6047db96d56Sopenharmony_ci self.assertFalse(exists(join(dst_dir, 'test_dir2', 'subdir'))) 6057db96d56Sopenharmony_ci 6067db96d56Sopenharmony_ci finally: 6077db96d56Sopenharmony_ci shutil.rmtree(dst_dir) 6087db96d56Sopenharmony_ci finally: 6097db96d56Sopenharmony_ci shutil.rmtree(src_dir) 6107db96d56Sopenharmony_ci shutil.rmtree(os.path.dirname(dst_dir)) 6117db96d56Sopenharmony_ci 6127db96d56Sopenharmony_ci def test_copytree_arg_types_of_ignore(self): 6137db96d56Sopenharmony_ci join = os.path.join 6147db96d56Sopenharmony_ci exists = os.path.exists 6157db96d56Sopenharmony_ci 6167db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 6177db96d56Sopenharmony_ci src_dir = join(tmp_dir, "source") 6187db96d56Sopenharmony_ci 6197db96d56Sopenharmony_ci os.mkdir(join(src_dir)) 6207db96d56Sopenharmony_ci os.mkdir(join(src_dir, 'test_dir')) 6217db96d56Sopenharmony_ci os.mkdir(os.path.join(src_dir, 'test_dir', 'subdir')) 6227db96d56Sopenharmony_ci write_file((src_dir, 'test_dir', 'subdir', 'test.txt'), '456') 6237db96d56Sopenharmony_ci 6247db96d56Sopenharmony_ci invokations = [] 6257db96d56Sopenharmony_ci 6267db96d56Sopenharmony_ci def _ignore(src, names): 6277db96d56Sopenharmony_ci invokations.append(src) 6287db96d56Sopenharmony_ci self.assertIsInstance(src, str) 6297db96d56Sopenharmony_ci self.assertIsInstance(names, list) 6307db96d56Sopenharmony_ci self.assertEqual(len(names), len(set(names))) 6317db96d56Sopenharmony_ci for name in names: 6327db96d56Sopenharmony_ci self.assertIsInstance(name, str) 6337db96d56Sopenharmony_ci return [] 6347db96d56Sopenharmony_ci 6357db96d56Sopenharmony_ci dst_dir = join(self.mkdtemp(), 'destination') 6367db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, ignore=_ignore) 6377db96d56Sopenharmony_ci self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 6387db96d56Sopenharmony_ci 'test.txt'))) 6397db96d56Sopenharmony_ci 6407db96d56Sopenharmony_ci dst_dir = join(self.mkdtemp(), 'destination') 6417db96d56Sopenharmony_ci shutil.copytree(pathlib.Path(src_dir), dst_dir, ignore=_ignore) 6427db96d56Sopenharmony_ci self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 6437db96d56Sopenharmony_ci 'test.txt'))) 6447db96d56Sopenharmony_ci 6457db96d56Sopenharmony_ci dst_dir = join(self.mkdtemp(), 'destination') 6467db96d56Sopenharmony_ci src_dir_entry = list(os.scandir(tmp_dir))[0] 6477db96d56Sopenharmony_ci self.assertIsInstance(src_dir_entry, os.DirEntry) 6487db96d56Sopenharmony_ci shutil.copytree(src_dir_entry, dst_dir, ignore=_ignore) 6497db96d56Sopenharmony_ci self.assertTrue(exists(join(dst_dir, 'test_dir', 'subdir', 6507db96d56Sopenharmony_ci 'test.txt'))) 6517db96d56Sopenharmony_ci 6527db96d56Sopenharmony_ci self.assertEqual(len(invokations), 9) 6537db96d56Sopenharmony_ci 6547db96d56Sopenharmony_ci def test_copytree_retains_permissions(self): 6557db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 6567db96d56Sopenharmony_ci src_dir = os.path.join(tmp_dir, 'source') 6577db96d56Sopenharmony_ci os.mkdir(src_dir) 6587db96d56Sopenharmony_ci dst_dir = os.path.join(tmp_dir, 'destination') 6597db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, tmp_dir) 6607db96d56Sopenharmony_ci 6617db96d56Sopenharmony_ci os.chmod(src_dir, 0o777) 6627db96d56Sopenharmony_ci write_file((src_dir, 'permissive.txt'), '123') 6637db96d56Sopenharmony_ci os.chmod(os.path.join(src_dir, 'permissive.txt'), 0o777) 6647db96d56Sopenharmony_ci write_file((src_dir, 'restrictive.txt'), '456') 6657db96d56Sopenharmony_ci os.chmod(os.path.join(src_dir, 'restrictive.txt'), 0o600) 6667db96d56Sopenharmony_ci restrictive_subdir = tempfile.mkdtemp(dir=src_dir) 6677db96d56Sopenharmony_ci self.addCleanup(os_helper.rmtree, restrictive_subdir) 6687db96d56Sopenharmony_ci os.chmod(restrictive_subdir, 0o600) 6697db96d56Sopenharmony_ci 6707db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir) 6717db96d56Sopenharmony_ci self.assertEqual(os.stat(src_dir).st_mode, os.stat(dst_dir).st_mode) 6727db96d56Sopenharmony_ci self.assertEqual(os.stat(os.path.join(src_dir, 'permissive.txt')).st_mode, 6737db96d56Sopenharmony_ci os.stat(os.path.join(dst_dir, 'permissive.txt')).st_mode) 6747db96d56Sopenharmony_ci self.assertEqual(os.stat(os.path.join(src_dir, 'restrictive.txt')).st_mode, 6757db96d56Sopenharmony_ci os.stat(os.path.join(dst_dir, 'restrictive.txt')).st_mode) 6767db96d56Sopenharmony_ci restrictive_subdir_dst = os.path.join(dst_dir, 6777db96d56Sopenharmony_ci os.path.split(restrictive_subdir)[1]) 6787db96d56Sopenharmony_ci self.assertEqual(os.stat(restrictive_subdir).st_mode, 6797db96d56Sopenharmony_ci os.stat(restrictive_subdir_dst).st_mode) 6807db96d56Sopenharmony_ci 6817db96d56Sopenharmony_ci @unittest.mock.patch('os.chmod') 6827db96d56Sopenharmony_ci def test_copytree_winerror(self, mock_patch): 6837db96d56Sopenharmony_ci # When copying to VFAT, copystat() raises OSError. On Windows, the 6847db96d56Sopenharmony_ci # exception object has a meaningful 'winerror' attribute, but not 6857db96d56Sopenharmony_ci # on other operating systems. Do not assume 'winerror' is set. 6867db96d56Sopenharmony_ci src_dir = self.mkdtemp() 6877db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination') 6887db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, src_dir) 6897db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, os.path.dirname(dst_dir)) 6907db96d56Sopenharmony_ci 6917db96d56Sopenharmony_ci mock_patch.side_effect = PermissionError('ka-boom') 6927db96d56Sopenharmony_ci with self.assertRaises(shutil.Error): 6937db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir) 6947db96d56Sopenharmony_ci 6957db96d56Sopenharmony_ci def test_copytree_custom_copy_function(self): 6967db96d56Sopenharmony_ci # See: https://bugs.python.org/issue35648 6977db96d56Sopenharmony_ci def custom_cpfun(a, b): 6987db96d56Sopenharmony_ci flag.append(None) 6997db96d56Sopenharmony_ci self.assertIsInstance(a, str) 7007db96d56Sopenharmony_ci self.assertIsInstance(b, str) 7017db96d56Sopenharmony_ci self.assertEqual(a, os.path.join(src, 'foo')) 7027db96d56Sopenharmony_ci self.assertEqual(b, os.path.join(dst, 'foo')) 7037db96d56Sopenharmony_ci 7047db96d56Sopenharmony_ci flag = [] 7057db96d56Sopenharmony_ci src = self.mkdtemp() 7067db96d56Sopenharmony_ci dst = tempfile.mktemp(dir=self.mkdtemp()) 7077db96d56Sopenharmony_ci with open(os.path.join(src, 'foo'), 'w', encoding='utf-8') as f: 7087db96d56Sopenharmony_ci f.close() 7097db96d56Sopenharmony_ci shutil.copytree(src, dst, copy_function=custom_cpfun) 7107db96d56Sopenharmony_ci self.assertEqual(len(flag), 1) 7117db96d56Sopenharmony_ci 7127db96d56Sopenharmony_ci # Issue #3002: copyfile and copytree block indefinitely on named pipes 7137db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 7147db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 7157db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "vxworks", 7167db96d56Sopenharmony_ci "fifo requires special path on VxWorks") 7177db96d56Sopenharmony_ci def test_copytree_named_pipe(self): 7187db96d56Sopenharmony_ci os.mkdir(TESTFN) 7197db96d56Sopenharmony_ci try: 7207db96d56Sopenharmony_ci subdir = os.path.join(TESTFN, "subdir") 7217db96d56Sopenharmony_ci os.mkdir(subdir) 7227db96d56Sopenharmony_ci pipe = os.path.join(subdir, "mypipe") 7237db96d56Sopenharmony_ci try: 7247db96d56Sopenharmony_ci os.mkfifo(pipe) 7257db96d56Sopenharmony_ci except PermissionError as e: 7267db96d56Sopenharmony_ci self.skipTest('os.mkfifo(): %s' % e) 7277db96d56Sopenharmony_ci try: 7287db96d56Sopenharmony_ci shutil.copytree(TESTFN, TESTFN2) 7297db96d56Sopenharmony_ci except shutil.Error as e: 7307db96d56Sopenharmony_ci errors = e.args[0] 7317db96d56Sopenharmony_ci self.assertEqual(len(errors), 1) 7327db96d56Sopenharmony_ci src, dst, error_msg = errors[0] 7337db96d56Sopenharmony_ci self.assertEqual("`%s` is a named pipe" % pipe, error_msg) 7347db96d56Sopenharmony_ci else: 7357db96d56Sopenharmony_ci self.fail("shutil.Error should have been raised") 7367db96d56Sopenharmony_ci finally: 7377db96d56Sopenharmony_ci shutil.rmtree(TESTFN, ignore_errors=True) 7387db96d56Sopenharmony_ci shutil.rmtree(TESTFN2, ignore_errors=True) 7397db96d56Sopenharmony_ci 7407db96d56Sopenharmony_ci def test_copytree_special_func(self): 7417db96d56Sopenharmony_ci src_dir = self.mkdtemp() 7427db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination') 7437db96d56Sopenharmony_ci write_file((src_dir, 'test.txt'), '123') 7447db96d56Sopenharmony_ci os.mkdir(os.path.join(src_dir, 'test_dir')) 7457db96d56Sopenharmony_ci write_file((src_dir, 'test_dir', 'test.txt'), '456') 7467db96d56Sopenharmony_ci 7477db96d56Sopenharmony_ci copied = [] 7487db96d56Sopenharmony_ci def _copy(src, dst): 7497db96d56Sopenharmony_ci copied.append((src, dst)) 7507db96d56Sopenharmony_ci 7517db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, copy_function=_copy) 7527db96d56Sopenharmony_ci self.assertEqual(len(copied), 2) 7537db96d56Sopenharmony_ci 7547db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 7557db96d56Sopenharmony_ci def test_copytree_dangling_symlinks(self): 7567db96d56Sopenharmony_ci src_dir = self.mkdtemp() 7577db96d56Sopenharmony_ci valid_file = os.path.join(src_dir, 'test.txt') 7587db96d56Sopenharmony_ci write_file(valid_file, 'abc') 7597db96d56Sopenharmony_ci dir_a = os.path.join(src_dir, 'dir_a') 7607db96d56Sopenharmony_ci os.mkdir(dir_a) 7617db96d56Sopenharmony_ci for d in src_dir, dir_a: 7627db96d56Sopenharmony_ci os.symlink('IDONTEXIST', os.path.join(d, 'broken')) 7637db96d56Sopenharmony_ci os.symlink(valid_file, os.path.join(d, 'valid')) 7647db96d56Sopenharmony_ci 7657db96d56Sopenharmony_ci # A dangling symlink should raise an error. 7667db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination') 7677db96d56Sopenharmony_ci self.assertRaises(Error, shutil.copytree, src_dir, dst_dir) 7687db96d56Sopenharmony_ci 7697db96d56Sopenharmony_ci # Dangling symlinks should be ignored with the proper flag. 7707db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination2') 7717db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, ignore_dangling_symlinks=True) 7727db96d56Sopenharmony_ci for root, dirs, files in os.walk(dst_dir): 7737db96d56Sopenharmony_ci self.assertNotIn('broken', files) 7747db96d56Sopenharmony_ci self.assertIn('valid', files) 7757db96d56Sopenharmony_ci 7767db96d56Sopenharmony_ci # a dangling symlink is copied if symlinks=True 7777db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination3') 7787db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, symlinks=True) 7797db96d56Sopenharmony_ci self.assertIn('test.txt', os.listdir(dst_dir)) 7807db96d56Sopenharmony_ci 7817db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 7827db96d56Sopenharmony_ci def test_copytree_symlink_dir(self): 7837db96d56Sopenharmony_ci src_dir = self.mkdtemp() 7847db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination') 7857db96d56Sopenharmony_ci os.mkdir(os.path.join(src_dir, 'real_dir')) 7867db96d56Sopenharmony_ci with open(os.path.join(src_dir, 'real_dir', 'test.txt'), 'wb'): 7877db96d56Sopenharmony_ci pass 7887db96d56Sopenharmony_ci os.symlink(os.path.join(src_dir, 'real_dir'), 7897db96d56Sopenharmony_ci os.path.join(src_dir, 'link_to_dir'), 7907db96d56Sopenharmony_ci target_is_directory=True) 7917db96d56Sopenharmony_ci 7927db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, symlinks=False) 7937db96d56Sopenharmony_ci self.assertFalse(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 7947db96d56Sopenharmony_ci self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 7957db96d56Sopenharmony_ci 7967db96d56Sopenharmony_ci dst_dir = os.path.join(self.mkdtemp(), 'destination2') 7977db96d56Sopenharmony_ci shutil.copytree(src_dir, dst_dir, symlinks=True) 7987db96d56Sopenharmony_ci self.assertTrue(os.path.islink(os.path.join(dst_dir, 'link_to_dir'))) 7997db96d56Sopenharmony_ci self.assertIn('test.txt', os.listdir(os.path.join(dst_dir, 'link_to_dir'))) 8007db96d56Sopenharmony_ci 8017db96d56Sopenharmony_ci def test_copytree_return_value(self): 8027db96d56Sopenharmony_ci # copytree returns its destination path. 8037db96d56Sopenharmony_ci src_dir = self.mkdtemp() 8047db96d56Sopenharmony_ci dst_dir = src_dir + "dest" 8057db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, dst_dir, True) 8067db96d56Sopenharmony_ci src = os.path.join(src_dir, 'foo') 8077db96d56Sopenharmony_ci write_file(src, 'foo') 8087db96d56Sopenharmony_ci rv = shutil.copytree(src_dir, dst_dir) 8097db96d56Sopenharmony_ci self.assertEqual(['foo'], os.listdir(rv)) 8107db96d56Sopenharmony_ci 8117db96d56Sopenharmony_ci def test_copytree_subdirectory(self): 8127db96d56Sopenharmony_ci # copytree where dst is a subdirectory of src, see Issue 38688 8137db96d56Sopenharmony_ci base_dir = self.mkdtemp() 8147db96d56Sopenharmony_ci self.addCleanup(shutil.rmtree, base_dir, ignore_errors=True) 8157db96d56Sopenharmony_ci src_dir = os.path.join(base_dir, "t", "pg") 8167db96d56Sopenharmony_ci dst_dir = os.path.join(src_dir, "somevendor", "1.0") 8177db96d56Sopenharmony_ci os.makedirs(src_dir) 8187db96d56Sopenharmony_ci src = os.path.join(src_dir, 'pol') 8197db96d56Sopenharmony_ci write_file(src, 'pol') 8207db96d56Sopenharmony_ci rv = shutil.copytree(src_dir, dst_dir) 8217db96d56Sopenharmony_ci self.assertEqual(['pol'], os.listdir(rv)) 8227db96d56Sopenharmony_ci 8237db96d56Sopenharmony_ciclass TestCopy(BaseTest, unittest.TestCase): 8247db96d56Sopenharmony_ci 8257db96d56Sopenharmony_ci ### shutil.copymode 8267db96d56Sopenharmony_ci 8277db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 8287db96d56Sopenharmony_ci def test_copymode_follow_symlinks(self): 8297db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 8307db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 8317db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 8327db96d56Sopenharmony_ci src_link = os.path.join(tmp_dir, 'baz') 8337db96d56Sopenharmony_ci dst_link = os.path.join(tmp_dir, 'quux') 8347db96d56Sopenharmony_ci write_file(src, 'foo') 8357db96d56Sopenharmony_ci write_file(dst, 'foo') 8367db96d56Sopenharmony_ci os.symlink(src, src_link) 8377db96d56Sopenharmony_ci os.symlink(dst, dst_link) 8387db96d56Sopenharmony_ci os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 8397db96d56Sopenharmony_ci # file to file 8407db96d56Sopenharmony_ci os.chmod(dst, stat.S_IRWXO) 8417db96d56Sopenharmony_ci self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8427db96d56Sopenharmony_ci shutil.copymode(src, dst) 8437db96d56Sopenharmony_ci self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8447db96d56Sopenharmony_ci # On Windows, os.chmod does not follow symlinks (issue #15411) 8457db96d56Sopenharmony_ci if os.name != 'nt': 8467db96d56Sopenharmony_ci # follow src link 8477db96d56Sopenharmony_ci os.chmod(dst, stat.S_IRWXO) 8487db96d56Sopenharmony_ci shutil.copymode(src_link, dst) 8497db96d56Sopenharmony_ci self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8507db96d56Sopenharmony_ci # follow dst link 8517db96d56Sopenharmony_ci os.chmod(dst, stat.S_IRWXO) 8527db96d56Sopenharmony_ci shutil.copymode(src, dst_link) 8537db96d56Sopenharmony_ci self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8547db96d56Sopenharmony_ci # follow both links 8557db96d56Sopenharmony_ci os.chmod(dst, stat.S_IRWXO) 8567db96d56Sopenharmony_ci shutil.copymode(src_link, dst_link) 8577db96d56Sopenharmony_ci self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8587db96d56Sopenharmony_ci 8597db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'lchmod'), 'requires os.lchmod') 8607db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 8617db96d56Sopenharmony_ci def test_copymode_symlink_to_symlink(self): 8627db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 8637db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 8647db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 8657db96d56Sopenharmony_ci src_link = os.path.join(tmp_dir, 'baz') 8667db96d56Sopenharmony_ci dst_link = os.path.join(tmp_dir, 'quux') 8677db96d56Sopenharmony_ci write_file(src, 'foo') 8687db96d56Sopenharmony_ci write_file(dst, 'foo') 8697db96d56Sopenharmony_ci os.symlink(src, src_link) 8707db96d56Sopenharmony_ci os.symlink(dst, dst_link) 8717db96d56Sopenharmony_ci os.chmod(src, stat.S_IRWXU|stat.S_IRWXG) 8727db96d56Sopenharmony_ci os.chmod(dst, stat.S_IRWXU) 8737db96d56Sopenharmony_ci os.lchmod(src_link, stat.S_IRWXO|stat.S_IRWXG) 8747db96d56Sopenharmony_ci # link to link 8757db96d56Sopenharmony_ci os.lchmod(dst_link, stat.S_IRWXO) 8767db96d56Sopenharmony_ci shutil.copymode(src_link, dst_link, follow_symlinks=False) 8777db96d56Sopenharmony_ci self.assertEqual(os.lstat(src_link).st_mode, 8787db96d56Sopenharmony_ci os.lstat(dst_link).st_mode) 8797db96d56Sopenharmony_ci self.assertNotEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8807db96d56Sopenharmony_ci # src link - use chmod 8817db96d56Sopenharmony_ci os.lchmod(dst_link, stat.S_IRWXO) 8827db96d56Sopenharmony_ci shutil.copymode(src_link, dst, follow_symlinks=False) 8837db96d56Sopenharmony_ci self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8847db96d56Sopenharmony_ci # dst link - use chmod 8857db96d56Sopenharmony_ci os.lchmod(dst_link, stat.S_IRWXO) 8867db96d56Sopenharmony_ci shutil.copymode(src, dst_link, follow_symlinks=False) 8877db96d56Sopenharmony_ci self.assertEqual(os.stat(src).st_mode, os.stat(dst).st_mode) 8887db96d56Sopenharmony_ci 8897db96d56Sopenharmony_ci @unittest.skipIf(hasattr(os, 'lchmod'), 'requires os.lchmod to be missing') 8907db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 8917db96d56Sopenharmony_ci def test_copymode_symlink_to_symlink_wo_lchmod(self): 8927db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 8937db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 8947db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 8957db96d56Sopenharmony_ci src_link = os.path.join(tmp_dir, 'baz') 8967db96d56Sopenharmony_ci dst_link = os.path.join(tmp_dir, 'quux') 8977db96d56Sopenharmony_ci write_file(src, 'foo') 8987db96d56Sopenharmony_ci write_file(dst, 'foo') 8997db96d56Sopenharmony_ci os.symlink(src, src_link) 9007db96d56Sopenharmony_ci os.symlink(dst, dst_link) 9017db96d56Sopenharmony_ci shutil.copymode(src_link, dst_link, follow_symlinks=False) # silent fail 9027db96d56Sopenharmony_ci 9037db96d56Sopenharmony_ci ### shutil.copystat 9047db96d56Sopenharmony_ci 9057db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 9067db96d56Sopenharmony_ci def test_copystat_symlinks(self): 9077db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 9087db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 9097db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 9107db96d56Sopenharmony_ci src_link = os.path.join(tmp_dir, 'baz') 9117db96d56Sopenharmony_ci dst_link = os.path.join(tmp_dir, 'qux') 9127db96d56Sopenharmony_ci write_file(src, 'foo') 9137db96d56Sopenharmony_ci src_stat = os.stat(src) 9147db96d56Sopenharmony_ci os.utime(src, (src_stat.st_atime, 9157db96d56Sopenharmony_ci src_stat.st_mtime - 42.0)) # ensure different mtimes 9167db96d56Sopenharmony_ci write_file(dst, 'bar') 9177db96d56Sopenharmony_ci self.assertNotEqual(os.stat(src).st_mtime, os.stat(dst).st_mtime) 9187db96d56Sopenharmony_ci os.symlink(src, src_link) 9197db96d56Sopenharmony_ci os.symlink(dst, dst_link) 9207db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 9217db96d56Sopenharmony_ci os.lchmod(src_link, stat.S_IRWXO) 9227db96d56Sopenharmony_ci if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 9237db96d56Sopenharmony_ci os.lchflags(src_link, stat.UF_NODUMP) 9247db96d56Sopenharmony_ci src_link_stat = os.lstat(src_link) 9257db96d56Sopenharmony_ci # follow 9267db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 9277db96d56Sopenharmony_ci shutil.copystat(src_link, dst_link, follow_symlinks=True) 9287db96d56Sopenharmony_ci self.assertNotEqual(src_link_stat.st_mode, os.stat(dst).st_mode) 9297db96d56Sopenharmony_ci # don't follow 9307db96d56Sopenharmony_ci shutil.copystat(src_link, dst_link, follow_symlinks=False) 9317db96d56Sopenharmony_ci dst_link_stat = os.lstat(dst_link) 9327db96d56Sopenharmony_ci if os.utime in os.supports_follow_symlinks: 9337db96d56Sopenharmony_ci for attr in 'st_atime', 'st_mtime': 9347db96d56Sopenharmony_ci # The modification times may be truncated in the new file. 9357db96d56Sopenharmony_ci self.assertLessEqual(getattr(src_link_stat, attr), 9367db96d56Sopenharmony_ci getattr(dst_link_stat, attr) + 1) 9377db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 9387db96d56Sopenharmony_ci self.assertEqual(src_link_stat.st_mode, dst_link_stat.st_mode) 9397db96d56Sopenharmony_ci if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 9407db96d56Sopenharmony_ci self.assertEqual(src_link_stat.st_flags, dst_link_stat.st_flags) 9417db96d56Sopenharmony_ci # tell to follow but dst is not a link 9427db96d56Sopenharmony_ci shutil.copystat(src_link, dst, follow_symlinks=False) 9437db96d56Sopenharmony_ci self.assertTrue(abs(os.stat(src).st_mtime - os.stat(dst).st_mtime) < 9447db96d56Sopenharmony_ci 00000.1) 9457db96d56Sopenharmony_ci 9467db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'chflags') and 9477db96d56Sopenharmony_ci hasattr(errno, 'EOPNOTSUPP') and 9487db96d56Sopenharmony_ci hasattr(errno, 'ENOTSUP'), 9497db96d56Sopenharmony_ci "requires os.chflags, EOPNOTSUPP & ENOTSUP") 9507db96d56Sopenharmony_ci def test_copystat_handles_harmless_chflags_errors(self): 9517db96d56Sopenharmony_ci tmpdir = self.mkdtemp() 9527db96d56Sopenharmony_ci file1 = os.path.join(tmpdir, 'file1') 9537db96d56Sopenharmony_ci file2 = os.path.join(tmpdir, 'file2') 9547db96d56Sopenharmony_ci write_file(file1, 'xxx') 9557db96d56Sopenharmony_ci write_file(file2, 'xxx') 9567db96d56Sopenharmony_ci 9577db96d56Sopenharmony_ci def make_chflags_raiser(err): 9587db96d56Sopenharmony_ci ex = OSError() 9597db96d56Sopenharmony_ci 9607db96d56Sopenharmony_ci def _chflags_raiser(path, flags, *, follow_symlinks=True): 9617db96d56Sopenharmony_ci ex.errno = err 9627db96d56Sopenharmony_ci raise ex 9637db96d56Sopenharmony_ci return _chflags_raiser 9647db96d56Sopenharmony_ci old_chflags = os.chflags 9657db96d56Sopenharmony_ci try: 9667db96d56Sopenharmony_ci for err in errno.EOPNOTSUPP, errno.ENOTSUP: 9677db96d56Sopenharmony_ci os.chflags = make_chflags_raiser(err) 9687db96d56Sopenharmony_ci shutil.copystat(file1, file2) 9697db96d56Sopenharmony_ci # assert others errors break it 9707db96d56Sopenharmony_ci os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP) 9717db96d56Sopenharmony_ci self.assertRaises(OSError, shutil.copystat, file1, file2) 9727db96d56Sopenharmony_ci finally: 9737db96d56Sopenharmony_ci os.chflags = old_chflags 9747db96d56Sopenharmony_ci 9757db96d56Sopenharmony_ci ### shutil.copyxattr 9767db96d56Sopenharmony_ci 9777db96d56Sopenharmony_ci @os_helper.skip_unless_xattr 9787db96d56Sopenharmony_ci def test_copyxattr(self): 9797db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 9807db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 9817db96d56Sopenharmony_ci write_file(src, 'foo') 9827db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 9837db96d56Sopenharmony_ci write_file(dst, 'bar') 9847db96d56Sopenharmony_ci 9857db96d56Sopenharmony_ci # no xattr == no problem 9867db96d56Sopenharmony_ci shutil._copyxattr(src, dst) 9877db96d56Sopenharmony_ci # common case 9887db96d56Sopenharmony_ci os.setxattr(src, 'user.foo', b'42') 9897db96d56Sopenharmony_ci os.setxattr(src, 'user.bar', b'43') 9907db96d56Sopenharmony_ci shutil._copyxattr(src, dst) 9917db96d56Sopenharmony_ci self.assertEqual(sorted(os.listxattr(src)), sorted(os.listxattr(dst))) 9927db96d56Sopenharmony_ci self.assertEqual( 9937db96d56Sopenharmony_ci os.getxattr(src, 'user.foo'), 9947db96d56Sopenharmony_ci os.getxattr(dst, 'user.foo')) 9957db96d56Sopenharmony_ci # check errors don't affect other attrs 9967db96d56Sopenharmony_ci os.remove(dst) 9977db96d56Sopenharmony_ci write_file(dst, 'bar') 9987db96d56Sopenharmony_ci os_error = OSError(errno.EPERM, 'EPERM') 9997db96d56Sopenharmony_ci 10007db96d56Sopenharmony_ci def _raise_on_user_foo(fname, attr, val, **kwargs): 10017db96d56Sopenharmony_ci if attr == 'user.foo': 10027db96d56Sopenharmony_ci raise os_error 10037db96d56Sopenharmony_ci else: 10047db96d56Sopenharmony_ci orig_setxattr(fname, attr, val, **kwargs) 10057db96d56Sopenharmony_ci try: 10067db96d56Sopenharmony_ci orig_setxattr = os.setxattr 10077db96d56Sopenharmony_ci os.setxattr = _raise_on_user_foo 10087db96d56Sopenharmony_ci shutil._copyxattr(src, dst) 10097db96d56Sopenharmony_ci self.assertIn('user.bar', os.listxattr(dst)) 10107db96d56Sopenharmony_ci finally: 10117db96d56Sopenharmony_ci os.setxattr = orig_setxattr 10127db96d56Sopenharmony_ci # the source filesystem not supporting xattrs should be ok, too. 10137db96d56Sopenharmony_ci def _raise_on_src(fname, *, follow_symlinks=True): 10147db96d56Sopenharmony_ci if fname == src: 10157db96d56Sopenharmony_ci raise OSError(errno.ENOTSUP, 'Operation not supported') 10167db96d56Sopenharmony_ci return orig_listxattr(fname, follow_symlinks=follow_symlinks) 10177db96d56Sopenharmony_ci try: 10187db96d56Sopenharmony_ci orig_listxattr = os.listxattr 10197db96d56Sopenharmony_ci os.listxattr = _raise_on_src 10207db96d56Sopenharmony_ci shutil._copyxattr(src, dst) 10217db96d56Sopenharmony_ci finally: 10227db96d56Sopenharmony_ci os.listxattr = orig_listxattr 10237db96d56Sopenharmony_ci 10247db96d56Sopenharmony_ci # test that shutil.copystat copies xattrs 10257db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'the_original') 10267db96d56Sopenharmony_ci srcro = os.path.join(tmp_dir, 'the_original_ro') 10277db96d56Sopenharmony_ci write_file(src, src) 10287db96d56Sopenharmony_ci write_file(srcro, srcro) 10297db96d56Sopenharmony_ci os.setxattr(src, 'user.the_value', b'fiddly') 10307db96d56Sopenharmony_ci os.setxattr(srcro, 'user.the_value', b'fiddly') 10317db96d56Sopenharmony_ci os.chmod(srcro, 0o444) 10327db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'the_copy') 10337db96d56Sopenharmony_ci dstro = os.path.join(tmp_dir, 'the_copy_ro') 10347db96d56Sopenharmony_ci write_file(dst, dst) 10357db96d56Sopenharmony_ci write_file(dstro, dstro) 10367db96d56Sopenharmony_ci shutil.copystat(src, dst) 10377db96d56Sopenharmony_ci shutil.copystat(srcro, dstro) 10387db96d56Sopenharmony_ci self.assertEqual(os.getxattr(dst, 'user.the_value'), b'fiddly') 10397db96d56Sopenharmony_ci self.assertEqual(os.getxattr(dstro, 'user.the_value'), b'fiddly') 10407db96d56Sopenharmony_ci 10417db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 10427db96d56Sopenharmony_ci @os_helper.skip_unless_xattr 10437db96d56Sopenharmony_ci @os_helper.skip_unless_dac_override 10447db96d56Sopenharmony_ci def test_copyxattr_symlinks(self): 10457db96d56Sopenharmony_ci # On Linux, it's only possible to access non-user xattr for symlinks; 10467db96d56Sopenharmony_ci # which in turn require root privileges. This test should be expanded 10477db96d56Sopenharmony_ci # as soon as other platforms gain support for extended attributes. 10487db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 10497db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 10507db96d56Sopenharmony_ci src_link = os.path.join(tmp_dir, 'baz') 10517db96d56Sopenharmony_ci write_file(src, 'foo') 10527db96d56Sopenharmony_ci os.symlink(src, src_link) 10537db96d56Sopenharmony_ci os.setxattr(src, 'trusted.foo', b'42') 10547db96d56Sopenharmony_ci os.setxattr(src_link, 'trusted.foo', b'43', follow_symlinks=False) 10557db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 10567db96d56Sopenharmony_ci dst_link = os.path.join(tmp_dir, 'qux') 10577db96d56Sopenharmony_ci write_file(dst, 'bar') 10587db96d56Sopenharmony_ci os.symlink(dst, dst_link) 10597db96d56Sopenharmony_ci shutil._copyxattr(src_link, dst_link, follow_symlinks=False) 10607db96d56Sopenharmony_ci self.assertEqual(os.getxattr(dst_link, 'trusted.foo', follow_symlinks=False), b'43') 10617db96d56Sopenharmony_ci self.assertRaises(OSError, os.getxattr, dst, 'trusted.foo') 10627db96d56Sopenharmony_ci shutil._copyxattr(src_link, dst, follow_symlinks=False) 10637db96d56Sopenharmony_ci self.assertEqual(os.getxattr(dst, 'trusted.foo'), b'43') 10647db96d56Sopenharmony_ci 10657db96d56Sopenharmony_ci ### shutil.copy 10667db96d56Sopenharmony_ci 10677db96d56Sopenharmony_ci def _copy_file(self, method): 10687db96d56Sopenharmony_ci fname = 'test.txt' 10697db96d56Sopenharmony_ci tmpdir = self.mkdtemp() 10707db96d56Sopenharmony_ci write_file((tmpdir, fname), 'xxx') 10717db96d56Sopenharmony_ci file1 = os.path.join(tmpdir, fname) 10727db96d56Sopenharmony_ci tmpdir2 = self.mkdtemp() 10737db96d56Sopenharmony_ci method(file1, tmpdir2) 10747db96d56Sopenharmony_ci file2 = os.path.join(tmpdir2, fname) 10757db96d56Sopenharmony_ci return (file1, file2) 10767db96d56Sopenharmony_ci 10777db96d56Sopenharmony_ci def test_copy(self): 10787db96d56Sopenharmony_ci # Ensure that the copied file exists and has the same mode bits. 10797db96d56Sopenharmony_ci file1, file2 = self._copy_file(shutil.copy) 10807db96d56Sopenharmony_ci self.assertTrue(os.path.exists(file2)) 10817db96d56Sopenharmony_ci self.assertEqual(os.stat(file1).st_mode, os.stat(file2).st_mode) 10827db96d56Sopenharmony_ci 10837db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 10847db96d56Sopenharmony_ci def test_copy_symlinks(self): 10857db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 10867db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 10877db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 10887db96d56Sopenharmony_ci src_link = os.path.join(tmp_dir, 'baz') 10897db96d56Sopenharmony_ci write_file(src, 'foo') 10907db96d56Sopenharmony_ci os.symlink(src, src_link) 10917db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 10927db96d56Sopenharmony_ci os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 10937db96d56Sopenharmony_ci # don't follow 10947db96d56Sopenharmony_ci shutil.copy(src_link, dst, follow_symlinks=True) 10957db96d56Sopenharmony_ci self.assertFalse(os.path.islink(dst)) 10967db96d56Sopenharmony_ci self.assertEqual(read_file(src), read_file(dst)) 10977db96d56Sopenharmony_ci os.remove(dst) 10987db96d56Sopenharmony_ci # follow 10997db96d56Sopenharmony_ci shutil.copy(src_link, dst, follow_symlinks=False) 11007db96d56Sopenharmony_ci self.assertTrue(os.path.islink(dst)) 11017db96d56Sopenharmony_ci self.assertEqual(os.readlink(dst), os.readlink(src_link)) 11027db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 11037db96d56Sopenharmony_ci self.assertEqual(os.lstat(src_link).st_mode, 11047db96d56Sopenharmony_ci os.lstat(dst).st_mode) 11057db96d56Sopenharmony_ci 11067db96d56Sopenharmony_ci ### shutil.copy2 11077db96d56Sopenharmony_ci 11087db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'utime'), 'requires os.utime') 11097db96d56Sopenharmony_ci def test_copy2(self): 11107db96d56Sopenharmony_ci # Ensure that the copied file exists and has the same mode and 11117db96d56Sopenharmony_ci # modification time bits. 11127db96d56Sopenharmony_ci file1, file2 = self._copy_file(shutil.copy2) 11137db96d56Sopenharmony_ci self.assertTrue(os.path.exists(file2)) 11147db96d56Sopenharmony_ci file1_stat = os.stat(file1) 11157db96d56Sopenharmony_ci file2_stat = os.stat(file2) 11167db96d56Sopenharmony_ci self.assertEqual(file1_stat.st_mode, file2_stat.st_mode) 11177db96d56Sopenharmony_ci for attr in 'st_atime', 'st_mtime': 11187db96d56Sopenharmony_ci # The modification times may be truncated in the new file. 11197db96d56Sopenharmony_ci self.assertLessEqual(getattr(file1_stat, attr), 11207db96d56Sopenharmony_ci getattr(file2_stat, attr) + 1) 11217db96d56Sopenharmony_ci if hasattr(os, 'chflags') and hasattr(file1_stat, 'st_flags'): 11227db96d56Sopenharmony_ci self.assertEqual(getattr(file1_stat, 'st_flags'), 11237db96d56Sopenharmony_ci getattr(file2_stat, 'st_flags')) 11247db96d56Sopenharmony_ci 11257db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 11267db96d56Sopenharmony_ci def test_copy2_symlinks(self): 11277db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 11287db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 11297db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 11307db96d56Sopenharmony_ci src_link = os.path.join(tmp_dir, 'baz') 11317db96d56Sopenharmony_ci write_file(src, 'foo') 11327db96d56Sopenharmony_ci os.symlink(src, src_link) 11337db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 11347db96d56Sopenharmony_ci os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO) 11357db96d56Sopenharmony_ci if hasattr(os, 'lchflags') and hasattr(stat, 'UF_NODUMP'): 11367db96d56Sopenharmony_ci os.lchflags(src_link, stat.UF_NODUMP) 11377db96d56Sopenharmony_ci src_stat = os.stat(src) 11387db96d56Sopenharmony_ci src_link_stat = os.lstat(src_link) 11397db96d56Sopenharmony_ci # follow 11407db96d56Sopenharmony_ci shutil.copy2(src_link, dst, follow_symlinks=True) 11417db96d56Sopenharmony_ci self.assertFalse(os.path.islink(dst)) 11427db96d56Sopenharmony_ci self.assertEqual(read_file(src), read_file(dst)) 11437db96d56Sopenharmony_ci os.remove(dst) 11447db96d56Sopenharmony_ci # don't follow 11457db96d56Sopenharmony_ci shutil.copy2(src_link, dst, follow_symlinks=False) 11467db96d56Sopenharmony_ci self.assertTrue(os.path.islink(dst)) 11477db96d56Sopenharmony_ci self.assertEqual(os.readlink(dst), os.readlink(src_link)) 11487db96d56Sopenharmony_ci dst_stat = os.lstat(dst) 11497db96d56Sopenharmony_ci if os.utime in os.supports_follow_symlinks: 11507db96d56Sopenharmony_ci for attr in 'st_atime', 'st_mtime': 11517db96d56Sopenharmony_ci # The modification times may be truncated in the new file. 11527db96d56Sopenharmony_ci self.assertLessEqual(getattr(src_link_stat, attr), 11537db96d56Sopenharmony_ci getattr(dst_stat, attr) + 1) 11547db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 11557db96d56Sopenharmony_ci self.assertEqual(src_link_stat.st_mode, dst_stat.st_mode) 11567db96d56Sopenharmony_ci self.assertNotEqual(src_stat.st_mode, dst_stat.st_mode) 11577db96d56Sopenharmony_ci if hasattr(os, 'lchflags') and hasattr(src_link_stat, 'st_flags'): 11587db96d56Sopenharmony_ci self.assertEqual(src_link_stat.st_flags, dst_stat.st_flags) 11597db96d56Sopenharmony_ci 11607db96d56Sopenharmony_ci @os_helper.skip_unless_xattr 11617db96d56Sopenharmony_ci def test_copy2_xattr(self): 11627db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 11637db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'foo') 11647db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'bar') 11657db96d56Sopenharmony_ci write_file(src, 'foo') 11667db96d56Sopenharmony_ci os.setxattr(src, 'user.foo', b'42') 11677db96d56Sopenharmony_ci shutil.copy2(src, dst) 11687db96d56Sopenharmony_ci self.assertEqual( 11697db96d56Sopenharmony_ci os.getxattr(src, 'user.foo'), 11707db96d56Sopenharmony_ci os.getxattr(dst, 'user.foo')) 11717db96d56Sopenharmony_ci os.remove(dst) 11727db96d56Sopenharmony_ci 11737db96d56Sopenharmony_ci def test_copy_return_value(self): 11747db96d56Sopenharmony_ci # copy and copy2 both return their destination path. 11757db96d56Sopenharmony_ci for fn in (shutil.copy, shutil.copy2): 11767db96d56Sopenharmony_ci src_dir = self.mkdtemp() 11777db96d56Sopenharmony_ci dst_dir = self.mkdtemp() 11787db96d56Sopenharmony_ci src = os.path.join(src_dir, 'foo') 11797db96d56Sopenharmony_ci write_file(src, 'foo') 11807db96d56Sopenharmony_ci rv = fn(src, dst_dir) 11817db96d56Sopenharmony_ci self.assertEqual(rv, os.path.join(dst_dir, 'foo')) 11827db96d56Sopenharmony_ci rv = fn(src, os.path.join(dst_dir, 'bar')) 11837db96d56Sopenharmony_ci self.assertEqual(rv, os.path.join(dst_dir, 'bar')) 11847db96d56Sopenharmony_ci 11857db96d56Sopenharmony_ci def test_copy_dir(self): 11867db96d56Sopenharmony_ci self._test_copy_dir(shutil.copy) 11877db96d56Sopenharmony_ci 11887db96d56Sopenharmony_ci def test_copy2_dir(self): 11897db96d56Sopenharmony_ci self._test_copy_dir(shutil.copy2) 11907db96d56Sopenharmony_ci 11917db96d56Sopenharmony_ci def _test_copy_dir(self, copy_func): 11927db96d56Sopenharmony_ci src_dir = self.mkdtemp() 11937db96d56Sopenharmony_ci src_file = os.path.join(src_dir, 'foo') 11947db96d56Sopenharmony_ci dir2 = self.mkdtemp() 11957db96d56Sopenharmony_ci dst = os.path.join(src_dir, 'does_not_exist/') 11967db96d56Sopenharmony_ci write_file(src_file, 'foo') 11977db96d56Sopenharmony_ci if sys.platform == "win32": 11987db96d56Sopenharmony_ci err = PermissionError 11997db96d56Sopenharmony_ci else: 12007db96d56Sopenharmony_ci err = IsADirectoryError 12017db96d56Sopenharmony_ci self.assertRaises(err, copy_func, dir2, src_dir) 12027db96d56Sopenharmony_ci 12037db96d56Sopenharmony_ci # raise *err* because of src rather than FileNotFoundError because of dst 12047db96d56Sopenharmony_ci self.assertRaises(err, copy_func, dir2, dst) 12057db96d56Sopenharmony_ci copy_func(src_file, dir2) # should not raise exceptions 12067db96d56Sopenharmony_ci 12077db96d56Sopenharmony_ci ### shutil.copyfile 12087db96d56Sopenharmony_ci 12097db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 12107db96d56Sopenharmony_ci def test_copyfile_symlinks(self): 12117db96d56Sopenharmony_ci tmp_dir = self.mkdtemp() 12127db96d56Sopenharmony_ci src = os.path.join(tmp_dir, 'src') 12137db96d56Sopenharmony_ci dst = os.path.join(tmp_dir, 'dst') 12147db96d56Sopenharmony_ci dst_link = os.path.join(tmp_dir, 'dst_link') 12157db96d56Sopenharmony_ci link = os.path.join(tmp_dir, 'link') 12167db96d56Sopenharmony_ci write_file(src, 'foo') 12177db96d56Sopenharmony_ci os.symlink(src, link) 12187db96d56Sopenharmony_ci # don't follow 12197db96d56Sopenharmony_ci shutil.copyfile(link, dst_link, follow_symlinks=False) 12207db96d56Sopenharmony_ci self.assertTrue(os.path.islink(dst_link)) 12217db96d56Sopenharmony_ci self.assertEqual(os.readlink(link), os.readlink(dst_link)) 12227db96d56Sopenharmony_ci # follow 12237db96d56Sopenharmony_ci shutil.copyfile(link, dst) 12247db96d56Sopenharmony_ci self.assertFalse(os.path.islink(dst)) 12257db96d56Sopenharmony_ci 12267db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'link'), 'requires os.link') 12277db96d56Sopenharmony_ci def test_dont_copy_file_onto_link_to_itself(self): 12287db96d56Sopenharmony_ci # bug 851123. 12297db96d56Sopenharmony_ci os.mkdir(TESTFN) 12307db96d56Sopenharmony_ci src = os.path.join(TESTFN, 'cheese') 12317db96d56Sopenharmony_ci dst = os.path.join(TESTFN, 'shop') 12327db96d56Sopenharmony_ci try: 12337db96d56Sopenharmony_ci with open(src, 'w', encoding='utf-8') as f: 12347db96d56Sopenharmony_ci f.write('cheddar') 12357db96d56Sopenharmony_ci try: 12367db96d56Sopenharmony_ci os.link(src, dst) 12377db96d56Sopenharmony_ci except PermissionError as e: 12387db96d56Sopenharmony_ci self.skipTest('os.link(): %s' % e) 12397db96d56Sopenharmony_ci self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 12407db96d56Sopenharmony_ci with open(src, 'r', encoding='utf-8') as f: 12417db96d56Sopenharmony_ci self.assertEqual(f.read(), 'cheddar') 12427db96d56Sopenharmony_ci os.remove(dst) 12437db96d56Sopenharmony_ci finally: 12447db96d56Sopenharmony_ci shutil.rmtree(TESTFN, ignore_errors=True) 12457db96d56Sopenharmony_ci 12467db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 12477db96d56Sopenharmony_ci def test_dont_copy_file_onto_symlink_to_itself(self): 12487db96d56Sopenharmony_ci # bug 851123. 12497db96d56Sopenharmony_ci os.mkdir(TESTFN) 12507db96d56Sopenharmony_ci src = os.path.join(TESTFN, 'cheese') 12517db96d56Sopenharmony_ci dst = os.path.join(TESTFN, 'shop') 12527db96d56Sopenharmony_ci try: 12537db96d56Sopenharmony_ci with open(src, 'w', encoding='utf-8') as f: 12547db96d56Sopenharmony_ci f.write('cheddar') 12557db96d56Sopenharmony_ci # Using `src` here would mean we end up with a symlink pointing 12567db96d56Sopenharmony_ci # to TESTFN/TESTFN/cheese, while it should point at 12577db96d56Sopenharmony_ci # TESTFN/cheese. 12587db96d56Sopenharmony_ci os.symlink('cheese', dst) 12597db96d56Sopenharmony_ci self.assertRaises(shutil.SameFileError, shutil.copyfile, src, dst) 12607db96d56Sopenharmony_ci with open(src, 'r', encoding='utf-8') as f: 12617db96d56Sopenharmony_ci self.assertEqual(f.read(), 'cheddar') 12627db96d56Sopenharmony_ci os.remove(dst) 12637db96d56Sopenharmony_ci finally: 12647db96d56Sopenharmony_ci shutil.rmtree(TESTFN, ignore_errors=True) 12657db96d56Sopenharmony_ci 12667db96d56Sopenharmony_ci # Issue #3002: copyfile and copytree block indefinitely on named pipes 12677db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()') 12687db96d56Sopenharmony_ci @unittest.skipIf(sys.platform == "vxworks", 12697db96d56Sopenharmony_ci "fifo requires special path on VxWorks") 12707db96d56Sopenharmony_ci def test_copyfile_named_pipe(self): 12717db96d56Sopenharmony_ci try: 12727db96d56Sopenharmony_ci os.mkfifo(TESTFN) 12737db96d56Sopenharmony_ci except PermissionError as e: 12747db96d56Sopenharmony_ci self.skipTest('os.mkfifo(): %s' % e) 12757db96d56Sopenharmony_ci try: 12767db96d56Sopenharmony_ci self.assertRaises(shutil.SpecialFileError, 12777db96d56Sopenharmony_ci shutil.copyfile, TESTFN, TESTFN2) 12787db96d56Sopenharmony_ci self.assertRaises(shutil.SpecialFileError, 12797db96d56Sopenharmony_ci shutil.copyfile, __file__, TESTFN) 12807db96d56Sopenharmony_ci finally: 12817db96d56Sopenharmony_ci os.remove(TESTFN) 12827db96d56Sopenharmony_ci 12837db96d56Sopenharmony_ci def test_copyfile_return_value(self): 12847db96d56Sopenharmony_ci # copytree returns its destination path. 12857db96d56Sopenharmony_ci src_dir = self.mkdtemp() 12867db96d56Sopenharmony_ci dst_dir = self.mkdtemp() 12877db96d56Sopenharmony_ci dst_file = os.path.join(dst_dir, 'bar') 12887db96d56Sopenharmony_ci src_file = os.path.join(src_dir, 'foo') 12897db96d56Sopenharmony_ci write_file(src_file, 'foo') 12907db96d56Sopenharmony_ci rv = shutil.copyfile(src_file, dst_file) 12917db96d56Sopenharmony_ci self.assertTrue(os.path.exists(rv)) 12927db96d56Sopenharmony_ci self.assertEqual(read_file(src_file), read_file(dst_file)) 12937db96d56Sopenharmony_ci 12947db96d56Sopenharmony_ci def test_copyfile_same_file(self): 12957db96d56Sopenharmony_ci # copyfile() should raise SameFileError if the source and destination 12967db96d56Sopenharmony_ci # are the same. 12977db96d56Sopenharmony_ci src_dir = self.mkdtemp() 12987db96d56Sopenharmony_ci src_file = os.path.join(src_dir, 'foo') 12997db96d56Sopenharmony_ci write_file(src_file, 'foo') 13007db96d56Sopenharmony_ci self.assertRaises(SameFileError, shutil.copyfile, src_file, src_file) 13017db96d56Sopenharmony_ci # But Error should work too, to stay backward compatible. 13027db96d56Sopenharmony_ci self.assertRaises(Error, shutil.copyfile, src_file, src_file) 13037db96d56Sopenharmony_ci # Make sure file is not corrupted. 13047db96d56Sopenharmony_ci self.assertEqual(read_file(src_file), 'foo') 13057db96d56Sopenharmony_ci 13067db96d56Sopenharmony_ci @unittest.skipIf(MACOS or SOLARIS or _winapi, 'On MACOS, Solaris and Windows the errors are not confusing (though different)') 13077db96d56Sopenharmony_ci # gh-92670: The test uses a trailing slash to force the OS consider 13087db96d56Sopenharmony_ci # the path as a directory, but on AIX the trailing slash has no effect 13097db96d56Sopenharmony_ci # and is considered as a file. 13107db96d56Sopenharmony_ci @unittest.skipIf(AIX, 'Not valid on AIX, see gh-92670') 13117db96d56Sopenharmony_ci def test_copyfile_nonexistent_dir(self): 13127db96d56Sopenharmony_ci # Issue 43219 13137db96d56Sopenharmony_ci src_dir = self.mkdtemp() 13147db96d56Sopenharmony_ci src_file = os.path.join(src_dir, 'foo') 13157db96d56Sopenharmony_ci dst = os.path.join(src_dir, 'does_not_exist/') 13167db96d56Sopenharmony_ci write_file(src_file, 'foo') 13177db96d56Sopenharmony_ci self.assertRaises(FileNotFoundError, shutil.copyfile, src_file, dst) 13187db96d56Sopenharmony_ci 13197db96d56Sopenharmony_ci def test_copyfile_copy_dir(self): 13207db96d56Sopenharmony_ci # Issue 45234 13217db96d56Sopenharmony_ci # test copy() and copyfile() raising proper exceptions when src and/or 13227db96d56Sopenharmony_ci # dst are directories 13237db96d56Sopenharmony_ci src_dir = self.mkdtemp() 13247db96d56Sopenharmony_ci src_file = os.path.join(src_dir, 'foo') 13257db96d56Sopenharmony_ci dir2 = self.mkdtemp() 13267db96d56Sopenharmony_ci dst = os.path.join(src_dir, 'does_not_exist/') 13277db96d56Sopenharmony_ci write_file(src_file, 'foo') 13287db96d56Sopenharmony_ci if sys.platform == "win32": 13297db96d56Sopenharmony_ci err = PermissionError 13307db96d56Sopenharmony_ci else: 13317db96d56Sopenharmony_ci err = IsADirectoryError 13327db96d56Sopenharmony_ci 13337db96d56Sopenharmony_ci self.assertRaises(err, shutil.copyfile, src_dir, dst) 13347db96d56Sopenharmony_ci self.assertRaises(err, shutil.copyfile, src_file, src_dir) 13357db96d56Sopenharmony_ci self.assertRaises(err, shutil.copyfile, dir2, src_dir) 13367db96d56Sopenharmony_ci 13377db96d56Sopenharmony_ci 13387db96d56Sopenharmony_ciclass TestArchives(BaseTest, unittest.TestCase): 13397db96d56Sopenharmony_ci 13407db96d56Sopenharmony_ci ### shutil.make_archive 13417db96d56Sopenharmony_ci 13427db96d56Sopenharmony_ci @support.requires_zlib() 13437db96d56Sopenharmony_ci def test_make_tarball(self): 13447db96d56Sopenharmony_ci # creating something to tar 13457db96d56Sopenharmony_ci root_dir, base_dir = self._create_files('') 13467db96d56Sopenharmony_ci 13477db96d56Sopenharmony_ci tmpdir2 = self.mkdtemp() 13487db96d56Sopenharmony_ci # force shutil to create the directory 13497db96d56Sopenharmony_ci os.rmdir(tmpdir2) 13507db96d56Sopenharmony_ci # working with relative paths 13517db96d56Sopenharmony_ci work_dir = os.path.dirname(tmpdir2) 13527db96d56Sopenharmony_ci rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 13537db96d56Sopenharmony_ci 13547db96d56Sopenharmony_ci with os_helper.change_cwd(work_dir), no_chdir: 13557db96d56Sopenharmony_ci base_name = os.path.abspath(rel_base_name) 13567db96d56Sopenharmony_ci tarball = make_archive(rel_base_name, 'gztar', root_dir, '.') 13577db96d56Sopenharmony_ci 13587db96d56Sopenharmony_ci # check if the compressed tarball was created 13597db96d56Sopenharmony_ci self.assertEqual(tarball, base_name + '.tar.gz') 13607db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(tarball)) 13617db96d56Sopenharmony_ci self.assertTrue(tarfile.is_tarfile(tarball)) 13627db96d56Sopenharmony_ci with tarfile.open(tarball, 'r:gz') as tf: 13637db96d56Sopenharmony_ci self.assertCountEqual(tf.getnames(), 13647db96d56Sopenharmony_ci ['.', './sub', './sub2', 13657db96d56Sopenharmony_ci './file1', './file2', './sub/file3']) 13667db96d56Sopenharmony_ci 13677db96d56Sopenharmony_ci # trying an uncompressed one 13687db96d56Sopenharmony_ci with os_helper.change_cwd(work_dir), no_chdir: 13697db96d56Sopenharmony_ci tarball = make_archive(rel_base_name, 'tar', root_dir, '.') 13707db96d56Sopenharmony_ci self.assertEqual(tarball, base_name + '.tar') 13717db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(tarball)) 13727db96d56Sopenharmony_ci self.assertTrue(tarfile.is_tarfile(tarball)) 13737db96d56Sopenharmony_ci with tarfile.open(tarball, 'r') as tf: 13747db96d56Sopenharmony_ci self.assertCountEqual(tf.getnames(), 13757db96d56Sopenharmony_ci ['.', './sub', './sub2', 13767db96d56Sopenharmony_ci './file1', './file2', './sub/file3']) 13777db96d56Sopenharmony_ci 13787db96d56Sopenharmony_ci def _tarinfo(self, path): 13797db96d56Sopenharmony_ci with tarfile.open(path) as tar: 13807db96d56Sopenharmony_ci names = tar.getnames() 13817db96d56Sopenharmony_ci names.sort() 13827db96d56Sopenharmony_ci return tuple(names) 13837db96d56Sopenharmony_ci 13847db96d56Sopenharmony_ci def _create_files(self, base_dir='dist'): 13857db96d56Sopenharmony_ci # creating something to tar 13867db96d56Sopenharmony_ci root_dir = self.mkdtemp() 13877db96d56Sopenharmony_ci dist = os.path.join(root_dir, base_dir) 13887db96d56Sopenharmony_ci os.makedirs(dist, exist_ok=True) 13897db96d56Sopenharmony_ci write_file((dist, 'file1'), 'xxx') 13907db96d56Sopenharmony_ci write_file((dist, 'file2'), 'xxx') 13917db96d56Sopenharmony_ci os.mkdir(os.path.join(dist, 'sub')) 13927db96d56Sopenharmony_ci write_file((dist, 'sub', 'file3'), 'xxx') 13937db96d56Sopenharmony_ci os.mkdir(os.path.join(dist, 'sub2')) 13947db96d56Sopenharmony_ci if base_dir: 13957db96d56Sopenharmony_ci write_file((root_dir, 'outer'), 'xxx') 13967db96d56Sopenharmony_ci return root_dir, base_dir 13977db96d56Sopenharmony_ci 13987db96d56Sopenharmony_ci @support.requires_zlib() 13997db96d56Sopenharmony_ci @unittest.skipUnless(shutil.which('tar'), 14007db96d56Sopenharmony_ci 'Need the tar command to run') 14017db96d56Sopenharmony_ci def test_tarfile_vs_tar(self): 14027db96d56Sopenharmony_ci root_dir, base_dir = self._create_files() 14037db96d56Sopenharmony_ci base_name = os.path.join(self.mkdtemp(), 'archive') 14047db96d56Sopenharmony_ci with no_chdir: 14057db96d56Sopenharmony_ci tarball = make_archive(base_name, 'gztar', root_dir, base_dir) 14067db96d56Sopenharmony_ci 14077db96d56Sopenharmony_ci # check if the compressed tarball was created 14087db96d56Sopenharmony_ci self.assertEqual(tarball, base_name + '.tar.gz') 14097db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(tarball)) 14107db96d56Sopenharmony_ci 14117db96d56Sopenharmony_ci # now create another tarball using `tar` 14127db96d56Sopenharmony_ci tarball2 = os.path.join(root_dir, 'archive2.tar') 14137db96d56Sopenharmony_ci tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir] 14147db96d56Sopenharmony_ci subprocess.check_call(tar_cmd, cwd=root_dir, 14157db96d56Sopenharmony_ci stdout=subprocess.DEVNULL) 14167db96d56Sopenharmony_ci 14177db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(tarball2)) 14187db96d56Sopenharmony_ci # let's compare both tarballs 14197db96d56Sopenharmony_ci self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2)) 14207db96d56Sopenharmony_ci 14217db96d56Sopenharmony_ci # trying an uncompressed one 14227db96d56Sopenharmony_ci with no_chdir: 14237db96d56Sopenharmony_ci tarball = make_archive(base_name, 'tar', root_dir, base_dir) 14247db96d56Sopenharmony_ci self.assertEqual(tarball, base_name + '.tar') 14257db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(tarball)) 14267db96d56Sopenharmony_ci 14277db96d56Sopenharmony_ci # now for a dry_run 14287db96d56Sopenharmony_ci with no_chdir: 14297db96d56Sopenharmony_ci tarball = make_archive(base_name, 'tar', root_dir, base_dir, 14307db96d56Sopenharmony_ci dry_run=True) 14317db96d56Sopenharmony_ci self.assertEqual(tarball, base_name + '.tar') 14327db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(tarball)) 14337db96d56Sopenharmony_ci 14347db96d56Sopenharmony_ci @support.requires_zlib() 14357db96d56Sopenharmony_ci def test_make_zipfile(self): 14367db96d56Sopenharmony_ci # creating something to zip 14377db96d56Sopenharmony_ci root_dir, base_dir = self._create_files() 14387db96d56Sopenharmony_ci 14397db96d56Sopenharmony_ci tmpdir2 = self.mkdtemp() 14407db96d56Sopenharmony_ci # force shutil to create the directory 14417db96d56Sopenharmony_ci os.rmdir(tmpdir2) 14427db96d56Sopenharmony_ci # working with relative paths 14437db96d56Sopenharmony_ci work_dir = os.path.dirname(tmpdir2) 14447db96d56Sopenharmony_ci rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive') 14457db96d56Sopenharmony_ci 14467db96d56Sopenharmony_ci with os_helper.change_cwd(work_dir), no_chdir: 14477db96d56Sopenharmony_ci base_name = os.path.abspath(rel_base_name) 14487db96d56Sopenharmony_ci res = make_archive(rel_base_name, 'zip', root_dir) 14497db96d56Sopenharmony_ci 14507db96d56Sopenharmony_ci self.assertEqual(res, base_name + '.zip') 14517db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(res)) 14527db96d56Sopenharmony_ci self.assertTrue(zipfile.is_zipfile(res)) 14537db96d56Sopenharmony_ci with zipfile.ZipFile(res) as zf: 14547db96d56Sopenharmony_ci self.assertCountEqual(zf.namelist(), 14557db96d56Sopenharmony_ci ['dist/', 'dist/sub/', 'dist/sub2/', 14567db96d56Sopenharmony_ci 'dist/file1', 'dist/file2', 'dist/sub/file3', 14577db96d56Sopenharmony_ci 'outer']) 14587db96d56Sopenharmony_ci 14597db96d56Sopenharmony_ci with os_helper.change_cwd(work_dir), no_chdir: 14607db96d56Sopenharmony_ci base_name = os.path.abspath(rel_base_name) 14617db96d56Sopenharmony_ci res = make_archive(rel_base_name, 'zip', root_dir, base_dir) 14627db96d56Sopenharmony_ci 14637db96d56Sopenharmony_ci self.assertEqual(res, base_name + '.zip') 14647db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(res)) 14657db96d56Sopenharmony_ci self.assertTrue(zipfile.is_zipfile(res)) 14667db96d56Sopenharmony_ci with zipfile.ZipFile(res) as zf: 14677db96d56Sopenharmony_ci self.assertCountEqual(zf.namelist(), 14687db96d56Sopenharmony_ci ['dist/', 'dist/sub/', 'dist/sub2/', 14697db96d56Sopenharmony_ci 'dist/file1', 'dist/file2', 'dist/sub/file3']) 14707db96d56Sopenharmony_ci 14717db96d56Sopenharmony_ci @support.requires_zlib() 14727db96d56Sopenharmony_ci @unittest.skipUnless(shutil.which('zip'), 14737db96d56Sopenharmony_ci 'Need the zip command to run') 14747db96d56Sopenharmony_ci def test_zipfile_vs_zip(self): 14757db96d56Sopenharmony_ci root_dir, base_dir = self._create_files() 14767db96d56Sopenharmony_ci base_name = os.path.join(self.mkdtemp(), 'archive') 14777db96d56Sopenharmony_ci with no_chdir: 14787db96d56Sopenharmony_ci archive = make_archive(base_name, 'zip', root_dir, base_dir) 14797db96d56Sopenharmony_ci 14807db96d56Sopenharmony_ci # check if ZIP file was created 14817db96d56Sopenharmony_ci self.assertEqual(archive, base_name + '.zip') 14827db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(archive)) 14837db96d56Sopenharmony_ci 14847db96d56Sopenharmony_ci # now create another ZIP file using `zip` 14857db96d56Sopenharmony_ci archive2 = os.path.join(root_dir, 'archive2.zip') 14867db96d56Sopenharmony_ci zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir] 14877db96d56Sopenharmony_ci subprocess.check_call(zip_cmd, cwd=root_dir, 14887db96d56Sopenharmony_ci stdout=subprocess.DEVNULL) 14897db96d56Sopenharmony_ci 14907db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(archive2)) 14917db96d56Sopenharmony_ci # let's compare both ZIP files 14927db96d56Sopenharmony_ci with zipfile.ZipFile(archive) as zf: 14937db96d56Sopenharmony_ci names = zf.namelist() 14947db96d56Sopenharmony_ci with zipfile.ZipFile(archive2) as zf: 14957db96d56Sopenharmony_ci names2 = zf.namelist() 14967db96d56Sopenharmony_ci self.assertEqual(sorted(names), sorted(names2)) 14977db96d56Sopenharmony_ci 14987db96d56Sopenharmony_ci @support.requires_zlib() 14997db96d56Sopenharmony_ci @unittest.skipUnless(shutil.which('unzip'), 15007db96d56Sopenharmony_ci 'Need the unzip command to run') 15017db96d56Sopenharmony_ci def test_unzip_zipfile(self): 15027db96d56Sopenharmony_ci root_dir, base_dir = self._create_files() 15037db96d56Sopenharmony_ci base_name = os.path.join(self.mkdtemp(), 'archive') 15047db96d56Sopenharmony_ci with no_chdir: 15057db96d56Sopenharmony_ci archive = make_archive(base_name, 'zip', root_dir, base_dir) 15067db96d56Sopenharmony_ci 15077db96d56Sopenharmony_ci # check if ZIP file was created 15087db96d56Sopenharmony_ci self.assertEqual(archive, base_name + '.zip') 15097db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(archive)) 15107db96d56Sopenharmony_ci 15117db96d56Sopenharmony_ci # now check the ZIP file using `unzip -t` 15127db96d56Sopenharmony_ci zip_cmd = ['unzip', '-t', archive] 15137db96d56Sopenharmony_ci with os_helper.change_cwd(root_dir): 15147db96d56Sopenharmony_ci try: 15157db96d56Sopenharmony_ci subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT) 15167db96d56Sopenharmony_ci except subprocess.CalledProcessError as exc: 15177db96d56Sopenharmony_ci details = exc.output.decode(errors="replace") 15187db96d56Sopenharmony_ci if 'unrecognized option: t' in details: 15197db96d56Sopenharmony_ci self.skipTest("unzip doesn't support -t") 15207db96d56Sopenharmony_ci msg = "{}\n\n**Unzip Output**\n{}" 15217db96d56Sopenharmony_ci self.fail(msg.format(exc, details)) 15227db96d56Sopenharmony_ci 15237db96d56Sopenharmony_ci def test_make_archive(self): 15247db96d56Sopenharmony_ci tmpdir = self.mkdtemp() 15257db96d56Sopenharmony_ci base_name = os.path.join(tmpdir, 'archive') 15267db96d56Sopenharmony_ci self.assertRaises(ValueError, make_archive, base_name, 'xxx') 15277db96d56Sopenharmony_ci 15287db96d56Sopenharmony_ci @support.requires_zlib() 15297db96d56Sopenharmony_ci def test_make_archive_owner_group(self): 15307db96d56Sopenharmony_ci # testing make_archive with owner and group, with various combinations 15317db96d56Sopenharmony_ci # this works even if there's not gid/uid support 15327db96d56Sopenharmony_ci if UID_GID_SUPPORT: 15337db96d56Sopenharmony_ci group = grp.getgrgid(0)[0] 15347db96d56Sopenharmony_ci owner = pwd.getpwuid(0)[0] 15357db96d56Sopenharmony_ci else: 15367db96d56Sopenharmony_ci group = owner = 'root' 15377db96d56Sopenharmony_ci 15387db96d56Sopenharmony_ci root_dir, base_dir = self._create_files() 15397db96d56Sopenharmony_ci base_name = os.path.join(self.mkdtemp(), 'archive') 15407db96d56Sopenharmony_ci res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, 15417db96d56Sopenharmony_ci group=group) 15427db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(res)) 15437db96d56Sopenharmony_ci 15447db96d56Sopenharmony_ci res = make_archive(base_name, 'zip', root_dir, base_dir) 15457db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(res)) 15467db96d56Sopenharmony_ci 15477db96d56Sopenharmony_ci res = make_archive(base_name, 'tar', root_dir, base_dir, 15487db96d56Sopenharmony_ci owner=owner, group=group) 15497db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(res)) 15507db96d56Sopenharmony_ci 15517db96d56Sopenharmony_ci res = make_archive(base_name, 'tar', root_dir, base_dir, 15527db96d56Sopenharmony_ci owner='kjhkjhkjg', group='oihohoh') 15537db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(res)) 15547db96d56Sopenharmony_ci 15557db96d56Sopenharmony_ci 15567db96d56Sopenharmony_ci @support.requires_zlib() 15577db96d56Sopenharmony_ci @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 15587db96d56Sopenharmony_ci def test_tarfile_root_owner(self): 15597db96d56Sopenharmony_ci root_dir, base_dir = self._create_files() 15607db96d56Sopenharmony_ci base_name = os.path.join(self.mkdtemp(), 'archive') 15617db96d56Sopenharmony_ci group = grp.getgrgid(0)[0] 15627db96d56Sopenharmony_ci owner = pwd.getpwuid(0)[0] 15637db96d56Sopenharmony_ci with os_helper.change_cwd(root_dir), no_chdir: 15647db96d56Sopenharmony_ci archive_name = make_archive(base_name, 'gztar', root_dir, 'dist', 15657db96d56Sopenharmony_ci owner=owner, group=group) 15667db96d56Sopenharmony_ci 15677db96d56Sopenharmony_ci # check if the compressed tarball was created 15687db96d56Sopenharmony_ci self.assertTrue(os.path.isfile(archive_name)) 15697db96d56Sopenharmony_ci 15707db96d56Sopenharmony_ci # now checks the rights 15717db96d56Sopenharmony_ci archive = tarfile.open(archive_name) 15727db96d56Sopenharmony_ci try: 15737db96d56Sopenharmony_ci for member in archive.getmembers(): 15747db96d56Sopenharmony_ci self.assertEqual(member.uid, 0) 15757db96d56Sopenharmony_ci self.assertEqual(member.gid, 0) 15767db96d56Sopenharmony_ci finally: 15777db96d56Sopenharmony_ci archive.close() 15787db96d56Sopenharmony_ci 15797db96d56Sopenharmony_ci def test_make_archive_cwd(self): 15807db96d56Sopenharmony_ci current_dir = os.getcwd() 15817db96d56Sopenharmony_ci root_dir = self.mkdtemp() 15827db96d56Sopenharmony_ci def _breaks(*args, **kw): 15837db96d56Sopenharmony_ci raise RuntimeError() 15847db96d56Sopenharmony_ci dirs = [] 15857db96d56Sopenharmony_ci def _chdir(path): 15867db96d56Sopenharmony_ci dirs.append(path) 15877db96d56Sopenharmony_ci orig_chdir(path) 15887db96d56Sopenharmony_ci 15897db96d56Sopenharmony_ci register_archive_format('xxx', _breaks, [], 'xxx file') 15907db96d56Sopenharmony_ci try: 15917db96d56Sopenharmony_ci with support.swap_attr(os, 'chdir', _chdir) as orig_chdir: 15927db96d56Sopenharmony_ci try: 15937db96d56Sopenharmony_ci make_archive('xxx', 'xxx', root_dir=root_dir) 15947db96d56Sopenharmony_ci except Exception: 15957db96d56Sopenharmony_ci pass 15967db96d56Sopenharmony_ci self.assertEqual(os.getcwd(), current_dir) 15977db96d56Sopenharmony_ci self.assertEqual(dirs, [root_dir, current_dir]) 15987db96d56Sopenharmony_ci finally: 15997db96d56Sopenharmony_ci unregister_archive_format('xxx') 16007db96d56Sopenharmony_ci 16017db96d56Sopenharmony_ci def test_make_tarfile_in_curdir(self): 16027db96d56Sopenharmony_ci # Issue #21280 16037db96d56Sopenharmony_ci root_dir = self.mkdtemp() 16047db96d56Sopenharmony_ci with os_helper.change_cwd(root_dir), no_chdir: 16057db96d56Sopenharmony_ci self.assertEqual(make_archive('test', 'tar'), 'test.tar') 16067db96d56Sopenharmony_ci self.assertTrue(os.path.isfile('test.tar')) 16077db96d56Sopenharmony_ci 16087db96d56Sopenharmony_ci @support.requires_zlib() 16097db96d56Sopenharmony_ci def test_make_zipfile_in_curdir(self): 16107db96d56Sopenharmony_ci # Issue #21280 16117db96d56Sopenharmony_ci root_dir = self.mkdtemp() 16127db96d56Sopenharmony_ci with os_helper.change_cwd(root_dir), no_chdir: 16137db96d56Sopenharmony_ci self.assertEqual(make_archive('test', 'zip'), 'test.zip') 16147db96d56Sopenharmony_ci self.assertTrue(os.path.isfile('test.zip')) 16157db96d56Sopenharmony_ci 16167db96d56Sopenharmony_ci def test_register_archive_format(self): 16177db96d56Sopenharmony_ci 16187db96d56Sopenharmony_ci self.assertRaises(TypeError, register_archive_format, 'xxx', 1) 16197db96d56Sopenharmony_ci self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 16207db96d56Sopenharmony_ci 1) 16217db96d56Sopenharmony_ci self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x, 16227db96d56Sopenharmony_ci [(1, 2), (1, 2, 3)]) 16237db96d56Sopenharmony_ci 16247db96d56Sopenharmony_ci register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file') 16257db96d56Sopenharmony_ci formats = [name for name, params in get_archive_formats()] 16267db96d56Sopenharmony_ci self.assertIn('xxx', formats) 16277db96d56Sopenharmony_ci 16287db96d56Sopenharmony_ci unregister_archive_format('xxx') 16297db96d56Sopenharmony_ci formats = [name for name, params in get_archive_formats()] 16307db96d56Sopenharmony_ci self.assertNotIn('xxx', formats) 16317db96d56Sopenharmony_ci 16327db96d56Sopenharmony_ci ### shutil.unpack_archive 16337db96d56Sopenharmony_ci 16347db96d56Sopenharmony_ci def check_unpack_archive(self, format, **kwargs): 16357db96d56Sopenharmony_ci self.check_unpack_archive_with_converter( 16367db96d56Sopenharmony_ci format, lambda path: path, **kwargs) 16377db96d56Sopenharmony_ci self.check_unpack_archive_with_converter( 16387db96d56Sopenharmony_ci format, pathlib.Path, **kwargs) 16397db96d56Sopenharmony_ci self.check_unpack_archive_with_converter(format, FakePath, **kwargs) 16407db96d56Sopenharmony_ci 16417db96d56Sopenharmony_ci def check_unpack_archive_with_converter(self, format, converter, **kwargs): 16427db96d56Sopenharmony_ci root_dir, base_dir = self._create_files() 16437db96d56Sopenharmony_ci expected = rlistdir(root_dir) 16447db96d56Sopenharmony_ci expected.remove('outer') 16457db96d56Sopenharmony_ci 16467db96d56Sopenharmony_ci base_name = os.path.join(self.mkdtemp(), 'archive') 16477db96d56Sopenharmony_ci filename = make_archive(base_name, format, root_dir, base_dir) 16487db96d56Sopenharmony_ci 16497db96d56Sopenharmony_ci # let's try to unpack it now 16507db96d56Sopenharmony_ci tmpdir2 = self.mkdtemp() 16517db96d56Sopenharmony_ci unpack_archive(converter(filename), converter(tmpdir2), **kwargs) 16527db96d56Sopenharmony_ci self.assertEqual(rlistdir(tmpdir2), expected) 16537db96d56Sopenharmony_ci 16547db96d56Sopenharmony_ci # and again, this time with the format specified 16557db96d56Sopenharmony_ci tmpdir3 = self.mkdtemp() 16567db96d56Sopenharmony_ci unpack_archive(converter(filename), converter(tmpdir3), format=format, 16577db96d56Sopenharmony_ci **kwargs) 16587db96d56Sopenharmony_ci self.assertEqual(rlistdir(tmpdir3), expected) 16597db96d56Sopenharmony_ci 16607db96d56Sopenharmony_ci with self.assertRaises(shutil.ReadError): 16617db96d56Sopenharmony_ci unpack_archive(converter(TESTFN), **kwargs) 16627db96d56Sopenharmony_ci with self.assertRaises(ValueError): 16637db96d56Sopenharmony_ci unpack_archive(converter(TESTFN), format='xxx', **kwargs) 16647db96d56Sopenharmony_ci 16657db96d56Sopenharmony_ci def check_unpack_tarball(self, format): 16667db96d56Sopenharmony_ci self.check_unpack_archive(format, filter='fully_trusted') 16677db96d56Sopenharmony_ci self.check_unpack_archive(format, filter='data') 16687db96d56Sopenharmony_ci with warnings_helper.check_no_warnings(self): 16697db96d56Sopenharmony_ci self.check_unpack_archive(format) 16707db96d56Sopenharmony_ci 16717db96d56Sopenharmony_ci def test_unpack_archive_tar(self): 16727db96d56Sopenharmony_ci self.check_unpack_tarball('tar') 16737db96d56Sopenharmony_ci 16747db96d56Sopenharmony_ci @support.requires_zlib() 16757db96d56Sopenharmony_ci def test_unpack_archive_gztar(self): 16767db96d56Sopenharmony_ci self.check_unpack_tarball('gztar') 16777db96d56Sopenharmony_ci 16787db96d56Sopenharmony_ci @support.requires_bz2() 16797db96d56Sopenharmony_ci def test_unpack_archive_bztar(self): 16807db96d56Sopenharmony_ci self.check_unpack_tarball('bztar') 16817db96d56Sopenharmony_ci 16827db96d56Sopenharmony_ci @support.requires_lzma() 16837db96d56Sopenharmony_ci @unittest.skipIf(AIX and not _maxdataOK(), "AIX MAXDATA must be 0x20000000 or larger") 16847db96d56Sopenharmony_ci def test_unpack_archive_xztar(self): 16857db96d56Sopenharmony_ci self.check_unpack_tarball('xztar') 16867db96d56Sopenharmony_ci 16877db96d56Sopenharmony_ci @support.requires_zlib() 16887db96d56Sopenharmony_ci def test_unpack_archive_zip(self): 16897db96d56Sopenharmony_ci self.check_unpack_archive('zip') 16907db96d56Sopenharmony_ci with self.assertRaises(TypeError): 16917db96d56Sopenharmony_ci self.check_unpack_archive('zip', filter='data') 16927db96d56Sopenharmony_ci 16937db96d56Sopenharmony_ci def test_unpack_registry(self): 16947db96d56Sopenharmony_ci 16957db96d56Sopenharmony_ci formats = get_unpack_formats() 16967db96d56Sopenharmony_ci 16977db96d56Sopenharmony_ci def _boo(filename, extract_dir, extra): 16987db96d56Sopenharmony_ci self.assertEqual(extra, 1) 16997db96d56Sopenharmony_ci self.assertEqual(filename, 'stuff.boo') 17007db96d56Sopenharmony_ci self.assertEqual(extract_dir, 'xx') 17017db96d56Sopenharmony_ci 17027db96d56Sopenharmony_ci register_unpack_format('Boo', ['.boo', '.b2'], _boo, [('extra', 1)]) 17037db96d56Sopenharmony_ci unpack_archive('stuff.boo', 'xx') 17047db96d56Sopenharmony_ci 17057db96d56Sopenharmony_ci # trying to register a .boo unpacker again 17067db96d56Sopenharmony_ci self.assertRaises(RegistryError, register_unpack_format, 'Boo2', 17077db96d56Sopenharmony_ci ['.boo'], _boo) 17087db96d56Sopenharmony_ci 17097db96d56Sopenharmony_ci # should work now 17107db96d56Sopenharmony_ci unregister_unpack_format('Boo') 17117db96d56Sopenharmony_ci register_unpack_format('Boo2', ['.boo'], _boo) 17127db96d56Sopenharmony_ci self.assertIn(('Boo2', ['.boo'], ''), get_unpack_formats()) 17137db96d56Sopenharmony_ci self.assertNotIn(('Boo', ['.boo'], ''), get_unpack_formats()) 17147db96d56Sopenharmony_ci 17157db96d56Sopenharmony_ci # let's leave a clean state 17167db96d56Sopenharmony_ci unregister_unpack_format('Boo2') 17177db96d56Sopenharmony_ci self.assertEqual(get_unpack_formats(), formats) 17187db96d56Sopenharmony_ci 17197db96d56Sopenharmony_ci 17207db96d56Sopenharmony_ciclass TestMisc(BaseTest, unittest.TestCase): 17217db96d56Sopenharmony_ci 17227db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(shutil, 'disk_usage'), 17237db96d56Sopenharmony_ci "disk_usage not available on this platform") 17247db96d56Sopenharmony_ci def test_disk_usage(self): 17257db96d56Sopenharmony_ci usage = shutil.disk_usage(os.path.dirname(__file__)) 17267db96d56Sopenharmony_ci for attr in ('total', 'used', 'free'): 17277db96d56Sopenharmony_ci self.assertIsInstance(getattr(usage, attr), int) 17287db96d56Sopenharmony_ci self.assertGreater(usage.total, 0) 17297db96d56Sopenharmony_ci self.assertGreater(usage.used, 0) 17307db96d56Sopenharmony_ci self.assertGreaterEqual(usage.free, 0) 17317db96d56Sopenharmony_ci self.assertGreaterEqual(usage.total, usage.used) 17327db96d56Sopenharmony_ci self.assertGreater(usage.total, usage.free) 17337db96d56Sopenharmony_ci 17347db96d56Sopenharmony_ci # bpo-32557: Check that disk_usage() also accepts a filename 17357db96d56Sopenharmony_ci shutil.disk_usage(__file__) 17367db96d56Sopenharmony_ci 17377db96d56Sopenharmony_ci @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support") 17387db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'chown'), 'requires os.chown') 17397db96d56Sopenharmony_ci def test_chown(self): 17407db96d56Sopenharmony_ci dirname = self.mkdtemp() 17417db96d56Sopenharmony_ci filename = tempfile.mktemp(dir=dirname) 17427db96d56Sopenharmony_ci write_file(filename, 'testing chown function') 17437db96d56Sopenharmony_ci 17447db96d56Sopenharmony_ci with self.assertRaises(ValueError): 17457db96d56Sopenharmony_ci shutil.chown(filename) 17467db96d56Sopenharmony_ci 17477db96d56Sopenharmony_ci with self.assertRaises(LookupError): 17487db96d56Sopenharmony_ci shutil.chown(filename, user='non-existing username') 17497db96d56Sopenharmony_ci 17507db96d56Sopenharmony_ci with self.assertRaises(LookupError): 17517db96d56Sopenharmony_ci shutil.chown(filename, group='non-existing groupname') 17527db96d56Sopenharmony_ci 17537db96d56Sopenharmony_ci with self.assertRaises(TypeError): 17547db96d56Sopenharmony_ci shutil.chown(filename, b'spam') 17557db96d56Sopenharmony_ci 17567db96d56Sopenharmony_ci with self.assertRaises(TypeError): 17577db96d56Sopenharmony_ci shutil.chown(filename, 3.14) 17587db96d56Sopenharmony_ci 17597db96d56Sopenharmony_ci uid = os.getuid() 17607db96d56Sopenharmony_ci gid = os.getgid() 17617db96d56Sopenharmony_ci 17627db96d56Sopenharmony_ci def check_chown(path, uid=None, gid=None): 17637db96d56Sopenharmony_ci s = os.stat(filename) 17647db96d56Sopenharmony_ci if uid is not None: 17657db96d56Sopenharmony_ci self.assertEqual(uid, s.st_uid) 17667db96d56Sopenharmony_ci if gid is not None: 17677db96d56Sopenharmony_ci self.assertEqual(gid, s.st_gid) 17687db96d56Sopenharmony_ci 17697db96d56Sopenharmony_ci shutil.chown(filename, uid, gid) 17707db96d56Sopenharmony_ci check_chown(filename, uid, gid) 17717db96d56Sopenharmony_ci shutil.chown(filename, uid) 17727db96d56Sopenharmony_ci check_chown(filename, uid) 17737db96d56Sopenharmony_ci shutil.chown(filename, user=uid) 17747db96d56Sopenharmony_ci check_chown(filename, uid) 17757db96d56Sopenharmony_ci shutil.chown(filename, group=gid) 17767db96d56Sopenharmony_ci check_chown(filename, gid=gid) 17777db96d56Sopenharmony_ci 17787db96d56Sopenharmony_ci shutil.chown(dirname, uid, gid) 17797db96d56Sopenharmony_ci check_chown(dirname, uid, gid) 17807db96d56Sopenharmony_ci shutil.chown(dirname, uid) 17817db96d56Sopenharmony_ci check_chown(dirname, uid) 17827db96d56Sopenharmony_ci shutil.chown(dirname, user=uid) 17837db96d56Sopenharmony_ci check_chown(dirname, uid) 17847db96d56Sopenharmony_ci shutil.chown(dirname, group=gid) 17857db96d56Sopenharmony_ci check_chown(dirname, gid=gid) 17867db96d56Sopenharmony_ci 17877db96d56Sopenharmony_ci try: 17887db96d56Sopenharmony_ci user = pwd.getpwuid(uid)[0] 17897db96d56Sopenharmony_ci group = grp.getgrgid(gid)[0] 17907db96d56Sopenharmony_ci except KeyError: 17917db96d56Sopenharmony_ci # On some systems uid/gid cannot be resolved. 17927db96d56Sopenharmony_ci pass 17937db96d56Sopenharmony_ci else: 17947db96d56Sopenharmony_ci shutil.chown(filename, user, group) 17957db96d56Sopenharmony_ci check_chown(filename, uid, gid) 17967db96d56Sopenharmony_ci shutil.chown(dirname, user, group) 17977db96d56Sopenharmony_ci check_chown(dirname, uid, gid) 17987db96d56Sopenharmony_ci 17997db96d56Sopenharmony_ci 18007db96d56Sopenharmony_ciclass TestWhich(BaseTest, unittest.TestCase): 18017db96d56Sopenharmony_ci 18027db96d56Sopenharmony_ci def setUp(self): 18037db96d56Sopenharmony_ci self.temp_dir = self.mkdtemp(prefix="Tmp") 18047db96d56Sopenharmony_ci # Give the temp_file an ".exe" suffix for all. 18057db96d56Sopenharmony_ci # It's needed on Windows and not harmful on other platforms. 18067db96d56Sopenharmony_ci self.temp_file = tempfile.NamedTemporaryFile(dir=self.temp_dir, 18077db96d56Sopenharmony_ci prefix="Tmp", 18087db96d56Sopenharmony_ci suffix=".Exe") 18097db96d56Sopenharmony_ci os.chmod(self.temp_file.name, stat.S_IXUSR) 18107db96d56Sopenharmony_ci self.addCleanup(self.temp_file.close) 18117db96d56Sopenharmony_ci self.dir, self.file = os.path.split(self.temp_file.name) 18127db96d56Sopenharmony_ci self.env_path = self.dir 18137db96d56Sopenharmony_ci self.curdir = os.curdir 18147db96d56Sopenharmony_ci self.ext = ".EXE" 18157db96d56Sopenharmony_ci 18167db96d56Sopenharmony_ci def test_basic(self): 18177db96d56Sopenharmony_ci # Given an EXE in a directory, it should be returned. 18187db96d56Sopenharmony_ci rv = shutil.which(self.file, path=self.dir) 18197db96d56Sopenharmony_ci self.assertEqual(rv, self.temp_file.name) 18207db96d56Sopenharmony_ci 18217db96d56Sopenharmony_ci def test_absolute_cmd(self): 18227db96d56Sopenharmony_ci # When given the fully qualified path to an executable that exists, 18237db96d56Sopenharmony_ci # it should be returned. 18247db96d56Sopenharmony_ci rv = shutil.which(self.temp_file.name, path=self.temp_dir) 18257db96d56Sopenharmony_ci self.assertEqual(rv, self.temp_file.name) 18267db96d56Sopenharmony_ci 18277db96d56Sopenharmony_ci def test_relative_cmd(self): 18287db96d56Sopenharmony_ci # When given the relative path with a directory part to an executable 18297db96d56Sopenharmony_ci # that exists, it should be returned. 18307db96d56Sopenharmony_ci base_dir, tail_dir = os.path.split(self.dir) 18317db96d56Sopenharmony_ci relpath = os.path.join(tail_dir, self.file) 18327db96d56Sopenharmony_ci with os_helper.change_cwd(path=base_dir): 18337db96d56Sopenharmony_ci rv = shutil.which(relpath, path=self.temp_dir) 18347db96d56Sopenharmony_ci self.assertEqual(rv, relpath) 18357db96d56Sopenharmony_ci # But it shouldn't be searched in PATH directories (issue #16957). 18367db96d56Sopenharmony_ci with os_helper.change_cwd(path=self.dir): 18377db96d56Sopenharmony_ci rv = shutil.which(relpath, path=base_dir) 18387db96d56Sopenharmony_ci self.assertIsNone(rv) 18397db96d56Sopenharmony_ci 18407db96d56Sopenharmony_ci def test_cwd(self): 18417db96d56Sopenharmony_ci # Issue #16957 18427db96d56Sopenharmony_ci base_dir = os.path.dirname(self.dir) 18437db96d56Sopenharmony_ci with os_helper.change_cwd(path=self.dir): 18447db96d56Sopenharmony_ci rv = shutil.which(self.file, path=base_dir) 18457db96d56Sopenharmony_ci if sys.platform == "win32": 18467db96d56Sopenharmony_ci # Windows: current directory implicitly on PATH 18477db96d56Sopenharmony_ci self.assertEqual(rv, os.path.join(self.curdir, self.file)) 18487db96d56Sopenharmony_ci else: 18497db96d56Sopenharmony_ci # Other platforms: shouldn't match in the current directory. 18507db96d56Sopenharmony_ci self.assertIsNone(rv) 18517db96d56Sopenharmony_ci 18527db96d56Sopenharmony_ci @os_helper.skip_if_dac_override 18537db96d56Sopenharmony_ci def test_non_matching_mode(self): 18547db96d56Sopenharmony_ci # Set the file read-only and ask for writeable files. 18557db96d56Sopenharmony_ci os.chmod(self.temp_file.name, stat.S_IREAD) 18567db96d56Sopenharmony_ci if os.access(self.temp_file.name, os.W_OK): 18577db96d56Sopenharmony_ci self.skipTest("can't set the file read-only") 18587db96d56Sopenharmony_ci rv = shutil.which(self.file, path=self.dir, mode=os.W_OK) 18597db96d56Sopenharmony_ci self.assertIsNone(rv) 18607db96d56Sopenharmony_ci 18617db96d56Sopenharmony_ci def test_relative_path(self): 18627db96d56Sopenharmony_ci base_dir, tail_dir = os.path.split(self.dir) 18637db96d56Sopenharmony_ci with os_helper.change_cwd(path=base_dir): 18647db96d56Sopenharmony_ci rv = shutil.which(self.file, path=tail_dir) 18657db96d56Sopenharmony_ci self.assertEqual(rv, os.path.join(tail_dir, self.file)) 18667db96d56Sopenharmony_ci 18677db96d56Sopenharmony_ci def test_nonexistent_file(self): 18687db96d56Sopenharmony_ci # Return None when no matching executable file is found on the path. 18697db96d56Sopenharmony_ci rv = shutil.which("foo.exe", path=self.dir) 18707db96d56Sopenharmony_ci self.assertIsNone(rv) 18717db96d56Sopenharmony_ci 18727db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform == "win32", 18737db96d56Sopenharmony_ci "pathext check is Windows-only") 18747db96d56Sopenharmony_ci def test_pathext_checking(self): 18757db96d56Sopenharmony_ci # Ask for the file without the ".exe" extension, then ensure that 18767db96d56Sopenharmony_ci # it gets found properly with the extension. 18777db96d56Sopenharmony_ci rv = shutil.which(self.file[:-4], path=self.dir) 18787db96d56Sopenharmony_ci self.assertEqual(rv, self.temp_file.name[:-4] + self.ext) 18797db96d56Sopenharmony_ci 18807db96d56Sopenharmony_ci def test_environ_path(self): 18817db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 18827db96d56Sopenharmony_ci env['PATH'] = self.env_path 18837db96d56Sopenharmony_ci rv = shutil.which(self.file) 18847db96d56Sopenharmony_ci self.assertEqual(rv, self.temp_file.name) 18857db96d56Sopenharmony_ci 18867db96d56Sopenharmony_ci def test_environ_path_empty(self): 18877db96d56Sopenharmony_ci # PATH='': no match 18887db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 18897db96d56Sopenharmony_ci env['PATH'] = '' 18907db96d56Sopenharmony_ci with unittest.mock.patch('os.confstr', return_value=self.dir, \ 18917db96d56Sopenharmony_ci create=True), \ 18927db96d56Sopenharmony_ci support.swap_attr(os, 'defpath', self.dir), \ 18937db96d56Sopenharmony_ci os_helper.change_cwd(self.dir): 18947db96d56Sopenharmony_ci rv = shutil.which(self.file) 18957db96d56Sopenharmony_ci self.assertIsNone(rv) 18967db96d56Sopenharmony_ci 18977db96d56Sopenharmony_ci def test_environ_path_cwd(self): 18987db96d56Sopenharmony_ci expected_cwd = os.path.basename(self.temp_file.name) 18997db96d56Sopenharmony_ci if sys.platform == "win32": 19007db96d56Sopenharmony_ci curdir = os.curdir 19017db96d56Sopenharmony_ci if isinstance(expected_cwd, bytes): 19027db96d56Sopenharmony_ci curdir = os.fsencode(curdir) 19037db96d56Sopenharmony_ci expected_cwd = os.path.join(curdir, expected_cwd) 19047db96d56Sopenharmony_ci 19057db96d56Sopenharmony_ci # PATH=':': explicitly looks in the current directory 19067db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 19077db96d56Sopenharmony_ci env['PATH'] = os.pathsep 19087db96d56Sopenharmony_ci with unittest.mock.patch('os.confstr', return_value=self.dir, \ 19097db96d56Sopenharmony_ci create=True), \ 19107db96d56Sopenharmony_ci support.swap_attr(os, 'defpath', self.dir): 19117db96d56Sopenharmony_ci rv = shutil.which(self.file) 19127db96d56Sopenharmony_ci self.assertIsNone(rv) 19137db96d56Sopenharmony_ci 19147db96d56Sopenharmony_ci # look in current directory 19157db96d56Sopenharmony_ci with os_helper.change_cwd(self.dir): 19167db96d56Sopenharmony_ci rv = shutil.which(self.file) 19177db96d56Sopenharmony_ci self.assertEqual(rv, expected_cwd) 19187db96d56Sopenharmony_ci 19197db96d56Sopenharmony_ci def test_environ_path_missing(self): 19207db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 19217db96d56Sopenharmony_ci env.pop('PATH', None) 19227db96d56Sopenharmony_ci 19237db96d56Sopenharmony_ci # without confstr 19247db96d56Sopenharmony_ci with unittest.mock.patch('os.confstr', side_effect=ValueError, \ 19257db96d56Sopenharmony_ci create=True), \ 19267db96d56Sopenharmony_ci support.swap_attr(os, 'defpath', self.dir): 19277db96d56Sopenharmony_ci rv = shutil.which(self.file) 19287db96d56Sopenharmony_ci self.assertEqual(rv, self.temp_file.name) 19297db96d56Sopenharmony_ci 19307db96d56Sopenharmony_ci # with confstr 19317db96d56Sopenharmony_ci with unittest.mock.patch('os.confstr', return_value=self.dir, \ 19327db96d56Sopenharmony_ci create=True), \ 19337db96d56Sopenharmony_ci support.swap_attr(os, 'defpath', ''): 19347db96d56Sopenharmony_ci rv = shutil.which(self.file) 19357db96d56Sopenharmony_ci self.assertEqual(rv, self.temp_file.name) 19367db96d56Sopenharmony_ci 19377db96d56Sopenharmony_ci def test_empty_path(self): 19387db96d56Sopenharmony_ci base_dir = os.path.dirname(self.dir) 19397db96d56Sopenharmony_ci with os_helper.change_cwd(path=self.dir), \ 19407db96d56Sopenharmony_ci os_helper.EnvironmentVarGuard() as env: 19417db96d56Sopenharmony_ci env['PATH'] = self.env_path 19427db96d56Sopenharmony_ci rv = shutil.which(self.file, path='') 19437db96d56Sopenharmony_ci self.assertIsNone(rv) 19447db96d56Sopenharmony_ci 19457db96d56Sopenharmony_ci def test_empty_path_no_PATH(self): 19467db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 19477db96d56Sopenharmony_ci env.pop('PATH', None) 19487db96d56Sopenharmony_ci rv = shutil.which(self.file) 19497db96d56Sopenharmony_ci self.assertIsNone(rv) 19507db96d56Sopenharmony_ci 19517db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 19527db96d56Sopenharmony_ci def test_pathext(self): 19537db96d56Sopenharmony_ci ext = ".xyz" 19547db96d56Sopenharmony_ci temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 19557db96d56Sopenharmony_ci prefix="Tmp2", suffix=ext) 19567db96d56Sopenharmony_ci os.chmod(temp_filexyz.name, stat.S_IXUSR) 19577db96d56Sopenharmony_ci self.addCleanup(temp_filexyz.close) 19587db96d56Sopenharmony_ci 19597db96d56Sopenharmony_ci # strip path and extension 19607db96d56Sopenharmony_ci program = os.path.basename(temp_filexyz.name) 19617db96d56Sopenharmony_ci program = os.path.splitext(program)[0] 19627db96d56Sopenharmony_ci 19637db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 19647db96d56Sopenharmony_ci env['PATHEXT'] = ext 19657db96d56Sopenharmony_ci rv = shutil.which(program, path=self.temp_dir) 19667db96d56Sopenharmony_ci self.assertEqual(rv, temp_filexyz.name) 19677db96d56Sopenharmony_ci 19687db96d56Sopenharmony_ci # Issue 40592: See https://bugs.python.org/issue40592 19697db96d56Sopenharmony_ci @unittest.skipUnless(sys.platform == "win32", 'test specific to Windows') 19707db96d56Sopenharmony_ci def test_pathext_with_empty_str(self): 19717db96d56Sopenharmony_ci ext = ".xyz" 19727db96d56Sopenharmony_ci temp_filexyz = tempfile.NamedTemporaryFile(dir=self.temp_dir, 19737db96d56Sopenharmony_ci prefix="Tmp2", suffix=ext) 19747db96d56Sopenharmony_ci self.addCleanup(temp_filexyz.close) 19757db96d56Sopenharmony_ci 19767db96d56Sopenharmony_ci # strip path and extension 19777db96d56Sopenharmony_ci program = os.path.basename(temp_filexyz.name) 19787db96d56Sopenharmony_ci program = os.path.splitext(program)[0] 19797db96d56Sopenharmony_ci 19807db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 19817db96d56Sopenharmony_ci env['PATHEXT'] = f"{ext};" # note the ; 19827db96d56Sopenharmony_ci rv = shutil.which(program, path=self.temp_dir) 19837db96d56Sopenharmony_ci self.assertEqual(rv, temp_filexyz.name) 19847db96d56Sopenharmony_ci 19857db96d56Sopenharmony_ci 19867db96d56Sopenharmony_ciclass TestWhichBytes(TestWhich): 19877db96d56Sopenharmony_ci def setUp(self): 19887db96d56Sopenharmony_ci TestWhich.setUp(self) 19897db96d56Sopenharmony_ci self.dir = os.fsencode(self.dir) 19907db96d56Sopenharmony_ci self.file = os.fsencode(self.file) 19917db96d56Sopenharmony_ci self.temp_file.name = os.fsencode(self.temp_file.name) 19927db96d56Sopenharmony_ci self.curdir = os.fsencode(self.curdir) 19937db96d56Sopenharmony_ci self.ext = os.fsencode(self.ext) 19947db96d56Sopenharmony_ci 19957db96d56Sopenharmony_ci 19967db96d56Sopenharmony_ciclass TestMove(BaseTest, unittest.TestCase): 19977db96d56Sopenharmony_ci 19987db96d56Sopenharmony_ci def setUp(self): 19997db96d56Sopenharmony_ci filename = "foo" 20007db96d56Sopenharmony_ci self.src_dir = self.mkdtemp() 20017db96d56Sopenharmony_ci self.dst_dir = self.mkdtemp() 20027db96d56Sopenharmony_ci self.src_file = os.path.join(self.src_dir, filename) 20037db96d56Sopenharmony_ci self.dst_file = os.path.join(self.dst_dir, filename) 20047db96d56Sopenharmony_ci with open(self.src_file, "wb") as f: 20057db96d56Sopenharmony_ci f.write(b"spam") 20067db96d56Sopenharmony_ci 20077db96d56Sopenharmony_ci def _check_move_file(self, src, dst, real_dst): 20087db96d56Sopenharmony_ci with open(src, "rb") as f: 20097db96d56Sopenharmony_ci contents = f.read() 20107db96d56Sopenharmony_ci shutil.move(src, dst) 20117db96d56Sopenharmony_ci with open(real_dst, "rb") as f: 20127db96d56Sopenharmony_ci self.assertEqual(contents, f.read()) 20137db96d56Sopenharmony_ci self.assertFalse(os.path.exists(src)) 20147db96d56Sopenharmony_ci 20157db96d56Sopenharmony_ci def _check_move_dir(self, src, dst, real_dst): 20167db96d56Sopenharmony_ci contents = sorted(os.listdir(src)) 20177db96d56Sopenharmony_ci shutil.move(src, dst) 20187db96d56Sopenharmony_ci self.assertEqual(contents, sorted(os.listdir(real_dst))) 20197db96d56Sopenharmony_ci self.assertFalse(os.path.exists(src)) 20207db96d56Sopenharmony_ci 20217db96d56Sopenharmony_ci def test_move_file(self): 20227db96d56Sopenharmony_ci # Move a file to another location on the same filesystem. 20237db96d56Sopenharmony_ci self._check_move_file(self.src_file, self.dst_file, self.dst_file) 20247db96d56Sopenharmony_ci 20257db96d56Sopenharmony_ci def test_move_file_to_dir(self): 20267db96d56Sopenharmony_ci # Move a file inside an existing dir on the same filesystem. 20277db96d56Sopenharmony_ci self._check_move_file(self.src_file, self.dst_dir, self.dst_file) 20287db96d56Sopenharmony_ci 20297db96d56Sopenharmony_ci def test_move_file_to_dir_pathlike_src(self): 20307db96d56Sopenharmony_ci # Move a pathlike file to another location on the same filesystem. 20317db96d56Sopenharmony_ci src = pathlib.Path(self.src_file) 20327db96d56Sopenharmony_ci self._check_move_file(src, self.dst_dir, self.dst_file) 20337db96d56Sopenharmony_ci 20347db96d56Sopenharmony_ci def test_move_file_to_dir_pathlike_dst(self): 20357db96d56Sopenharmony_ci # Move a file to another pathlike location on the same filesystem. 20367db96d56Sopenharmony_ci dst = pathlib.Path(self.dst_dir) 20377db96d56Sopenharmony_ci self._check_move_file(self.src_file, dst, self.dst_file) 20387db96d56Sopenharmony_ci 20397db96d56Sopenharmony_ci @mock_rename 20407db96d56Sopenharmony_ci def test_move_file_other_fs(self): 20417db96d56Sopenharmony_ci # Move a file to an existing dir on another filesystem. 20427db96d56Sopenharmony_ci self.test_move_file() 20437db96d56Sopenharmony_ci 20447db96d56Sopenharmony_ci @mock_rename 20457db96d56Sopenharmony_ci def test_move_file_to_dir_other_fs(self): 20467db96d56Sopenharmony_ci # Move a file to another location on another filesystem. 20477db96d56Sopenharmony_ci self.test_move_file_to_dir() 20487db96d56Sopenharmony_ci 20497db96d56Sopenharmony_ci def test_move_dir(self): 20507db96d56Sopenharmony_ci # Move a dir to another location on the same filesystem. 20517db96d56Sopenharmony_ci dst_dir = tempfile.mktemp(dir=self.mkdtemp()) 20527db96d56Sopenharmony_ci try: 20537db96d56Sopenharmony_ci self._check_move_dir(self.src_dir, dst_dir, dst_dir) 20547db96d56Sopenharmony_ci finally: 20557db96d56Sopenharmony_ci os_helper.rmtree(dst_dir) 20567db96d56Sopenharmony_ci 20577db96d56Sopenharmony_ci @mock_rename 20587db96d56Sopenharmony_ci def test_move_dir_other_fs(self): 20597db96d56Sopenharmony_ci # Move a dir to another location on another filesystem. 20607db96d56Sopenharmony_ci self.test_move_dir() 20617db96d56Sopenharmony_ci 20627db96d56Sopenharmony_ci def test_move_dir_to_dir(self): 20637db96d56Sopenharmony_ci # Move a dir inside an existing dir on the same filesystem. 20647db96d56Sopenharmony_ci self._check_move_dir(self.src_dir, self.dst_dir, 20657db96d56Sopenharmony_ci os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 20667db96d56Sopenharmony_ci 20677db96d56Sopenharmony_ci @mock_rename 20687db96d56Sopenharmony_ci def test_move_dir_to_dir_other_fs(self): 20697db96d56Sopenharmony_ci # Move a dir inside an existing dir on another filesystem. 20707db96d56Sopenharmony_ci self.test_move_dir_to_dir() 20717db96d56Sopenharmony_ci 20727db96d56Sopenharmony_ci def test_move_dir_sep_to_dir(self): 20737db96d56Sopenharmony_ci self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir, 20747db96d56Sopenharmony_ci os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 20757db96d56Sopenharmony_ci 20767db96d56Sopenharmony_ci @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep') 20777db96d56Sopenharmony_ci def test_move_dir_altsep_to_dir(self): 20787db96d56Sopenharmony_ci self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir, 20797db96d56Sopenharmony_ci os.path.join(self.dst_dir, os.path.basename(self.src_dir))) 20807db96d56Sopenharmony_ci 20817db96d56Sopenharmony_ci def test_existing_file_inside_dest_dir(self): 20827db96d56Sopenharmony_ci # A file with the same name inside the destination dir already exists. 20837db96d56Sopenharmony_ci with open(self.dst_file, "wb"): 20847db96d56Sopenharmony_ci pass 20857db96d56Sopenharmony_ci self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir) 20867db96d56Sopenharmony_ci 20877db96d56Sopenharmony_ci def test_dont_move_dir_in_itself(self): 20887db96d56Sopenharmony_ci # Moving a dir inside itself raises an Error. 20897db96d56Sopenharmony_ci dst = os.path.join(self.src_dir, "bar") 20907db96d56Sopenharmony_ci self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst) 20917db96d56Sopenharmony_ci 20927db96d56Sopenharmony_ci def test_destinsrc_false_negative(self): 20937db96d56Sopenharmony_ci os.mkdir(TESTFN) 20947db96d56Sopenharmony_ci try: 20957db96d56Sopenharmony_ci for src, dst in [('srcdir', 'srcdir/dest')]: 20967db96d56Sopenharmony_ci src = os.path.join(TESTFN, src) 20977db96d56Sopenharmony_ci dst = os.path.join(TESTFN, dst) 20987db96d56Sopenharmony_ci self.assertTrue(shutil._destinsrc(src, dst), 20997db96d56Sopenharmony_ci msg='_destinsrc() wrongly concluded that ' 21007db96d56Sopenharmony_ci 'dst (%s) is not in src (%s)' % (dst, src)) 21017db96d56Sopenharmony_ci finally: 21027db96d56Sopenharmony_ci os_helper.rmtree(TESTFN) 21037db96d56Sopenharmony_ci 21047db96d56Sopenharmony_ci def test_destinsrc_false_positive(self): 21057db96d56Sopenharmony_ci os.mkdir(TESTFN) 21067db96d56Sopenharmony_ci try: 21077db96d56Sopenharmony_ci for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]: 21087db96d56Sopenharmony_ci src = os.path.join(TESTFN, src) 21097db96d56Sopenharmony_ci dst = os.path.join(TESTFN, dst) 21107db96d56Sopenharmony_ci self.assertFalse(shutil._destinsrc(src, dst), 21117db96d56Sopenharmony_ci msg='_destinsrc() wrongly concluded that ' 21127db96d56Sopenharmony_ci 'dst (%s) is in src (%s)' % (dst, src)) 21137db96d56Sopenharmony_ci finally: 21147db96d56Sopenharmony_ci os_helper.rmtree(TESTFN) 21157db96d56Sopenharmony_ci 21167db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 21177db96d56Sopenharmony_ci @mock_rename 21187db96d56Sopenharmony_ci def test_move_file_symlink(self): 21197db96d56Sopenharmony_ci dst = os.path.join(self.src_dir, 'bar') 21207db96d56Sopenharmony_ci os.symlink(self.src_file, dst) 21217db96d56Sopenharmony_ci shutil.move(dst, self.dst_file) 21227db96d56Sopenharmony_ci self.assertTrue(os.path.islink(self.dst_file)) 21237db96d56Sopenharmony_ci self.assertTrue(os.path.samefile(self.src_file, self.dst_file)) 21247db96d56Sopenharmony_ci 21257db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 21267db96d56Sopenharmony_ci @mock_rename 21277db96d56Sopenharmony_ci def test_move_file_symlink_to_dir(self): 21287db96d56Sopenharmony_ci filename = "bar" 21297db96d56Sopenharmony_ci dst = os.path.join(self.src_dir, filename) 21307db96d56Sopenharmony_ci os.symlink(self.src_file, dst) 21317db96d56Sopenharmony_ci shutil.move(dst, self.dst_dir) 21327db96d56Sopenharmony_ci final_link = os.path.join(self.dst_dir, filename) 21337db96d56Sopenharmony_ci self.assertTrue(os.path.islink(final_link)) 21347db96d56Sopenharmony_ci self.assertTrue(os.path.samefile(self.src_file, final_link)) 21357db96d56Sopenharmony_ci 21367db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 21377db96d56Sopenharmony_ci @mock_rename 21387db96d56Sopenharmony_ci def test_move_dangling_symlink(self): 21397db96d56Sopenharmony_ci src = os.path.join(self.src_dir, 'baz') 21407db96d56Sopenharmony_ci dst = os.path.join(self.src_dir, 'bar') 21417db96d56Sopenharmony_ci os.symlink(src, dst) 21427db96d56Sopenharmony_ci dst_link = os.path.join(self.dst_dir, 'quux') 21437db96d56Sopenharmony_ci shutil.move(dst, dst_link) 21447db96d56Sopenharmony_ci self.assertTrue(os.path.islink(dst_link)) 21457db96d56Sopenharmony_ci self.assertEqual(os.path.realpath(src), os.path.realpath(dst_link)) 21467db96d56Sopenharmony_ci 21477db96d56Sopenharmony_ci @os_helper.skip_unless_symlink 21487db96d56Sopenharmony_ci @mock_rename 21497db96d56Sopenharmony_ci def test_move_dir_symlink(self): 21507db96d56Sopenharmony_ci src = os.path.join(self.src_dir, 'baz') 21517db96d56Sopenharmony_ci dst = os.path.join(self.src_dir, 'bar') 21527db96d56Sopenharmony_ci os.mkdir(src) 21537db96d56Sopenharmony_ci os.symlink(src, dst) 21547db96d56Sopenharmony_ci dst_link = os.path.join(self.dst_dir, 'quux') 21557db96d56Sopenharmony_ci shutil.move(dst, dst_link) 21567db96d56Sopenharmony_ci self.assertTrue(os.path.islink(dst_link)) 21577db96d56Sopenharmony_ci self.assertTrue(os.path.samefile(src, dst_link)) 21587db96d56Sopenharmony_ci 21597db96d56Sopenharmony_ci def test_move_return_value(self): 21607db96d56Sopenharmony_ci rv = shutil.move(self.src_file, self.dst_dir) 21617db96d56Sopenharmony_ci self.assertEqual(rv, 21627db96d56Sopenharmony_ci os.path.join(self.dst_dir, os.path.basename(self.src_file))) 21637db96d56Sopenharmony_ci 21647db96d56Sopenharmony_ci def test_move_as_rename_return_value(self): 21657db96d56Sopenharmony_ci rv = shutil.move(self.src_file, os.path.join(self.dst_dir, 'bar')) 21667db96d56Sopenharmony_ci self.assertEqual(rv, os.path.join(self.dst_dir, 'bar')) 21677db96d56Sopenharmony_ci 21687db96d56Sopenharmony_ci @mock_rename 21697db96d56Sopenharmony_ci def test_move_file_special_function(self): 21707db96d56Sopenharmony_ci moved = [] 21717db96d56Sopenharmony_ci def _copy(src, dst): 21727db96d56Sopenharmony_ci moved.append((src, dst)) 21737db96d56Sopenharmony_ci shutil.move(self.src_file, self.dst_dir, copy_function=_copy) 21747db96d56Sopenharmony_ci self.assertEqual(len(moved), 1) 21757db96d56Sopenharmony_ci 21767db96d56Sopenharmony_ci @mock_rename 21777db96d56Sopenharmony_ci def test_move_dir_special_function(self): 21787db96d56Sopenharmony_ci moved = [] 21797db96d56Sopenharmony_ci def _copy(src, dst): 21807db96d56Sopenharmony_ci moved.append((src, dst)) 21817db96d56Sopenharmony_ci os_helper.create_empty_file(os.path.join(self.src_dir, 'child')) 21827db96d56Sopenharmony_ci os_helper.create_empty_file(os.path.join(self.src_dir, 'child1')) 21837db96d56Sopenharmony_ci shutil.move(self.src_dir, self.dst_dir, copy_function=_copy) 21847db96d56Sopenharmony_ci self.assertEqual(len(moved), 3) 21857db96d56Sopenharmony_ci 21867db96d56Sopenharmony_ci def test_move_dir_caseinsensitive(self): 21877db96d56Sopenharmony_ci # Renames a folder to the same name 21887db96d56Sopenharmony_ci # but a different case. 21897db96d56Sopenharmony_ci 21907db96d56Sopenharmony_ci self.src_dir = self.mkdtemp() 21917db96d56Sopenharmony_ci dst_dir = os.path.join( 21927db96d56Sopenharmony_ci os.path.dirname(self.src_dir), 21937db96d56Sopenharmony_ci os.path.basename(self.src_dir).upper()) 21947db96d56Sopenharmony_ci self.assertNotEqual(self.src_dir, dst_dir) 21957db96d56Sopenharmony_ci 21967db96d56Sopenharmony_ci try: 21977db96d56Sopenharmony_ci shutil.move(self.src_dir, dst_dir) 21987db96d56Sopenharmony_ci self.assertTrue(os.path.isdir(dst_dir)) 21997db96d56Sopenharmony_ci finally: 22007db96d56Sopenharmony_ci os.rmdir(dst_dir) 22017db96d56Sopenharmony_ci 22027db96d56Sopenharmony_ci 22037db96d56Sopenharmony_ci @os_helper.skip_unless_dac_override 22047db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'lchflags') 22057db96d56Sopenharmony_ci and hasattr(stat, 'SF_IMMUTABLE') 22067db96d56Sopenharmony_ci and hasattr(stat, 'UF_OPAQUE'), 22077db96d56Sopenharmony_ci 'requires lchflags') 22087db96d56Sopenharmony_ci def test_move_dir_permission_denied(self): 22097db96d56Sopenharmony_ci # bpo-42782: shutil.move should not create destination directories 22107db96d56Sopenharmony_ci # if the source directory cannot be removed. 22117db96d56Sopenharmony_ci try: 22127db96d56Sopenharmony_ci os.mkdir(TESTFN_SRC) 22137db96d56Sopenharmony_ci os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE) 22147db96d56Sopenharmony_ci 22157db96d56Sopenharmony_ci # Testing on an empty immutable directory 22167db96d56Sopenharmony_ci # TESTFN_DST should not exist if shutil.move failed 22177db96d56Sopenharmony_ci self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST) 22187db96d56Sopenharmony_ci self.assertFalse(TESTFN_DST in os.listdir()) 22197db96d56Sopenharmony_ci 22207db96d56Sopenharmony_ci # Create a file and keep the directory immutable 22217db96d56Sopenharmony_ci os.lchflags(TESTFN_SRC, stat.UF_OPAQUE) 22227db96d56Sopenharmony_ci os_helper.create_empty_file(os.path.join(TESTFN_SRC, 'child')) 22237db96d56Sopenharmony_ci os.lchflags(TESTFN_SRC, stat.SF_IMMUTABLE) 22247db96d56Sopenharmony_ci 22257db96d56Sopenharmony_ci # Testing on a non-empty immutable directory 22267db96d56Sopenharmony_ci # TESTFN_DST should not exist if shutil.move failed 22277db96d56Sopenharmony_ci self.assertRaises(PermissionError, shutil.move, TESTFN_SRC, TESTFN_DST) 22287db96d56Sopenharmony_ci self.assertFalse(TESTFN_DST in os.listdir()) 22297db96d56Sopenharmony_ci finally: 22307db96d56Sopenharmony_ci if os.path.exists(TESTFN_SRC): 22317db96d56Sopenharmony_ci os.lchflags(TESTFN_SRC, stat.UF_OPAQUE) 22327db96d56Sopenharmony_ci os_helper.rmtree(TESTFN_SRC) 22337db96d56Sopenharmony_ci if os.path.exists(TESTFN_DST): 22347db96d56Sopenharmony_ci os.lchflags(TESTFN_DST, stat.UF_OPAQUE) 22357db96d56Sopenharmony_ci os_helper.rmtree(TESTFN_DST) 22367db96d56Sopenharmony_ci 22377db96d56Sopenharmony_ci 22387db96d56Sopenharmony_ciclass TestCopyFile(unittest.TestCase): 22397db96d56Sopenharmony_ci 22407db96d56Sopenharmony_ci class Faux(object): 22417db96d56Sopenharmony_ci _entered = False 22427db96d56Sopenharmony_ci _exited_with = None 22437db96d56Sopenharmony_ci _raised = False 22447db96d56Sopenharmony_ci def __init__(self, raise_in_exit=False, suppress_at_exit=True): 22457db96d56Sopenharmony_ci self._raise_in_exit = raise_in_exit 22467db96d56Sopenharmony_ci self._suppress_at_exit = suppress_at_exit 22477db96d56Sopenharmony_ci def read(self, *args): 22487db96d56Sopenharmony_ci return '' 22497db96d56Sopenharmony_ci def __enter__(self): 22507db96d56Sopenharmony_ci self._entered = True 22517db96d56Sopenharmony_ci def __exit__(self, exc_type, exc_val, exc_tb): 22527db96d56Sopenharmony_ci self._exited_with = exc_type, exc_val, exc_tb 22537db96d56Sopenharmony_ci if self._raise_in_exit: 22547db96d56Sopenharmony_ci self._raised = True 22557db96d56Sopenharmony_ci raise OSError("Cannot close") 22567db96d56Sopenharmony_ci return self._suppress_at_exit 22577db96d56Sopenharmony_ci 22587db96d56Sopenharmony_ci def test_w_source_open_fails(self): 22597db96d56Sopenharmony_ci def _open(filename, mode='r'): 22607db96d56Sopenharmony_ci if filename == 'srcfile': 22617db96d56Sopenharmony_ci raise OSError('Cannot open "srcfile"') 22627db96d56Sopenharmony_ci assert 0 # shouldn't reach here. 22637db96d56Sopenharmony_ci 22647db96d56Sopenharmony_ci with support.swap_attr(shutil, 'open', _open): 22657db96d56Sopenharmony_ci with self.assertRaises(OSError): 22667db96d56Sopenharmony_ci shutil.copyfile('srcfile', 'destfile') 22677db96d56Sopenharmony_ci 22687db96d56Sopenharmony_ci @unittest.skipIf(MACOS, "skipped on macOS") 22697db96d56Sopenharmony_ci def test_w_dest_open_fails(self): 22707db96d56Sopenharmony_ci srcfile = self.Faux() 22717db96d56Sopenharmony_ci 22727db96d56Sopenharmony_ci def _open(filename, mode='r'): 22737db96d56Sopenharmony_ci if filename == 'srcfile': 22747db96d56Sopenharmony_ci return srcfile 22757db96d56Sopenharmony_ci if filename == 'destfile': 22767db96d56Sopenharmony_ci raise OSError('Cannot open "destfile"') 22777db96d56Sopenharmony_ci assert 0 # shouldn't reach here. 22787db96d56Sopenharmony_ci 22797db96d56Sopenharmony_ci with support.swap_attr(shutil, 'open', _open): 22807db96d56Sopenharmony_ci shutil.copyfile('srcfile', 'destfile') 22817db96d56Sopenharmony_ci self.assertTrue(srcfile._entered) 22827db96d56Sopenharmony_ci self.assertTrue(srcfile._exited_with[0] is OSError) 22837db96d56Sopenharmony_ci self.assertEqual(srcfile._exited_with[1].args, 22847db96d56Sopenharmony_ci ('Cannot open "destfile"',)) 22857db96d56Sopenharmony_ci 22867db96d56Sopenharmony_ci @unittest.skipIf(MACOS, "skipped on macOS") 22877db96d56Sopenharmony_ci def test_w_dest_close_fails(self): 22887db96d56Sopenharmony_ci srcfile = self.Faux() 22897db96d56Sopenharmony_ci destfile = self.Faux(True) 22907db96d56Sopenharmony_ci 22917db96d56Sopenharmony_ci def _open(filename, mode='r'): 22927db96d56Sopenharmony_ci if filename == 'srcfile': 22937db96d56Sopenharmony_ci return srcfile 22947db96d56Sopenharmony_ci if filename == 'destfile': 22957db96d56Sopenharmony_ci return destfile 22967db96d56Sopenharmony_ci assert 0 # shouldn't reach here. 22977db96d56Sopenharmony_ci 22987db96d56Sopenharmony_ci with support.swap_attr(shutil, 'open', _open): 22997db96d56Sopenharmony_ci shutil.copyfile('srcfile', 'destfile') 23007db96d56Sopenharmony_ci self.assertTrue(srcfile._entered) 23017db96d56Sopenharmony_ci self.assertTrue(destfile._entered) 23027db96d56Sopenharmony_ci self.assertTrue(destfile._raised) 23037db96d56Sopenharmony_ci self.assertTrue(srcfile._exited_with[0] is OSError) 23047db96d56Sopenharmony_ci self.assertEqual(srcfile._exited_with[1].args, 23057db96d56Sopenharmony_ci ('Cannot close',)) 23067db96d56Sopenharmony_ci 23077db96d56Sopenharmony_ci @unittest.skipIf(MACOS, "skipped on macOS") 23087db96d56Sopenharmony_ci def test_w_source_close_fails(self): 23097db96d56Sopenharmony_ci 23107db96d56Sopenharmony_ci srcfile = self.Faux(True) 23117db96d56Sopenharmony_ci destfile = self.Faux() 23127db96d56Sopenharmony_ci 23137db96d56Sopenharmony_ci def _open(filename, mode='r'): 23147db96d56Sopenharmony_ci if filename == 'srcfile': 23157db96d56Sopenharmony_ci return srcfile 23167db96d56Sopenharmony_ci if filename == 'destfile': 23177db96d56Sopenharmony_ci return destfile 23187db96d56Sopenharmony_ci assert 0 # shouldn't reach here. 23197db96d56Sopenharmony_ci 23207db96d56Sopenharmony_ci with support.swap_attr(shutil, 'open', _open): 23217db96d56Sopenharmony_ci with self.assertRaises(OSError): 23227db96d56Sopenharmony_ci shutil.copyfile('srcfile', 'destfile') 23237db96d56Sopenharmony_ci self.assertTrue(srcfile._entered) 23247db96d56Sopenharmony_ci self.assertTrue(destfile._entered) 23257db96d56Sopenharmony_ci self.assertFalse(destfile._raised) 23267db96d56Sopenharmony_ci self.assertTrue(srcfile._exited_with[0] is None) 23277db96d56Sopenharmony_ci self.assertTrue(srcfile._raised) 23287db96d56Sopenharmony_ci 23297db96d56Sopenharmony_ci 23307db96d56Sopenharmony_ciclass TestCopyFileObj(unittest.TestCase): 23317db96d56Sopenharmony_ci FILESIZE = 2 * 1024 * 1024 23327db96d56Sopenharmony_ci 23337db96d56Sopenharmony_ci @classmethod 23347db96d56Sopenharmony_ci def setUpClass(cls): 23357db96d56Sopenharmony_ci write_test_file(TESTFN, cls.FILESIZE) 23367db96d56Sopenharmony_ci 23377db96d56Sopenharmony_ci @classmethod 23387db96d56Sopenharmony_ci def tearDownClass(cls): 23397db96d56Sopenharmony_ci os_helper.unlink(TESTFN) 23407db96d56Sopenharmony_ci os_helper.unlink(TESTFN2) 23417db96d56Sopenharmony_ci 23427db96d56Sopenharmony_ci def tearDown(self): 23437db96d56Sopenharmony_ci os_helper.unlink(TESTFN2) 23447db96d56Sopenharmony_ci 23457db96d56Sopenharmony_ci @contextlib.contextmanager 23467db96d56Sopenharmony_ci def get_files(self): 23477db96d56Sopenharmony_ci with open(TESTFN, "rb") as src: 23487db96d56Sopenharmony_ci with open(TESTFN2, "wb") as dst: 23497db96d56Sopenharmony_ci yield (src, dst) 23507db96d56Sopenharmony_ci 23517db96d56Sopenharmony_ci def assert_files_eq(self, src, dst): 23527db96d56Sopenharmony_ci with open(src, 'rb') as fsrc: 23537db96d56Sopenharmony_ci with open(dst, 'rb') as fdst: 23547db96d56Sopenharmony_ci self.assertEqual(fsrc.read(), fdst.read()) 23557db96d56Sopenharmony_ci 23567db96d56Sopenharmony_ci def test_content(self): 23577db96d56Sopenharmony_ci with self.get_files() as (src, dst): 23587db96d56Sopenharmony_ci shutil.copyfileobj(src, dst) 23597db96d56Sopenharmony_ci self.assert_files_eq(TESTFN, TESTFN2) 23607db96d56Sopenharmony_ci 23617db96d56Sopenharmony_ci def test_file_not_closed(self): 23627db96d56Sopenharmony_ci with self.get_files() as (src, dst): 23637db96d56Sopenharmony_ci shutil.copyfileobj(src, dst) 23647db96d56Sopenharmony_ci assert not src.closed 23657db96d56Sopenharmony_ci assert not dst.closed 23667db96d56Sopenharmony_ci 23677db96d56Sopenharmony_ci def test_file_offset(self): 23687db96d56Sopenharmony_ci with self.get_files() as (src, dst): 23697db96d56Sopenharmony_ci shutil.copyfileobj(src, dst) 23707db96d56Sopenharmony_ci self.assertEqual(src.tell(), self.FILESIZE) 23717db96d56Sopenharmony_ci self.assertEqual(dst.tell(), self.FILESIZE) 23727db96d56Sopenharmony_ci 23737db96d56Sopenharmony_ci @unittest.skipIf(os.name != 'nt', "Windows only") 23747db96d56Sopenharmony_ci def test_win_impl(self): 23757db96d56Sopenharmony_ci # Make sure alternate Windows implementation is called. 23767db96d56Sopenharmony_ci with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 23777db96d56Sopenharmony_ci shutil.copyfile(TESTFN, TESTFN2) 23787db96d56Sopenharmony_ci assert m.called 23797db96d56Sopenharmony_ci 23807db96d56Sopenharmony_ci # File size is 2 MiB but max buf size should be 1 MiB. 23817db96d56Sopenharmony_ci self.assertEqual(m.call_args[0][2], 1 * 1024 * 1024) 23827db96d56Sopenharmony_ci 23837db96d56Sopenharmony_ci # If file size < 1 MiB memoryview() length must be equal to 23847db96d56Sopenharmony_ci # the actual file size. 23857db96d56Sopenharmony_ci with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: 23867db96d56Sopenharmony_ci f.write(b'foo') 23877db96d56Sopenharmony_ci fname = f.name 23887db96d56Sopenharmony_ci self.addCleanup(os_helper.unlink, fname) 23897db96d56Sopenharmony_ci with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 23907db96d56Sopenharmony_ci shutil.copyfile(fname, TESTFN2) 23917db96d56Sopenharmony_ci self.assertEqual(m.call_args[0][2], 3) 23927db96d56Sopenharmony_ci 23937db96d56Sopenharmony_ci # Empty files should not rely on readinto() variant. 23947db96d56Sopenharmony_ci with tempfile.NamedTemporaryFile(dir=os.getcwd(), delete=False) as f: 23957db96d56Sopenharmony_ci pass 23967db96d56Sopenharmony_ci fname = f.name 23977db96d56Sopenharmony_ci self.addCleanup(os_helper.unlink, fname) 23987db96d56Sopenharmony_ci with unittest.mock.patch("shutil._copyfileobj_readinto") as m: 23997db96d56Sopenharmony_ci shutil.copyfile(fname, TESTFN2) 24007db96d56Sopenharmony_ci assert not m.called 24017db96d56Sopenharmony_ci self.assert_files_eq(fname, TESTFN2) 24027db96d56Sopenharmony_ci 24037db96d56Sopenharmony_ci 24047db96d56Sopenharmony_ciclass _ZeroCopyFileTest(object): 24057db96d56Sopenharmony_ci """Tests common to all zero-copy APIs.""" 24067db96d56Sopenharmony_ci FILESIZE = (10 * 1024 * 1024) # 10 MiB 24077db96d56Sopenharmony_ci FILEDATA = b"" 24087db96d56Sopenharmony_ci PATCHPOINT = "" 24097db96d56Sopenharmony_ci 24107db96d56Sopenharmony_ci @classmethod 24117db96d56Sopenharmony_ci def setUpClass(cls): 24127db96d56Sopenharmony_ci write_test_file(TESTFN, cls.FILESIZE) 24137db96d56Sopenharmony_ci with open(TESTFN, 'rb') as f: 24147db96d56Sopenharmony_ci cls.FILEDATA = f.read() 24157db96d56Sopenharmony_ci assert len(cls.FILEDATA) == cls.FILESIZE 24167db96d56Sopenharmony_ci 24177db96d56Sopenharmony_ci @classmethod 24187db96d56Sopenharmony_ci def tearDownClass(cls): 24197db96d56Sopenharmony_ci os_helper.unlink(TESTFN) 24207db96d56Sopenharmony_ci 24217db96d56Sopenharmony_ci def tearDown(self): 24227db96d56Sopenharmony_ci os_helper.unlink(TESTFN2) 24237db96d56Sopenharmony_ci 24247db96d56Sopenharmony_ci @contextlib.contextmanager 24257db96d56Sopenharmony_ci def get_files(self): 24267db96d56Sopenharmony_ci with open(TESTFN, "rb") as src: 24277db96d56Sopenharmony_ci with open(TESTFN2, "wb") as dst: 24287db96d56Sopenharmony_ci yield (src, dst) 24297db96d56Sopenharmony_ci 24307db96d56Sopenharmony_ci def zerocopy_fun(self, *args, **kwargs): 24317db96d56Sopenharmony_ci raise NotImplementedError("must be implemented in subclass") 24327db96d56Sopenharmony_ci 24337db96d56Sopenharmony_ci def reset(self): 24347db96d56Sopenharmony_ci self.tearDown() 24357db96d56Sopenharmony_ci self.tearDownClass() 24367db96d56Sopenharmony_ci self.setUpClass() 24377db96d56Sopenharmony_ci self.setUp() 24387db96d56Sopenharmony_ci 24397db96d56Sopenharmony_ci # --- 24407db96d56Sopenharmony_ci 24417db96d56Sopenharmony_ci def test_regular_copy(self): 24427db96d56Sopenharmony_ci with self.get_files() as (src, dst): 24437db96d56Sopenharmony_ci self.zerocopy_fun(src, dst) 24447db96d56Sopenharmony_ci self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 24457db96d56Sopenharmony_ci # Make sure the fallback function is not called. 24467db96d56Sopenharmony_ci with self.get_files() as (src, dst): 24477db96d56Sopenharmony_ci with unittest.mock.patch('shutil.copyfileobj') as m: 24487db96d56Sopenharmony_ci shutil.copyfile(TESTFN, TESTFN2) 24497db96d56Sopenharmony_ci assert not m.called 24507db96d56Sopenharmony_ci 24517db96d56Sopenharmony_ci def test_same_file(self): 24527db96d56Sopenharmony_ci self.addCleanup(self.reset) 24537db96d56Sopenharmony_ci with self.get_files() as (src, dst): 24547db96d56Sopenharmony_ci with self.assertRaises(Exception): 24557db96d56Sopenharmony_ci self.zerocopy_fun(src, src) 24567db96d56Sopenharmony_ci # Make sure src file is not corrupted. 24577db96d56Sopenharmony_ci self.assertEqual(read_file(TESTFN, binary=True), self.FILEDATA) 24587db96d56Sopenharmony_ci 24597db96d56Sopenharmony_ci def test_non_existent_src(self): 24607db96d56Sopenharmony_ci name = tempfile.mktemp(dir=os.getcwd()) 24617db96d56Sopenharmony_ci with self.assertRaises(FileNotFoundError) as cm: 24627db96d56Sopenharmony_ci shutil.copyfile(name, "new") 24637db96d56Sopenharmony_ci self.assertEqual(cm.exception.filename, name) 24647db96d56Sopenharmony_ci 24657db96d56Sopenharmony_ci def test_empty_file(self): 24667db96d56Sopenharmony_ci srcname = TESTFN + 'src' 24677db96d56Sopenharmony_ci dstname = TESTFN + 'dst' 24687db96d56Sopenharmony_ci self.addCleanup(lambda: os_helper.unlink(srcname)) 24697db96d56Sopenharmony_ci self.addCleanup(lambda: os_helper.unlink(dstname)) 24707db96d56Sopenharmony_ci with open(srcname, "wb"): 24717db96d56Sopenharmony_ci pass 24727db96d56Sopenharmony_ci 24737db96d56Sopenharmony_ci with open(srcname, "rb") as src: 24747db96d56Sopenharmony_ci with open(dstname, "wb") as dst: 24757db96d56Sopenharmony_ci self.zerocopy_fun(src, dst) 24767db96d56Sopenharmony_ci 24777db96d56Sopenharmony_ci self.assertEqual(read_file(dstname, binary=True), b"") 24787db96d56Sopenharmony_ci 24797db96d56Sopenharmony_ci def test_unhandled_exception(self): 24807db96d56Sopenharmony_ci with unittest.mock.patch(self.PATCHPOINT, 24817db96d56Sopenharmony_ci side_effect=ZeroDivisionError): 24827db96d56Sopenharmony_ci self.assertRaises(ZeroDivisionError, 24837db96d56Sopenharmony_ci shutil.copyfile, TESTFN, TESTFN2) 24847db96d56Sopenharmony_ci 24857db96d56Sopenharmony_ci def test_exception_on_first_call(self): 24867db96d56Sopenharmony_ci # Emulate a case where the first call to the zero-copy 24877db96d56Sopenharmony_ci # function raises an exception in which case the function is 24887db96d56Sopenharmony_ci # supposed to give up immediately. 24897db96d56Sopenharmony_ci with unittest.mock.patch(self.PATCHPOINT, 24907db96d56Sopenharmony_ci side_effect=OSError(errno.EINVAL, "yo")): 24917db96d56Sopenharmony_ci with self.get_files() as (src, dst): 24927db96d56Sopenharmony_ci with self.assertRaises(_GiveupOnFastCopy): 24937db96d56Sopenharmony_ci self.zerocopy_fun(src, dst) 24947db96d56Sopenharmony_ci 24957db96d56Sopenharmony_ci def test_filesystem_full(self): 24967db96d56Sopenharmony_ci # Emulate a case where filesystem is full and sendfile() fails 24977db96d56Sopenharmony_ci # on first call. 24987db96d56Sopenharmony_ci with unittest.mock.patch(self.PATCHPOINT, 24997db96d56Sopenharmony_ci side_effect=OSError(errno.ENOSPC, "yo")): 25007db96d56Sopenharmony_ci with self.get_files() as (src, dst): 25017db96d56Sopenharmony_ci self.assertRaises(OSError, self.zerocopy_fun, src, dst) 25027db96d56Sopenharmony_ci 25037db96d56Sopenharmony_ci 25047db96d56Sopenharmony_ci@unittest.skipIf(not SUPPORTS_SENDFILE, 'os.sendfile() not supported') 25057db96d56Sopenharmony_ciclass TestZeroCopySendfile(_ZeroCopyFileTest, unittest.TestCase): 25067db96d56Sopenharmony_ci PATCHPOINT = "os.sendfile" 25077db96d56Sopenharmony_ci 25087db96d56Sopenharmony_ci def zerocopy_fun(self, fsrc, fdst): 25097db96d56Sopenharmony_ci return shutil._fastcopy_sendfile(fsrc, fdst) 25107db96d56Sopenharmony_ci 25117db96d56Sopenharmony_ci def test_non_regular_file_src(self): 25127db96d56Sopenharmony_ci with io.BytesIO(self.FILEDATA) as src: 25137db96d56Sopenharmony_ci with open(TESTFN2, "wb") as dst: 25147db96d56Sopenharmony_ci with self.assertRaises(_GiveupOnFastCopy): 25157db96d56Sopenharmony_ci self.zerocopy_fun(src, dst) 25167db96d56Sopenharmony_ci shutil.copyfileobj(src, dst) 25177db96d56Sopenharmony_ci 25187db96d56Sopenharmony_ci self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 25197db96d56Sopenharmony_ci 25207db96d56Sopenharmony_ci def test_non_regular_file_dst(self): 25217db96d56Sopenharmony_ci with open(TESTFN, "rb") as src: 25227db96d56Sopenharmony_ci with io.BytesIO() as dst: 25237db96d56Sopenharmony_ci with self.assertRaises(_GiveupOnFastCopy): 25247db96d56Sopenharmony_ci self.zerocopy_fun(src, dst) 25257db96d56Sopenharmony_ci shutil.copyfileobj(src, dst) 25267db96d56Sopenharmony_ci dst.seek(0) 25277db96d56Sopenharmony_ci self.assertEqual(dst.read(), self.FILEDATA) 25287db96d56Sopenharmony_ci 25297db96d56Sopenharmony_ci def test_exception_on_second_call(self): 25307db96d56Sopenharmony_ci def sendfile(*args, **kwargs): 25317db96d56Sopenharmony_ci if not flag: 25327db96d56Sopenharmony_ci flag.append(None) 25337db96d56Sopenharmony_ci return orig_sendfile(*args, **kwargs) 25347db96d56Sopenharmony_ci else: 25357db96d56Sopenharmony_ci raise OSError(errno.EBADF, "yo") 25367db96d56Sopenharmony_ci 25377db96d56Sopenharmony_ci flag = [] 25387db96d56Sopenharmony_ci orig_sendfile = os.sendfile 25397db96d56Sopenharmony_ci with unittest.mock.patch('os.sendfile', create=True, 25407db96d56Sopenharmony_ci side_effect=sendfile): 25417db96d56Sopenharmony_ci with self.get_files() as (src, dst): 25427db96d56Sopenharmony_ci with self.assertRaises(OSError) as cm: 25437db96d56Sopenharmony_ci shutil._fastcopy_sendfile(src, dst) 25447db96d56Sopenharmony_ci assert flag 25457db96d56Sopenharmony_ci self.assertEqual(cm.exception.errno, errno.EBADF) 25467db96d56Sopenharmony_ci 25477db96d56Sopenharmony_ci def test_cant_get_size(self): 25487db96d56Sopenharmony_ci # Emulate a case where src file size cannot be determined. 25497db96d56Sopenharmony_ci # Internally bufsize will be set to a small value and 25507db96d56Sopenharmony_ci # sendfile() will be called repeatedly. 25517db96d56Sopenharmony_ci with unittest.mock.patch('os.fstat', side_effect=OSError) as m: 25527db96d56Sopenharmony_ci with self.get_files() as (src, dst): 25537db96d56Sopenharmony_ci shutil._fastcopy_sendfile(src, dst) 25547db96d56Sopenharmony_ci assert m.called 25557db96d56Sopenharmony_ci self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 25567db96d56Sopenharmony_ci 25577db96d56Sopenharmony_ci def test_small_chunks(self): 25587db96d56Sopenharmony_ci # Force internal file size detection to be smaller than the 25597db96d56Sopenharmony_ci # actual file size. We want to force sendfile() to be called 25607db96d56Sopenharmony_ci # multiple times, also in order to emulate a src fd which gets 25617db96d56Sopenharmony_ci # bigger while it is being copied. 25627db96d56Sopenharmony_ci mock = unittest.mock.Mock() 25637db96d56Sopenharmony_ci mock.st_size = 65536 + 1 25647db96d56Sopenharmony_ci with unittest.mock.patch('os.fstat', return_value=mock) as m: 25657db96d56Sopenharmony_ci with self.get_files() as (src, dst): 25667db96d56Sopenharmony_ci shutil._fastcopy_sendfile(src, dst) 25677db96d56Sopenharmony_ci assert m.called 25687db96d56Sopenharmony_ci self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 25697db96d56Sopenharmony_ci 25707db96d56Sopenharmony_ci def test_big_chunk(self): 25717db96d56Sopenharmony_ci # Force internal file size detection to be +100MB bigger than 25727db96d56Sopenharmony_ci # the actual file size. Make sure sendfile() does not rely on 25737db96d56Sopenharmony_ci # file size value except for (maybe) a better throughput / 25747db96d56Sopenharmony_ci # performance. 25757db96d56Sopenharmony_ci mock = unittest.mock.Mock() 25767db96d56Sopenharmony_ci mock.st_size = self.FILESIZE + (100 * 1024 * 1024) 25777db96d56Sopenharmony_ci with unittest.mock.patch('os.fstat', return_value=mock) as m: 25787db96d56Sopenharmony_ci with self.get_files() as (src, dst): 25797db96d56Sopenharmony_ci shutil._fastcopy_sendfile(src, dst) 25807db96d56Sopenharmony_ci assert m.called 25817db96d56Sopenharmony_ci self.assertEqual(read_file(TESTFN2, binary=True), self.FILEDATA) 25827db96d56Sopenharmony_ci 25837db96d56Sopenharmony_ci def test_blocksize_arg(self): 25847db96d56Sopenharmony_ci with unittest.mock.patch('os.sendfile', 25857db96d56Sopenharmony_ci side_effect=ZeroDivisionError) as m: 25867db96d56Sopenharmony_ci self.assertRaises(ZeroDivisionError, 25877db96d56Sopenharmony_ci shutil.copyfile, TESTFN, TESTFN2) 25887db96d56Sopenharmony_ci blocksize = m.call_args[0][3] 25897db96d56Sopenharmony_ci # Make sure file size and the block size arg passed to 25907db96d56Sopenharmony_ci # sendfile() are the same. 25917db96d56Sopenharmony_ci self.assertEqual(blocksize, os.path.getsize(TESTFN)) 25927db96d56Sopenharmony_ci # ...unless we're dealing with a small file. 25937db96d56Sopenharmony_ci os_helper.unlink(TESTFN2) 25947db96d56Sopenharmony_ci write_file(TESTFN2, b"hello", binary=True) 25957db96d56Sopenharmony_ci self.addCleanup(os_helper.unlink, TESTFN2 + '3') 25967db96d56Sopenharmony_ci self.assertRaises(ZeroDivisionError, 25977db96d56Sopenharmony_ci shutil.copyfile, TESTFN2, TESTFN2 + '3') 25987db96d56Sopenharmony_ci blocksize = m.call_args[0][3] 25997db96d56Sopenharmony_ci self.assertEqual(blocksize, 2 ** 23) 26007db96d56Sopenharmony_ci 26017db96d56Sopenharmony_ci def test_file2file_not_supported(self): 26027db96d56Sopenharmony_ci # Emulate a case where sendfile() only support file->socket 26037db96d56Sopenharmony_ci # fds. In such a case copyfile() is supposed to skip the 26047db96d56Sopenharmony_ci # fast-copy attempt from then on. 26057db96d56Sopenharmony_ci assert shutil._USE_CP_SENDFILE 26067db96d56Sopenharmony_ci try: 26077db96d56Sopenharmony_ci with unittest.mock.patch( 26087db96d56Sopenharmony_ci self.PATCHPOINT, 26097db96d56Sopenharmony_ci side_effect=OSError(errno.ENOTSOCK, "yo")) as m: 26107db96d56Sopenharmony_ci with self.get_files() as (src, dst): 26117db96d56Sopenharmony_ci with self.assertRaises(_GiveupOnFastCopy): 26127db96d56Sopenharmony_ci shutil._fastcopy_sendfile(src, dst) 26137db96d56Sopenharmony_ci assert m.called 26147db96d56Sopenharmony_ci assert not shutil._USE_CP_SENDFILE 26157db96d56Sopenharmony_ci 26167db96d56Sopenharmony_ci with unittest.mock.patch(self.PATCHPOINT) as m: 26177db96d56Sopenharmony_ci shutil.copyfile(TESTFN, TESTFN2) 26187db96d56Sopenharmony_ci assert not m.called 26197db96d56Sopenharmony_ci finally: 26207db96d56Sopenharmony_ci shutil._USE_CP_SENDFILE = True 26217db96d56Sopenharmony_ci 26227db96d56Sopenharmony_ci 26237db96d56Sopenharmony_ci@unittest.skipIf(not MACOS, 'macOS only') 26247db96d56Sopenharmony_ciclass TestZeroCopyMACOS(_ZeroCopyFileTest, unittest.TestCase): 26257db96d56Sopenharmony_ci PATCHPOINT = "posix._fcopyfile" 26267db96d56Sopenharmony_ci 26277db96d56Sopenharmony_ci def zerocopy_fun(self, src, dst): 26287db96d56Sopenharmony_ci return shutil._fastcopy_fcopyfile(src, dst, posix._COPYFILE_DATA) 26297db96d56Sopenharmony_ci 26307db96d56Sopenharmony_ci 26317db96d56Sopenharmony_ciclass TestGetTerminalSize(unittest.TestCase): 26327db96d56Sopenharmony_ci def test_does_not_crash(self): 26337db96d56Sopenharmony_ci """Check if get_terminal_size() returns a meaningful value. 26347db96d56Sopenharmony_ci 26357db96d56Sopenharmony_ci There's no easy portable way to actually check the size of the 26367db96d56Sopenharmony_ci terminal, so let's check if it returns something sensible instead. 26377db96d56Sopenharmony_ci """ 26387db96d56Sopenharmony_ci size = shutil.get_terminal_size() 26397db96d56Sopenharmony_ci self.assertGreaterEqual(size.columns, 0) 26407db96d56Sopenharmony_ci self.assertGreaterEqual(size.lines, 0) 26417db96d56Sopenharmony_ci 26427db96d56Sopenharmony_ci def test_os_environ_first(self): 26437db96d56Sopenharmony_ci "Check if environment variables have precedence" 26447db96d56Sopenharmony_ci 26457db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 26467db96d56Sopenharmony_ci env['COLUMNS'] = '777' 26477db96d56Sopenharmony_ci del env['LINES'] 26487db96d56Sopenharmony_ci size = shutil.get_terminal_size() 26497db96d56Sopenharmony_ci self.assertEqual(size.columns, 777) 26507db96d56Sopenharmony_ci 26517db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 26527db96d56Sopenharmony_ci del env['COLUMNS'] 26537db96d56Sopenharmony_ci env['LINES'] = '888' 26547db96d56Sopenharmony_ci size = shutil.get_terminal_size() 26557db96d56Sopenharmony_ci self.assertEqual(size.lines, 888) 26567db96d56Sopenharmony_ci 26577db96d56Sopenharmony_ci def test_bad_environ(self): 26587db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 26597db96d56Sopenharmony_ci env['COLUMNS'] = 'xxx' 26607db96d56Sopenharmony_ci env['LINES'] = 'yyy' 26617db96d56Sopenharmony_ci size = shutil.get_terminal_size() 26627db96d56Sopenharmony_ci self.assertGreaterEqual(size.columns, 0) 26637db96d56Sopenharmony_ci self.assertGreaterEqual(size.lines, 0) 26647db96d56Sopenharmony_ci 26657db96d56Sopenharmony_ci @unittest.skipUnless(os.isatty(sys.__stdout__.fileno()), "not on tty") 26667db96d56Sopenharmony_ci @unittest.skipUnless(hasattr(os, 'get_terminal_size'), 26677db96d56Sopenharmony_ci 'need os.get_terminal_size()') 26687db96d56Sopenharmony_ci def test_stty_match(self): 26697db96d56Sopenharmony_ci """Check if stty returns the same results ignoring env 26707db96d56Sopenharmony_ci 26717db96d56Sopenharmony_ci This test will fail if stdin and stdout are connected to 26727db96d56Sopenharmony_ci different terminals with different sizes. Nevertheless, such 26737db96d56Sopenharmony_ci situations should be pretty rare. 26747db96d56Sopenharmony_ci """ 26757db96d56Sopenharmony_ci try: 26767db96d56Sopenharmony_ci size = subprocess.check_output(['stty', 'size']).decode().split() 26777db96d56Sopenharmony_ci except (FileNotFoundError, PermissionError, 26787db96d56Sopenharmony_ci subprocess.CalledProcessError): 26797db96d56Sopenharmony_ci self.skipTest("stty invocation failed") 26807db96d56Sopenharmony_ci expected = (int(size[1]), int(size[0])) # reversed order 26817db96d56Sopenharmony_ci 26827db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 26837db96d56Sopenharmony_ci del env['LINES'] 26847db96d56Sopenharmony_ci del env['COLUMNS'] 26857db96d56Sopenharmony_ci actual = shutil.get_terminal_size() 26867db96d56Sopenharmony_ci 26877db96d56Sopenharmony_ci self.assertEqual(expected, actual) 26887db96d56Sopenharmony_ci 26897db96d56Sopenharmony_ci @unittest.skipIf(support.is_wasi, "WASI has no /dev/null") 26907db96d56Sopenharmony_ci def test_fallback(self): 26917db96d56Sopenharmony_ci with os_helper.EnvironmentVarGuard() as env: 26927db96d56Sopenharmony_ci del env['LINES'] 26937db96d56Sopenharmony_ci del env['COLUMNS'] 26947db96d56Sopenharmony_ci 26957db96d56Sopenharmony_ci # sys.__stdout__ has no fileno() 26967db96d56Sopenharmony_ci with support.swap_attr(sys, '__stdout__', None): 26977db96d56Sopenharmony_ci size = shutil.get_terminal_size(fallback=(10, 20)) 26987db96d56Sopenharmony_ci self.assertEqual(size.columns, 10) 26997db96d56Sopenharmony_ci self.assertEqual(size.lines, 20) 27007db96d56Sopenharmony_ci 27017db96d56Sopenharmony_ci # sys.__stdout__ is not a terminal on Unix 27027db96d56Sopenharmony_ci # or fileno() not in (0, 1, 2) on Windows 27037db96d56Sopenharmony_ci with open(os.devnull, 'w', encoding='utf-8') as f, \ 27047db96d56Sopenharmony_ci support.swap_attr(sys, '__stdout__', f): 27057db96d56Sopenharmony_ci size = shutil.get_terminal_size(fallback=(30, 40)) 27067db96d56Sopenharmony_ci self.assertEqual(size.columns, 30) 27077db96d56Sopenharmony_ci self.assertEqual(size.lines, 40) 27087db96d56Sopenharmony_ci 27097db96d56Sopenharmony_ci 27107db96d56Sopenharmony_ciclass PublicAPITests(unittest.TestCase): 27117db96d56Sopenharmony_ci """Ensures that the correct values are exposed in the public API.""" 27127db96d56Sopenharmony_ci 27137db96d56Sopenharmony_ci def test_module_all_attribute(self): 27147db96d56Sopenharmony_ci self.assertTrue(hasattr(shutil, '__all__')) 27157db96d56Sopenharmony_ci target_api = ['copyfileobj', 'copyfile', 'copymode', 'copystat', 27167db96d56Sopenharmony_ci 'copy', 'copy2', 'copytree', 'move', 'rmtree', 'Error', 27177db96d56Sopenharmony_ci 'SpecialFileError', 'ExecError', 'make_archive', 27187db96d56Sopenharmony_ci 'get_archive_formats', 'register_archive_format', 27197db96d56Sopenharmony_ci 'unregister_archive_format', 'get_unpack_formats', 27207db96d56Sopenharmony_ci 'register_unpack_format', 'unregister_unpack_format', 27217db96d56Sopenharmony_ci 'unpack_archive', 'ignore_patterns', 'chown', 'which', 27227db96d56Sopenharmony_ci 'get_terminal_size', 'SameFileError'] 27237db96d56Sopenharmony_ci if hasattr(os, 'statvfs') or os.name == 'nt': 27247db96d56Sopenharmony_ci target_api.append('disk_usage') 27257db96d56Sopenharmony_ci self.assertEqual(set(shutil.__all__), set(target_api)) 27267db96d56Sopenharmony_ci 27277db96d56Sopenharmony_ci 27287db96d56Sopenharmony_ciif __name__ == '__main__': 27297db96d56Sopenharmony_ci unittest.main() 2730