1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import platform 21import shutil 22import stat 23import sys 24import time 25import zipfile 26import hashlib 27 28from devicetest.core.exception import DeviceTestError 29from devicetest.core.variables import get_project_path 30from devicetest.error import ErrorMessage 31from devicetest.error import ErrorCategory 32from devicetest.log.logger import DeviceTestLog as log 33 34 35def get_template_path(template_file_path, isdir=None): 36 """ 37 @param template_file_path: Obtains the absolute path of the template screen cap path. 38 @param isdir: Obtain the directory: True; Obtain the file: False; 39 None: Ignore the file type 40 """ 41 template_file_path = template_file_path.replace("\\", "/") 42 if os.path.isabs(template_file_path) \ 43 and (not isdir and os.path.isfile(template_file_path)): 44 return os.path.abspath(template_file_path) 45 46 # remove first str '/' 47 if not os.path.isfile(template_file_path) and template_file_path.startswith("/"): 48 template_file_path = template_file_path[1:] 49 50 _fol = None 51 if template_file_path.startswith("resource"): 52 path = template_file_path[9:] 53 from xdevice import EnvPool 54 if EnvPool.resource_path is not None: 55 folder = os.path.abspath(EnvPool.resource_path) 56 _fol = travesal_folder(folder, path, isdir) 57 if _fol is None: 58 log.debug("Not found [%s] in env pool path %s, " 59 "continue to find template in resource path." % ( 60 path, folder)) 61 if _fol is None: 62 ecotest_resource_path = getattr(sys, "ecotest_resource_path", "") 63 if ecotest_resource_path is not None: 64 folder = os.path.abspath(ecotest_resource_path) 65 _fol = travesal_folder(folder, path, isdir) 66 if _fol is None: 67 log.debug("Not found [%s] in resource path %s, " 68 "continue to find template in other path." % ( 69 path, folder)) 70 else: 71 _fol = get_resource_path(template_file_path) 72 log.debug("get template path:{}".format(_fol)) 73 return _fol 74 75 76def get_resource_path(resource_file_path, isdir=None): 77 """ 78 @param resource_file_path: Obtains the absolute path of the resource file. 79 @param isdir: Obtain the directory: True; Obtain the file: False; 80 None: Ignore the file type 81 """ 82 resource_file_path = resource_file_path.replace("\\", "/") 83 if os.path.isabs(resource_file_path) \ 84 and ((isdir is None and os.path.exists(resource_file_path)) 85 or (not isdir and os.path.isfile(resource_file_path)) 86 or (isdir and os.path.isdir(resource_file_path))): 87 return os.path.abspath(resource_file_path) 88 89 _fol = None 90 from xdevice import EnvPool 91 if EnvPool.resource_path is not None: 92 folder = os.path.abspath(EnvPool.resource_path) 93 _fol = travesal_folder(folder, resource_file_path, isdir) 94 if _fol is None: 95 log.debug("Not found [%s] in env pool path %s, " 96 "continue to find in project resource path." % ( 97 resource_file_path, folder)) 98 99 if _fol is None: 100 ecotest_resource_path = getattr(sys, "ecotest_resource_path", "") 101 if ecotest_resource_path is not None: 102 folder = os.path.abspath(ecotest_resource_path) 103 _fol = travesal_folder(folder, resource_file_path, isdir) 104 if _fol is None: 105 log.debug("Not found [%s] in ecotest path %s, " 106 "continue to find in suit path." % ( 107 resource_file_path, folder)) 108 109 from devicetest.core.variables import DeccVariable 110 if _fol is None: 111 folder = os.path.abspath(DeccVariable.project.resource_path) 112 _fol = travesal_folder(folder, resource_file_path, isdir) 113 if _fol is None: 114 log.debug("Not found [%s] in product path %s, " 115 "continue to find in project resource path." % ( 116 resource_file_path, folder)) 117 118 if _fol is None: 119 folder = os.path.abspath(DeccVariable.project.test_suite_path) 120 _fol = travesal_folder(folder, resource_file_path, isdir) 121 if _fol is None: 122 log.debug("Not found [%s] in product path %s, " 123 "continue to find in suit resource path." % ( 124 resource_file_path, folder)) 125 126 if _fol is None: 127 folder = os.path.abspath(get_project_path()) 128 _fol = travesal_folder(folder, resource_file_path, isdir) 129 if _fol is None: 130 log.debug("Not found [%s] in product path %s, " 131 "continue to find in project path." % ( 132 resource_file_path, folder)) 133 134 if _fol is None: 135 err_msg = ErrorMessage.Common.Code_0201008.format(resource_file_path) 136 log.error(err_msg) 137 raise DeviceTestError(err_msg) 138 log.debug("get resource path:{}".format(_fol)) 139 return _fol 140 141 142def travesal_folder(folder, folder_file_path, isdir=False): 143 folder_file = os.path.join(folder, folder_file_path) 144 if (isdir is None and os.path.exists(folder_file)) \ 145 or (not isdir and os.path.isfile(folder_file)) \ 146 or (isdir and os.path.isdir(folder_file)): 147 return os.path.abspath(folder_file) 148 149 if not os.path.exists(folder): 150 return None 151 152 for child in os.listdir(folder): 153 if child == ".svn": 154 continue 155 156 folder_file = os.path.join(folder, child) 157 if os.path.isdir(folder_file): 158 if (isdir is None or isdir) \ 159 and folder_file.endswith(os.sep + folder_file_path): 160 return folder_file 161 else: 162 folder_ret = travesal_folder(folder_file, 163 folder_file_path, isdir) 164 if folder_ret is not None: 165 return folder_ret 166 elif os.path.isfile(folder_file) \ 167 and folder_file.endswith(os.sep + folder_file_path) \ 168 and (isdir is None or not isdir): 169 return folder_file 170 171 return None 172 173 174def os_open_file_write(file_path, content, mode="w"): 175 try: 176 flags = os.O_WRONLY | os.O_CREAT 177 modes = stat.S_IWUSR | stat.S_IRUSR 178 dir_path = os.path.dirname(file_path) 179 if not os.path.isdir(dir_path): 180 os.makedirs(dir_path) 181 with os.fdopen(os.open(file_path, flags, modes), mode) as fout: 182 fout.write(content) 183 except Exception as error: 184 err_msg = ErrorMessage.Common.Code_0201009 185 log.error(err_msg, is_traceback=True) 186 raise DeviceTestError(err_msg) from error 187 188 189def os_open_file_read(file_path, mode="r"): 190 try: 191 flags = os.O_RDONLY 192 modes = stat.S_IWUSR | stat.S_IRUSR 193 with os.fdopen(os.open(file_path, flags, modes), mode) as fout: 194 return fout.read() 195 except FileNotFoundError as error: 196 err_msg = ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, file_path) 197 log.error(err_msg, is_traceback=True) 198 raise DeviceTestError(err_msg) from error 199 except Exception as error: 200 err_msg = ErrorMessage.Common.Code_0201010 201 log.error(err_msg, is_traceback=True) 202 raise DeviceTestError(err_msg) from error 203 204 205def save_file(file_path, content): 206 os_open_file_write(file_path, content, "wb") 207 208 209def create_dir(create_path): 210 """ 211 Creates a directory if it does not exist already. 212 Args: 213 create_path: The path of the directory to create. 214 """ 215 full_path = os.path.abspath(os.path.expanduser(create_path)) 216 if not os.path.exists(full_path): 217 os.makedirs(full_path, exist_ok=True) # exist_ok=True 218 219 220def to_file(filename, content): 221 """ 222 generate files 223 """ 224 dirname = os.path.dirname(filename) 225 if not os.path.isdir(dirname): 226 os.makedirs(dirname) 227 os_open_file_write(filename, content, "wb") 228 229 230def delfile(filename): 231 try: 232 os.remove(filename) 233 except Exception as exception: 234 log.error(exception) 235 if os.path.isfile(filename): 236 if "nt" in sys.builtin_module_names: 237 os.remove(filename) 238 else: 239 shutil.rmtree(filename) 240 241 for _ in range(5): 242 if os.path.isfile(filename): 243 time.sleep(0.1) 244 continue 245 else: 246 break 247 248 if os.path.isfile(filename): 249 log.error("Delete file %s failed." % filename) 250 251 252def delfolder(dirname): 253 try: 254 shutil.rmtree(dirname) 255 except Exception as _: 256 if os.path.isdir(dirname): 257 shutil.rmtree(dirname) 258 259 for _ in range(5): 260 if os.path.isdir(dirname): 261 time.sleep(0.1) 262 continue 263 else: 264 break 265 266 if os.path.isdir(dirname): 267 log.error("Delete folder %s failed." % dirname) 268 269 270def copy_to_folder(src, des): 271 """Copy a folder and its children or a file to another folder. 272 """ 273 src = os.path.normpath(src) 274 des = os.path.normpath(des) 275 if not os.path.exists(src): 276 log.error("No found [%s]" % src) 277 return 278 if not os.path.exists(des): 279 create_dir(des) 280 if not os.path.isdir(des): 281 log.error("[%s] is not a folder." % des) 282 return 283 284 if not os.path.isdir(src): 285 shutil.copy(src, des) 286 return 287 os.chdir(src) 288 src_file_list = [os.path.join(src, src_file) 289 for src_file in os.listdir(des)] 290 for source in src_file_list: 291 if os.path.isfile(source): 292 shutil.copy(source, des) 293 if os.path.isdir(source): 294 _, src_name = os.path.split(source) 295 shutil.copytree(source, os.path.join(des, src_name)) 296 297 298def delete_file_folder(src): 299 """ 300 @summary: Delete files or directories. 301 """ 302 303 if os.path.isfile(src): 304 delfile(src) 305 elif os.path.isdir(src): 306 delfolder(src) 307 308 309def get_file_md5(file_name: str): 310 """ 311 @summary: Get MD5 hash of a file. 312 :param file_name: 313 :return: 314 """ 315 if platform.system() == "Windows": 316 flags = os.O_RDONLY | os.O_BINARY 317 else: 318 flags = os.O_RDONLY 319 fd = os.open(file_name, flags, 0o644) 320 m = hashlib.md5() # 创建md5对象 321 with os.fdopen(fd, mode="rb") as f: 322 while True: 323 data = f.read(4096) 324 if not data: 325 break 326 m.update(data) # 更新md5对象 327 return m.hexdigest() 328 329 330def compress_and_remove(folder_path: str, archive_name: str) -> bool: 331 """ 332 @summary: Compress and remove a file. 333 :param folder_path: 334 :param archive_name: 335 :return: 336 """ 337 try: 338 shutil.make_archive(archive_name, 'zip', folder_path) 339 shutil.rmtree(folder_path) 340 return True 341 except Exception as e: 342 log.error(e) 343 return False 344 345 346def unzip_file(zip_path, extract_to): 347 """ 348 解压指定的 ZIP 文件到目标目录 349 350 :param zip_path: ZIP 文件路径 351 :param extract_to: 解压到的目标目录 352 """ 353 # 确保目标目录存在 354 if not os.path.exists(extract_to): 355 os.makedirs(extract_to) 356 357 # 打开 ZIP 文件 358 with zipfile.ZipFile(zip_path, 'r') as zip_ref: 359 # 解压所有文件到目标目录 360 zip_ref.extractall(extract_to) 361