1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2024 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import os 17import zipfile 18import subprocess 19import re 20import time 21 22OUTPUT_PATH = "testModule/output" 23 24def checkFile(filePath, checkFunction): 25 """ 26 description: 首先检查文件是否存在,然后检查文件内容是否符合要求 27 file_path: str, the path of the file to be checked 28 check_function: 校验函数,校验file_path文件内容是否符合要求,是则返回True,否则返回False 29 """ 30 assert os.path.exists(filePath), f"The file {filePath} does not exist." 31 with open(filePath, "r", encoding="utf-8") as f: 32 output = f.read() 33 return checkFunction(output) 34 35def checkZipFile(zipFilePath, checkFunction): 36 """ 37 description: 首先检查zip文件是否存在,然后解压文件,并检查解压后的log.txt文件内容是否符合要求 38 check_function: 校验函数,校验解压后的log.txt文件内容是否符合要求,是则返回True,否则返回False 39 """ 40 assert os.path.exists(zipFilePath), f"The file {zipFilePath} does not exist." 41 with zipfile.ZipFile(zipFilePath, 'r') as zip_ref: 42 # 解压所有文件到指定目录 43 dirname = os.path.dirname(zipFilePath) 44 zip_ref.extractall(dirname) 45 with open(f"{dirname}/log.txt", "r", encoding="utf-8") as f: 46 output = f.read() 47 return checkFunction(output) 48 49def print_check_result(func): 50 def wrapper(*args, **kwargs): 51 ret = func(*args, **kwargs) 52 if (ret): 53 print(f"func {func.__name__} success") 54 else: 55 print(f"func {func.__name__} failed") 56 return ret 57 return wrapper 58 59def GetPidByProcessName(processName): 60 pid = None 61 cmd = f"hdc shell \"pidof {processName}\"" 62 try: 63 pid = subprocess.check_output(cmd, shell=True, encoding="utf-8", text=True) 64 pid = int(pid.strip().split()[0]) 65 except subprocess.CalledProcessError as e: 66 print(f"Command failed: {cmd}\nError: {e}") 67 except Exception as e: 68 print(f"Unexpected error: {e}") 69 return pid 70 71def convert_string_to_matrix(data : str) -> list: 72 """ 73 description: 将字符串转换为矩阵 74 string: str, 字符串 75 """ 76 data = data.strip("-\n") 77 lines = data.split('\n') 78 matrix = [] 79 # 遍历每一行 80 for line in lines: 81 # 如果行是空的,跳过 82 if not line.strip(): 83 continue 84 # 分割每一列,去除空格,并转换为整数 85 row = [int(col.strip()) for col in line.split()] 86 matrix.append(row) 87 return matrix 88 89def CheckCmd(command, checkFunction): 90 output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8") 91 assert checkFunction(output) 92 93def CheckCmdRedirect(command, checkFunction, filePath = None): 94 filePath = f"{OUTPUT_PATH}/hidumper_redirect.txt" if filePath is None else filePath 95 subprocess.check_output(f"hdc shell \"{command}\" > {filePath}", shell=True, text=True, encoding="utf-8") 96 assert checkFile(filePath, checkFunction = checkFunction) 97 98def CheckCmdZip(command, checkFunction): 99 output = subprocess.check_output(f"hdc shell \"{command} --zip\"", shell=True, text=True, encoding="utf-8") 100 zipSourceFile = re.search("The result is:(.+)", output).group(1).strip() 101 zipTargetFile = f"{OUTPUT_PATH}/" + os.path.basename(zipSourceFile) 102 subprocess.check_output(f"hdc file recv {zipSourceFile} {zipTargetFile}", shell=True, text=True, encoding="utf-8") 103 assert checkZipFile(zipTargetFile, checkFunction = checkFunction) 104 105def IsLogVersion(): 106 output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip() 107 return "log" in output 108 109def IsRootVersion(): 110 output = subprocess.check_output("hdc shell param get const.debuggable", shell=True, text=True, encoding="utf-8").strip() 111 return output == "1" 112 113def IsOpenHarmonyVersion(): 114 output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip() 115 return "OpenHarmony" in output 116 117def WaitUntillLogAppear(command,targetLog, second): 118 process = subprocess.Popen( 119 command, 120 stdout=subprocess.PIPE, 121 stderr=subprocess.PIPE, 122 shell=True, 123 text=True 124 ) 125 start = time.time() 126 while True: 127 output = process.stdout.readline() 128 if targetLog in output: 129 process.kill() 130 return True 131 now = time.time() 132 if now - start > second: 133 process.kill() 134 return False