176e6818aSopenharmony_ci#!/usr/bin/env python3 276e6818aSopenharmony_ci# coding=utf-8 376e6818aSopenharmony_ci 476e6818aSopenharmony_ci# 576e6818aSopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd. 676e6818aSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License"); 776e6818aSopenharmony_ci# you may not use this file except in compliance with the License. 876e6818aSopenharmony_ci# You may obtain a copy of the License at 976e6818aSopenharmony_ci# 1076e6818aSopenharmony_ci# http://www.apache.org/licenses/LICENSE-2.0 1176e6818aSopenharmony_ci# 1276e6818aSopenharmony_ci# Unless required by applicable law or agreed to in writing, software 1376e6818aSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS, 1476e6818aSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1576e6818aSopenharmony_ci# See the License for the specific language governing permissions and 1676e6818aSopenharmony_ci# limitations under the License. 1776e6818aSopenharmony_ci# 1876e6818aSopenharmony_ci 1976e6818aSopenharmony_ciimport importlib 2076e6818aSopenharmony_ciimport hashlib 2176e6818aSopenharmony_ciimport os 2276e6818aSopenharmony_ciimport platform 2376e6818aSopenharmony_ciimport re 2476e6818aSopenharmony_ciimport secrets 2576e6818aSopenharmony_ciimport site 2676e6818aSopenharmony_ciimport socket 2776e6818aSopenharmony_ciimport subprocess 2876e6818aSopenharmony_ciimport sys 2976e6818aSopenharmony_ci 3076e6818aSopenharmony_cifrom xdevice import get_decode 3176e6818aSopenharmony_cifrom xdevice import ParamError 3276e6818aSopenharmony_cifrom xdevice import DeviceConnectorType 3376e6818aSopenharmony_ci 3476e6818aSopenharmony_cifrom devicetest.core.exception import DeviceTestError 3576e6818aSopenharmony_cifrom devicetest.core.exception import ModuleNotAttributeError 3676e6818aSopenharmony_cifrom devicetest.error import ErrorCategory 3776e6818aSopenharmony_cifrom devicetest.error import ErrorMessage 3876e6818aSopenharmony_cifrom devicetest.log.logger import DeviceTestLog as log 3976e6818aSopenharmony_ci 4076e6818aSopenharmony_ci 4176e6818aSopenharmony_cidef clean_sys_resource(file_path=None, file_base_name=None): 4276e6818aSopenharmony_ci """ 4376e6818aSopenharmony_ci clean sys.path/sys.modules resource 4476e6818aSopenharmony_ci :param file_path: sys path 4576e6818aSopenharmony_ci :param file_base_name: module name 4676e6818aSopenharmony_ci :return: None 4776e6818aSopenharmony_ci """ 4876e6818aSopenharmony_ci if file_path in sys.path: 4976e6818aSopenharmony_ci sys.path.remove(file_path) 5076e6818aSopenharmony_ci 5176e6818aSopenharmony_ci if file_base_name in sys.modules: 5276e6818aSopenharmony_ci del sys.modules[file_base_name] 5376e6818aSopenharmony_ci 5476e6818aSopenharmony_ci 5576e6818aSopenharmony_cidef get_base_name(file_abs_path, is_abs_name=False): 5676e6818aSopenharmony_ci """ 5776e6818aSopenharmony_ci Args: 5876e6818aSopenharmony_ci file_abs_path: str , file path 5976e6818aSopenharmony_ci is_abs_name : bool, 6076e6818aSopenharmony_ci Returns: 6176e6818aSopenharmony_ci file base name 6276e6818aSopenharmony_ci Example: 6376e6818aSopenharmony_ci input: D:/xdevice/decc.py 6476e6818aSopenharmony_ci if is_abs_name return: D:/xdevice/decc, else return: decc 6576e6818aSopenharmony_ci """ 6676e6818aSopenharmony_ci if isinstance(file_abs_path, str): 6776e6818aSopenharmony_ci base_name = file_abs_path if is_abs_name else os.path.basename( 6876e6818aSopenharmony_ci file_abs_path) 6976e6818aSopenharmony_ci file_base_name, _ = os.path.splitext(base_name) 7076e6818aSopenharmony_ci return file_base_name 7176e6818aSopenharmony_ci return None 7276e6818aSopenharmony_ci 7376e6818aSopenharmony_ci 7476e6818aSopenharmony_cidef get_dir_path(file_path): 7576e6818aSopenharmony_ci if isinstance(file_path, str): 7676e6818aSopenharmony_ci if os.path.exists(file_path): 7776e6818aSopenharmony_ci return os.path.dirname(file_path) 7876e6818aSopenharmony_ci return None 7976e6818aSopenharmony_ci 8076e6818aSopenharmony_ci 8176e6818aSopenharmony_cidef import_from_file(file_path, file_base_name): 8276e6818aSopenharmony_ci if file_path in sys.path: 8376e6818aSopenharmony_ci sys.path.remove(file_path) 8476e6818aSopenharmony_ci 8576e6818aSopenharmony_ci sys.path.insert(0, file_path) 8676e6818aSopenharmony_ci if file_base_name in sys.modules: 8776e6818aSopenharmony_ci del sys.modules[file_base_name] 8876e6818aSopenharmony_ci 8976e6818aSopenharmony_ci try: 9076e6818aSopenharmony_ci importlib.import_module(file_base_name) 9176e6818aSopenharmony_ci except Exception as exception: 9276e6818aSopenharmony_ci file_abs_path = os.path.join(file_path, file_base_name) 9376e6818aSopenharmony_ci error_msg = ErrorMessage.TestCase.Code_0203001.format(file_abs_path, exception) 9476e6818aSopenharmony_ci raise ImportError(error_msg) from exception 9576e6818aSopenharmony_ci if not hasattr(sys.modules.get(file_base_name), file_base_name): 9676e6818aSopenharmony_ci raise ModuleNotAttributeError(ErrorMessage.TestCase.Code_0203016) 9776e6818aSopenharmony_ci return getattr(sys.modules[file_base_name], file_base_name) 9876e6818aSopenharmony_ci 9976e6818aSopenharmony_ci 10076e6818aSopenharmony_cidef get_forward_ports(self=None): 10176e6818aSopenharmony_ci try: 10276e6818aSopenharmony_ci ports_list = [] 10376e6818aSopenharmony_ci cmd = "fport ls" 10476e6818aSopenharmony_ci out = get_decode(self.connector_command(cmd)).strip() 10576e6818aSopenharmony_ci clean_lines = out.split('\n') 10676e6818aSopenharmony_ci for line_text in clean_lines: 10776e6818aSopenharmony_ci # clear reverse port first Example: 'tcp:8011 tcp:9963' [Reverse] 10876e6818aSopenharmony_ci if "Reverse" in line_text and "fport" in cmd: 10976e6818aSopenharmony_ci connector_tokens = line_text.split() 11076e6818aSopenharmony_ci self.connector_command(["fport", "rm", 11176e6818aSopenharmony_ci connector_tokens[0].replace("'", ""), 11276e6818aSopenharmony_ci connector_tokens[1].replace("'", "")]) 11376e6818aSopenharmony_ci continue 11476e6818aSopenharmony_ci connector_tokens = line_text.split("tcp:") 11576e6818aSopenharmony_ci if len(connector_tokens) != 3: 11676e6818aSopenharmony_ci continue 11776e6818aSopenharmony_ci ports_list.append(int(connector_tokens[1])) 11876e6818aSopenharmony_ci return ports_list 11976e6818aSopenharmony_ci except Exception: 12076e6818aSopenharmony_ci log.error(ErrorMessage.Common.Code_0201005) 12176e6818aSopenharmony_ci return [] 12276e6818aSopenharmony_ci 12376e6818aSopenharmony_ci 12476e6818aSopenharmony_cidef is_port_idle(host: str = "127.0.0.1", port: int = None) -> bool: 12576e6818aSopenharmony_ci """端口是否空闲""" 12676e6818aSopenharmony_ci s = None 12776e6818aSopenharmony_ci is_idle = False 12876e6818aSopenharmony_ci try: 12976e6818aSopenharmony_ci s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 13076e6818aSopenharmony_ci s.settimeout(3) 13176e6818aSopenharmony_ci s.connect((host, port)) 13276e6818aSopenharmony_ci except Exception: 13376e6818aSopenharmony_ci # 已知会抛出ConnectionRefusedError和TimeoutError两种 13476e6818aSopenharmony_ci is_idle = True 13576e6818aSopenharmony_ci finally: 13676e6818aSopenharmony_ci if s is not None: 13776e6818aSopenharmony_ci s.close() 13876e6818aSopenharmony_ci return is_idle 13976e6818aSopenharmony_ci 14076e6818aSopenharmony_ci 14176e6818aSopenharmony_cidef get_forward_port(self, host: str = None, port: int = None, filter_ports: list = None): 14276e6818aSopenharmony_ci if filter_ports is None: 14376e6818aSopenharmony_ci filter_ports = [] 14476e6818aSopenharmony_ci try: 14576e6818aSopenharmony_ci ports_list = get_forward_ports(self) 14676e6818aSopenharmony_ci 14776e6818aSopenharmony_ci port = 9999 - secrets.randbelow(99) 14876e6818aSopenharmony_ci cnt = 0 14976e6818aSopenharmony_ci while cnt < 10 and port > 1024: 15076e6818aSopenharmony_ci if port not in filter_ports and port not in ports_list and is_port_idle(host, port): 15176e6818aSopenharmony_ci cnt += 1 15276e6818aSopenharmony_ci break 15376e6818aSopenharmony_ci 15476e6818aSopenharmony_ci port -= 1 15576e6818aSopenharmony_ci return port 15676e6818aSopenharmony_ci except Exception as error: 15776e6818aSopenharmony_ci err_msg = ErrorMessage.Common.Code_0201005 15876e6818aSopenharmony_ci log.error(err_msg) 15976e6818aSopenharmony_ci raise DeviceTestError(err_msg) from error 16076e6818aSopenharmony_ci 16176e6818aSopenharmony_ci 16276e6818aSopenharmony_cidef get_local_ip_address(): 16376e6818aSopenharmony_ci """ 16476e6818aSopenharmony_ci 查询本机ip地址 16576e6818aSopenharmony_ci :return: ip 16676e6818aSopenharmony_ci """ 16776e6818aSopenharmony_ci ip = "127.0.0.1" 16876e6818aSopenharmony_ci return ip 16976e6818aSopenharmony_ci 17076e6818aSopenharmony_ci 17176e6818aSopenharmony_cidef compare_version(version, base_version: tuple, rex: str): 17276e6818aSopenharmony_ci """比较两个版本号的大小,若version版本大于等于base_version,返回True 17376e6818aSopenharmony_ci Args: 17476e6818aSopenharmony_ci version: str, version 17576e6818aSopenharmony_ci rex: version style rex 17676e6818aSopenharmony_ci base_version: list, base_version 17776e6818aSopenharmony_ci Example: 17876e6818aSopenharmony_ci version: "4.0.0.1" base_version:[4.0.0.0] 17976e6818aSopenharmony_ci if version bigger than base_version or equal to base_version, return True, else return False 18076e6818aSopenharmony_ci """ 18176e6818aSopenharmony_ci version = version.strip() 18276e6818aSopenharmony_ci if re.match(rex, version): 18376e6818aSopenharmony_ci version = tuple(version.split(".")) 18476e6818aSopenharmony_ci if version > base_version: 18576e6818aSopenharmony_ci return True 18676e6818aSopenharmony_ci else: 18776e6818aSopenharmony_ci return True 18876e6818aSopenharmony_ci return False 18976e6818aSopenharmony_ci 19076e6818aSopenharmony_ci 19176e6818aSopenharmony_cidef extract_version(version_str: str): 19276e6818aSopenharmony_ci """ 19376e6818aSopenharmony_ci 获取版本 19476e6818aSopenharmony_ci :param version_str: 版本信息 ALN-AL00 5.0.0.26(SP1C00E25R4P5log) 19576e6818aSopenharmony_ci :return: 5.0.0.26 19676e6818aSopenharmony_ci """ 19776e6818aSopenharmony_ci match = re.search(r'(\d+\.\d+\.\d+\.\d+)', version_str) 19876e6818aSopenharmony_ci if match: 19976e6818aSopenharmony_ci return match.group(1) 20076e6818aSopenharmony_ci return None 20176e6818aSopenharmony_ci 20276e6818aSopenharmony_ci 20376e6818aSopenharmony_cidef compare_versions_by_product(version1: str, version2: str): 20476e6818aSopenharmony_ci """ 20576e6818aSopenharmony_ci 比较两个版本号 20676e6818aSopenharmony_ci :param version1: 5.0.0.26 20776e6818aSopenharmony_ci :param version2: 5.0.0.23 20876e6818aSopenharmony_ci :return: 20976e6818aSopenharmony_ci """ 21076e6818aSopenharmony_ci version1 = extract_version(version1) 21176e6818aSopenharmony_ci version2 = extract_version(version2) 21276e6818aSopenharmony_ci v1 = tuple(map(int, version1.split('.'))) 21376e6818aSopenharmony_ci v2 = tuple(map(int, version2.split('.'))) 21476e6818aSopenharmony_ci if v1 > v2: 21576e6818aSopenharmony_ci return True 21676e6818aSopenharmony_ci else: 21776e6818aSopenharmony_ci return False 21876e6818aSopenharmony_ci 21976e6818aSopenharmony_ci 22076e6818aSopenharmony_ciclass DeviceFileUtils: 22176e6818aSopenharmony_ci @staticmethod 22276e6818aSopenharmony_ci def check_remote_file_is_exist(_ad, remote_file): 22376e6818aSopenharmony_ci # test -f remotepath judge file exists. 22476e6818aSopenharmony_ci # if exist,return 0,else return None 22576e6818aSopenharmony_ci # 判断设备中文件是否存在 22676e6818aSopenharmony_ci ret = _ad.execute_shell_command("test -f %s && echo 0" % remote_file) 22776e6818aSopenharmony_ci if ret != "" \ 22876e6818aSopenharmony_ci and len(str(ret).split()) \ 22976e6818aSopenharmony_ci != 0 and str(ret).split()[0] == "0": 23076e6818aSopenharmony_ci return True 23176e6818aSopenharmony_ci return False 23276e6818aSopenharmony_ci 23376e6818aSopenharmony_ci @staticmethod 23476e6818aSopenharmony_ci def check_remote_dict_is_exist(_ad, remote_file): 23576e6818aSopenharmony_ci # test -f remotepath judge folder exists. 23676e6818aSopenharmony_ci # if exist,return 0,else return None 23776e6818aSopenharmony_ci # 判断设备中文件夹是否存在 23876e6818aSopenharmony_ci ret = _ad.execute_shell_command( 23976e6818aSopenharmony_ci "test -d {} && echo 0".format(remote_file)) 24076e6818aSopenharmony_ci if ret != "" \ 24176e6818aSopenharmony_ci and len(str(ret).split()) != 0 and str(ret).split()[0] == "0": 24276e6818aSopenharmony_ci return True 24376e6818aSopenharmony_ci return False 24476e6818aSopenharmony_ci 24576e6818aSopenharmony_ci 24676e6818aSopenharmony_cidef compare_text(text, expect_text, fuzzy): 24776e6818aSopenharmony_ci """支持多种匹配方式的文本匹配""" 24876e6818aSopenharmony_ci if fuzzy is None or fuzzy.startswith("equal"): 24976e6818aSopenharmony_ci result = (expect_text == text) 25076e6818aSopenharmony_ci elif fuzzy == "starts_with": 25176e6818aSopenharmony_ci result = text.startswith(expect_text) 25276e6818aSopenharmony_ci elif fuzzy == "ends_with": 25376e6818aSopenharmony_ci result = text.endswith(expect_text) 25476e6818aSopenharmony_ci elif fuzzy == "contains": 25576e6818aSopenharmony_ci result = expect_text in text 25676e6818aSopenharmony_ci elif fuzzy == "regexp": 25776e6818aSopenharmony_ci result = re.search(expect_text, text) 25876e6818aSopenharmony_ci result = False if result is None else True 25976e6818aSopenharmony_ci else: 26076e6818aSopenharmony_ci raise ParamError("expected [equal, starts_with, ends_with, contains], get [{}]".format(fuzzy)) 26176e6818aSopenharmony_ci return result 26276e6818aSopenharmony_ci 26376e6818aSopenharmony_ci 26476e6818aSopenharmony_cidef get_process_pid(device, process_name): 26576e6818aSopenharmony_ci cmd = "ps -ef | grep '{}'".format(process_name) 26676e6818aSopenharmony_ci ret = device.execute_shell_command(cmd) 26776e6818aSopenharmony_ci ret = ret.strip() 26876e6818aSopenharmony_ci pids = ret.split("\n") 26976e6818aSopenharmony_ci for pid in pids: 27076e6818aSopenharmony_ci if "grep" not in pid: 27176e6818aSopenharmony_ci pid = pid.split() 27276e6818aSopenharmony_ci return pid[1] 27376e6818aSopenharmony_ci return None 27476e6818aSopenharmony_ci 27576e6818aSopenharmony_ci 27676e6818aSopenharmony_cidef check_port_state(port: int = None) -> None: 27776e6818aSopenharmony_ci """查看端口状态""" 27876e6818aSopenharmony_ci try: 27976e6818aSopenharmony_ci log.debug("##########port state##########") 28076e6818aSopenharmony_ci sys_type = platform.system() 28176e6818aSopenharmony_ci if sys_type == "Linux" or sys_type == "Darwin": 28276e6818aSopenharmony_ci cmd = "lsof -i:{}".format(port) 28376e6818aSopenharmony_ci out = shell_command(cmd) 28476e6818aSopenharmony_ci log.debug(out) 28576e6818aSopenharmony_ci else: 28676e6818aSopenharmony_ci out = shell_command("netstat -aon", "findstr :{}".format(port)) 28776e6818aSopenharmony_ci log.debug(out) 28876e6818aSopenharmony_ci results = out.strip("\r\n") 28976e6818aSopenharmony_ci if results: 29076e6818aSopenharmony_ci results = results.split("\r\n") 29176e6818aSopenharmony_ci for result in results: 29276e6818aSopenharmony_ci items = result.split() 29376e6818aSopenharmony_ci if items[0] == "TCP" and items[-2] == "LISTENING": 29476e6818aSopenharmony_ci out = shell_command("tasklist", "findstr {}".format(items[-1])) 29576e6818aSopenharmony_ci log.debug(out) 29676e6818aSopenharmony_ci log.debug("##########port state##########") 29776e6818aSopenharmony_ci except Exception as e: 29876e6818aSopenharmony_ci log.error("check port state error, reason: {}".format(e)) 29976e6818aSopenharmony_ci 30076e6818aSopenharmony_ci 30176e6818aSopenharmony_cidef shell_command(cmd: str, findstr: str = "") -> str: 30276e6818aSopenharmony_ci command = cmd.split(" ") 30376e6818aSopenharmony_ci first_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False) 30476e6818aSopenharmony_ci if findstr: 30576e6818aSopenharmony_ci findstr_command = findstr.split(" ") 30676e6818aSopenharmony_ci findstr_process = subprocess.Popen(findstr_command, stdin=first_process.stdout, 30776e6818aSopenharmony_ci stderr=subprocess.PIPE, 30876e6818aSopenharmony_ci stdout=subprocess.PIPE, 30976e6818aSopenharmony_ci shell=False) 31076e6818aSopenharmony_ci out, _ = findstr_process.communicate(timeout=10) 31176e6818aSopenharmony_ci else: 31276e6818aSopenharmony_ci out, _ = first_process.communicate(timeout=10) 31376e6818aSopenharmony_ci out = out.decode("utf-8") 31476e6818aSopenharmony_ci return out 31576e6818aSopenharmony_ci 31676e6818aSopenharmony_ci 31776e6818aSopenharmony_cidef check_device_file_md5(device, pc_file: str, device_file: str) -> bool: 31876e6818aSopenharmony_ci if not os.path.isfile(pc_file): 31976e6818aSopenharmony_ci raise FileNotFoundError(ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, pc_file)) 32076e6818aSopenharmony_ci _, local_file_name = os.path.split(pc_file) 32176e6818aSopenharmony_ci device_md5 = device.execute_shell_command( 32276e6818aSopenharmony_ci "md5sum {}".format(device_file)).split()[0].strip() 32376e6818aSopenharmony_ci device.log.debug("device {} md5: {}".format(local_file_name, device_md5)) 32476e6818aSopenharmony_ci with open(pc_file, "rb") as f: 32576e6818aSopenharmony_ci data = f.read() 32676e6818aSopenharmony_ci md5hash = hashlib.md5(data) 32776e6818aSopenharmony_ci local_md5 = md5hash.hexdigest() 32876e6818aSopenharmony_ci device.log.debug("local {} md5: {}".format(local_file_name, local_md5)) 32976e6818aSopenharmony_ci return True if device_md5 == local_md5 else False 33076e6818aSopenharmony_ci 33176e6818aSopenharmony_ci 33276e6818aSopenharmony_cidef is_standard_lib(file_path): 33376e6818aSopenharmony_ci """Check if the file is part of the standard library.""" 33476e6818aSopenharmony_ci std_lib_path = os.path.join(sys.base_prefix, 'lib') 33576e6818aSopenharmony_ci return file_path.startswith(std_lib_path) 33676e6818aSopenharmony_ci 33776e6818aSopenharmony_ci 33876e6818aSopenharmony_cidef is_third_party_lib(file_path): 33976e6818aSopenharmony_ci """Check if the file is part of a third-party library.""" 34076e6818aSopenharmony_ci site_packages = [os.path.join(sys.prefix, 'lib', 'site-packages')] 34176e6818aSopenharmony_ci if hasattr(sys, 'real_prefix'): # This means we are in a virtual environment 34276e6818aSopenharmony_ci site_packages.append(os.path.join(sys.real_prefix, 'lib', 'site-packages')) 34376e6818aSopenharmony_ci site_packages.append(site.getusersitepackages()) 34476e6818aSopenharmony_ci return any(file_path.startswith(site_package) for site_package in site_packages) 34576e6818aSopenharmony_ci 34676e6818aSopenharmony_ci 34776e6818aSopenharmony_cidef extract_file_method_code(stack_line): 34876e6818aSopenharmony_ci """Extract the Python file name and the method after 'in' from a stack trace line.""" 34976e6818aSopenharmony_ci match = re.search(r'File "(.*?)", line \d+, in (.+)', stack_line) 35076e6818aSopenharmony_ci if match: 35176e6818aSopenharmony_ci file_name = match.group(1).split(os.sep)[-1] 35276e6818aSopenharmony_ci method_name = match.group(2).strip() 35376e6818aSopenharmony_ci code_line = stack_line.split('\n')[-2].strip() 35476e6818aSopenharmony_ci return file_name, method_name, code_line 35576e6818aSopenharmony_ci return None, None, None 356