100600bfbSopenharmony_ci#!/usr/bin/env python3
200600bfbSopenharmony_ci# -*- coding: utf-8 -*-
300600bfbSopenharmony_ci# Copyright (C) 2024 Huawei Device Co., Ltd.
400600bfbSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
500600bfbSopenharmony_ci# you may not use this file except in compliance with the License.
600600bfbSopenharmony_ci# You may obtain a copy of the License at
700600bfbSopenharmony_ci#
800600bfbSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
900600bfbSopenharmony_ci#
1000600bfbSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1100600bfbSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1200600bfbSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1300600bfbSopenharmony_ci# See the License for the specific language governing permissions and
1400600bfbSopenharmony_ci# limitations under the License.
1500600bfbSopenharmony_ci
1600600bfbSopenharmony_ciimport os
1700600bfbSopenharmony_ciimport zipfile
1800600bfbSopenharmony_ciimport subprocess
1900600bfbSopenharmony_ciimport re
2000600bfbSopenharmony_ciimport time
2100600bfbSopenharmony_ci
2200600bfbSopenharmony_ciOUTPUT_PATH = "testModule/output"
2300600bfbSopenharmony_ci
2400600bfbSopenharmony_cidef checkFile(filePath, checkFunction):
2500600bfbSopenharmony_ci    """
2600600bfbSopenharmony_ci    description: 首先检查文件是否存在,然后检查文件内容是否符合要求
2700600bfbSopenharmony_ci    file_path: str, the path of the file to be checked
2800600bfbSopenharmony_ci    check_function: 校验函数,校验file_path文件内容是否符合要求,是则返回True,否则返回False
2900600bfbSopenharmony_ci    """
3000600bfbSopenharmony_ci    assert os.path.exists(filePath), f"The file {filePath} does not exist."
3100600bfbSopenharmony_ci    with open(filePath, "r", encoding="utf-8") as f:
3200600bfbSopenharmony_ci        output = f.read()
3300600bfbSopenharmony_ci    return checkFunction(output)
3400600bfbSopenharmony_ci
3500600bfbSopenharmony_cidef checkZipFile(zipFilePath, checkFunction):
3600600bfbSopenharmony_ci    """
3700600bfbSopenharmony_ci    description: 首先检查zip文件是否存在,然后解压文件,并检查解压后的log.txt文件内容是否符合要求
3800600bfbSopenharmony_ci    check_function: 校验函数,校验解压后的log.txt文件内容是否符合要求,是则返回True,否则返回False
3900600bfbSopenharmony_ci    """
4000600bfbSopenharmony_ci    assert os.path.exists(zipFilePath), f"The file {zipFilePath} does not exist."
4100600bfbSopenharmony_ci    with zipfile.ZipFile(zipFilePath, 'r') as zip_ref:
4200600bfbSopenharmony_ci        # 解压所有文件到指定目录
4300600bfbSopenharmony_ci        dirname = os.path.dirname(zipFilePath)
4400600bfbSopenharmony_ci        zip_ref.extractall(dirname)
4500600bfbSopenharmony_ci        with open(f"{dirname}/log.txt", "r", encoding="utf-8") as f:
4600600bfbSopenharmony_ci            output = f.read()
4700600bfbSopenharmony_ci    return checkFunction(output)
4800600bfbSopenharmony_ci
4900600bfbSopenharmony_cidef print_check_result(func):
5000600bfbSopenharmony_ci    def wrapper(*args, **kwargs):
5100600bfbSopenharmony_ci        ret = func(*args, **kwargs)
5200600bfbSopenharmony_ci        if (ret):
5300600bfbSopenharmony_ci            print(f"func {func.__name__} success")
5400600bfbSopenharmony_ci        else:
5500600bfbSopenharmony_ci            print(f"func {func.__name__} failed")
5600600bfbSopenharmony_ci        return ret
5700600bfbSopenharmony_ci    return wrapper
5800600bfbSopenharmony_ci
5900600bfbSopenharmony_cidef GetPidByProcessName(processName):
6000600bfbSopenharmony_ci    pid = None
6100600bfbSopenharmony_ci    cmd = f"hdc shell \"pidof {processName}\""
6200600bfbSopenharmony_ci    try:
6300600bfbSopenharmony_ci        pid = subprocess.check_output(cmd, shell=True, encoding="utf-8", text=True)
6400600bfbSopenharmony_ci        pid = int(pid.strip().split()[0])
6500600bfbSopenharmony_ci    except subprocess.CalledProcessError as e:
6600600bfbSopenharmony_ci        print(f"Command failed: {cmd}\nError: {e}")
6700600bfbSopenharmony_ci    except Exception as e:
6800600bfbSopenharmony_ci        print(f"Unexpected error: {e}")
6900600bfbSopenharmony_ci    return pid
7000600bfbSopenharmony_ci
7100600bfbSopenharmony_cidef convert_string_to_matrix(data : str) -> list:
7200600bfbSopenharmony_ci    """
7300600bfbSopenharmony_ci    description: 将字符串转换为矩阵
7400600bfbSopenharmony_ci    string: str, 字符串
7500600bfbSopenharmony_ci    """
7600600bfbSopenharmony_ci    data = data.strip("-\n")
7700600bfbSopenharmony_ci    lines = data.split('\n')
7800600bfbSopenharmony_ci    matrix = []
7900600bfbSopenharmony_ci    # 遍历每一行
8000600bfbSopenharmony_ci    for line in lines:
8100600bfbSopenharmony_ci        # 如果行是空的,跳过
8200600bfbSopenharmony_ci        if not line.strip():
8300600bfbSopenharmony_ci            continue
8400600bfbSopenharmony_ci        # 分割每一列,去除空格,并转换为整数
8500600bfbSopenharmony_ci        row = [int(col.strip()) for col in line.split()]
8600600bfbSopenharmony_ci        matrix.append(row)
8700600bfbSopenharmony_ci    return matrix
8800600bfbSopenharmony_ci
8900600bfbSopenharmony_cidef CheckCmd(command, checkFunction):
9000600bfbSopenharmony_ci    output = subprocess.check_output(f"hdc shell \"{command}\"", shell=True, text=True, encoding="utf-8")
9100600bfbSopenharmony_ci    assert checkFunction(output)
9200600bfbSopenharmony_ci
9300600bfbSopenharmony_cidef CheckCmdRedirect(command, checkFunction, filePath = None):
9400600bfbSopenharmony_ci    filePath = f"{OUTPUT_PATH}/hidumper_redirect.txt" if filePath is None else filePath
9500600bfbSopenharmony_ci    subprocess.check_output(f"hdc shell \"{command}\" > {filePath}", shell=True, text=True, encoding="utf-8")
9600600bfbSopenharmony_ci    assert checkFile(filePath, checkFunction = checkFunction)
9700600bfbSopenharmony_ci
9800600bfbSopenharmony_cidef CheckCmdZip(command, checkFunction):
9900600bfbSopenharmony_ci    output = subprocess.check_output(f"hdc shell \"{command} --zip\"", shell=True, text=True, encoding="utf-8")
10000600bfbSopenharmony_ci    zipSourceFile = re.search("The result is:(.+)", output).group(1).strip()
10100600bfbSopenharmony_ci    zipTargetFile = f"{OUTPUT_PATH}/" + os.path.basename(zipSourceFile)
10200600bfbSopenharmony_ci    subprocess.check_output(f"hdc file recv {zipSourceFile} {zipTargetFile}", shell=True, text=True, encoding="utf-8")
10300600bfbSopenharmony_ci    assert checkZipFile(zipTargetFile, checkFunction = checkFunction)
10400600bfbSopenharmony_ci
10500600bfbSopenharmony_cidef IsLogVersion():
10600600bfbSopenharmony_ci    output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip()
10700600bfbSopenharmony_ci    return "log" in output
10800600bfbSopenharmony_ci
10900600bfbSopenharmony_cidef IsRootVersion():
11000600bfbSopenharmony_ci    output = subprocess.check_output("hdc shell param get const.debuggable", shell=True, text=True, encoding="utf-8").strip()
11100600bfbSopenharmony_ci    return output == "1"
11200600bfbSopenharmony_ci
11300600bfbSopenharmony_cidef IsOpenHarmonyVersion():
11400600bfbSopenharmony_ci    output = subprocess.check_output("hdc shell param get const.product.software.version", shell=True, text=True, encoding="utf-8").strip()
11500600bfbSopenharmony_ci    return "OpenHarmony" in output
11600600bfbSopenharmony_ci
11700600bfbSopenharmony_cidef WaitUntillLogAppear(command,targetLog, second):
11800600bfbSopenharmony_ci    process = subprocess.Popen(
11900600bfbSopenharmony_ci        command,
12000600bfbSopenharmony_ci        stdout=subprocess.PIPE,
12100600bfbSopenharmony_ci        stderr=subprocess.PIPE,
12200600bfbSopenharmony_ci        shell=True,
12300600bfbSopenharmony_ci        text=True
12400600bfbSopenharmony_ci    )
12500600bfbSopenharmony_ci    start = time.time()
12600600bfbSopenharmony_ci    while True:
12700600bfbSopenharmony_ci        output = process.stdout.readline()
12800600bfbSopenharmony_ci        if targetLog in output:
12900600bfbSopenharmony_ci            process.kill()
13000600bfbSopenharmony_ci            return True
13100600bfbSopenharmony_ci        now = time.time()
13200600bfbSopenharmony_ci        if now - start > second:
13300600bfbSopenharmony_ci            process.kill()
13400600bfbSopenharmony_ci            return False