176e6818aSopenharmony_ci#!/usr/bin/env python3 276e6818aSopenharmony_ci# coding=utf-8 376e6818aSopenharmony_ci 476e6818aSopenharmony_ci# 576e6818aSopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd. 676e6818aSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License"); 776e6818aSopenharmony_ci# you may not use this file except in compliance with the License. 876e6818aSopenharmony_ci# You may obtain a copy of the License at 976e6818aSopenharmony_ci# 1076e6818aSopenharmony_ci# http://www.apache.org/licenses/LICENSE-2.0 1176e6818aSopenharmony_ci# 1276e6818aSopenharmony_ci# Unless required by applicable law or agreed to in writing, software 1376e6818aSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS, 1476e6818aSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1576e6818aSopenharmony_ci# See the License for the specific language governing permissions and 1676e6818aSopenharmony_ci# limitations under the License. 1776e6818aSopenharmony_ci# 1876e6818aSopenharmony_ci 1976e6818aSopenharmony_ciimport os 2076e6818aSopenharmony_ciimport platform 2176e6818aSopenharmony_ciimport shutil 2276e6818aSopenharmony_ciimport stat 2376e6818aSopenharmony_ciimport sys 2476e6818aSopenharmony_ciimport time 2576e6818aSopenharmony_ciimport zipfile 2676e6818aSopenharmony_ciimport hashlib 2776e6818aSopenharmony_ci 2876e6818aSopenharmony_cifrom devicetest.core.exception import DeviceTestError 2976e6818aSopenharmony_cifrom devicetest.core.variables import get_project_path 3076e6818aSopenharmony_cifrom devicetest.error import ErrorMessage 3176e6818aSopenharmony_cifrom devicetest.error import ErrorCategory 3276e6818aSopenharmony_cifrom devicetest.log.logger import DeviceTestLog as log 3376e6818aSopenharmony_ci 3476e6818aSopenharmony_ci 3576e6818aSopenharmony_cidef get_template_path(template_file_path, isdir=None): 3676e6818aSopenharmony_ci """ 3776e6818aSopenharmony_ci @param template_file_path: Obtains the absolute path of the template screen cap path. 3876e6818aSopenharmony_ci @param isdir: Obtain the directory: True; Obtain the file: False; 3976e6818aSopenharmony_ci None: Ignore the file type 4076e6818aSopenharmony_ci """ 4176e6818aSopenharmony_ci template_file_path = template_file_path.replace("\\", "/") 4276e6818aSopenharmony_ci if os.path.isabs(template_file_path) \ 4376e6818aSopenharmony_ci and (not isdir and os.path.isfile(template_file_path)): 4476e6818aSopenharmony_ci return os.path.abspath(template_file_path) 4576e6818aSopenharmony_ci 4676e6818aSopenharmony_ci # remove first str '/' 4776e6818aSopenharmony_ci if not os.path.isfile(template_file_path) and template_file_path.startswith("/"): 4876e6818aSopenharmony_ci template_file_path = template_file_path[1:] 4976e6818aSopenharmony_ci 5076e6818aSopenharmony_ci _fol = None 5176e6818aSopenharmony_ci if template_file_path.startswith("resource"): 5276e6818aSopenharmony_ci path = template_file_path[9:] 5376e6818aSopenharmony_ci from xdevice import EnvPool 5476e6818aSopenharmony_ci if EnvPool.resource_path is not None: 5576e6818aSopenharmony_ci folder = os.path.abspath(EnvPool.resource_path) 5676e6818aSopenharmony_ci _fol = travesal_folder(folder, path, isdir) 5776e6818aSopenharmony_ci if _fol is None: 5876e6818aSopenharmony_ci log.debug("Not found [%s] in env pool path %s, " 5976e6818aSopenharmony_ci "continue to find template in resource path." % ( 6076e6818aSopenharmony_ci path, folder)) 6176e6818aSopenharmony_ci if _fol is None: 6276e6818aSopenharmony_ci ecotest_resource_path = getattr(sys, "ecotest_resource_path", "") 6376e6818aSopenharmony_ci if ecotest_resource_path is not None: 6476e6818aSopenharmony_ci folder = os.path.abspath(ecotest_resource_path) 6576e6818aSopenharmony_ci _fol = travesal_folder(folder, path, isdir) 6676e6818aSopenharmony_ci if _fol is None: 6776e6818aSopenharmony_ci log.debug("Not found [%s] in resource path %s, " 6876e6818aSopenharmony_ci "continue to find template in other path." % ( 6976e6818aSopenharmony_ci path, folder)) 7076e6818aSopenharmony_ci else: 7176e6818aSopenharmony_ci _fol = get_resource_path(template_file_path) 7276e6818aSopenharmony_ci log.debug("get template path:{}".format(_fol)) 7376e6818aSopenharmony_ci return _fol 7476e6818aSopenharmony_ci 7576e6818aSopenharmony_ci 7676e6818aSopenharmony_cidef get_resource_path(resource_file_path, isdir=None): 7776e6818aSopenharmony_ci """ 7876e6818aSopenharmony_ci @param resource_file_path: Obtains the absolute path of the resource file. 7976e6818aSopenharmony_ci @param isdir: Obtain the directory: True; Obtain the file: False; 8076e6818aSopenharmony_ci None: Ignore the file type 8176e6818aSopenharmony_ci """ 8276e6818aSopenharmony_ci resource_file_path = resource_file_path.replace("\\", "/") 8376e6818aSopenharmony_ci if os.path.isabs(resource_file_path) \ 8476e6818aSopenharmony_ci and ((isdir is None and os.path.exists(resource_file_path)) 8576e6818aSopenharmony_ci or (not isdir and os.path.isfile(resource_file_path)) 8676e6818aSopenharmony_ci or (isdir and os.path.isdir(resource_file_path))): 8776e6818aSopenharmony_ci return os.path.abspath(resource_file_path) 8876e6818aSopenharmony_ci 8976e6818aSopenharmony_ci _fol = None 9076e6818aSopenharmony_ci from xdevice import EnvPool 9176e6818aSopenharmony_ci if EnvPool.resource_path is not None: 9276e6818aSopenharmony_ci folder = os.path.abspath(EnvPool.resource_path) 9376e6818aSopenharmony_ci _fol = travesal_folder(folder, resource_file_path, isdir) 9476e6818aSopenharmony_ci if _fol is None: 9576e6818aSopenharmony_ci log.debug("Not found [%s] in env pool path %s, " 9676e6818aSopenharmony_ci "continue to find in project resource path." % ( 9776e6818aSopenharmony_ci resource_file_path, folder)) 9876e6818aSopenharmony_ci 9976e6818aSopenharmony_ci if _fol is None: 10076e6818aSopenharmony_ci ecotest_resource_path = getattr(sys, "ecotest_resource_path", "") 10176e6818aSopenharmony_ci if ecotest_resource_path is not None: 10276e6818aSopenharmony_ci folder = os.path.abspath(ecotest_resource_path) 10376e6818aSopenharmony_ci _fol = travesal_folder(folder, resource_file_path, isdir) 10476e6818aSopenharmony_ci if _fol is None: 10576e6818aSopenharmony_ci log.debug("Not found [%s] in ecotest path %s, " 10676e6818aSopenharmony_ci "continue to find in suit path." % ( 10776e6818aSopenharmony_ci resource_file_path, folder)) 10876e6818aSopenharmony_ci 10976e6818aSopenharmony_ci from devicetest.core.variables import DeccVariable 11076e6818aSopenharmony_ci if _fol is None: 11176e6818aSopenharmony_ci folder = os.path.abspath(DeccVariable.project.resource_path) 11276e6818aSopenharmony_ci _fol = travesal_folder(folder, resource_file_path, isdir) 11376e6818aSopenharmony_ci if _fol is None: 11476e6818aSopenharmony_ci log.debug("Not found [%s] in product path %s, " 11576e6818aSopenharmony_ci "continue to find in project resource path." % ( 11676e6818aSopenharmony_ci resource_file_path, folder)) 11776e6818aSopenharmony_ci 11876e6818aSopenharmony_ci if _fol is None: 11976e6818aSopenharmony_ci folder = os.path.abspath(DeccVariable.project.test_suite_path) 12076e6818aSopenharmony_ci _fol = travesal_folder(folder, resource_file_path, isdir) 12176e6818aSopenharmony_ci if _fol is None: 12276e6818aSopenharmony_ci log.debug("Not found [%s] in product path %s, " 12376e6818aSopenharmony_ci "continue to find in suit resource path." % ( 12476e6818aSopenharmony_ci resource_file_path, folder)) 12576e6818aSopenharmony_ci 12676e6818aSopenharmony_ci if _fol is None: 12776e6818aSopenharmony_ci folder = os.path.abspath(get_project_path()) 12876e6818aSopenharmony_ci _fol = travesal_folder(folder, resource_file_path, isdir) 12976e6818aSopenharmony_ci if _fol is None: 13076e6818aSopenharmony_ci log.debug("Not found [%s] in product path %s, " 13176e6818aSopenharmony_ci "continue to find in project path." % ( 13276e6818aSopenharmony_ci resource_file_path, folder)) 13376e6818aSopenharmony_ci 13476e6818aSopenharmony_ci if _fol is None: 13576e6818aSopenharmony_ci err_msg = ErrorMessage.Common.Code_0201008.format(resource_file_path) 13676e6818aSopenharmony_ci log.error(err_msg) 13776e6818aSopenharmony_ci raise DeviceTestError(err_msg) 13876e6818aSopenharmony_ci log.debug("get resource path:{}".format(_fol)) 13976e6818aSopenharmony_ci return _fol 14076e6818aSopenharmony_ci 14176e6818aSopenharmony_ci 14276e6818aSopenharmony_cidef travesal_folder(folder, folder_file_path, isdir=False): 14376e6818aSopenharmony_ci folder_file = os.path.join(folder, folder_file_path) 14476e6818aSopenharmony_ci if (isdir is None and os.path.exists(folder_file)) \ 14576e6818aSopenharmony_ci or (not isdir and os.path.isfile(folder_file)) \ 14676e6818aSopenharmony_ci or (isdir and os.path.isdir(folder_file)): 14776e6818aSopenharmony_ci return os.path.abspath(folder_file) 14876e6818aSopenharmony_ci 14976e6818aSopenharmony_ci if not os.path.exists(folder): 15076e6818aSopenharmony_ci return None 15176e6818aSopenharmony_ci 15276e6818aSopenharmony_ci for child in os.listdir(folder): 15376e6818aSopenharmony_ci if child == ".svn": 15476e6818aSopenharmony_ci continue 15576e6818aSopenharmony_ci 15676e6818aSopenharmony_ci folder_file = os.path.join(folder, child) 15776e6818aSopenharmony_ci if os.path.isdir(folder_file): 15876e6818aSopenharmony_ci if (isdir is None or isdir) \ 15976e6818aSopenharmony_ci and folder_file.endswith(os.sep + folder_file_path): 16076e6818aSopenharmony_ci return folder_file 16176e6818aSopenharmony_ci else: 16276e6818aSopenharmony_ci folder_ret = travesal_folder(folder_file, 16376e6818aSopenharmony_ci folder_file_path, isdir) 16476e6818aSopenharmony_ci if folder_ret is not None: 16576e6818aSopenharmony_ci return folder_ret 16676e6818aSopenharmony_ci elif os.path.isfile(folder_file) \ 16776e6818aSopenharmony_ci and folder_file.endswith(os.sep + folder_file_path) \ 16876e6818aSopenharmony_ci and (isdir is None or not isdir): 16976e6818aSopenharmony_ci return folder_file 17076e6818aSopenharmony_ci 17176e6818aSopenharmony_ci return None 17276e6818aSopenharmony_ci 17376e6818aSopenharmony_ci 17476e6818aSopenharmony_cidef os_open_file_write(file_path, content, mode="w"): 17576e6818aSopenharmony_ci try: 17676e6818aSopenharmony_ci flags = os.O_WRONLY | os.O_CREAT 17776e6818aSopenharmony_ci modes = stat.S_IWUSR | stat.S_IRUSR 17876e6818aSopenharmony_ci dir_path = os.path.dirname(file_path) 17976e6818aSopenharmony_ci if not os.path.isdir(dir_path): 18076e6818aSopenharmony_ci os.makedirs(dir_path) 18176e6818aSopenharmony_ci with os.fdopen(os.open(file_path, flags, modes), mode) as fout: 18276e6818aSopenharmony_ci fout.write(content) 18376e6818aSopenharmony_ci except Exception as error: 18476e6818aSopenharmony_ci err_msg = ErrorMessage.Common.Code_0201009 18576e6818aSopenharmony_ci log.error(err_msg, is_traceback=True) 18676e6818aSopenharmony_ci raise DeviceTestError(err_msg) from error 18776e6818aSopenharmony_ci 18876e6818aSopenharmony_ci 18976e6818aSopenharmony_cidef os_open_file_read(file_path, mode="r"): 19076e6818aSopenharmony_ci try: 19176e6818aSopenharmony_ci flags = os.O_RDONLY 19276e6818aSopenharmony_ci modes = stat.S_IWUSR | stat.S_IRUSR 19376e6818aSopenharmony_ci with os.fdopen(os.open(file_path, flags, modes), mode) as fout: 19476e6818aSopenharmony_ci return fout.read() 19576e6818aSopenharmony_ci except FileNotFoundError as error: 19676e6818aSopenharmony_ci err_msg = ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, file_path) 19776e6818aSopenharmony_ci log.error(err_msg, is_traceback=True) 19876e6818aSopenharmony_ci raise DeviceTestError(err_msg) from error 19976e6818aSopenharmony_ci except Exception as error: 20076e6818aSopenharmony_ci err_msg = ErrorMessage.Common.Code_0201010 20176e6818aSopenharmony_ci log.error(err_msg, is_traceback=True) 20276e6818aSopenharmony_ci raise DeviceTestError(err_msg) from error 20376e6818aSopenharmony_ci 20476e6818aSopenharmony_ci 20576e6818aSopenharmony_cidef save_file(file_path, content): 20676e6818aSopenharmony_ci os_open_file_write(file_path, content, "wb") 20776e6818aSopenharmony_ci 20876e6818aSopenharmony_ci 20976e6818aSopenharmony_cidef create_dir(create_path): 21076e6818aSopenharmony_ci """ 21176e6818aSopenharmony_ci Creates a directory if it does not exist already. 21276e6818aSopenharmony_ci Args: 21376e6818aSopenharmony_ci create_path: The path of the directory to create. 21476e6818aSopenharmony_ci """ 21576e6818aSopenharmony_ci full_path = os.path.abspath(os.path.expanduser(create_path)) 21676e6818aSopenharmony_ci if not os.path.exists(full_path): 21776e6818aSopenharmony_ci os.makedirs(full_path, exist_ok=True) # exist_ok=True 21876e6818aSopenharmony_ci 21976e6818aSopenharmony_ci 22076e6818aSopenharmony_cidef to_file(filename, content): 22176e6818aSopenharmony_ci """ 22276e6818aSopenharmony_ci generate files 22376e6818aSopenharmony_ci """ 22476e6818aSopenharmony_ci dirname = os.path.dirname(filename) 22576e6818aSopenharmony_ci if not os.path.isdir(dirname): 22676e6818aSopenharmony_ci os.makedirs(dirname) 22776e6818aSopenharmony_ci os_open_file_write(filename, content, "wb") 22876e6818aSopenharmony_ci 22976e6818aSopenharmony_ci 23076e6818aSopenharmony_cidef delfile(filename): 23176e6818aSopenharmony_ci try: 23276e6818aSopenharmony_ci os.remove(filename) 23376e6818aSopenharmony_ci except Exception as exception: 23476e6818aSopenharmony_ci log.error(exception) 23576e6818aSopenharmony_ci if os.path.isfile(filename): 23676e6818aSopenharmony_ci if "nt" in sys.builtin_module_names: 23776e6818aSopenharmony_ci os.remove(filename) 23876e6818aSopenharmony_ci else: 23976e6818aSopenharmony_ci shutil.rmtree(filename) 24076e6818aSopenharmony_ci 24176e6818aSopenharmony_ci for _ in range(5): 24276e6818aSopenharmony_ci if os.path.isfile(filename): 24376e6818aSopenharmony_ci time.sleep(0.1) 24476e6818aSopenharmony_ci continue 24576e6818aSopenharmony_ci else: 24676e6818aSopenharmony_ci break 24776e6818aSopenharmony_ci 24876e6818aSopenharmony_ci if os.path.isfile(filename): 24976e6818aSopenharmony_ci log.error("Delete file %s failed." % filename) 25076e6818aSopenharmony_ci 25176e6818aSopenharmony_ci 25276e6818aSopenharmony_cidef delfolder(dirname): 25376e6818aSopenharmony_ci try: 25476e6818aSopenharmony_ci shutil.rmtree(dirname) 25576e6818aSopenharmony_ci except Exception as _: 25676e6818aSopenharmony_ci if os.path.isdir(dirname): 25776e6818aSopenharmony_ci shutil.rmtree(dirname) 25876e6818aSopenharmony_ci 25976e6818aSopenharmony_ci for _ in range(5): 26076e6818aSopenharmony_ci if os.path.isdir(dirname): 26176e6818aSopenharmony_ci time.sleep(0.1) 26276e6818aSopenharmony_ci continue 26376e6818aSopenharmony_ci else: 26476e6818aSopenharmony_ci break 26576e6818aSopenharmony_ci 26676e6818aSopenharmony_ci if os.path.isdir(dirname): 26776e6818aSopenharmony_ci log.error("Delete folder %s failed." % dirname) 26876e6818aSopenharmony_ci 26976e6818aSopenharmony_ci 27076e6818aSopenharmony_cidef copy_to_folder(src, des): 27176e6818aSopenharmony_ci """Copy a folder and its children or a file to another folder. 27276e6818aSopenharmony_ci """ 27376e6818aSopenharmony_ci src = os.path.normpath(src) 27476e6818aSopenharmony_ci des = os.path.normpath(des) 27576e6818aSopenharmony_ci if not os.path.exists(src): 27676e6818aSopenharmony_ci log.error("No found [%s]" % src) 27776e6818aSopenharmony_ci return 27876e6818aSopenharmony_ci if not os.path.exists(des): 27976e6818aSopenharmony_ci create_dir(des) 28076e6818aSopenharmony_ci if not os.path.isdir(des): 28176e6818aSopenharmony_ci log.error("[%s] is not a folder." % des) 28276e6818aSopenharmony_ci return 28376e6818aSopenharmony_ci 28476e6818aSopenharmony_ci if not os.path.isdir(src): 28576e6818aSopenharmony_ci shutil.copy(src, des) 28676e6818aSopenharmony_ci return 28776e6818aSopenharmony_ci os.chdir(src) 28876e6818aSopenharmony_ci src_file_list = [os.path.join(src, src_file) 28976e6818aSopenharmony_ci for src_file in os.listdir(des)] 29076e6818aSopenharmony_ci for source in src_file_list: 29176e6818aSopenharmony_ci if os.path.isfile(source): 29276e6818aSopenharmony_ci shutil.copy(source, des) 29376e6818aSopenharmony_ci if os.path.isdir(source): 29476e6818aSopenharmony_ci _, src_name = os.path.split(source) 29576e6818aSopenharmony_ci shutil.copytree(source, os.path.join(des, src_name)) 29676e6818aSopenharmony_ci 29776e6818aSopenharmony_ci 29876e6818aSopenharmony_cidef delete_file_folder(src): 29976e6818aSopenharmony_ci """ 30076e6818aSopenharmony_ci @summary: Delete files or directories. 30176e6818aSopenharmony_ci """ 30276e6818aSopenharmony_ci 30376e6818aSopenharmony_ci if os.path.isfile(src): 30476e6818aSopenharmony_ci delfile(src) 30576e6818aSopenharmony_ci elif os.path.isdir(src): 30676e6818aSopenharmony_ci delfolder(src) 30776e6818aSopenharmony_ci 30876e6818aSopenharmony_ci 30976e6818aSopenharmony_cidef get_file_md5(file_name: str): 31076e6818aSopenharmony_ci """ 31176e6818aSopenharmony_ci @summary: Get MD5 hash of a file. 31276e6818aSopenharmony_ci :param file_name: 31376e6818aSopenharmony_ci :return: 31476e6818aSopenharmony_ci """ 31576e6818aSopenharmony_ci if platform.system() == "Windows": 31676e6818aSopenharmony_ci flags = os.O_RDONLY | os.O_BINARY 31776e6818aSopenharmony_ci else: 31876e6818aSopenharmony_ci flags = os.O_RDONLY 31976e6818aSopenharmony_ci fd = os.open(file_name, flags, 0o644) 32076e6818aSopenharmony_ci m = hashlib.md5() # 创建md5对象 32176e6818aSopenharmony_ci with os.fdopen(fd, mode="rb") as f: 32276e6818aSopenharmony_ci while True: 32376e6818aSopenharmony_ci data = f.read(4096) 32476e6818aSopenharmony_ci if not data: 32576e6818aSopenharmony_ci break 32676e6818aSopenharmony_ci m.update(data) # 更新md5对象 32776e6818aSopenharmony_ci return m.hexdigest() 32876e6818aSopenharmony_ci 32976e6818aSopenharmony_ci 33076e6818aSopenharmony_cidef compress_and_remove(folder_path: str, archive_name: str) -> bool: 33176e6818aSopenharmony_ci """ 33276e6818aSopenharmony_ci @summary: Compress and remove a file. 33376e6818aSopenharmony_ci :param folder_path: 33476e6818aSopenharmony_ci :param archive_name: 33576e6818aSopenharmony_ci :return: 33676e6818aSopenharmony_ci """ 33776e6818aSopenharmony_ci try: 33876e6818aSopenharmony_ci shutil.make_archive(archive_name, 'zip', folder_path) 33976e6818aSopenharmony_ci shutil.rmtree(folder_path) 34076e6818aSopenharmony_ci return True 34176e6818aSopenharmony_ci except Exception as e: 34276e6818aSopenharmony_ci log.error(e) 34376e6818aSopenharmony_ci return False 34476e6818aSopenharmony_ci 34576e6818aSopenharmony_ci 34676e6818aSopenharmony_cidef unzip_file(zip_path, extract_to): 34776e6818aSopenharmony_ci """ 34876e6818aSopenharmony_ci 解压指定的 ZIP 文件到目标目录 34976e6818aSopenharmony_ci 35076e6818aSopenharmony_ci :param zip_path: ZIP 文件路径 35176e6818aSopenharmony_ci :param extract_to: 解压到的目标目录 35276e6818aSopenharmony_ci """ 35376e6818aSopenharmony_ci # 确保目标目录存在 35476e6818aSopenharmony_ci if not os.path.exists(extract_to): 35576e6818aSopenharmony_ci os.makedirs(extract_to) 35676e6818aSopenharmony_ci 35776e6818aSopenharmony_ci # 打开 ZIP 文件 35876e6818aSopenharmony_ci with zipfile.ZipFile(zip_path, 'r') as zip_ref: 35976e6818aSopenharmony_ci # 解压所有文件到目标目录 36076e6818aSopenharmony_ci zip_ref.extractall(extract_to) 361