1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import importlib 20import hashlib 21import os 22import platform 23import re 24import secrets 25import site 26import socket 27import subprocess 28import sys 29 30from xdevice import get_decode 31from xdevice import ParamError 32from xdevice import DeviceConnectorType 33 34from devicetest.core.exception import DeviceTestError 35from devicetest.core.exception import ModuleNotAttributeError 36from devicetest.error import ErrorCategory 37from devicetest.error import ErrorMessage 38from devicetest.log.logger import DeviceTestLog as log 39 40 41def clean_sys_resource(file_path=None, file_base_name=None): 42 """ 43 clean sys.path/sys.modules resource 44 :param file_path: sys path 45 :param file_base_name: module name 46 :return: None 47 """ 48 if file_path in sys.path: 49 sys.path.remove(file_path) 50 51 if file_base_name in sys.modules: 52 del sys.modules[file_base_name] 53 54 55def get_base_name(file_abs_path, is_abs_name=False): 56 """ 57 Args: 58 file_abs_path: str , file path 59 is_abs_name : bool, 60 Returns: 61 file base name 62 Example: 63 input: D:/xdevice/decc.py 64 if is_abs_name return: D:/xdevice/decc, else return: decc 65 """ 66 if isinstance(file_abs_path, str): 67 base_name = file_abs_path if is_abs_name else os.path.basename( 68 file_abs_path) 69 file_base_name, _ = os.path.splitext(base_name) 70 return file_base_name 71 return None 72 73 74def get_dir_path(file_path): 75 if isinstance(file_path, str): 76 if os.path.exists(file_path): 77 return os.path.dirname(file_path) 78 return None 79 80 81def import_from_file(file_path, file_base_name): 82 if file_path in sys.path: 83 sys.path.remove(file_path) 84 85 sys.path.insert(0, file_path) 86 if file_base_name in sys.modules: 87 del sys.modules[file_base_name] 88 89 try: 90 importlib.import_module(file_base_name) 91 except Exception as exception: 92 file_abs_path = os.path.join(file_path, file_base_name) 93 error_msg = ErrorMessage.TestCase.Code_0203001.format(file_abs_path, exception) 94 raise ImportError(error_msg) from exception 95 if not hasattr(sys.modules.get(file_base_name), file_base_name): 96 raise ModuleNotAttributeError(ErrorMessage.TestCase.Code_0203016) 97 return getattr(sys.modules[file_base_name], file_base_name) 98 99 100def get_forward_ports(self=None): 101 try: 102 ports_list = [] 103 cmd = "fport ls" 104 out = get_decode(self.connector_command(cmd)).strip() 105 clean_lines = out.split('\n') 106 for line_text in clean_lines: 107 # clear reverse port first Example: 'tcp:8011 tcp:9963' [Reverse] 108 if "Reverse" in line_text and "fport" in cmd: 109 connector_tokens = line_text.split() 110 self.connector_command(["fport", "rm", 111 connector_tokens[0].replace("'", ""), 112 connector_tokens[1].replace("'", "")]) 113 continue 114 connector_tokens = line_text.split("tcp:") 115 if len(connector_tokens) != 3: 116 continue 117 ports_list.append(int(connector_tokens[1])) 118 return ports_list 119 except Exception: 120 log.error(ErrorMessage.Common.Code_0201005) 121 return [] 122 123 124def is_port_idle(host: str = "127.0.0.1", port: int = None) -> bool: 125 """端口是否空闲""" 126 s = None 127 is_idle = False 128 try: 129 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 130 s.settimeout(3) 131 s.connect((host, port)) 132 except Exception: 133 # 已知会抛出ConnectionRefusedError和TimeoutError两种 134 is_idle = True 135 finally: 136 if s is not None: 137 s.close() 138 return is_idle 139 140 141def get_forward_port(self, host: str = None, port: int = None, filter_ports: list = None): 142 if filter_ports is None: 143 filter_ports = [] 144 try: 145 ports_list = get_forward_ports(self) 146 147 port = 9999 - secrets.randbelow(99) 148 cnt = 0 149 while cnt < 10 and port > 1024: 150 if port not in filter_ports and port not in ports_list and is_port_idle(host, port): 151 cnt += 1 152 break 153 154 port -= 1 155 return port 156 except Exception as error: 157 err_msg = ErrorMessage.Common.Code_0201005 158 log.error(err_msg) 159 raise DeviceTestError(err_msg) from error 160 161 162def get_local_ip_address(): 163 """ 164 查询本机ip地址 165 :return: ip 166 """ 167 ip = "127.0.0.1" 168 return ip 169 170 171def compare_version(version, base_version: tuple, rex: str): 172 """比较两个版本号的大小,若version版本大于等于base_version,返回True 173 Args: 174 version: str, version 175 rex: version style rex 176 base_version: list, base_version 177 Example: 178 version: "4.0.0.1" base_version:[4.0.0.0] 179 if version bigger than base_version or equal to base_version, return True, else return False 180 """ 181 version = version.strip() 182 if re.match(rex, version): 183 version = tuple(version.split(".")) 184 if version > base_version: 185 return True 186 else: 187 return True 188 return False 189 190 191def extract_version(version_str: str): 192 """ 193 获取版本 194 :param version_str: 版本信息 ALN-AL00 5.0.0.26(SP1C00E25R4P5log) 195 :return: 5.0.0.26 196 """ 197 match = re.search(r'(\d+\.\d+\.\d+\.\d+)', version_str) 198 if match: 199 return match.group(1) 200 return None 201 202 203def compare_versions_by_product(version1: str, version2: str): 204 """ 205 比较两个版本号 206 :param version1: 5.0.0.26 207 :param version2: 5.0.0.23 208 :return: 209 """ 210 version1 = extract_version(version1) 211 version2 = extract_version(version2) 212 v1 = tuple(map(int, version1.split('.'))) 213 v2 = tuple(map(int, version2.split('.'))) 214 if v1 > v2: 215 return True 216 else: 217 return False 218 219 220class DeviceFileUtils: 221 @staticmethod 222 def check_remote_file_is_exist(_ad, remote_file): 223 # test -f remotepath judge file exists. 224 # if exist,return 0,else return None 225 # 判断设备中文件是否存在 226 ret = _ad.execute_shell_command("test -f %s && echo 0" % remote_file) 227 if ret != "" \ 228 and len(str(ret).split()) \ 229 != 0 and str(ret).split()[0] == "0": 230 return True 231 return False 232 233 @staticmethod 234 def check_remote_dict_is_exist(_ad, remote_file): 235 # test -f remotepath judge folder exists. 236 # if exist,return 0,else return None 237 # 判断设备中文件夹是否存在 238 ret = _ad.execute_shell_command( 239 "test -d {} && echo 0".format(remote_file)) 240 if ret != "" \ 241 and len(str(ret).split()) != 0 and str(ret).split()[0] == "0": 242 return True 243 return False 244 245 246def compare_text(text, expect_text, fuzzy): 247 """支持多种匹配方式的文本匹配""" 248 if fuzzy is None or fuzzy.startswith("equal"): 249 result = (expect_text == text) 250 elif fuzzy == "starts_with": 251 result = text.startswith(expect_text) 252 elif fuzzy == "ends_with": 253 result = text.endswith(expect_text) 254 elif fuzzy == "contains": 255 result = expect_text in text 256 elif fuzzy == "regexp": 257 result = re.search(expect_text, text) 258 result = False if result is None else True 259 else: 260 raise ParamError("expected [equal, starts_with, ends_with, contains], get [{}]".format(fuzzy)) 261 return result 262 263 264def get_process_pid(device, process_name): 265 cmd = "ps -ef | grep '{}'".format(process_name) 266 ret = device.execute_shell_command(cmd) 267 ret = ret.strip() 268 pids = ret.split("\n") 269 for pid in pids: 270 if "grep" not in pid: 271 pid = pid.split() 272 return pid[1] 273 return None 274 275 276def check_port_state(port: int = None) -> None: 277 """查看端口状态""" 278 try: 279 log.debug("##########port state##########") 280 sys_type = platform.system() 281 if sys_type == "Linux" or sys_type == "Darwin": 282 cmd = "lsof -i:{}".format(port) 283 out = shell_command(cmd) 284 log.debug(out) 285 else: 286 out = shell_command("netstat -aon", "findstr :{}".format(port)) 287 log.debug(out) 288 results = out.strip("\r\n") 289 if results: 290 results = results.split("\r\n") 291 for result in results: 292 items = result.split() 293 if items[0] == "TCP" and items[-2] == "LISTENING": 294 out = shell_command("tasklist", "findstr {}".format(items[-1])) 295 log.debug(out) 296 log.debug("##########port state##########") 297 except Exception as e: 298 log.error("check port state error, reason: {}".format(e)) 299 300 301def shell_command(cmd: str, findstr: str = "") -> str: 302 command = cmd.split(" ") 303 first_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) 304 if findstr: 305 findstr_command = findstr.split(" ") 306 findstr_process = subprocess.Popen(findstr_command, stdin=first_process.stdout, 307 stderr=subprocess.PIPE, 308 stdout=subprocess.PIPE, 309 shell=False) 310 out, _ = findstr_process.communicate(timeout=10) 311 else: 312 out, _ = first_process.communicate(timeout=10) 313 out = out.decode("utf-8") 314 return out 315 316 317def check_device_file_md5(device, pc_file: str, device_file: str) -> bool: 318 if not os.path.isfile(pc_file): 319 raise FileNotFoundError(ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, pc_file)) 320 _, local_file_name = os.path.split(pc_file) 321 device_md5 = device.execute_shell_command( 322 "md5sum {}".format(device_file)).split()[0].strip() 323 device.log.debug("device {} md5: {}".format(local_file_name, device_md5)) 324 with open(pc_file, "rb") as f: 325 data = f.read() 326 md5hash = hashlib.md5(data) 327 local_md5 = md5hash.hexdigest() 328 device.log.debug("local {} md5: {}".format(local_file_name, local_md5)) 329 return True if device_md5 == local_md5 else False 330 331 332def is_standard_lib(file_path): 333 """Check if the file is part of the standard library.""" 334 std_lib_path = os.path.join(sys.base_prefix, 'lib') 335 return file_path.startswith(std_lib_path) 336 337 338def is_third_party_lib(file_path): 339 """Check if the file is part of a third-party library.""" 340 site_packages = [os.path.join(sys.prefix, 'lib', 'site-packages')] 341 if hasattr(sys, 'real_prefix'): # This means we are in a virtual environment 342 site_packages.append(os.path.join(sys.real_prefix, 'lib', 'site-packages')) 343 site_packages.append(site.getusersitepackages()) 344 return any(file_path.startswith(site_package) for site_package in site_packages) 345 346 347def extract_file_method_code(stack_line): 348 """Extract the Python file name and the method after 'in' from a stack trace line.""" 349 match = re.search(r'File "(.*?)", line \d+, in (.+)', stack_line) 350 if match: 351 file_name = match.group(1).split(os.sep)[-1] 352 method_name = match.group(2).strip() 353 code_line = stack_line.split('\n')[-2].strip() 354 return file_name, method_name, code_line 355 return None, None, None 356