17db96d56Sopenharmony_ci"""Utility functions for copying and archiving files and directory trees. 27db96d56Sopenharmony_ci 37db96d56Sopenharmony_ciXXX The functions here don't copy the resource fork or other metadata on Mac. 47db96d56Sopenharmony_ci 57db96d56Sopenharmony_ci""" 67db96d56Sopenharmony_ci 77db96d56Sopenharmony_ciimport os 87db96d56Sopenharmony_ciimport sys 97db96d56Sopenharmony_ciimport stat 107db96d56Sopenharmony_ciimport fnmatch 117db96d56Sopenharmony_ciimport collections 127db96d56Sopenharmony_ciimport errno 137db96d56Sopenharmony_ci 147db96d56Sopenharmony_citry: 157db96d56Sopenharmony_ci import zlib 167db96d56Sopenharmony_ci del zlib 177db96d56Sopenharmony_ci _ZLIB_SUPPORTED = True 187db96d56Sopenharmony_ciexcept ImportError: 197db96d56Sopenharmony_ci _ZLIB_SUPPORTED = False 207db96d56Sopenharmony_ci 217db96d56Sopenharmony_citry: 227db96d56Sopenharmony_ci import bz2 237db96d56Sopenharmony_ci del bz2 247db96d56Sopenharmony_ci _BZ2_SUPPORTED = True 257db96d56Sopenharmony_ciexcept ImportError: 267db96d56Sopenharmony_ci _BZ2_SUPPORTED = False 277db96d56Sopenharmony_ci 287db96d56Sopenharmony_citry: 297db96d56Sopenharmony_ci import lzma 307db96d56Sopenharmony_ci del lzma 317db96d56Sopenharmony_ci _LZMA_SUPPORTED = True 327db96d56Sopenharmony_ciexcept ImportError: 337db96d56Sopenharmony_ci _LZMA_SUPPORTED = False 347db96d56Sopenharmony_ci 357db96d56Sopenharmony_ci_WINDOWS = os.name == 'nt' 367db96d56Sopenharmony_ciposix = nt = None 377db96d56Sopenharmony_ciif os.name == 'posix': 387db96d56Sopenharmony_ci import posix 397db96d56Sopenharmony_cielif _WINDOWS: 407db96d56Sopenharmony_ci import nt 417db96d56Sopenharmony_ci 427db96d56Sopenharmony_ciCOPY_BUFSIZE = 1024 * 1024 if _WINDOWS else 64 * 1024 437db96d56Sopenharmony_ci# This should never be removed, see rationale in: 447db96d56Sopenharmony_ci# https://bugs.python.org/issue43743#msg393429 457db96d56Sopenharmony_ci_USE_CP_SENDFILE = hasattr(os, "sendfile") and sys.platform.startswith("linux") 467db96d56Sopenharmony_ci_HAS_FCOPYFILE = posix and hasattr(posix, "_fcopyfile") # macOS 477db96d56Sopenharmony_ci 487db96d56Sopenharmony_ci# CMD defaults in Windows 10 497db96d56Sopenharmony_ci_WIN_DEFAULT_PATHEXT = ".COM;.EXE;.BAT;.CMD;.VBS;.JS;.WS;.MSC" 507db96d56Sopenharmony_ci 517db96d56Sopenharmony_ci__all__ = ["copyfileobj", "copyfile", "copymode", "copystat", "copy", "copy2", 527db96d56Sopenharmony_ci "copytree", "move", "rmtree", "Error", "SpecialFileError", 537db96d56Sopenharmony_ci "ExecError", "make_archive", "get_archive_formats", 547db96d56Sopenharmony_ci "register_archive_format", "unregister_archive_format", 557db96d56Sopenharmony_ci "get_unpack_formats", "register_unpack_format", 567db96d56Sopenharmony_ci "unregister_unpack_format", "unpack_archive", 577db96d56Sopenharmony_ci "ignore_patterns", "chown", "which", "get_terminal_size", 587db96d56Sopenharmony_ci "SameFileError"] 597db96d56Sopenharmony_ci # disk_usage is added later, if available on the platform 607db96d56Sopenharmony_ci 617db96d56Sopenharmony_ciclass Error(OSError): 627db96d56Sopenharmony_ci pass 637db96d56Sopenharmony_ci 647db96d56Sopenharmony_ciclass SameFileError(Error): 657db96d56Sopenharmony_ci """Raised when source and destination are the same file.""" 667db96d56Sopenharmony_ci 677db96d56Sopenharmony_ciclass SpecialFileError(OSError): 687db96d56Sopenharmony_ci """Raised when trying to do a kind of operation (e.g. copying) which is 697db96d56Sopenharmony_ci not supported on a special file (e.g. a named pipe)""" 707db96d56Sopenharmony_ci 717db96d56Sopenharmony_ciclass ExecError(OSError): 727db96d56Sopenharmony_ci """Raised when a command could not be executed""" 737db96d56Sopenharmony_ci 747db96d56Sopenharmony_ciclass ReadError(OSError): 757db96d56Sopenharmony_ci """Raised when an archive cannot be read""" 767db96d56Sopenharmony_ci 777db96d56Sopenharmony_ciclass RegistryError(Exception): 787db96d56Sopenharmony_ci """Raised when a registry operation with the archiving 797db96d56Sopenharmony_ci and unpacking registries fails""" 807db96d56Sopenharmony_ci 817db96d56Sopenharmony_ciclass _GiveupOnFastCopy(Exception): 827db96d56Sopenharmony_ci """Raised as a signal to fallback on using raw read()/write() 837db96d56Sopenharmony_ci file copy when fast-copy functions fail to do so. 847db96d56Sopenharmony_ci """ 857db96d56Sopenharmony_ci 867db96d56Sopenharmony_cidef _fastcopy_fcopyfile(fsrc, fdst, flags): 877db96d56Sopenharmony_ci """Copy a regular file content or metadata by using high-performance 887db96d56Sopenharmony_ci fcopyfile(3) syscall (macOS). 897db96d56Sopenharmony_ci """ 907db96d56Sopenharmony_ci try: 917db96d56Sopenharmony_ci infd = fsrc.fileno() 927db96d56Sopenharmony_ci outfd = fdst.fileno() 937db96d56Sopenharmony_ci except Exception as err: 947db96d56Sopenharmony_ci raise _GiveupOnFastCopy(err) # not a regular file 957db96d56Sopenharmony_ci 967db96d56Sopenharmony_ci try: 977db96d56Sopenharmony_ci posix._fcopyfile(infd, outfd, flags) 987db96d56Sopenharmony_ci except OSError as err: 997db96d56Sopenharmony_ci err.filename = fsrc.name 1007db96d56Sopenharmony_ci err.filename2 = fdst.name 1017db96d56Sopenharmony_ci if err.errno in {errno.EINVAL, errno.ENOTSUP}: 1027db96d56Sopenharmony_ci raise _GiveupOnFastCopy(err) 1037db96d56Sopenharmony_ci else: 1047db96d56Sopenharmony_ci raise err from None 1057db96d56Sopenharmony_ci 1067db96d56Sopenharmony_cidef _fastcopy_sendfile(fsrc, fdst): 1077db96d56Sopenharmony_ci """Copy data from one regular mmap-like fd to another by using 1087db96d56Sopenharmony_ci high-performance sendfile(2) syscall. 1097db96d56Sopenharmony_ci This should work on Linux >= 2.6.33 only. 1107db96d56Sopenharmony_ci """ 1117db96d56Sopenharmony_ci # Note: copyfileobj() is left alone in order to not introduce any 1127db96d56Sopenharmony_ci # unexpected breakage. Possible risks by using zero-copy calls 1137db96d56Sopenharmony_ci # in copyfileobj() are: 1147db96d56Sopenharmony_ci # - fdst cannot be open in "a"(ppend) mode 1157db96d56Sopenharmony_ci # - fsrc and fdst may be open in "t"(ext) mode 1167db96d56Sopenharmony_ci # - fsrc may be a BufferedReader (which hides unread data in a buffer), 1177db96d56Sopenharmony_ci # GzipFile (which decompresses data), HTTPResponse (which decodes 1187db96d56Sopenharmony_ci # chunks). 1197db96d56Sopenharmony_ci # - possibly others (e.g. encrypted fs/partition?) 1207db96d56Sopenharmony_ci global _USE_CP_SENDFILE 1217db96d56Sopenharmony_ci try: 1227db96d56Sopenharmony_ci infd = fsrc.fileno() 1237db96d56Sopenharmony_ci outfd = fdst.fileno() 1247db96d56Sopenharmony_ci except Exception as err: 1257db96d56Sopenharmony_ci raise _GiveupOnFastCopy(err) # not a regular file 1267db96d56Sopenharmony_ci 1277db96d56Sopenharmony_ci # Hopefully the whole file will be copied in a single call. 1287db96d56Sopenharmony_ci # sendfile() is called in a loop 'till EOF is reached (0 return) 1297db96d56Sopenharmony_ci # so a bufsize smaller or bigger than the actual file size 1307db96d56Sopenharmony_ci # should not make any difference, also in case the file content 1317db96d56Sopenharmony_ci # changes while being copied. 1327db96d56Sopenharmony_ci try: 1337db96d56Sopenharmony_ci blocksize = max(os.fstat(infd).st_size, 2 ** 23) # min 8MiB 1347db96d56Sopenharmony_ci except OSError: 1357db96d56Sopenharmony_ci blocksize = 2 ** 27 # 128MiB 1367db96d56Sopenharmony_ci # On 32-bit architectures truncate to 1GiB to avoid OverflowError, 1377db96d56Sopenharmony_ci # see bpo-38319. 1387db96d56Sopenharmony_ci if sys.maxsize < 2 ** 32: 1397db96d56Sopenharmony_ci blocksize = min(blocksize, 2 ** 30) 1407db96d56Sopenharmony_ci 1417db96d56Sopenharmony_ci offset = 0 1427db96d56Sopenharmony_ci while True: 1437db96d56Sopenharmony_ci try: 1447db96d56Sopenharmony_ci sent = os.sendfile(outfd, infd, offset, blocksize) 1457db96d56Sopenharmony_ci except OSError as err: 1467db96d56Sopenharmony_ci # ...in oder to have a more informative exception. 1477db96d56Sopenharmony_ci err.filename = fsrc.name 1487db96d56Sopenharmony_ci err.filename2 = fdst.name 1497db96d56Sopenharmony_ci 1507db96d56Sopenharmony_ci if err.errno == errno.ENOTSOCK: 1517db96d56Sopenharmony_ci # sendfile() on this platform (probably Linux < 2.6.33) 1527db96d56Sopenharmony_ci # does not support copies between regular files (only 1537db96d56Sopenharmony_ci # sockets). 1547db96d56Sopenharmony_ci _USE_CP_SENDFILE = False 1557db96d56Sopenharmony_ci raise _GiveupOnFastCopy(err) 1567db96d56Sopenharmony_ci 1577db96d56Sopenharmony_ci if err.errno == errno.ENOSPC: # filesystem is full 1587db96d56Sopenharmony_ci raise err from None 1597db96d56Sopenharmony_ci 1607db96d56Sopenharmony_ci # Give up on first call and if no data was copied. 1617db96d56Sopenharmony_ci if offset == 0 and os.lseek(outfd, 0, os.SEEK_CUR) == 0: 1627db96d56Sopenharmony_ci raise _GiveupOnFastCopy(err) 1637db96d56Sopenharmony_ci 1647db96d56Sopenharmony_ci raise err 1657db96d56Sopenharmony_ci else: 1667db96d56Sopenharmony_ci if sent == 0: 1677db96d56Sopenharmony_ci break # EOF 1687db96d56Sopenharmony_ci offset += sent 1697db96d56Sopenharmony_ci 1707db96d56Sopenharmony_cidef _copyfileobj_readinto(fsrc, fdst, length=COPY_BUFSIZE): 1717db96d56Sopenharmony_ci """readinto()/memoryview() based variant of copyfileobj(). 1727db96d56Sopenharmony_ci *fsrc* must support readinto() method and both files must be 1737db96d56Sopenharmony_ci open in binary mode. 1747db96d56Sopenharmony_ci """ 1757db96d56Sopenharmony_ci # Localize variable access to minimize overhead. 1767db96d56Sopenharmony_ci fsrc_readinto = fsrc.readinto 1777db96d56Sopenharmony_ci fdst_write = fdst.write 1787db96d56Sopenharmony_ci with memoryview(bytearray(length)) as mv: 1797db96d56Sopenharmony_ci while True: 1807db96d56Sopenharmony_ci n = fsrc_readinto(mv) 1817db96d56Sopenharmony_ci if not n: 1827db96d56Sopenharmony_ci break 1837db96d56Sopenharmony_ci elif n < length: 1847db96d56Sopenharmony_ci with mv[:n] as smv: 1857db96d56Sopenharmony_ci fdst.write(smv) 1867db96d56Sopenharmony_ci else: 1877db96d56Sopenharmony_ci fdst_write(mv) 1887db96d56Sopenharmony_ci 1897db96d56Sopenharmony_cidef copyfileobj(fsrc, fdst, length=0): 1907db96d56Sopenharmony_ci """copy data from file-like object fsrc to file-like object fdst""" 1917db96d56Sopenharmony_ci if not length: 1927db96d56Sopenharmony_ci length = COPY_BUFSIZE 1937db96d56Sopenharmony_ci # Localize variable access to minimize overhead. 1947db96d56Sopenharmony_ci fsrc_read = fsrc.read 1957db96d56Sopenharmony_ci fdst_write = fdst.write 1967db96d56Sopenharmony_ci while True: 1977db96d56Sopenharmony_ci buf = fsrc_read(length) 1987db96d56Sopenharmony_ci if not buf: 1997db96d56Sopenharmony_ci break 2007db96d56Sopenharmony_ci fdst_write(buf) 2017db96d56Sopenharmony_ci 2027db96d56Sopenharmony_cidef _samefile(src, dst): 2037db96d56Sopenharmony_ci # Macintosh, Unix. 2047db96d56Sopenharmony_ci if isinstance(src, os.DirEntry) and hasattr(os.path, 'samestat'): 2057db96d56Sopenharmony_ci try: 2067db96d56Sopenharmony_ci return os.path.samestat(src.stat(), os.stat(dst)) 2077db96d56Sopenharmony_ci except OSError: 2087db96d56Sopenharmony_ci return False 2097db96d56Sopenharmony_ci 2107db96d56Sopenharmony_ci if hasattr(os.path, 'samefile'): 2117db96d56Sopenharmony_ci try: 2127db96d56Sopenharmony_ci return os.path.samefile(src, dst) 2137db96d56Sopenharmony_ci except OSError: 2147db96d56Sopenharmony_ci return False 2157db96d56Sopenharmony_ci 2167db96d56Sopenharmony_ci # All other platforms: check for same pathname. 2177db96d56Sopenharmony_ci return (os.path.normcase(os.path.abspath(src)) == 2187db96d56Sopenharmony_ci os.path.normcase(os.path.abspath(dst))) 2197db96d56Sopenharmony_ci 2207db96d56Sopenharmony_cidef _stat(fn): 2217db96d56Sopenharmony_ci return fn.stat() if isinstance(fn, os.DirEntry) else os.stat(fn) 2227db96d56Sopenharmony_ci 2237db96d56Sopenharmony_cidef _islink(fn): 2247db96d56Sopenharmony_ci return fn.is_symlink() if isinstance(fn, os.DirEntry) else os.path.islink(fn) 2257db96d56Sopenharmony_ci 2267db96d56Sopenharmony_cidef copyfile(src, dst, *, follow_symlinks=True): 2277db96d56Sopenharmony_ci """Copy data from src to dst in the most efficient way possible. 2287db96d56Sopenharmony_ci 2297db96d56Sopenharmony_ci If follow_symlinks is not set and src is a symbolic link, a new 2307db96d56Sopenharmony_ci symlink will be created instead of copying the file it points to. 2317db96d56Sopenharmony_ci 2327db96d56Sopenharmony_ci """ 2337db96d56Sopenharmony_ci sys.audit("shutil.copyfile", src, dst) 2347db96d56Sopenharmony_ci 2357db96d56Sopenharmony_ci if _samefile(src, dst): 2367db96d56Sopenharmony_ci raise SameFileError("{!r} and {!r} are the same file".format(src, dst)) 2377db96d56Sopenharmony_ci 2387db96d56Sopenharmony_ci file_size = 0 2397db96d56Sopenharmony_ci for i, fn in enumerate([src, dst]): 2407db96d56Sopenharmony_ci try: 2417db96d56Sopenharmony_ci st = _stat(fn) 2427db96d56Sopenharmony_ci except OSError: 2437db96d56Sopenharmony_ci # File most likely does not exist 2447db96d56Sopenharmony_ci pass 2457db96d56Sopenharmony_ci else: 2467db96d56Sopenharmony_ci # XXX What about other special files? (sockets, devices...) 2477db96d56Sopenharmony_ci if stat.S_ISFIFO(st.st_mode): 2487db96d56Sopenharmony_ci fn = fn.path if isinstance(fn, os.DirEntry) else fn 2497db96d56Sopenharmony_ci raise SpecialFileError("`%s` is a named pipe" % fn) 2507db96d56Sopenharmony_ci if _WINDOWS and i == 0: 2517db96d56Sopenharmony_ci file_size = st.st_size 2527db96d56Sopenharmony_ci 2537db96d56Sopenharmony_ci if not follow_symlinks and _islink(src): 2547db96d56Sopenharmony_ci os.symlink(os.readlink(src), dst) 2557db96d56Sopenharmony_ci else: 2567db96d56Sopenharmony_ci with open(src, 'rb') as fsrc: 2577db96d56Sopenharmony_ci try: 2587db96d56Sopenharmony_ci with open(dst, 'wb') as fdst: 2597db96d56Sopenharmony_ci # macOS 2607db96d56Sopenharmony_ci if _HAS_FCOPYFILE: 2617db96d56Sopenharmony_ci try: 2627db96d56Sopenharmony_ci _fastcopy_fcopyfile(fsrc, fdst, posix._COPYFILE_DATA) 2637db96d56Sopenharmony_ci return dst 2647db96d56Sopenharmony_ci except _GiveupOnFastCopy: 2657db96d56Sopenharmony_ci pass 2667db96d56Sopenharmony_ci # Linux 2677db96d56Sopenharmony_ci elif _USE_CP_SENDFILE: 2687db96d56Sopenharmony_ci try: 2697db96d56Sopenharmony_ci _fastcopy_sendfile(fsrc, fdst) 2707db96d56Sopenharmony_ci return dst 2717db96d56Sopenharmony_ci except _GiveupOnFastCopy: 2727db96d56Sopenharmony_ci pass 2737db96d56Sopenharmony_ci # Windows, see: 2747db96d56Sopenharmony_ci # https://github.com/python/cpython/pull/7160#discussion_r195405230 2757db96d56Sopenharmony_ci elif _WINDOWS and file_size > 0: 2767db96d56Sopenharmony_ci _copyfileobj_readinto(fsrc, fdst, min(file_size, COPY_BUFSIZE)) 2777db96d56Sopenharmony_ci return dst 2787db96d56Sopenharmony_ci 2797db96d56Sopenharmony_ci copyfileobj(fsrc, fdst) 2807db96d56Sopenharmony_ci 2817db96d56Sopenharmony_ci # Issue 43219, raise a less confusing exception 2827db96d56Sopenharmony_ci except IsADirectoryError as e: 2837db96d56Sopenharmony_ci if not os.path.exists(dst): 2847db96d56Sopenharmony_ci raise FileNotFoundError(f'Directory does not exist: {dst}') from e 2857db96d56Sopenharmony_ci else: 2867db96d56Sopenharmony_ci raise 2877db96d56Sopenharmony_ci 2887db96d56Sopenharmony_ci return dst 2897db96d56Sopenharmony_ci 2907db96d56Sopenharmony_cidef copymode(src, dst, *, follow_symlinks=True): 2917db96d56Sopenharmony_ci """Copy mode bits from src to dst. 2927db96d56Sopenharmony_ci 2937db96d56Sopenharmony_ci If follow_symlinks is not set, symlinks aren't followed if and only 2947db96d56Sopenharmony_ci if both `src` and `dst` are symlinks. If `lchmod` isn't available 2957db96d56Sopenharmony_ci (e.g. Linux) this method does nothing. 2967db96d56Sopenharmony_ci 2977db96d56Sopenharmony_ci """ 2987db96d56Sopenharmony_ci sys.audit("shutil.copymode", src, dst) 2997db96d56Sopenharmony_ci 3007db96d56Sopenharmony_ci if not follow_symlinks and _islink(src) and os.path.islink(dst): 3017db96d56Sopenharmony_ci if hasattr(os, 'lchmod'): 3027db96d56Sopenharmony_ci stat_func, chmod_func = os.lstat, os.lchmod 3037db96d56Sopenharmony_ci else: 3047db96d56Sopenharmony_ci return 3057db96d56Sopenharmony_ci else: 3067db96d56Sopenharmony_ci stat_func, chmod_func = _stat, os.chmod 3077db96d56Sopenharmony_ci 3087db96d56Sopenharmony_ci st = stat_func(src) 3097db96d56Sopenharmony_ci chmod_func(dst, stat.S_IMODE(st.st_mode)) 3107db96d56Sopenharmony_ci 3117db96d56Sopenharmony_ciif hasattr(os, 'listxattr'): 3127db96d56Sopenharmony_ci def _copyxattr(src, dst, *, follow_symlinks=True): 3137db96d56Sopenharmony_ci """Copy extended filesystem attributes from `src` to `dst`. 3147db96d56Sopenharmony_ci 3157db96d56Sopenharmony_ci Overwrite existing attributes. 3167db96d56Sopenharmony_ci 3177db96d56Sopenharmony_ci If `follow_symlinks` is false, symlinks won't be followed. 3187db96d56Sopenharmony_ci 3197db96d56Sopenharmony_ci """ 3207db96d56Sopenharmony_ci 3217db96d56Sopenharmony_ci try: 3227db96d56Sopenharmony_ci names = os.listxattr(src, follow_symlinks=follow_symlinks) 3237db96d56Sopenharmony_ci except OSError as e: 3247db96d56Sopenharmony_ci if e.errno not in (errno.ENOTSUP, errno.ENODATA, errno.EINVAL): 3257db96d56Sopenharmony_ci raise 3267db96d56Sopenharmony_ci return 3277db96d56Sopenharmony_ci for name in names: 3287db96d56Sopenharmony_ci try: 3297db96d56Sopenharmony_ci value = os.getxattr(src, name, follow_symlinks=follow_symlinks) 3307db96d56Sopenharmony_ci os.setxattr(dst, name, value, follow_symlinks=follow_symlinks) 3317db96d56Sopenharmony_ci except OSError as e: 3327db96d56Sopenharmony_ci if e.errno not in (errno.EPERM, errno.ENOTSUP, errno.ENODATA, 3337db96d56Sopenharmony_ci errno.EINVAL): 3347db96d56Sopenharmony_ci raise 3357db96d56Sopenharmony_cielse: 3367db96d56Sopenharmony_ci def _copyxattr(*args, **kwargs): 3377db96d56Sopenharmony_ci pass 3387db96d56Sopenharmony_ci 3397db96d56Sopenharmony_cidef copystat(src, dst, *, follow_symlinks=True): 3407db96d56Sopenharmony_ci """Copy file metadata 3417db96d56Sopenharmony_ci 3427db96d56Sopenharmony_ci Copy the permission bits, last access time, last modification time, and 3437db96d56Sopenharmony_ci flags from `src` to `dst`. On Linux, copystat() also copies the "extended 3447db96d56Sopenharmony_ci attributes" where possible. The file contents, owner, and group are 3457db96d56Sopenharmony_ci unaffected. `src` and `dst` are path-like objects or path names given as 3467db96d56Sopenharmony_ci strings. 3477db96d56Sopenharmony_ci 3487db96d56Sopenharmony_ci If the optional flag `follow_symlinks` is not set, symlinks aren't 3497db96d56Sopenharmony_ci followed if and only if both `src` and `dst` are symlinks. 3507db96d56Sopenharmony_ci """ 3517db96d56Sopenharmony_ci sys.audit("shutil.copystat", src, dst) 3527db96d56Sopenharmony_ci 3537db96d56Sopenharmony_ci def _nop(*args, ns=None, follow_symlinks=None): 3547db96d56Sopenharmony_ci pass 3557db96d56Sopenharmony_ci 3567db96d56Sopenharmony_ci # follow symlinks (aka don't not follow symlinks) 3577db96d56Sopenharmony_ci follow = follow_symlinks or not (_islink(src) and os.path.islink(dst)) 3587db96d56Sopenharmony_ci if follow: 3597db96d56Sopenharmony_ci # use the real function if it exists 3607db96d56Sopenharmony_ci def lookup(name): 3617db96d56Sopenharmony_ci return getattr(os, name, _nop) 3627db96d56Sopenharmony_ci else: 3637db96d56Sopenharmony_ci # use the real function only if it exists 3647db96d56Sopenharmony_ci # *and* it supports follow_symlinks 3657db96d56Sopenharmony_ci def lookup(name): 3667db96d56Sopenharmony_ci fn = getattr(os, name, _nop) 3677db96d56Sopenharmony_ci if fn in os.supports_follow_symlinks: 3687db96d56Sopenharmony_ci return fn 3697db96d56Sopenharmony_ci return _nop 3707db96d56Sopenharmony_ci 3717db96d56Sopenharmony_ci if isinstance(src, os.DirEntry): 3727db96d56Sopenharmony_ci st = src.stat(follow_symlinks=follow) 3737db96d56Sopenharmony_ci else: 3747db96d56Sopenharmony_ci st = lookup("stat")(src, follow_symlinks=follow) 3757db96d56Sopenharmony_ci mode = stat.S_IMODE(st.st_mode) 3767db96d56Sopenharmony_ci lookup("utime")(dst, ns=(st.st_atime_ns, st.st_mtime_ns), 3777db96d56Sopenharmony_ci follow_symlinks=follow) 3787db96d56Sopenharmony_ci # We must copy extended attributes before the file is (potentially) 3797db96d56Sopenharmony_ci # chmod()'ed read-only, otherwise setxattr() will error with -EACCES. 3807db96d56Sopenharmony_ci _copyxattr(src, dst, follow_symlinks=follow) 3817db96d56Sopenharmony_ci try: 3827db96d56Sopenharmony_ci lookup("chmod")(dst, mode, follow_symlinks=follow) 3837db96d56Sopenharmony_ci except NotImplementedError: 3847db96d56Sopenharmony_ci # if we got a NotImplementedError, it's because 3857db96d56Sopenharmony_ci # * follow_symlinks=False, 3867db96d56Sopenharmony_ci # * lchown() is unavailable, and 3877db96d56Sopenharmony_ci # * either 3887db96d56Sopenharmony_ci # * fchownat() is unavailable or 3897db96d56Sopenharmony_ci # * fchownat() doesn't implement AT_SYMLINK_NOFOLLOW. 3907db96d56Sopenharmony_ci # (it returned ENOSUP.) 3917db96d56Sopenharmony_ci # therefore we're out of options--we simply cannot chown the 3927db96d56Sopenharmony_ci # symlink. give up, suppress the error. 3937db96d56Sopenharmony_ci # (which is what shutil always did in this circumstance.) 3947db96d56Sopenharmony_ci pass 3957db96d56Sopenharmony_ci if hasattr(st, 'st_flags'): 3967db96d56Sopenharmony_ci try: 3977db96d56Sopenharmony_ci lookup("chflags")(dst, st.st_flags, follow_symlinks=follow) 3987db96d56Sopenharmony_ci except OSError as why: 3997db96d56Sopenharmony_ci for err in 'EOPNOTSUPP', 'ENOTSUP': 4007db96d56Sopenharmony_ci if hasattr(errno, err) and why.errno == getattr(errno, err): 4017db96d56Sopenharmony_ci break 4027db96d56Sopenharmony_ci else: 4037db96d56Sopenharmony_ci raise 4047db96d56Sopenharmony_ci 4057db96d56Sopenharmony_cidef copy(src, dst, *, follow_symlinks=True): 4067db96d56Sopenharmony_ci """Copy data and mode bits ("cp src dst"). Return the file's destination. 4077db96d56Sopenharmony_ci 4087db96d56Sopenharmony_ci The destination may be a directory. 4097db96d56Sopenharmony_ci 4107db96d56Sopenharmony_ci If follow_symlinks is false, symlinks won't be followed. This 4117db96d56Sopenharmony_ci resembles GNU's "cp -P src dst". 4127db96d56Sopenharmony_ci 4137db96d56Sopenharmony_ci If source and destination are the same file, a SameFileError will be 4147db96d56Sopenharmony_ci raised. 4157db96d56Sopenharmony_ci 4167db96d56Sopenharmony_ci """ 4177db96d56Sopenharmony_ci if os.path.isdir(dst): 4187db96d56Sopenharmony_ci dst = os.path.join(dst, os.path.basename(src)) 4197db96d56Sopenharmony_ci copyfile(src, dst, follow_symlinks=follow_symlinks) 4207db96d56Sopenharmony_ci copymode(src, dst, follow_symlinks=follow_symlinks) 4217db96d56Sopenharmony_ci return dst 4227db96d56Sopenharmony_ci 4237db96d56Sopenharmony_cidef copy2(src, dst, *, follow_symlinks=True): 4247db96d56Sopenharmony_ci """Copy data and metadata. Return the file's destination. 4257db96d56Sopenharmony_ci 4267db96d56Sopenharmony_ci Metadata is copied with copystat(). Please see the copystat function 4277db96d56Sopenharmony_ci for more information. 4287db96d56Sopenharmony_ci 4297db96d56Sopenharmony_ci The destination may be a directory. 4307db96d56Sopenharmony_ci 4317db96d56Sopenharmony_ci If follow_symlinks is false, symlinks won't be followed. This 4327db96d56Sopenharmony_ci resembles GNU's "cp -P src dst". 4337db96d56Sopenharmony_ci """ 4347db96d56Sopenharmony_ci if os.path.isdir(dst): 4357db96d56Sopenharmony_ci dst = os.path.join(dst, os.path.basename(src)) 4367db96d56Sopenharmony_ci copyfile(src, dst, follow_symlinks=follow_symlinks) 4377db96d56Sopenharmony_ci copystat(src, dst, follow_symlinks=follow_symlinks) 4387db96d56Sopenharmony_ci return dst 4397db96d56Sopenharmony_ci 4407db96d56Sopenharmony_cidef ignore_patterns(*patterns): 4417db96d56Sopenharmony_ci """Function that can be used as copytree() ignore parameter. 4427db96d56Sopenharmony_ci 4437db96d56Sopenharmony_ci Patterns is a sequence of glob-style patterns 4447db96d56Sopenharmony_ci that are used to exclude files""" 4457db96d56Sopenharmony_ci def _ignore_patterns(path, names): 4467db96d56Sopenharmony_ci ignored_names = [] 4477db96d56Sopenharmony_ci for pattern in patterns: 4487db96d56Sopenharmony_ci ignored_names.extend(fnmatch.filter(names, pattern)) 4497db96d56Sopenharmony_ci return set(ignored_names) 4507db96d56Sopenharmony_ci return _ignore_patterns 4517db96d56Sopenharmony_ci 4527db96d56Sopenharmony_cidef _copytree(entries, src, dst, symlinks, ignore, copy_function, 4537db96d56Sopenharmony_ci ignore_dangling_symlinks, dirs_exist_ok=False): 4547db96d56Sopenharmony_ci if ignore is not None: 4557db96d56Sopenharmony_ci ignored_names = ignore(os.fspath(src), [x.name for x in entries]) 4567db96d56Sopenharmony_ci else: 4577db96d56Sopenharmony_ci ignored_names = set() 4587db96d56Sopenharmony_ci 4597db96d56Sopenharmony_ci os.makedirs(dst, exist_ok=dirs_exist_ok) 4607db96d56Sopenharmony_ci errors = [] 4617db96d56Sopenharmony_ci use_srcentry = copy_function is copy2 or copy_function is copy 4627db96d56Sopenharmony_ci 4637db96d56Sopenharmony_ci for srcentry in entries: 4647db96d56Sopenharmony_ci if srcentry.name in ignored_names: 4657db96d56Sopenharmony_ci continue 4667db96d56Sopenharmony_ci srcname = os.path.join(src, srcentry.name) 4677db96d56Sopenharmony_ci dstname = os.path.join(dst, srcentry.name) 4687db96d56Sopenharmony_ci srcobj = srcentry if use_srcentry else srcname 4697db96d56Sopenharmony_ci try: 4707db96d56Sopenharmony_ci is_symlink = srcentry.is_symlink() 4717db96d56Sopenharmony_ci if is_symlink and os.name == 'nt': 4727db96d56Sopenharmony_ci # Special check for directory junctions, which appear as 4737db96d56Sopenharmony_ci # symlinks but we want to recurse. 4747db96d56Sopenharmony_ci lstat = srcentry.stat(follow_symlinks=False) 4757db96d56Sopenharmony_ci if lstat.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT: 4767db96d56Sopenharmony_ci is_symlink = False 4777db96d56Sopenharmony_ci if is_symlink: 4787db96d56Sopenharmony_ci linkto = os.readlink(srcname) 4797db96d56Sopenharmony_ci if symlinks: 4807db96d56Sopenharmony_ci # We can't just leave it to `copy_function` because legacy 4817db96d56Sopenharmony_ci # code with a custom `copy_function` may rely on copytree 4827db96d56Sopenharmony_ci # doing the right thing. 4837db96d56Sopenharmony_ci os.symlink(linkto, dstname) 4847db96d56Sopenharmony_ci copystat(srcobj, dstname, follow_symlinks=not symlinks) 4857db96d56Sopenharmony_ci else: 4867db96d56Sopenharmony_ci # ignore dangling symlink if the flag is on 4877db96d56Sopenharmony_ci if not os.path.exists(linkto) and ignore_dangling_symlinks: 4887db96d56Sopenharmony_ci continue 4897db96d56Sopenharmony_ci # otherwise let the copy occur. copy2 will raise an error 4907db96d56Sopenharmony_ci if srcentry.is_dir(): 4917db96d56Sopenharmony_ci copytree(srcobj, dstname, symlinks, ignore, 4927db96d56Sopenharmony_ci copy_function, ignore_dangling_symlinks, 4937db96d56Sopenharmony_ci dirs_exist_ok) 4947db96d56Sopenharmony_ci else: 4957db96d56Sopenharmony_ci copy_function(srcobj, dstname) 4967db96d56Sopenharmony_ci elif srcentry.is_dir(): 4977db96d56Sopenharmony_ci copytree(srcobj, dstname, symlinks, ignore, copy_function, 4987db96d56Sopenharmony_ci ignore_dangling_symlinks, dirs_exist_ok) 4997db96d56Sopenharmony_ci else: 5007db96d56Sopenharmony_ci # Will raise a SpecialFileError for unsupported file types 5017db96d56Sopenharmony_ci copy_function(srcobj, dstname) 5027db96d56Sopenharmony_ci # catch the Error from the recursive copytree so that we can 5037db96d56Sopenharmony_ci # continue with other files 5047db96d56Sopenharmony_ci except Error as err: 5057db96d56Sopenharmony_ci errors.extend(err.args[0]) 5067db96d56Sopenharmony_ci except OSError as why: 5077db96d56Sopenharmony_ci errors.append((srcname, dstname, str(why))) 5087db96d56Sopenharmony_ci try: 5097db96d56Sopenharmony_ci copystat(src, dst) 5107db96d56Sopenharmony_ci except OSError as why: 5117db96d56Sopenharmony_ci # Copying file access times may fail on Windows 5127db96d56Sopenharmony_ci if getattr(why, 'winerror', None) is None: 5137db96d56Sopenharmony_ci errors.append((src, dst, str(why))) 5147db96d56Sopenharmony_ci if errors: 5157db96d56Sopenharmony_ci raise Error(errors) 5167db96d56Sopenharmony_ci return dst 5177db96d56Sopenharmony_ci 5187db96d56Sopenharmony_cidef copytree(src, dst, symlinks=False, ignore=None, copy_function=copy2, 5197db96d56Sopenharmony_ci ignore_dangling_symlinks=False, dirs_exist_ok=False): 5207db96d56Sopenharmony_ci """Recursively copy a directory tree and return the destination directory. 5217db96d56Sopenharmony_ci 5227db96d56Sopenharmony_ci If exception(s) occur, an Error is raised with a list of reasons. 5237db96d56Sopenharmony_ci 5247db96d56Sopenharmony_ci If the optional symlinks flag is true, symbolic links in the 5257db96d56Sopenharmony_ci source tree result in symbolic links in the destination tree; if 5267db96d56Sopenharmony_ci it is false, the contents of the files pointed to by symbolic 5277db96d56Sopenharmony_ci links are copied. If the file pointed by the symlink doesn't 5287db96d56Sopenharmony_ci exist, an exception will be added in the list of errors raised in 5297db96d56Sopenharmony_ci an Error exception at the end of the copy process. 5307db96d56Sopenharmony_ci 5317db96d56Sopenharmony_ci You can set the optional ignore_dangling_symlinks flag to true if you 5327db96d56Sopenharmony_ci want to silence this exception. Notice that this has no effect on 5337db96d56Sopenharmony_ci platforms that don't support os.symlink. 5347db96d56Sopenharmony_ci 5357db96d56Sopenharmony_ci The optional ignore argument is a callable. If given, it 5367db96d56Sopenharmony_ci is called with the `src` parameter, which is the directory 5377db96d56Sopenharmony_ci being visited by copytree(), and `names` which is the list of 5387db96d56Sopenharmony_ci `src` contents, as returned by os.listdir(): 5397db96d56Sopenharmony_ci 5407db96d56Sopenharmony_ci callable(src, names) -> ignored_names 5417db96d56Sopenharmony_ci 5427db96d56Sopenharmony_ci Since copytree() is called recursively, the callable will be 5437db96d56Sopenharmony_ci called once for each directory that is copied. It returns a 5447db96d56Sopenharmony_ci list of names relative to the `src` directory that should 5457db96d56Sopenharmony_ci not be copied. 5467db96d56Sopenharmony_ci 5477db96d56Sopenharmony_ci The optional copy_function argument is a callable that will be used 5487db96d56Sopenharmony_ci to copy each file. It will be called with the source path and the 5497db96d56Sopenharmony_ci destination path as arguments. By default, copy2() is used, but any 5507db96d56Sopenharmony_ci function that supports the same signature (like copy()) can be used. 5517db96d56Sopenharmony_ci 5527db96d56Sopenharmony_ci If dirs_exist_ok is false (the default) and `dst` already exists, a 5537db96d56Sopenharmony_ci `FileExistsError` is raised. If `dirs_exist_ok` is true, the copying 5547db96d56Sopenharmony_ci operation will continue if it encounters existing directories, and files 5557db96d56Sopenharmony_ci within the `dst` tree will be overwritten by corresponding files from the 5567db96d56Sopenharmony_ci `src` tree. 5577db96d56Sopenharmony_ci """ 5587db96d56Sopenharmony_ci sys.audit("shutil.copytree", src, dst) 5597db96d56Sopenharmony_ci with os.scandir(src) as itr: 5607db96d56Sopenharmony_ci entries = list(itr) 5617db96d56Sopenharmony_ci return _copytree(entries=entries, src=src, dst=dst, symlinks=symlinks, 5627db96d56Sopenharmony_ci ignore=ignore, copy_function=copy_function, 5637db96d56Sopenharmony_ci ignore_dangling_symlinks=ignore_dangling_symlinks, 5647db96d56Sopenharmony_ci dirs_exist_ok=dirs_exist_ok) 5657db96d56Sopenharmony_ci 5667db96d56Sopenharmony_ciif hasattr(os.stat_result, 'st_file_attributes'): 5677db96d56Sopenharmony_ci # Special handling for directory junctions to make them behave like 5687db96d56Sopenharmony_ci # symlinks for shutil.rmtree, since in general they do not appear as 5697db96d56Sopenharmony_ci # regular links. 5707db96d56Sopenharmony_ci def _rmtree_isdir(entry): 5717db96d56Sopenharmony_ci try: 5727db96d56Sopenharmony_ci st = entry.stat(follow_symlinks=False) 5737db96d56Sopenharmony_ci return (stat.S_ISDIR(st.st_mode) and not 5747db96d56Sopenharmony_ci (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 5757db96d56Sopenharmony_ci and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 5767db96d56Sopenharmony_ci except OSError: 5777db96d56Sopenharmony_ci return False 5787db96d56Sopenharmony_ci 5797db96d56Sopenharmony_ci def _rmtree_islink(path): 5807db96d56Sopenharmony_ci try: 5817db96d56Sopenharmony_ci st = os.lstat(path) 5827db96d56Sopenharmony_ci return (stat.S_ISLNK(st.st_mode) or 5837db96d56Sopenharmony_ci (st.st_file_attributes & stat.FILE_ATTRIBUTE_REPARSE_POINT 5847db96d56Sopenharmony_ci and st.st_reparse_tag == stat.IO_REPARSE_TAG_MOUNT_POINT)) 5857db96d56Sopenharmony_ci except OSError: 5867db96d56Sopenharmony_ci return False 5877db96d56Sopenharmony_cielse: 5887db96d56Sopenharmony_ci def _rmtree_isdir(entry): 5897db96d56Sopenharmony_ci try: 5907db96d56Sopenharmony_ci return entry.is_dir(follow_symlinks=False) 5917db96d56Sopenharmony_ci except OSError: 5927db96d56Sopenharmony_ci return False 5937db96d56Sopenharmony_ci 5947db96d56Sopenharmony_ci def _rmtree_islink(path): 5957db96d56Sopenharmony_ci return os.path.islink(path) 5967db96d56Sopenharmony_ci 5977db96d56Sopenharmony_ci# version vulnerable to race conditions 5987db96d56Sopenharmony_cidef _rmtree_unsafe(path, onerror): 5997db96d56Sopenharmony_ci try: 6007db96d56Sopenharmony_ci with os.scandir(path) as scandir_it: 6017db96d56Sopenharmony_ci entries = list(scandir_it) 6027db96d56Sopenharmony_ci except OSError: 6037db96d56Sopenharmony_ci onerror(os.scandir, path, sys.exc_info()) 6047db96d56Sopenharmony_ci entries = [] 6057db96d56Sopenharmony_ci for entry in entries: 6067db96d56Sopenharmony_ci fullname = entry.path 6077db96d56Sopenharmony_ci if _rmtree_isdir(entry): 6087db96d56Sopenharmony_ci try: 6097db96d56Sopenharmony_ci if entry.is_symlink(): 6107db96d56Sopenharmony_ci # This can only happen if someone replaces 6117db96d56Sopenharmony_ci # a directory with a symlink after the call to 6127db96d56Sopenharmony_ci # os.scandir or entry.is_dir above. 6137db96d56Sopenharmony_ci raise OSError("Cannot call rmtree on a symbolic link") 6147db96d56Sopenharmony_ci except OSError: 6157db96d56Sopenharmony_ci onerror(os.path.islink, fullname, sys.exc_info()) 6167db96d56Sopenharmony_ci continue 6177db96d56Sopenharmony_ci _rmtree_unsafe(fullname, onerror) 6187db96d56Sopenharmony_ci else: 6197db96d56Sopenharmony_ci try: 6207db96d56Sopenharmony_ci os.unlink(fullname) 6217db96d56Sopenharmony_ci except OSError: 6227db96d56Sopenharmony_ci onerror(os.unlink, fullname, sys.exc_info()) 6237db96d56Sopenharmony_ci try: 6247db96d56Sopenharmony_ci os.rmdir(path) 6257db96d56Sopenharmony_ci except OSError: 6267db96d56Sopenharmony_ci onerror(os.rmdir, path, sys.exc_info()) 6277db96d56Sopenharmony_ci 6287db96d56Sopenharmony_ci# Version using fd-based APIs to protect against races 6297db96d56Sopenharmony_cidef _rmtree_safe_fd(topfd, path, onerror): 6307db96d56Sopenharmony_ci try: 6317db96d56Sopenharmony_ci with os.scandir(topfd) as scandir_it: 6327db96d56Sopenharmony_ci entries = list(scandir_it) 6337db96d56Sopenharmony_ci except OSError as err: 6347db96d56Sopenharmony_ci err.filename = path 6357db96d56Sopenharmony_ci onerror(os.scandir, path, sys.exc_info()) 6367db96d56Sopenharmony_ci return 6377db96d56Sopenharmony_ci for entry in entries: 6387db96d56Sopenharmony_ci fullname = os.path.join(path, entry.name) 6397db96d56Sopenharmony_ci try: 6407db96d56Sopenharmony_ci is_dir = entry.is_dir(follow_symlinks=False) 6417db96d56Sopenharmony_ci except OSError: 6427db96d56Sopenharmony_ci is_dir = False 6437db96d56Sopenharmony_ci else: 6447db96d56Sopenharmony_ci if is_dir: 6457db96d56Sopenharmony_ci try: 6467db96d56Sopenharmony_ci orig_st = entry.stat(follow_symlinks=False) 6477db96d56Sopenharmony_ci is_dir = stat.S_ISDIR(orig_st.st_mode) 6487db96d56Sopenharmony_ci except OSError: 6497db96d56Sopenharmony_ci onerror(os.lstat, fullname, sys.exc_info()) 6507db96d56Sopenharmony_ci continue 6517db96d56Sopenharmony_ci if is_dir: 6527db96d56Sopenharmony_ci try: 6537db96d56Sopenharmony_ci dirfd = os.open(entry.name, os.O_RDONLY, dir_fd=topfd) 6547db96d56Sopenharmony_ci dirfd_closed = False 6557db96d56Sopenharmony_ci except OSError: 6567db96d56Sopenharmony_ci onerror(os.open, fullname, sys.exc_info()) 6577db96d56Sopenharmony_ci else: 6587db96d56Sopenharmony_ci try: 6597db96d56Sopenharmony_ci if os.path.samestat(orig_st, os.fstat(dirfd)): 6607db96d56Sopenharmony_ci _rmtree_safe_fd(dirfd, fullname, onerror) 6617db96d56Sopenharmony_ci try: 6627db96d56Sopenharmony_ci os.close(dirfd) 6637db96d56Sopenharmony_ci dirfd_closed = True 6647db96d56Sopenharmony_ci os.rmdir(entry.name, dir_fd=topfd) 6657db96d56Sopenharmony_ci except OSError: 6667db96d56Sopenharmony_ci onerror(os.rmdir, fullname, sys.exc_info()) 6677db96d56Sopenharmony_ci else: 6687db96d56Sopenharmony_ci try: 6697db96d56Sopenharmony_ci # This can only happen if someone replaces 6707db96d56Sopenharmony_ci # a directory with a symlink after the call to 6717db96d56Sopenharmony_ci # os.scandir or stat.S_ISDIR above. 6727db96d56Sopenharmony_ci raise OSError("Cannot call rmtree on a symbolic " 6737db96d56Sopenharmony_ci "link") 6747db96d56Sopenharmony_ci except OSError: 6757db96d56Sopenharmony_ci onerror(os.path.islink, fullname, sys.exc_info()) 6767db96d56Sopenharmony_ci finally: 6777db96d56Sopenharmony_ci if not dirfd_closed: 6787db96d56Sopenharmony_ci os.close(dirfd) 6797db96d56Sopenharmony_ci else: 6807db96d56Sopenharmony_ci try: 6817db96d56Sopenharmony_ci os.unlink(entry.name, dir_fd=topfd) 6827db96d56Sopenharmony_ci except OSError: 6837db96d56Sopenharmony_ci onerror(os.unlink, fullname, sys.exc_info()) 6847db96d56Sopenharmony_ci 6857db96d56Sopenharmony_ci_use_fd_functions = ({os.open, os.stat, os.unlink, os.rmdir} <= 6867db96d56Sopenharmony_ci os.supports_dir_fd and 6877db96d56Sopenharmony_ci os.scandir in os.supports_fd and 6887db96d56Sopenharmony_ci os.stat in os.supports_follow_symlinks) 6897db96d56Sopenharmony_ci 6907db96d56Sopenharmony_cidef rmtree(path, ignore_errors=False, onerror=None, *, dir_fd=None): 6917db96d56Sopenharmony_ci """Recursively delete a directory tree. 6927db96d56Sopenharmony_ci 6937db96d56Sopenharmony_ci If dir_fd is not None, it should be a file descriptor open to a directory; 6947db96d56Sopenharmony_ci path will then be relative to that directory. 6957db96d56Sopenharmony_ci dir_fd may not be implemented on your platform. 6967db96d56Sopenharmony_ci If it is unavailable, using it will raise a NotImplementedError. 6977db96d56Sopenharmony_ci 6987db96d56Sopenharmony_ci If ignore_errors is set, errors are ignored; otherwise, if onerror 6997db96d56Sopenharmony_ci is set, it is called to handle the error with arguments (func, 7007db96d56Sopenharmony_ci path, exc_info) where func is platform and implementation dependent; 7017db96d56Sopenharmony_ci path is the argument to that function that caused it to fail; and 7027db96d56Sopenharmony_ci exc_info is a tuple returned by sys.exc_info(). If ignore_errors 7037db96d56Sopenharmony_ci is false and onerror is None, an exception is raised. 7047db96d56Sopenharmony_ci 7057db96d56Sopenharmony_ci """ 7067db96d56Sopenharmony_ci sys.audit("shutil.rmtree", path, dir_fd) 7077db96d56Sopenharmony_ci if ignore_errors: 7087db96d56Sopenharmony_ci def onerror(*args): 7097db96d56Sopenharmony_ci pass 7107db96d56Sopenharmony_ci elif onerror is None: 7117db96d56Sopenharmony_ci def onerror(*args): 7127db96d56Sopenharmony_ci raise 7137db96d56Sopenharmony_ci if _use_fd_functions: 7147db96d56Sopenharmony_ci # While the unsafe rmtree works fine on bytes, the fd based does not. 7157db96d56Sopenharmony_ci if isinstance(path, bytes): 7167db96d56Sopenharmony_ci path = os.fsdecode(path) 7177db96d56Sopenharmony_ci # Note: To guard against symlink races, we use the standard 7187db96d56Sopenharmony_ci # lstat()/open()/fstat() trick. 7197db96d56Sopenharmony_ci try: 7207db96d56Sopenharmony_ci orig_st = os.lstat(path, dir_fd=dir_fd) 7217db96d56Sopenharmony_ci except Exception: 7227db96d56Sopenharmony_ci onerror(os.lstat, path, sys.exc_info()) 7237db96d56Sopenharmony_ci return 7247db96d56Sopenharmony_ci try: 7257db96d56Sopenharmony_ci fd = os.open(path, os.O_RDONLY, dir_fd=dir_fd) 7267db96d56Sopenharmony_ci fd_closed = False 7277db96d56Sopenharmony_ci except Exception: 7287db96d56Sopenharmony_ci onerror(os.open, path, sys.exc_info()) 7297db96d56Sopenharmony_ci return 7307db96d56Sopenharmony_ci try: 7317db96d56Sopenharmony_ci if os.path.samestat(orig_st, os.fstat(fd)): 7327db96d56Sopenharmony_ci _rmtree_safe_fd(fd, path, onerror) 7337db96d56Sopenharmony_ci try: 7347db96d56Sopenharmony_ci os.close(fd) 7357db96d56Sopenharmony_ci fd_closed = True 7367db96d56Sopenharmony_ci os.rmdir(path, dir_fd=dir_fd) 7377db96d56Sopenharmony_ci except OSError: 7387db96d56Sopenharmony_ci onerror(os.rmdir, path, sys.exc_info()) 7397db96d56Sopenharmony_ci else: 7407db96d56Sopenharmony_ci try: 7417db96d56Sopenharmony_ci # symlinks to directories are forbidden, see bug #1669 7427db96d56Sopenharmony_ci raise OSError("Cannot call rmtree on a symbolic link") 7437db96d56Sopenharmony_ci except OSError: 7447db96d56Sopenharmony_ci onerror(os.path.islink, path, sys.exc_info()) 7457db96d56Sopenharmony_ci finally: 7467db96d56Sopenharmony_ci if not fd_closed: 7477db96d56Sopenharmony_ci os.close(fd) 7487db96d56Sopenharmony_ci else: 7497db96d56Sopenharmony_ci if dir_fd is not None: 7507db96d56Sopenharmony_ci raise NotImplementedError("dir_fd unavailable on this platform") 7517db96d56Sopenharmony_ci try: 7527db96d56Sopenharmony_ci if _rmtree_islink(path): 7537db96d56Sopenharmony_ci # symlinks to directories are forbidden, see bug #1669 7547db96d56Sopenharmony_ci raise OSError("Cannot call rmtree on a symbolic link") 7557db96d56Sopenharmony_ci except OSError: 7567db96d56Sopenharmony_ci onerror(os.path.islink, path, sys.exc_info()) 7577db96d56Sopenharmony_ci # can't continue even if onerror hook returns 7587db96d56Sopenharmony_ci return 7597db96d56Sopenharmony_ci return _rmtree_unsafe(path, onerror) 7607db96d56Sopenharmony_ci 7617db96d56Sopenharmony_ci# Allow introspection of whether or not the hardening against symlink 7627db96d56Sopenharmony_ci# attacks is supported on the current platform 7637db96d56Sopenharmony_cirmtree.avoids_symlink_attacks = _use_fd_functions 7647db96d56Sopenharmony_ci 7657db96d56Sopenharmony_cidef _basename(path): 7667db96d56Sopenharmony_ci """A basename() variant which first strips the trailing slash, if present. 7677db96d56Sopenharmony_ci Thus we always get the last component of the path, even for directories. 7687db96d56Sopenharmony_ci 7697db96d56Sopenharmony_ci path: Union[PathLike, str] 7707db96d56Sopenharmony_ci 7717db96d56Sopenharmony_ci e.g. 7727db96d56Sopenharmony_ci >>> os.path.basename('/bar/foo') 7737db96d56Sopenharmony_ci 'foo' 7747db96d56Sopenharmony_ci >>> os.path.basename('/bar/foo/') 7757db96d56Sopenharmony_ci '' 7767db96d56Sopenharmony_ci >>> _basename('/bar/foo/') 7777db96d56Sopenharmony_ci 'foo' 7787db96d56Sopenharmony_ci """ 7797db96d56Sopenharmony_ci path = os.fspath(path) 7807db96d56Sopenharmony_ci sep = os.path.sep + (os.path.altsep or '') 7817db96d56Sopenharmony_ci return os.path.basename(path.rstrip(sep)) 7827db96d56Sopenharmony_ci 7837db96d56Sopenharmony_cidef move(src, dst, copy_function=copy2): 7847db96d56Sopenharmony_ci """Recursively move a file or directory to another location. This is 7857db96d56Sopenharmony_ci similar to the Unix "mv" command. Return the file or directory's 7867db96d56Sopenharmony_ci destination. 7877db96d56Sopenharmony_ci 7887db96d56Sopenharmony_ci If the destination is a directory or a symlink to a directory, the source 7897db96d56Sopenharmony_ci is moved inside the directory. The destination path must not already 7907db96d56Sopenharmony_ci exist. 7917db96d56Sopenharmony_ci 7927db96d56Sopenharmony_ci If the destination already exists but is not a directory, it may be 7937db96d56Sopenharmony_ci overwritten depending on os.rename() semantics. 7947db96d56Sopenharmony_ci 7957db96d56Sopenharmony_ci If the destination is on our current filesystem, then rename() is used. 7967db96d56Sopenharmony_ci Otherwise, src is copied to the destination and then removed. Symlinks are 7977db96d56Sopenharmony_ci recreated under the new name if os.rename() fails because of cross 7987db96d56Sopenharmony_ci filesystem renames. 7997db96d56Sopenharmony_ci 8007db96d56Sopenharmony_ci The optional `copy_function` argument is a callable that will be used 8017db96d56Sopenharmony_ci to copy the source or it will be delegated to `copytree`. 8027db96d56Sopenharmony_ci By default, copy2() is used, but any function that supports the same 8037db96d56Sopenharmony_ci signature (like copy()) can be used. 8047db96d56Sopenharmony_ci 8057db96d56Sopenharmony_ci A lot more could be done here... A look at a mv.c shows a lot of 8067db96d56Sopenharmony_ci the issues this implementation glosses over. 8077db96d56Sopenharmony_ci 8087db96d56Sopenharmony_ci """ 8097db96d56Sopenharmony_ci sys.audit("shutil.move", src, dst) 8107db96d56Sopenharmony_ci real_dst = dst 8117db96d56Sopenharmony_ci if os.path.isdir(dst): 8127db96d56Sopenharmony_ci if _samefile(src, dst): 8137db96d56Sopenharmony_ci # We might be on a case insensitive filesystem, 8147db96d56Sopenharmony_ci # perform the rename anyway. 8157db96d56Sopenharmony_ci os.rename(src, dst) 8167db96d56Sopenharmony_ci return 8177db96d56Sopenharmony_ci 8187db96d56Sopenharmony_ci # Using _basename instead of os.path.basename is important, as we must 8197db96d56Sopenharmony_ci # ignore any trailing slash to avoid the basename returning '' 8207db96d56Sopenharmony_ci real_dst = os.path.join(dst, _basename(src)) 8217db96d56Sopenharmony_ci 8227db96d56Sopenharmony_ci if os.path.exists(real_dst): 8237db96d56Sopenharmony_ci raise Error("Destination path '%s' already exists" % real_dst) 8247db96d56Sopenharmony_ci try: 8257db96d56Sopenharmony_ci os.rename(src, real_dst) 8267db96d56Sopenharmony_ci except OSError: 8277db96d56Sopenharmony_ci if os.path.islink(src): 8287db96d56Sopenharmony_ci linkto = os.readlink(src) 8297db96d56Sopenharmony_ci os.symlink(linkto, real_dst) 8307db96d56Sopenharmony_ci os.unlink(src) 8317db96d56Sopenharmony_ci elif os.path.isdir(src): 8327db96d56Sopenharmony_ci if _destinsrc(src, dst): 8337db96d56Sopenharmony_ci raise Error("Cannot move a directory '%s' into itself" 8347db96d56Sopenharmony_ci " '%s'." % (src, dst)) 8357db96d56Sopenharmony_ci if (_is_immutable(src) 8367db96d56Sopenharmony_ci or (not os.access(src, os.W_OK) and os.listdir(src) 8377db96d56Sopenharmony_ci and sys.platform == 'darwin')): 8387db96d56Sopenharmony_ci raise PermissionError("Cannot move the non-empty directory " 8397db96d56Sopenharmony_ci "'%s': Lacking write permission to '%s'." 8407db96d56Sopenharmony_ci % (src, src)) 8417db96d56Sopenharmony_ci copytree(src, real_dst, copy_function=copy_function, 8427db96d56Sopenharmony_ci symlinks=True) 8437db96d56Sopenharmony_ci rmtree(src) 8447db96d56Sopenharmony_ci else: 8457db96d56Sopenharmony_ci copy_function(src, real_dst) 8467db96d56Sopenharmony_ci os.unlink(src) 8477db96d56Sopenharmony_ci return real_dst 8487db96d56Sopenharmony_ci 8497db96d56Sopenharmony_cidef _destinsrc(src, dst): 8507db96d56Sopenharmony_ci src = os.path.abspath(src) 8517db96d56Sopenharmony_ci dst = os.path.abspath(dst) 8527db96d56Sopenharmony_ci if not src.endswith(os.path.sep): 8537db96d56Sopenharmony_ci src += os.path.sep 8547db96d56Sopenharmony_ci if not dst.endswith(os.path.sep): 8557db96d56Sopenharmony_ci dst += os.path.sep 8567db96d56Sopenharmony_ci return dst.startswith(src) 8577db96d56Sopenharmony_ci 8587db96d56Sopenharmony_cidef _is_immutable(src): 8597db96d56Sopenharmony_ci st = _stat(src) 8607db96d56Sopenharmony_ci immutable_states = [stat.UF_IMMUTABLE, stat.SF_IMMUTABLE] 8617db96d56Sopenharmony_ci return hasattr(st, 'st_flags') and st.st_flags in immutable_states 8627db96d56Sopenharmony_ci 8637db96d56Sopenharmony_cidef _get_gid(name): 8647db96d56Sopenharmony_ci """Returns a gid, given a group name.""" 8657db96d56Sopenharmony_ci if name is None: 8667db96d56Sopenharmony_ci return None 8677db96d56Sopenharmony_ci 8687db96d56Sopenharmony_ci try: 8697db96d56Sopenharmony_ci from grp import getgrnam 8707db96d56Sopenharmony_ci except ImportError: 8717db96d56Sopenharmony_ci return None 8727db96d56Sopenharmony_ci 8737db96d56Sopenharmony_ci try: 8747db96d56Sopenharmony_ci result = getgrnam(name) 8757db96d56Sopenharmony_ci except KeyError: 8767db96d56Sopenharmony_ci result = None 8777db96d56Sopenharmony_ci if result is not None: 8787db96d56Sopenharmony_ci return result[2] 8797db96d56Sopenharmony_ci return None 8807db96d56Sopenharmony_ci 8817db96d56Sopenharmony_cidef _get_uid(name): 8827db96d56Sopenharmony_ci """Returns an uid, given a user name.""" 8837db96d56Sopenharmony_ci if name is None: 8847db96d56Sopenharmony_ci return None 8857db96d56Sopenharmony_ci 8867db96d56Sopenharmony_ci try: 8877db96d56Sopenharmony_ci from pwd import getpwnam 8887db96d56Sopenharmony_ci except ImportError: 8897db96d56Sopenharmony_ci return None 8907db96d56Sopenharmony_ci 8917db96d56Sopenharmony_ci try: 8927db96d56Sopenharmony_ci result = getpwnam(name) 8937db96d56Sopenharmony_ci except KeyError: 8947db96d56Sopenharmony_ci result = None 8957db96d56Sopenharmony_ci if result is not None: 8967db96d56Sopenharmony_ci return result[2] 8977db96d56Sopenharmony_ci return None 8987db96d56Sopenharmony_ci 8997db96d56Sopenharmony_cidef _make_tarball(base_name, base_dir, compress="gzip", verbose=0, dry_run=0, 9007db96d56Sopenharmony_ci owner=None, group=None, logger=None, root_dir=None): 9017db96d56Sopenharmony_ci """Create a (possibly compressed) tar file from all the files under 9027db96d56Sopenharmony_ci 'base_dir'. 9037db96d56Sopenharmony_ci 9047db96d56Sopenharmony_ci 'compress' must be "gzip" (the default), "bzip2", "xz", or None. 9057db96d56Sopenharmony_ci 9067db96d56Sopenharmony_ci 'owner' and 'group' can be used to define an owner and a group for the 9077db96d56Sopenharmony_ci archive that is being built. If not provided, the current owner and group 9087db96d56Sopenharmony_ci will be used. 9097db96d56Sopenharmony_ci 9107db96d56Sopenharmony_ci The output tar file will be named 'base_name' + ".tar", possibly plus 9117db96d56Sopenharmony_ci the appropriate compression extension (".gz", ".bz2", or ".xz"). 9127db96d56Sopenharmony_ci 9137db96d56Sopenharmony_ci Returns the output filename. 9147db96d56Sopenharmony_ci """ 9157db96d56Sopenharmony_ci if compress is None: 9167db96d56Sopenharmony_ci tar_compression = '' 9177db96d56Sopenharmony_ci elif _ZLIB_SUPPORTED and compress == 'gzip': 9187db96d56Sopenharmony_ci tar_compression = 'gz' 9197db96d56Sopenharmony_ci elif _BZ2_SUPPORTED and compress == 'bzip2': 9207db96d56Sopenharmony_ci tar_compression = 'bz2' 9217db96d56Sopenharmony_ci elif _LZMA_SUPPORTED and compress == 'xz': 9227db96d56Sopenharmony_ci tar_compression = 'xz' 9237db96d56Sopenharmony_ci else: 9247db96d56Sopenharmony_ci raise ValueError("bad value for 'compress', or compression format not " 9257db96d56Sopenharmony_ci "supported : {0}".format(compress)) 9267db96d56Sopenharmony_ci 9277db96d56Sopenharmony_ci import tarfile # late import for breaking circular dependency 9287db96d56Sopenharmony_ci 9297db96d56Sopenharmony_ci compress_ext = '.' + tar_compression if compress else '' 9307db96d56Sopenharmony_ci archive_name = base_name + '.tar' + compress_ext 9317db96d56Sopenharmony_ci archive_dir = os.path.dirname(archive_name) 9327db96d56Sopenharmony_ci 9337db96d56Sopenharmony_ci if archive_dir and not os.path.exists(archive_dir): 9347db96d56Sopenharmony_ci if logger is not None: 9357db96d56Sopenharmony_ci logger.info("creating %s", archive_dir) 9367db96d56Sopenharmony_ci if not dry_run: 9377db96d56Sopenharmony_ci os.makedirs(archive_dir) 9387db96d56Sopenharmony_ci 9397db96d56Sopenharmony_ci # creating the tarball 9407db96d56Sopenharmony_ci if logger is not None: 9417db96d56Sopenharmony_ci logger.info('Creating tar archive') 9427db96d56Sopenharmony_ci 9437db96d56Sopenharmony_ci uid = _get_uid(owner) 9447db96d56Sopenharmony_ci gid = _get_gid(group) 9457db96d56Sopenharmony_ci 9467db96d56Sopenharmony_ci def _set_uid_gid(tarinfo): 9477db96d56Sopenharmony_ci if gid is not None: 9487db96d56Sopenharmony_ci tarinfo.gid = gid 9497db96d56Sopenharmony_ci tarinfo.gname = group 9507db96d56Sopenharmony_ci if uid is not None: 9517db96d56Sopenharmony_ci tarinfo.uid = uid 9527db96d56Sopenharmony_ci tarinfo.uname = owner 9537db96d56Sopenharmony_ci return tarinfo 9547db96d56Sopenharmony_ci 9557db96d56Sopenharmony_ci if not dry_run: 9567db96d56Sopenharmony_ci tar = tarfile.open(archive_name, 'w|%s' % tar_compression) 9577db96d56Sopenharmony_ci arcname = base_dir 9587db96d56Sopenharmony_ci if root_dir is not None: 9597db96d56Sopenharmony_ci base_dir = os.path.join(root_dir, base_dir) 9607db96d56Sopenharmony_ci try: 9617db96d56Sopenharmony_ci tar.add(base_dir, arcname, filter=_set_uid_gid) 9627db96d56Sopenharmony_ci finally: 9637db96d56Sopenharmony_ci tar.close() 9647db96d56Sopenharmony_ci 9657db96d56Sopenharmony_ci if root_dir is not None: 9667db96d56Sopenharmony_ci archive_name = os.path.abspath(archive_name) 9677db96d56Sopenharmony_ci return archive_name 9687db96d56Sopenharmony_ci 9697db96d56Sopenharmony_cidef _make_zipfile(base_name, base_dir, verbose=0, dry_run=0, 9707db96d56Sopenharmony_ci logger=None, owner=None, group=None, root_dir=None): 9717db96d56Sopenharmony_ci """Create a zip file from all the files under 'base_dir'. 9727db96d56Sopenharmony_ci 9737db96d56Sopenharmony_ci The output zip file will be named 'base_name' + ".zip". Returns the 9747db96d56Sopenharmony_ci name of the output zip file. 9757db96d56Sopenharmony_ci """ 9767db96d56Sopenharmony_ci import zipfile # late import for breaking circular dependency 9777db96d56Sopenharmony_ci 9787db96d56Sopenharmony_ci zip_filename = base_name + ".zip" 9797db96d56Sopenharmony_ci archive_dir = os.path.dirname(base_name) 9807db96d56Sopenharmony_ci 9817db96d56Sopenharmony_ci if archive_dir and not os.path.exists(archive_dir): 9827db96d56Sopenharmony_ci if logger is not None: 9837db96d56Sopenharmony_ci logger.info("creating %s", archive_dir) 9847db96d56Sopenharmony_ci if not dry_run: 9857db96d56Sopenharmony_ci os.makedirs(archive_dir) 9867db96d56Sopenharmony_ci 9877db96d56Sopenharmony_ci if logger is not None: 9887db96d56Sopenharmony_ci logger.info("creating '%s' and adding '%s' to it", 9897db96d56Sopenharmony_ci zip_filename, base_dir) 9907db96d56Sopenharmony_ci 9917db96d56Sopenharmony_ci if not dry_run: 9927db96d56Sopenharmony_ci with zipfile.ZipFile(zip_filename, "w", 9937db96d56Sopenharmony_ci compression=zipfile.ZIP_DEFLATED) as zf: 9947db96d56Sopenharmony_ci arcname = os.path.normpath(base_dir) 9957db96d56Sopenharmony_ci if root_dir is not None: 9967db96d56Sopenharmony_ci base_dir = os.path.join(root_dir, base_dir) 9977db96d56Sopenharmony_ci base_dir = os.path.normpath(base_dir) 9987db96d56Sopenharmony_ci if arcname != os.curdir: 9997db96d56Sopenharmony_ci zf.write(base_dir, arcname) 10007db96d56Sopenharmony_ci if logger is not None: 10017db96d56Sopenharmony_ci logger.info("adding '%s'", base_dir) 10027db96d56Sopenharmony_ci for dirpath, dirnames, filenames in os.walk(base_dir): 10037db96d56Sopenharmony_ci arcdirpath = dirpath 10047db96d56Sopenharmony_ci if root_dir is not None: 10057db96d56Sopenharmony_ci arcdirpath = os.path.relpath(arcdirpath, root_dir) 10067db96d56Sopenharmony_ci arcdirpath = os.path.normpath(arcdirpath) 10077db96d56Sopenharmony_ci for name in sorted(dirnames): 10087db96d56Sopenharmony_ci path = os.path.join(dirpath, name) 10097db96d56Sopenharmony_ci arcname = os.path.join(arcdirpath, name) 10107db96d56Sopenharmony_ci zf.write(path, arcname) 10117db96d56Sopenharmony_ci if logger is not None: 10127db96d56Sopenharmony_ci logger.info("adding '%s'", path) 10137db96d56Sopenharmony_ci for name in filenames: 10147db96d56Sopenharmony_ci path = os.path.join(dirpath, name) 10157db96d56Sopenharmony_ci path = os.path.normpath(path) 10167db96d56Sopenharmony_ci if os.path.isfile(path): 10177db96d56Sopenharmony_ci arcname = os.path.join(arcdirpath, name) 10187db96d56Sopenharmony_ci zf.write(path, arcname) 10197db96d56Sopenharmony_ci if logger is not None: 10207db96d56Sopenharmony_ci logger.info("adding '%s'", path) 10217db96d56Sopenharmony_ci 10227db96d56Sopenharmony_ci if root_dir is not None: 10237db96d56Sopenharmony_ci zip_filename = os.path.abspath(zip_filename) 10247db96d56Sopenharmony_ci return zip_filename 10257db96d56Sopenharmony_ci 10267db96d56Sopenharmony_ci# Maps the name of the archive format to a tuple containing: 10277db96d56Sopenharmony_ci# * the archiving function 10287db96d56Sopenharmony_ci# * extra keyword arguments 10297db96d56Sopenharmony_ci# * description 10307db96d56Sopenharmony_ci# * does it support the root_dir argument? 10317db96d56Sopenharmony_ci_ARCHIVE_FORMATS = { 10327db96d56Sopenharmony_ci 'tar': (_make_tarball, [('compress', None)], 10337db96d56Sopenharmony_ci "uncompressed tar file", True), 10347db96d56Sopenharmony_ci} 10357db96d56Sopenharmony_ci 10367db96d56Sopenharmony_ciif _ZLIB_SUPPORTED: 10377db96d56Sopenharmony_ci _ARCHIVE_FORMATS['gztar'] = (_make_tarball, [('compress', 'gzip')], 10387db96d56Sopenharmony_ci "gzip'ed tar-file", True) 10397db96d56Sopenharmony_ci _ARCHIVE_FORMATS['zip'] = (_make_zipfile, [], "ZIP file", True) 10407db96d56Sopenharmony_ci 10417db96d56Sopenharmony_ciif _BZ2_SUPPORTED: 10427db96d56Sopenharmony_ci _ARCHIVE_FORMATS['bztar'] = (_make_tarball, [('compress', 'bzip2')], 10437db96d56Sopenharmony_ci "bzip2'ed tar-file", True) 10447db96d56Sopenharmony_ci 10457db96d56Sopenharmony_ciif _LZMA_SUPPORTED: 10467db96d56Sopenharmony_ci _ARCHIVE_FORMATS['xztar'] = (_make_tarball, [('compress', 'xz')], 10477db96d56Sopenharmony_ci "xz'ed tar-file", True) 10487db96d56Sopenharmony_ci 10497db96d56Sopenharmony_cidef get_archive_formats(): 10507db96d56Sopenharmony_ci """Returns a list of supported formats for archiving and unarchiving. 10517db96d56Sopenharmony_ci 10527db96d56Sopenharmony_ci Each element of the returned sequence is a tuple (name, description) 10537db96d56Sopenharmony_ci """ 10547db96d56Sopenharmony_ci formats = [(name, registry[2]) for name, registry in 10557db96d56Sopenharmony_ci _ARCHIVE_FORMATS.items()] 10567db96d56Sopenharmony_ci formats.sort() 10577db96d56Sopenharmony_ci return formats 10587db96d56Sopenharmony_ci 10597db96d56Sopenharmony_cidef register_archive_format(name, function, extra_args=None, description=''): 10607db96d56Sopenharmony_ci """Registers an archive format. 10617db96d56Sopenharmony_ci 10627db96d56Sopenharmony_ci name is the name of the format. function is the callable that will be 10637db96d56Sopenharmony_ci used to create archives. If provided, extra_args is a sequence of 10647db96d56Sopenharmony_ci (name, value) tuples that will be passed as arguments to the callable. 10657db96d56Sopenharmony_ci description can be provided to describe the format, and will be returned 10667db96d56Sopenharmony_ci by the get_archive_formats() function. 10677db96d56Sopenharmony_ci """ 10687db96d56Sopenharmony_ci if extra_args is None: 10697db96d56Sopenharmony_ci extra_args = [] 10707db96d56Sopenharmony_ci if not callable(function): 10717db96d56Sopenharmony_ci raise TypeError('The %s object is not callable' % function) 10727db96d56Sopenharmony_ci if not isinstance(extra_args, (tuple, list)): 10737db96d56Sopenharmony_ci raise TypeError('extra_args needs to be a sequence') 10747db96d56Sopenharmony_ci for element in extra_args: 10757db96d56Sopenharmony_ci if not isinstance(element, (tuple, list)) or len(element) !=2: 10767db96d56Sopenharmony_ci raise TypeError('extra_args elements are : (arg_name, value)') 10777db96d56Sopenharmony_ci 10787db96d56Sopenharmony_ci _ARCHIVE_FORMATS[name] = (function, extra_args, description, False) 10797db96d56Sopenharmony_ci 10807db96d56Sopenharmony_cidef unregister_archive_format(name): 10817db96d56Sopenharmony_ci del _ARCHIVE_FORMATS[name] 10827db96d56Sopenharmony_ci 10837db96d56Sopenharmony_cidef make_archive(base_name, format, root_dir=None, base_dir=None, verbose=0, 10847db96d56Sopenharmony_ci dry_run=0, owner=None, group=None, logger=None): 10857db96d56Sopenharmony_ci """Create an archive file (eg. zip or tar). 10867db96d56Sopenharmony_ci 10877db96d56Sopenharmony_ci 'base_name' is the name of the file to create, minus any format-specific 10887db96d56Sopenharmony_ci extension; 'format' is the archive format: one of "zip", "tar", "gztar", 10897db96d56Sopenharmony_ci "bztar", or "xztar". Or any other registered format. 10907db96d56Sopenharmony_ci 10917db96d56Sopenharmony_ci 'root_dir' is a directory that will be the root directory of the 10927db96d56Sopenharmony_ci archive; ie. we typically chdir into 'root_dir' before creating the 10937db96d56Sopenharmony_ci archive. 'base_dir' is the directory where we start archiving from; 10947db96d56Sopenharmony_ci ie. 'base_dir' will be the common prefix of all files and 10957db96d56Sopenharmony_ci directories in the archive. 'root_dir' and 'base_dir' both default 10967db96d56Sopenharmony_ci to the current directory. Returns the name of the archive file. 10977db96d56Sopenharmony_ci 10987db96d56Sopenharmony_ci 'owner' and 'group' are used when creating a tar archive. By default, 10997db96d56Sopenharmony_ci uses the current owner and group. 11007db96d56Sopenharmony_ci """ 11017db96d56Sopenharmony_ci sys.audit("shutil.make_archive", base_name, format, root_dir, base_dir) 11027db96d56Sopenharmony_ci try: 11037db96d56Sopenharmony_ci format_info = _ARCHIVE_FORMATS[format] 11047db96d56Sopenharmony_ci except KeyError: 11057db96d56Sopenharmony_ci raise ValueError("unknown archive format '%s'" % format) from None 11067db96d56Sopenharmony_ci 11077db96d56Sopenharmony_ci kwargs = {'dry_run': dry_run, 'logger': logger, 11087db96d56Sopenharmony_ci 'owner': owner, 'group': group} 11097db96d56Sopenharmony_ci 11107db96d56Sopenharmony_ci func = format_info[0] 11117db96d56Sopenharmony_ci for arg, val in format_info[1]: 11127db96d56Sopenharmony_ci kwargs[arg] = val 11137db96d56Sopenharmony_ci 11147db96d56Sopenharmony_ci if base_dir is None: 11157db96d56Sopenharmony_ci base_dir = os.curdir 11167db96d56Sopenharmony_ci 11177db96d56Sopenharmony_ci support_root_dir = format_info[3] 11187db96d56Sopenharmony_ci save_cwd = None 11197db96d56Sopenharmony_ci if root_dir is not None: 11207db96d56Sopenharmony_ci if support_root_dir: 11217db96d56Sopenharmony_ci # Support path-like base_name here for backwards-compatibility. 11227db96d56Sopenharmony_ci base_name = os.fspath(base_name) 11237db96d56Sopenharmony_ci kwargs['root_dir'] = root_dir 11247db96d56Sopenharmony_ci else: 11257db96d56Sopenharmony_ci save_cwd = os.getcwd() 11267db96d56Sopenharmony_ci if logger is not None: 11277db96d56Sopenharmony_ci logger.debug("changing into '%s'", root_dir) 11287db96d56Sopenharmony_ci base_name = os.path.abspath(base_name) 11297db96d56Sopenharmony_ci if not dry_run: 11307db96d56Sopenharmony_ci os.chdir(root_dir) 11317db96d56Sopenharmony_ci 11327db96d56Sopenharmony_ci try: 11337db96d56Sopenharmony_ci filename = func(base_name, base_dir, **kwargs) 11347db96d56Sopenharmony_ci finally: 11357db96d56Sopenharmony_ci if save_cwd is not None: 11367db96d56Sopenharmony_ci if logger is not None: 11377db96d56Sopenharmony_ci logger.debug("changing back to '%s'", save_cwd) 11387db96d56Sopenharmony_ci os.chdir(save_cwd) 11397db96d56Sopenharmony_ci 11407db96d56Sopenharmony_ci return filename 11417db96d56Sopenharmony_ci 11427db96d56Sopenharmony_ci 11437db96d56Sopenharmony_cidef get_unpack_formats(): 11447db96d56Sopenharmony_ci """Returns a list of supported formats for unpacking. 11457db96d56Sopenharmony_ci 11467db96d56Sopenharmony_ci Each element of the returned sequence is a tuple 11477db96d56Sopenharmony_ci (name, extensions, description) 11487db96d56Sopenharmony_ci """ 11497db96d56Sopenharmony_ci formats = [(name, info[0], info[3]) for name, info in 11507db96d56Sopenharmony_ci _UNPACK_FORMATS.items()] 11517db96d56Sopenharmony_ci formats.sort() 11527db96d56Sopenharmony_ci return formats 11537db96d56Sopenharmony_ci 11547db96d56Sopenharmony_cidef _check_unpack_options(extensions, function, extra_args): 11557db96d56Sopenharmony_ci """Checks what gets registered as an unpacker.""" 11567db96d56Sopenharmony_ci # first make sure no other unpacker is registered for this extension 11577db96d56Sopenharmony_ci existing_extensions = {} 11587db96d56Sopenharmony_ci for name, info in _UNPACK_FORMATS.items(): 11597db96d56Sopenharmony_ci for ext in info[0]: 11607db96d56Sopenharmony_ci existing_extensions[ext] = name 11617db96d56Sopenharmony_ci 11627db96d56Sopenharmony_ci for extension in extensions: 11637db96d56Sopenharmony_ci if extension in existing_extensions: 11647db96d56Sopenharmony_ci msg = '%s is already registered for "%s"' 11657db96d56Sopenharmony_ci raise RegistryError(msg % (extension, 11667db96d56Sopenharmony_ci existing_extensions[extension])) 11677db96d56Sopenharmony_ci 11687db96d56Sopenharmony_ci if not callable(function): 11697db96d56Sopenharmony_ci raise TypeError('The registered function must be a callable') 11707db96d56Sopenharmony_ci 11717db96d56Sopenharmony_ci 11727db96d56Sopenharmony_cidef register_unpack_format(name, extensions, function, extra_args=None, 11737db96d56Sopenharmony_ci description=''): 11747db96d56Sopenharmony_ci """Registers an unpack format. 11757db96d56Sopenharmony_ci 11767db96d56Sopenharmony_ci `name` is the name of the format. `extensions` is a list of extensions 11777db96d56Sopenharmony_ci corresponding to the format. 11787db96d56Sopenharmony_ci 11797db96d56Sopenharmony_ci `function` is the callable that will be 11807db96d56Sopenharmony_ci used to unpack archives. The callable will receive archives to unpack. 11817db96d56Sopenharmony_ci If it's unable to handle an archive, it needs to raise a ReadError 11827db96d56Sopenharmony_ci exception. 11837db96d56Sopenharmony_ci 11847db96d56Sopenharmony_ci If provided, `extra_args` is a sequence of 11857db96d56Sopenharmony_ci (name, value) tuples that will be passed as arguments to the callable. 11867db96d56Sopenharmony_ci description can be provided to describe the format, and will be returned 11877db96d56Sopenharmony_ci by the get_unpack_formats() function. 11887db96d56Sopenharmony_ci """ 11897db96d56Sopenharmony_ci if extra_args is None: 11907db96d56Sopenharmony_ci extra_args = [] 11917db96d56Sopenharmony_ci _check_unpack_options(extensions, function, extra_args) 11927db96d56Sopenharmony_ci _UNPACK_FORMATS[name] = extensions, function, extra_args, description 11937db96d56Sopenharmony_ci 11947db96d56Sopenharmony_cidef unregister_unpack_format(name): 11957db96d56Sopenharmony_ci """Removes the pack format from the registry.""" 11967db96d56Sopenharmony_ci del _UNPACK_FORMATS[name] 11977db96d56Sopenharmony_ci 11987db96d56Sopenharmony_cidef _ensure_directory(path): 11997db96d56Sopenharmony_ci """Ensure that the parent directory of `path` exists""" 12007db96d56Sopenharmony_ci dirname = os.path.dirname(path) 12017db96d56Sopenharmony_ci if not os.path.isdir(dirname): 12027db96d56Sopenharmony_ci os.makedirs(dirname) 12037db96d56Sopenharmony_ci 12047db96d56Sopenharmony_cidef _unpack_zipfile(filename, extract_dir): 12057db96d56Sopenharmony_ci """Unpack zip `filename` to `extract_dir` 12067db96d56Sopenharmony_ci """ 12077db96d56Sopenharmony_ci import zipfile # late import for breaking circular dependency 12087db96d56Sopenharmony_ci 12097db96d56Sopenharmony_ci if not zipfile.is_zipfile(filename): 12107db96d56Sopenharmony_ci raise ReadError("%s is not a zip file" % filename) 12117db96d56Sopenharmony_ci 12127db96d56Sopenharmony_ci zip = zipfile.ZipFile(filename) 12137db96d56Sopenharmony_ci try: 12147db96d56Sopenharmony_ci for info in zip.infolist(): 12157db96d56Sopenharmony_ci name = info.filename 12167db96d56Sopenharmony_ci 12177db96d56Sopenharmony_ci # don't extract absolute paths or ones with .. in them 12187db96d56Sopenharmony_ci if name.startswith('/') or '..' in name: 12197db96d56Sopenharmony_ci continue 12207db96d56Sopenharmony_ci 12217db96d56Sopenharmony_ci targetpath = os.path.join(extract_dir, *name.split('/')) 12227db96d56Sopenharmony_ci if not targetpath: 12237db96d56Sopenharmony_ci continue 12247db96d56Sopenharmony_ci 12257db96d56Sopenharmony_ci _ensure_directory(targetpath) 12267db96d56Sopenharmony_ci if not name.endswith('/'): 12277db96d56Sopenharmony_ci # file 12287db96d56Sopenharmony_ci with zip.open(name, 'r') as source, \ 12297db96d56Sopenharmony_ci open(targetpath, 'wb') as target: 12307db96d56Sopenharmony_ci copyfileobj(source, target) 12317db96d56Sopenharmony_ci finally: 12327db96d56Sopenharmony_ci zip.close() 12337db96d56Sopenharmony_ci 12347db96d56Sopenharmony_cidef _unpack_tarfile(filename, extract_dir, *, filter=None): 12357db96d56Sopenharmony_ci """Unpack tar/tar.gz/tar.bz2/tar.xz `filename` to `extract_dir` 12367db96d56Sopenharmony_ci """ 12377db96d56Sopenharmony_ci import tarfile # late import for breaking circular dependency 12387db96d56Sopenharmony_ci try: 12397db96d56Sopenharmony_ci tarobj = tarfile.open(filename) 12407db96d56Sopenharmony_ci except tarfile.TarError: 12417db96d56Sopenharmony_ci raise ReadError( 12427db96d56Sopenharmony_ci "%s is not a compressed or uncompressed tar file" % filename) 12437db96d56Sopenharmony_ci try: 12447db96d56Sopenharmony_ci tarobj.extractall(extract_dir, filter=filter) 12457db96d56Sopenharmony_ci finally: 12467db96d56Sopenharmony_ci tarobj.close() 12477db96d56Sopenharmony_ci 12487db96d56Sopenharmony_ci# Maps the name of the unpack format to a tuple containing: 12497db96d56Sopenharmony_ci# * extensions 12507db96d56Sopenharmony_ci# * the unpacking function 12517db96d56Sopenharmony_ci# * extra keyword arguments 12527db96d56Sopenharmony_ci# * description 12537db96d56Sopenharmony_ci_UNPACK_FORMATS = { 12547db96d56Sopenharmony_ci 'tar': (['.tar'], _unpack_tarfile, [], "uncompressed tar file"), 12557db96d56Sopenharmony_ci 'zip': (['.zip'], _unpack_zipfile, [], "ZIP file"), 12567db96d56Sopenharmony_ci} 12577db96d56Sopenharmony_ci 12587db96d56Sopenharmony_ciif _ZLIB_SUPPORTED: 12597db96d56Sopenharmony_ci _UNPACK_FORMATS['gztar'] = (['.tar.gz', '.tgz'], _unpack_tarfile, [], 12607db96d56Sopenharmony_ci "gzip'ed tar-file") 12617db96d56Sopenharmony_ci 12627db96d56Sopenharmony_ciif _BZ2_SUPPORTED: 12637db96d56Sopenharmony_ci _UNPACK_FORMATS['bztar'] = (['.tar.bz2', '.tbz2'], _unpack_tarfile, [], 12647db96d56Sopenharmony_ci "bzip2'ed tar-file") 12657db96d56Sopenharmony_ci 12667db96d56Sopenharmony_ciif _LZMA_SUPPORTED: 12677db96d56Sopenharmony_ci _UNPACK_FORMATS['xztar'] = (['.tar.xz', '.txz'], _unpack_tarfile, [], 12687db96d56Sopenharmony_ci "xz'ed tar-file") 12697db96d56Sopenharmony_ci 12707db96d56Sopenharmony_cidef _find_unpack_format(filename): 12717db96d56Sopenharmony_ci for name, info in _UNPACK_FORMATS.items(): 12727db96d56Sopenharmony_ci for extension in info[0]: 12737db96d56Sopenharmony_ci if filename.endswith(extension): 12747db96d56Sopenharmony_ci return name 12757db96d56Sopenharmony_ci return None 12767db96d56Sopenharmony_ci 12777db96d56Sopenharmony_cidef unpack_archive(filename, extract_dir=None, format=None, *, filter=None): 12787db96d56Sopenharmony_ci """Unpack an archive. 12797db96d56Sopenharmony_ci 12807db96d56Sopenharmony_ci `filename` is the name of the archive. 12817db96d56Sopenharmony_ci 12827db96d56Sopenharmony_ci `extract_dir` is the name of the target directory, where the archive 12837db96d56Sopenharmony_ci is unpacked. If not provided, the current working directory is used. 12847db96d56Sopenharmony_ci 12857db96d56Sopenharmony_ci `format` is the archive format: one of "zip", "tar", "gztar", "bztar", 12867db96d56Sopenharmony_ci or "xztar". Or any other registered format. If not provided, 12877db96d56Sopenharmony_ci unpack_archive will use the filename extension and see if an unpacker 12887db96d56Sopenharmony_ci was registered for that extension. 12897db96d56Sopenharmony_ci 12907db96d56Sopenharmony_ci In case none is found, a ValueError is raised. 12917db96d56Sopenharmony_ci 12927db96d56Sopenharmony_ci If `filter` is given, it is passed to the underlying 12937db96d56Sopenharmony_ci extraction function. 12947db96d56Sopenharmony_ci """ 12957db96d56Sopenharmony_ci sys.audit("shutil.unpack_archive", filename, extract_dir, format) 12967db96d56Sopenharmony_ci 12977db96d56Sopenharmony_ci if extract_dir is None: 12987db96d56Sopenharmony_ci extract_dir = os.getcwd() 12997db96d56Sopenharmony_ci 13007db96d56Sopenharmony_ci extract_dir = os.fspath(extract_dir) 13017db96d56Sopenharmony_ci filename = os.fspath(filename) 13027db96d56Sopenharmony_ci 13037db96d56Sopenharmony_ci if filter is None: 13047db96d56Sopenharmony_ci filter_kwargs = {} 13057db96d56Sopenharmony_ci else: 13067db96d56Sopenharmony_ci filter_kwargs = {'filter': filter} 13077db96d56Sopenharmony_ci if format is not None: 13087db96d56Sopenharmony_ci try: 13097db96d56Sopenharmony_ci format_info = _UNPACK_FORMATS[format] 13107db96d56Sopenharmony_ci except KeyError: 13117db96d56Sopenharmony_ci raise ValueError("Unknown unpack format '{0}'".format(format)) from None 13127db96d56Sopenharmony_ci 13137db96d56Sopenharmony_ci func = format_info[1] 13147db96d56Sopenharmony_ci func(filename, extract_dir, **dict(format_info[2]), **filter_kwargs) 13157db96d56Sopenharmony_ci else: 13167db96d56Sopenharmony_ci # we need to look at the registered unpackers supported extensions 13177db96d56Sopenharmony_ci format = _find_unpack_format(filename) 13187db96d56Sopenharmony_ci if format is None: 13197db96d56Sopenharmony_ci raise ReadError("Unknown archive format '{0}'".format(filename)) 13207db96d56Sopenharmony_ci 13217db96d56Sopenharmony_ci func = _UNPACK_FORMATS[format][1] 13227db96d56Sopenharmony_ci kwargs = dict(_UNPACK_FORMATS[format][2]) | filter_kwargs 13237db96d56Sopenharmony_ci func(filename, extract_dir, **kwargs) 13247db96d56Sopenharmony_ci 13257db96d56Sopenharmony_ci 13267db96d56Sopenharmony_ciif hasattr(os, 'statvfs'): 13277db96d56Sopenharmony_ci 13287db96d56Sopenharmony_ci __all__.append('disk_usage') 13297db96d56Sopenharmony_ci _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 13307db96d56Sopenharmony_ci _ntuple_diskusage.total.__doc__ = 'Total space in bytes' 13317db96d56Sopenharmony_ci _ntuple_diskusage.used.__doc__ = 'Used space in bytes' 13327db96d56Sopenharmony_ci _ntuple_diskusage.free.__doc__ = 'Free space in bytes' 13337db96d56Sopenharmony_ci 13347db96d56Sopenharmony_ci def disk_usage(path): 13357db96d56Sopenharmony_ci """Return disk usage statistics about the given path. 13367db96d56Sopenharmony_ci 13377db96d56Sopenharmony_ci Returned value is a named tuple with attributes 'total', 'used' and 13387db96d56Sopenharmony_ci 'free', which are the amount of total, used and free space, in bytes. 13397db96d56Sopenharmony_ci """ 13407db96d56Sopenharmony_ci st = os.statvfs(path) 13417db96d56Sopenharmony_ci free = st.f_bavail * st.f_frsize 13427db96d56Sopenharmony_ci total = st.f_blocks * st.f_frsize 13437db96d56Sopenharmony_ci used = (st.f_blocks - st.f_bfree) * st.f_frsize 13447db96d56Sopenharmony_ci return _ntuple_diskusage(total, used, free) 13457db96d56Sopenharmony_ci 13467db96d56Sopenharmony_cielif _WINDOWS: 13477db96d56Sopenharmony_ci 13487db96d56Sopenharmony_ci __all__.append('disk_usage') 13497db96d56Sopenharmony_ci _ntuple_diskusage = collections.namedtuple('usage', 'total used free') 13507db96d56Sopenharmony_ci 13517db96d56Sopenharmony_ci def disk_usage(path): 13527db96d56Sopenharmony_ci """Return disk usage statistics about the given path. 13537db96d56Sopenharmony_ci 13547db96d56Sopenharmony_ci Returned values is a named tuple with attributes 'total', 'used' and 13557db96d56Sopenharmony_ci 'free', which are the amount of total, used and free space, in bytes. 13567db96d56Sopenharmony_ci """ 13577db96d56Sopenharmony_ci total, free = nt._getdiskusage(path) 13587db96d56Sopenharmony_ci used = total - free 13597db96d56Sopenharmony_ci return _ntuple_diskusage(total, used, free) 13607db96d56Sopenharmony_ci 13617db96d56Sopenharmony_ci 13627db96d56Sopenharmony_cidef chown(path, user=None, group=None): 13637db96d56Sopenharmony_ci """Change owner user and group of the given path. 13647db96d56Sopenharmony_ci 13657db96d56Sopenharmony_ci user and group can be the uid/gid or the user/group names, and in that case, 13667db96d56Sopenharmony_ci they are converted to their respective uid/gid. 13677db96d56Sopenharmony_ci """ 13687db96d56Sopenharmony_ci sys.audit('shutil.chown', path, user, group) 13697db96d56Sopenharmony_ci 13707db96d56Sopenharmony_ci if user is None and group is None: 13717db96d56Sopenharmony_ci raise ValueError("user and/or group must be set") 13727db96d56Sopenharmony_ci 13737db96d56Sopenharmony_ci _user = user 13747db96d56Sopenharmony_ci _group = group 13757db96d56Sopenharmony_ci 13767db96d56Sopenharmony_ci # -1 means don't change it 13777db96d56Sopenharmony_ci if user is None: 13787db96d56Sopenharmony_ci _user = -1 13797db96d56Sopenharmony_ci # user can either be an int (the uid) or a string (the system username) 13807db96d56Sopenharmony_ci elif isinstance(user, str): 13817db96d56Sopenharmony_ci _user = _get_uid(user) 13827db96d56Sopenharmony_ci if _user is None: 13837db96d56Sopenharmony_ci raise LookupError("no such user: {!r}".format(user)) 13847db96d56Sopenharmony_ci 13857db96d56Sopenharmony_ci if group is None: 13867db96d56Sopenharmony_ci _group = -1 13877db96d56Sopenharmony_ci elif not isinstance(group, int): 13887db96d56Sopenharmony_ci _group = _get_gid(group) 13897db96d56Sopenharmony_ci if _group is None: 13907db96d56Sopenharmony_ci raise LookupError("no such group: {!r}".format(group)) 13917db96d56Sopenharmony_ci 13927db96d56Sopenharmony_ci os.chown(path, _user, _group) 13937db96d56Sopenharmony_ci 13947db96d56Sopenharmony_cidef get_terminal_size(fallback=(80, 24)): 13957db96d56Sopenharmony_ci """Get the size of the terminal window. 13967db96d56Sopenharmony_ci 13977db96d56Sopenharmony_ci For each of the two dimensions, the environment variable, COLUMNS 13987db96d56Sopenharmony_ci and LINES respectively, is checked. If the variable is defined and 13997db96d56Sopenharmony_ci the value is a positive integer, it is used. 14007db96d56Sopenharmony_ci 14017db96d56Sopenharmony_ci When COLUMNS or LINES is not defined, which is the common case, 14027db96d56Sopenharmony_ci the terminal connected to sys.__stdout__ is queried 14037db96d56Sopenharmony_ci by invoking os.get_terminal_size. 14047db96d56Sopenharmony_ci 14057db96d56Sopenharmony_ci If the terminal size cannot be successfully queried, either because 14067db96d56Sopenharmony_ci the system doesn't support querying, or because we are not 14077db96d56Sopenharmony_ci connected to a terminal, the value given in fallback parameter 14087db96d56Sopenharmony_ci is used. Fallback defaults to (80, 24) which is the default 14097db96d56Sopenharmony_ci size used by many terminal emulators. 14107db96d56Sopenharmony_ci 14117db96d56Sopenharmony_ci The value returned is a named tuple of type os.terminal_size. 14127db96d56Sopenharmony_ci """ 14137db96d56Sopenharmony_ci # columns, lines are the working values 14147db96d56Sopenharmony_ci try: 14157db96d56Sopenharmony_ci columns = int(os.environ['COLUMNS']) 14167db96d56Sopenharmony_ci except (KeyError, ValueError): 14177db96d56Sopenharmony_ci columns = 0 14187db96d56Sopenharmony_ci 14197db96d56Sopenharmony_ci try: 14207db96d56Sopenharmony_ci lines = int(os.environ['LINES']) 14217db96d56Sopenharmony_ci except (KeyError, ValueError): 14227db96d56Sopenharmony_ci lines = 0 14237db96d56Sopenharmony_ci 14247db96d56Sopenharmony_ci # only query if necessary 14257db96d56Sopenharmony_ci if columns <= 0 or lines <= 0: 14267db96d56Sopenharmony_ci try: 14277db96d56Sopenharmony_ci size = os.get_terminal_size(sys.__stdout__.fileno()) 14287db96d56Sopenharmony_ci except (AttributeError, ValueError, OSError): 14297db96d56Sopenharmony_ci # stdout is None, closed, detached, or not a terminal, or 14307db96d56Sopenharmony_ci # os.get_terminal_size() is unsupported 14317db96d56Sopenharmony_ci size = os.terminal_size(fallback) 14327db96d56Sopenharmony_ci if columns <= 0: 14337db96d56Sopenharmony_ci columns = size.columns or fallback[0] 14347db96d56Sopenharmony_ci if lines <= 0: 14357db96d56Sopenharmony_ci lines = size.lines or fallback[1] 14367db96d56Sopenharmony_ci 14377db96d56Sopenharmony_ci return os.terminal_size((columns, lines)) 14387db96d56Sopenharmony_ci 14397db96d56Sopenharmony_ci 14407db96d56Sopenharmony_ci# Check that a given file can be accessed with the correct mode. 14417db96d56Sopenharmony_ci# Additionally check that `file` is not a directory, as on Windows 14427db96d56Sopenharmony_ci# directories pass the os.access check. 14437db96d56Sopenharmony_cidef _access_check(fn, mode): 14447db96d56Sopenharmony_ci return (os.path.exists(fn) and os.access(fn, mode) 14457db96d56Sopenharmony_ci and not os.path.isdir(fn)) 14467db96d56Sopenharmony_ci 14477db96d56Sopenharmony_ci 14487db96d56Sopenharmony_cidef which(cmd, mode=os.F_OK | os.X_OK, path=None): 14497db96d56Sopenharmony_ci """Given a command, mode, and a PATH string, return the path which 14507db96d56Sopenharmony_ci conforms to the given mode on the PATH, or None if there is no such 14517db96d56Sopenharmony_ci file. 14527db96d56Sopenharmony_ci 14537db96d56Sopenharmony_ci `mode` defaults to os.F_OK | os.X_OK. `path` defaults to the result 14547db96d56Sopenharmony_ci of os.environ.get("PATH"), or can be overridden with a custom search 14557db96d56Sopenharmony_ci path. 14567db96d56Sopenharmony_ci 14577db96d56Sopenharmony_ci """ 14587db96d56Sopenharmony_ci # If we're given a path with a directory part, look it up directly rather 14597db96d56Sopenharmony_ci # than referring to PATH directories. This includes checking relative to the 14607db96d56Sopenharmony_ci # current directory, e.g. ./script 14617db96d56Sopenharmony_ci if os.path.dirname(cmd): 14627db96d56Sopenharmony_ci if _access_check(cmd, mode): 14637db96d56Sopenharmony_ci return cmd 14647db96d56Sopenharmony_ci return None 14657db96d56Sopenharmony_ci 14667db96d56Sopenharmony_ci use_bytes = isinstance(cmd, bytes) 14677db96d56Sopenharmony_ci 14687db96d56Sopenharmony_ci if path is None: 14697db96d56Sopenharmony_ci path = os.environ.get("PATH", None) 14707db96d56Sopenharmony_ci if path is None: 14717db96d56Sopenharmony_ci try: 14727db96d56Sopenharmony_ci path = os.confstr("CS_PATH") 14737db96d56Sopenharmony_ci except (AttributeError, ValueError): 14747db96d56Sopenharmony_ci # os.confstr() or CS_PATH is not available 14757db96d56Sopenharmony_ci path = os.defpath 14767db96d56Sopenharmony_ci # bpo-35755: Don't use os.defpath if the PATH environment variable is 14777db96d56Sopenharmony_ci # set to an empty string 14787db96d56Sopenharmony_ci 14797db96d56Sopenharmony_ci # PATH='' doesn't match, whereas PATH=':' looks in the current directory 14807db96d56Sopenharmony_ci if not path: 14817db96d56Sopenharmony_ci return None 14827db96d56Sopenharmony_ci 14837db96d56Sopenharmony_ci if use_bytes: 14847db96d56Sopenharmony_ci path = os.fsencode(path) 14857db96d56Sopenharmony_ci path = path.split(os.fsencode(os.pathsep)) 14867db96d56Sopenharmony_ci else: 14877db96d56Sopenharmony_ci path = os.fsdecode(path) 14887db96d56Sopenharmony_ci path = path.split(os.pathsep) 14897db96d56Sopenharmony_ci 14907db96d56Sopenharmony_ci if sys.platform == "win32": 14917db96d56Sopenharmony_ci # The current directory takes precedence on Windows. 14927db96d56Sopenharmony_ci curdir = os.curdir 14937db96d56Sopenharmony_ci if use_bytes: 14947db96d56Sopenharmony_ci curdir = os.fsencode(curdir) 14957db96d56Sopenharmony_ci if curdir not in path: 14967db96d56Sopenharmony_ci path.insert(0, curdir) 14977db96d56Sopenharmony_ci 14987db96d56Sopenharmony_ci # PATHEXT is necessary to check on Windows. 14997db96d56Sopenharmony_ci pathext_source = os.getenv("PATHEXT") or _WIN_DEFAULT_PATHEXT 15007db96d56Sopenharmony_ci pathext = [ext for ext in pathext_source.split(os.pathsep) if ext] 15017db96d56Sopenharmony_ci 15027db96d56Sopenharmony_ci if use_bytes: 15037db96d56Sopenharmony_ci pathext = [os.fsencode(ext) for ext in pathext] 15047db96d56Sopenharmony_ci # See if the given file matches any of the expected path extensions. 15057db96d56Sopenharmony_ci # This will allow us to short circuit when given "python.exe". 15067db96d56Sopenharmony_ci # If it does match, only test that one, otherwise we have to try 15077db96d56Sopenharmony_ci # others. 15087db96d56Sopenharmony_ci if any(cmd.lower().endswith(ext.lower()) for ext in pathext): 15097db96d56Sopenharmony_ci files = [cmd] 15107db96d56Sopenharmony_ci else: 15117db96d56Sopenharmony_ci files = [cmd + ext for ext in pathext] 15127db96d56Sopenharmony_ci else: 15137db96d56Sopenharmony_ci # On other platforms you don't have things like PATHEXT to tell you 15147db96d56Sopenharmony_ci # what file suffixes are executable, so just pass on cmd as-is. 15157db96d56Sopenharmony_ci files = [cmd] 15167db96d56Sopenharmony_ci 15177db96d56Sopenharmony_ci seen = set() 15187db96d56Sopenharmony_ci for dir in path: 15197db96d56Sopenharmony_ci normdir = os.path.normcase(dir) 15207db96d56Sopenharmony_ci if not normdir in seen: 15217db96d56Sopenharmony_ci seen.add(normdir) 15227db96d56Sopenharmony_ci for thefile in files: 15237db96d56Sopenharmony_ci name = os.path.join(dir, thefile) 15247db96d56Sopenharmony_ci if _access_check(name, mode): 15257db96d56Sopenharmony_ci return name 15267db96d56Sopenharmony_ci return None 1527