1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import os
17import zipfile
18import subprocess
19import re
20import time
21
22OUTPUT_PATH = "testModule/output"
23
24def checkFile(filePath, checkFunction):
25    """
26    description: 首先检查文件是否存在,然后检查文件内容是否符合要求
27    file_path: str, the path of the file to be checked
28    check_function: 校验函数,校验file_path文件内容是否符合要求,是则返回True,否则返回False
29    """
30    assert os.path.exists(filePath), f"The file {filePath} does not exist."
31    with open(filePath, "r", encoding="utf-8") as f:
32        output = f.read()
33    return checkFunction(output)
34
35def checkZipFile(zipFilePath, checkFunction):
36    """
37    description: 首先检查zip文件是否存在,然后解压文件,并检查解压后的log.txt文件内容是否符合要求
38    check_function: 校验函数,校验解压后的log.txt文件内容是否符合要求,是则返回True,否则返回False
39    """
40    assert os.path.exists(zipFilePath), f"The file {zipFilePath} does not exist."
41    with zipfile.ZipFile(zipFilePath, 'r') as zip_ref:
42        # 解压所有文件到指定目录
43        dirname = os.path.dirname(zipFilePath)
44        zip_ref.extractall(dirname)
45        with open(f"{dirname}/log.txt", "r", encoding="utf-8") as f:
46            output = f.read()
47    return checkFunction(output)
48
49def print_check_result(func):
50    def wrapper(*args, **kwargs):
51        ret = func(*args, **kwargs)
52        if (ret):
53            print(f"func {func.__name__} success")
54        else:
55            print(f"func {func.__name__} failed")
56        return ret
57    return wrapper
58
59def GetPidByProcessName(processName):
60    pid = None
61    cmd = f"hdc shell \"pidof {processName}\""
62    try:
63        pid = subprocess.check_output(cmd, shell=True, encoding="utf-8", text=True)
64        pid = int(pid.strip().split()[0])
65    except subprocess.CalledProcessError as e:
66        print(f"Command failed: {cmd}\nError: {e}")
67    except Exception as e:
68        print(f"Unexpected error: {e}")
69    return pid
70
71def convert_string_to_matrix(data : str) -> list:
72    """
73    description: 将字符串转换为矩阵
74    string: str, 字符串
75    """
76    data = data.strip("-\n")
77    lines = data.split('\n')
78    matrix = []
79    # 遍历每一行
80    for line in lines:
81        # 如果行是空的,跳过
82        if not line.strip():
83            continue
84        # 分割每一列,去除空格,并转换为整数
85        row = [int(col.strip()) for col in line.split()]
86        matrix.append(row)
87    return matrix
88
89def CheckCmd(command, checkFunction):
90    output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
91    assert checkFunction(output)
92
93def CheckCmdRedirect(command, checkFunction, filePath = None):
94    filePath = f"{OUTPUT_PATH}/hidumper_redirect.txt" if filePath is None else filePath
95    subprocess.check_output(f"hdc shell \"{command}\" > {filePath}", shell=True, text=True, encoding="utf-8")
96    assert checkFile(filePath, checkFunction = checkFunction)
97
98def CheckCmdZip(command, checkFunction):
99    output = subprocess.check_output(f"hdc shell \"{command} --zip\"", shell=True, text=True, encoding="utf-8")
100    zipSourceFile = re.search("The result is:(.+)", output).group(1).strip()
101    zipTargetFile = f"{OUTPUT_PATH}/" + os.path.basename(zipSourceFile)
102    subprocess.check_output(f"hdc file recv {zipSourceFile} {zipTargetFile}", shell=True, text=True, encoding="utf-8")
103    assert checkZipFile(zipTargetFile, checkFunction = checkFunction)
104
105def IsLogVersion():
106    output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip()
107    return "log" in output
108
109def IsRootVersion():
110    output = subprocess.check_output("hdc shell param get const.debuggable", shell=True, text=True, encoding="utf-8").strip()
111    return output == "1"
112
113def IsOpenHarmonyVersion():
114    output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip()
115    return "OpenHarmony" in output
116
117def WaitUntillLogAppear(command,targetLog, second):
118    process = subprocess.Popen(
119        command,
120        stdout=subprocess.PIPE,
121        stderr=subprocess.PIPE,
122        shell=True,
123        text=True
124    )
125    start = time.time()
126    while True:
127        output = process.stdout.readline()
128        if targetLog in output:
129            process.kill()
130            return True
131        now = time.time()
132        if now - start > second:
133            process.kill()
134            return False