100600bfbSopenharmony_ci#!/usr/bin/env python3 200600bfbSopenharmony_ci# -*- coding: utf-8 -*- 300600bfbSopenharmony_ci# Copyright (C) 2024 Huawei Device Co., Ltd. 400600bfbSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License"); 500600bfbSopenharmony_ci# you may not use this file except in compliance with the License. 600600bfbSopenharmony_ci# You may obtain a copy of the License at 700600bfbSopenharmony_ci# 800600bfbSopenharmony_ci# http://www.apache.org/licenses/LICENSE-2.0 900600bfbSopenharmony_ci# 1000600bfbSopenharmony_ci# Unless required by applicable law or agreed to in writing, software 1100600bfbSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS, 1200600bfbSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 1300600bfbSopenharmony_ci# See the License for the specific language governing permissions and 1400600bfbSopenharmony_ci# limitations under the License. 1500600bfbSopenharmony_ci 1600600bfbSopenharmony_ciimport os 1700600bfbSopenharmony_ciimport zipfile 1800600bfbSopenharmony_ciimport subprocess 1900600bfbSopenharmony_ciimport re 2000600bfbSopenharmony_ciimport time 2100600bfbSopenharmony_ci 2200600bfbSopenharmony_ciOUTPUT_PATH = "testModule/output" 2300600bfbSopenharmony_ci 2400600bfbSopenharmony_cidef checkFile(filePath, checkFunction): 2500600bfbSopenharmony_ci """ 2600600bfbSopenharmony_ci description: 首先检查文件是否存在,然后检查文件内容是否符合要求 2700600bfbSopenharmony_ci file_path: str, the path of the file to be checked 2800600bfbSopenharmony_ci check_function: 校验函数,校验file_path文件内容是否符合要求,是则返回True,否则返回False 2900600bfbSopenharmony_ci """ 3000600bfbSopenharmony_ci assert os.path.exists(filePath), f"The file {filePath} does not exist." 3100600bfbSopenharmony_ci with open(filePath, "r", encoding="utf-8") as f: 3200600bfbSopenharmony_ci output = f.read() 3300600bfbSopenharmony_ci return checkFunction(output) 3400600bfbSopenharmony_ci 3500600bfbSopenharmony_cidef checkZipFile(zipFilePath, checkFunction): 3600600bfbSopenharmony_ci """ 3700600bfbSopenharmony_ci description: 首先检查zip文件是否存在,然后解压文件,并检查解压后的log.txt文件内容是否符合要求 3800600bfbSopenharmony_ci check_function: 校验函数,校验解压后的log.txt文件内容是否符合要求,是则返回True,否则返回False 3900600bfbSopenharmony_ci """ 4000600bfbSopenharmony_ci assert os.path.exists(zipFilePath), f"The file {zipFilePath} does not exist." 4100600bfbSopenharmony_ci with zipfile.ZipFile(zipFilePath, 'r') as zip_ref: 4200600bfbSopenharmony_ci # 解压所有文件到指定目录 4300600bfbSopenharmony_ci dirname = os.path.dirname(zipFilePath) 4400600bfbSopenharmony_ci zip_ref.extractall(dirname) 4500600bfbSopenharmony_ci with open(f"{dirname}/log.txt", "r", encoding="utf-8") as f: 4600600bfbSopenharmony_ci output = f.read() 4700600bfbSopenharmony_ci return checkFunction(output) 4800600bfbSopenharmony_ci 4900600bfbSopenharmony_cidef print_check_result(func): 5000600bfbSopenharmony_ci def wrapper(*args, **kwargs): 5100600bfbSopenharmony_ci ret = func(*args, **kwargs) 5200600bfbSopenharmony_ci if (ret): 5300600bfbSopenharmony_ci print(f"func {func.__name__} success") 5400600bfbSopenharmony_ci else: 5500600bfbSopenharmony_ci print(f"func {func.__name__} failed") 5600600bfbSopenharmony_ci return ret 5700600bfbSopenharmony_ci return wrapper 5800600bfbSopenharmony_ci 5900600bfbSopenharmony_cidef GetPidByProcessName(processName): 6000600bfbSopenharmony_ci pid = None 6100600bfbSopenharmony_ci cmd = f"hdc shell \"pidof {processName}\"" 6200600bfbSopenharmony_ci try: 6300600bfbSopenharmony_ci pid = subprocess.check_output(cmd, shell=True, encoding="utf-8", text=True) 6400600bfbSopenharmony_ci pid = int(pid.strip().split()[0]) 6500600bfbSopenharmony_ci except subprocess.CalledProcessError as e: 6600600bfbSopenharmony_ci print(f"Command failed: {cmd}\nError: {e}") 6700600bfbSopenharmony_ci except Exception as e: 6800600bfbSopenharmony_ci print(f"Unexpected error: {e}") 6900600bfbSopenharmony_ci return pid 7000600bfbSopenharmony_ci 7100600bfbSopenharmony_cidef convert_string_to_matrix(data : str) -> list: 7200600bfbSopenharmony_ci """ 7300600bfbSopenharmony_ci description: 将字符串转换为矩阵 7400600bfbSopenharmony_ci string: str, 字符串 7500600bfbSopenharmony_ci """ 7600600bfbSopenharmony_ci data = data.strip("-\n") 7700600bfbSopenharmony_ci lines = data.split('\n') 7800600bfbSopenharmony_ci matrix = [] 7900600bfbSopenharmony_ci # 遍历每一行 8000600bfbSopenharmony_ci for line in lines: 8100600bfbSopenharmony_ci # 如果行是空的,跳过 8200600bfbSopenharmony_ci if not line.strip(): 8300600bfbSopenharmony_ci continue 8400600bfbSopenharmony_ci # 分割每一列,去除空格,并转换为整数 8500600bfbSopenharmony_ci row = [int(col.strip()) for col in line.split()] 8600600bfbSopenharmony_ci matrix.append(row) 8700600bfbSopenharmony_ci return matrix 8800600bfbSopenharmony_ci 8900600bfbSopenharmony_cidef CheckCmd(command, checkFunction): 9000600bfbSopenharmony_ci output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8") 9100600bfbSopenharmony_ci assert checkFunction(output) 9200600bfbSopenharmony_ci 9300600bfbSopenharmony_cidef CheckCmdRedirect(command, checkFunction, filePath = None): 9400600bfbSopenharmony_ci filePath = f"{OUTPUT_PATH}/hidumper_redirect.txt" if filePath is None else filePath 9500600bfbSopenharmony_ci subprocess.check_output(f"hdc shell \"{command}\" > {filePath}", shell=True, text=True, encoding="utf-8") 9600600bfbSopenharmony_ci assert checkFile(filePath, checkFunction = checkFunction) 9700600bfbSopenharmony_ci 9800600bfbSopenharmony_cidef CheckCmdZip(command, checkFunction): 9900600bfbSopenharmony_ci output = subprocess.check_output(f"hdc shell \"{command} --zip\"", shell=True, text=True, encoding="utf-8") 10000600bfbSopenharmony_ci zipSourceFile = re.search("The result is:(.+)", output).group(1).strip() 10100600bfbSopenharmony_ci zipTargetFile = f"{OUTPUT_PATH}/" + os.path.basename(zipSourceFile) 10200600bfbSopenharmony_ci subprocess.check_output(f"hdc file recv {zipSourceFile} {zipTargetFile}", shell=True, text=True, encoding="utf-8") 10300600bfbSopenharmony_ci assert checkZipFile(zipTargetFile, checkFunction = checkFunction) 10400600bfbSopenharmony_ci 10500600bfbSopenharmony_cidef IsLogVersion(): 10600600bfbSopenharmony_ci output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip() 10700600bfbSopenharmony_ci return "log" in output 10800600bfbSopenharmony_ci 10900600bfbSopenharmony_cidef IsRootVersion(): 11000600bfbSopenharmony_ci output = subprocess.check_output("hdc shell param get const.debuggable", shell=True, text=True, encoding="utf-8").strip() 11100600bfbSopenharmony_ci return output == "1" 11200600bfbSopenharmony_ci 11300600bfbSopenharmony_cidef IsOpenHarmonyVersion(): 11400600bfbSopenharmony_ci output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip() 11500600bfbSopenharmony_ci return "OpenHarmony" in output 11600600bfbSopenharmony_ci 11700600bfbSopenharmony_cidef WaitUntillLogAppear(command,targetLog, second): 11800600bfbSopenharmony_ci process = subprocess.Popen( 11900600bfbSopenharmony_ci command, 12000600bfbSopenharmony_ci stdout=subprocess.PIPE, 12100600bfbSopenharmony_ci stderr=subprocess.PIPE, 12200600bfbSopenharmony_ci shell=True, 12300600bfbSopenharmony_ci text=True 12400600bfbSopenharmony_ci ) 12500600bfbSopenharmony_ci start = time.time() 12600600bfbSopenharmony_ci while True: 12700600bfbSopenharmony_ci output = process.stdout.readline() 12800600bfbSopenharmony_ci if targetLog in output: 12900600bfbSopenharmony_ci process.kill() 13000600bfbSopenharmony_ci return True 13100600bfbSopenharmony_ci now = time.time() 13200600bfbSopenharmony_ci if now - start > second: 13300600bfbSopenharmony_ci process.kill() 13400600bfbSopenharmony_ci return False