176e6818aSopenharmony_ci#!/usr/bin/env python3
276e6818aSopenharmony_ci# coding=utf-8
376e6818aSopenharmony_ci
476e6818aSopenharmony_ci#
576e6818aSopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd.
676e6818aSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
776e6818aSopenharmony_ci# you may not use this file except in compliance with the License.
876e6818aSopenharmony_ci# You may obtain a copy of the License at
976e6818aSopenharmony_ci#
1076e6818aSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
1176e6818aSopenharmony_ci#
1276e6818aSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1376e6818aSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1476e6818aSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1576e6818aSopenharmony_ci# See the License for the specific language governing permissions and
1676e6818aSopenharmony_ci# limitations under the License.
1776e6818aSopenharmony_ci#
1876e6818aSopenharmony_ci
1976e6818aSopenharmony_ciimport importlib
2076e6818aSopenharmony_ciimport hashlib
2176e6818aSopenharmony_ciimport os
2276e6818aSopenharmony_ciimport platform
2376e6818aSopenharmony_ciimport re
2476e6818aSopenharmony_ciimport secrets
2576e6818aSopenharmony_ciimport site
2676e6818aSopenharmony_ciimport socket
2776e6818aSopenharmony_ciimport subprocess
2876e6818aSopenharmony_ciimport sys
2976e6818aSopenharmony_ci
3076e6818aSopenharmony_cifrom xdevice import get_decode
3176e6818aSopenharmony_cifrom xdevice import ParamError
3276e6818aSopenharmony_cifrom xdevice import DeviceConnectorType
3376e6818aSopenharmony_ci
3476e6818aSopenharmony_cifrom devicetest.core.exception import DeviceTestError
3576e6818aSopenharmony_cifrom devicetest.core.exception import ModuleNotAttributeError
3676e6818aSopenharmony_cifrom devicetest.error import ErrorCategory
3776e6818aSopenharmony_cifrom devicetest.error import ErrorMessage
3876e6818aSopenharmony_cifrom devicetest.log.logger import DeviceTestLog as log
3976e6818aSopenharmony_ci
4076e6818aSopenharmony_ci
4176e6818aSopenharmony_cidef clean_sys_resource(file_path=None, file_base_name=None):
4276e6818aSopenharmony_ci    """
4376e6818aSopenharmony_ci    clean sys.path/sys.modules resource
4476e6818aSopenharmony_ci    :param file_path: sys path
4576e6818aSopenharmony_ci    :param file_base_name: module name
4676e6818aSopenharmony_ci    :return: None
4776e6818aSopenharmony_ci    """
4876e6818aSopenharmony_ci    if file_path in sys.path:
4976e6818aSopenharmony_ci        sys.path.remove(file_path)
5076e6818aSopenharmony_ci
5176e6818aSopenharmony_ci    if file_base_name in sys.modules:
5276e6818aSopenharmony_ci        del sys.modules[file_base_name]
5376e6818aSopenharmony_ci
5476e6818aSopenharmony_ci
5576e6818aSopenharmony_cidef get_base_name(file_abs_path, is_abs_name=False):
5676e6818aSopenharmony_ci    """
5776e6818aSopenharmony_ci    Args:
5876e6818aSopenharmony_ci        file_abs_path: str , file path
5976e6818aSopenharmony_ci        is_abs_name  : bool,
6076e6818aSopenharmony_ci    Returns:
6176e6818aSopenharmony_ci        file base name
6276e6818aSopenharmony_ci    Example:
6376e6818aSopenharmony_ci        input: D:/xdevice/decc.py
6476e6818aSopenharmony_ci        if is_abs_name return: D:/xdevice/decc, else return: decc
6576e6818aSopenharmony_ci    """
6676e6818aSopenharmony_ci    if isinstance(file_abs_path, str):
6776e6818aSopenharmony_ci        base_name = file_abs_path if is_abs_name else os.path.basename(
6876e6818aSopenharmony_ci            file_abs_path)
6976e6818aSopenharmony_ci        file_base_name, _ = os.path.splitext(base_name)
7076e6818aSopenharmony_ci        return file_base_name
7176e6818aSopenharmony_ci    return None
7276e6818aSopenharmony_ci
7376e6818aSopenharmony_ci
7476e6818aSopenharmony_cidef get_dir_path(file_path):
7576e6818aSopenharmony_ci    if isinstance(file_path, str):
7676e6818aSopenharmony_ci        if os.path.exists(file_path):
7776e6818aSopenharmony_ci            return os.path.dirname(file_path)
7876e6818aSopenharmony_ci    return None
7976e6818aSopenharmony_ci
8076e6818aSopenharmony_ci
8176e6818aSopenharmony_cidef import_from_file(file_path, file_base_name):
8276e6818aSopenharmony_ci    if file_path in sys.path:
8376e6818aSopenharmony_ci        sys.path.remove(file_path)
8476e6818aSopenharmony_ci
8576e6818aSopenharmony_ci    sys.path.insert(0, file_path)
8676e6818aSopenharmony_ci    if file_base_name in sys.modules:
8776e6818aSopenharmony_ci        del sys.modules[file_base_name]
8876e6818aSopenharmony_ci
8976e6818aSopenharmony_ci    try:
9076e6818aSopenharmony_ci        importlib.import_module(file_base_name)
9176e6818aSopenharmony_ci    except Exception as exception:
9276e6818aSopenharmony_ci        file_abs_path = os.path.join(file_path, file_base_name)
9376e6818aSopenharmony_ci        error_msg = ErrorMessage.TestCase.Code_0203001.format(file_abs_path, exception)
9476e6818aSopenharmony_ci        raise ImportError(error_msg) from exception
9576e6818aSopenharmony_ci    if not hasattr(sys.modules.get(file_base_name), file_base_name):
9676e6818aSopenharmony_ci        raise ModuleNotAttributeError(ErrorMessage.TestCase.Code_0203016)
9776e6818aSopenharmony_ci    return getattr(sys.modules[file_base_name], file_base_name)
9876e6818aSopenharmony_ci
9976e6818aSopenharmony_ci
10076e6818aSopenharmony_cidef get_forward_ports(self=None):
10176e6818aSopenharmony_ci    try:
10276e6818aSopenharmony_ci        ports_list = []
10376e6818aSopenharmony_ci        cmd = "fport ls"
10476e6818aSopenharmony_ci        out = get_decode(self.connector_command(cmd)).strip()
10576e6818aSopenharmony_ci        clean_lines = out.split('\n')
10676e6818aSopenharmony_ci        for line_text in clean_lines:
10776e6818aSopenharmony_ci            # clear reverse port first  Example: 'tcp:8011 tcp:9963'     [Reverse]
10876e6818aSopenharmony_ci            if "Reverse" in line_text and "fport" in cmd:
10976e6818aSopenharmony_ci                connector_tokens = line_text.split()
11076e6818aSopenharmony_ci                self.connector_command(["fport", "rm",
11176e6818aSopenharmony_ci                                        connector_tokens[0].replace("'", ""),
11276e6818aSopenharmony_ci                                        connector_tokens[1].replace("'", "")])
11376e6818aSopenharmony_ci                continue
11476e6818aSopenharmony_ci            connector_tokens = line_text.split("tcp:")
11576e6818aSopenharmony_ci            if len(connector_tokens) != 3:
11676e6818aSopenharmony_ci                continue
11776e6818aSopenharmony_ci            ports_list.append(int(connector_tokens[1]))
11876e6818aSopenharmony_ci        return ports_list
11976e6818aSopenharmony_ci    except Exception:
12076e6818aSopenharmony_ci        log.error(ErrorMessage.Common.Code_0201005)
12176e6818aSopenharmony_ci        return []
12276e6818aSopenharmony_ci
12376e6818aSopenharmony_ci
12476e6818aSopenharmony_cidef is_port_idle(host: str = "127.0.0.1", port: int = None) -> bool:
12576e6818aSopenharmony_ci    """端口是否空闲"""
12676e6818aSopenharmony_ci    s = None
12776e6818aSopenharmony_ci    is_idle = False
12876e6818aSopenharmony_ci    try:
12976e6818aSopenharmony_ci        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
13076e6818aSopenharmony_ci        s.settimeout(3)
13176e6818aSopenharmony_ci        s.connect((host, port))
13276e6818aSopenharmony_ci    except Exception:
13376e6818aSopenharmony_ci        # 已知会抛出ConnectionRefusedError和TimeoutError两种
13476e6818aSopenharmony_ci        is_idle = True
13576e6818aSopenharmony_ci    finally:
13676e6818aSopenharmony_ci        if s is not None:
13776e6818aSopenharmony_ci            s.close()
13876e6818aSopenharmony_ci    return is_idle
13976e6818aSopenharmony_ci
14076e6818aSopenharmony_ci
14176e6818aSopenharmony_cidef get_forward_port(self, host: str = None, port: int = None, filter_ports: list = None):
14276e6818aSopenharmony_ci    if filter_ports is None:
14376e6818aSopenharmony_ci        filter_ports = []
14476e6818aSopenharmony_ci    try:
14576e6818aSopenharmony_ci        ports_list = get_forward_ports(self)
14676e6818aSopenharmony_ci
14776e6818aSopenharmony_ci        port = 9999 - secrets.randbelow(99)
14876e6818aSopenharmony_ci        cnt = 0
14976e6818aSopenharmony_ci        while cnt < 10 and port > 1024:
15076e6818aSopenharmony_ci            if port not in filter_ports and port not in ports_list and is_port_idle(host, port):
15176e6818aSopenharmony_ci                cnt += 1
15276e6818aSopenharmony_ci                break
15376e6818aSopenharmony_ci
15476e6818aSopenharmony_ci            port -= 1
15576e6818aSopenharmony_ci        return port
15676e6818aSopenharmony_ci    except Exception as error:
15776e6818aSopenharmony_ci        err_msg = ErrorMessage.Common.Code_0201005
15876e6818aSopenharmony_ci        log.error(err_msg)
15976e6818aSopenharmony_ci        raise DeviceTestError(err_msg) from error
16076e6818aSopenharmony_ci
16176e6818aSopenharmony_ci
16276e6818aSopenharmony_cidef get_local_ip_address():
16376e6818aSopenharmony_ci    """
16476e6818aSopenharmony_ci    查询本机ip地址
16576e6818aSopenharmony_ci    :return: ip
16676e6818aSopenharmony_ci    """
16776e6818aSopenharmony_ci    ip = "127.0.0.1"
16876e6818aSopenharmony_ci    return ip
16976e6818aSopenharmony_ci
17076e6818aSopenharmony_ci
17176e6818aSopenharmony_cidef compare_version(version, base_version: tuple, rex: str):
17276e6818aSopenharmony_ci    """比较两个版本号的大小,若version版本大于等于base_version,返回True
17376e6818aSopenharmony_ci    Args:
17476e6818aSopenharmony_ci        version: str, version
17576e6818aSopenharmony_ci        rex: version style rex
17676e6818aSopenharmony_ci        base_version: list, base_version
17776e6818aSopenharmony_ci    Example:
17876e6818aSopenharmony_ci        version: "4.0.0.1" base_version:[4.0.0.0]
17976e6818aSopenharmony_ci        if version bigger than base_version or equal to base_version, return True, else return False
18076e6818aSopenharmony_ci    """
18176e6818aSopenharmony_ci    version = version.strip()
18276e6818aSopenharmony_ci    if re.match(rex, version):
18376e6818aSopenharmony_ci        version = tuple(version.split("."))
18476e6818aSopenharmony_ci        if version > base_version:
18576e6818aSopenharmony_ci            return True
18676e6818aSopenharmony_ci    else:
18776e6818aSopenharmony_ci        return True
18876e6818aSopenharmony_ci    return False
18976e6818aSopenharmony_ci
19076e6818aSopenharmony_ci
19176e6818aSopenharmony_cidef extract_version(version_str: str):
19276e6818aSopenharmony_ci    """
19376e6818aSopenharmony_ci    获取版本
19476e6818aSopenharmony_ci    :param version_str: 版本信息 ALN-AL00 5.0.0.26(SP1C00E25R4P5log)
19576e6818aSopenharmony_ci    :return: 5.0.0.26
19676e6818aSopenharmony_ci    """
19776e6818aSopenharmony_ci    match = re.search(r'(\d+\.\d+\.\d+\.\d+)', version_str)
19876e6818aSopenharmony_ci    if match:
19976e6818aSopenharmony_ci        return match.group(1)
20076e6818aSopenharmony_ci    return None
20176e6818aSopenharmony_ci
20276e6818aSopenharmony_ci
20376e6818aSopenharmony_cidef compare_versions_by_product(version1: str, version2: str):
20476e6818aSopenharmony_ci    """
20576e6818aSopenharmony_ci    比较两个版本号
20676e6818aSopenharmony_ci    :param version1: 5.0.0.26
20776e6818aSopenharmony_ci    :param version2: 5.0.0.23
20876e6818aSopenharmony_ci    :return:
20976e6818aSopenharmony_ci    """
21076e6818aSopenharmony_ci    version1 = extract_version(version1)
21176e6818aSopenharmony_ci    version2 = extract_version(version2)
21276e6818aSopenharmony_ci    v1 = tuple(map(int, version1.split('.')))
21376e6818aSopenharmony_ci    v2 = tuple(map(int, version2.split('.')))
21476e6818aSopenharmony_ci    if v1 > v2:
21576e6818aSopenharmony_ci        return True
21676e6818aSopenharmony_ci    else:
21776e6818aSopenharmony_ci        return False
21876e6818aSopenharmony_ci
21976e6818aSopenharmony_ci
22076e6818aSopenharmony_ciclass DeviceFileUtils:
22176e6818aSopenharmony_ci    @staticmethod
22276e6818aSopenharmony_ci    def check_remote_file_is_exist(_ad, remote_file):
22376e6818aSopenharmony_ci        # test -f remotepath judge file exists.
22476e6818aSopenharmony_ci        # if exist,return 0,else return None
22576e6818aSopenharmony_ci        # 判断设备中文件是否存在
22676e6818aSopenharmony_ci        ret = _ad.execute_shell_command("test -f %s && echo 0" % remote_file)
22776e6818aSopenharmony_ci        if ret != "" \
22876e6818aSopenharmony_ci                and len(str(ret).split()) \
22976e6818aSopenharmony_ci                != 0 and str(ret).split()[0] == "0":
23076e6818aSopenharmony_ci            return True
23176e6818aSopenharmony_ci        return False
23276e6818aSopenharmony_ci
23376e6818aSopenharmony_ci    @staticmethod
23476e6818aSopenharmony_ci    def check_remote_dict_is_exist(_ad, remote_file):
23576e6818aSopenharmony_ci        # test -f remotepath judge folder exists.
23676e6818aSopenharmony_ci        # if exist,return 0,else return None
23776e6818aSopenharmony_ci        # 判断设备中文件夹是否存在
23876e6818aSopenharmony_ci        ret = _ad.execute_shell_command(
23976e6818aSopenharmony_ci            "test -d {} && echo 0".format(remote_file))
24076e6818aSopenharmony_ci        if ret != "" \
24176e6818aSopenharmony_ci                and len(str(ret).split()) != 0 and str(ret).split()[0] == "0":
24276e6818aSopenharmony_ci            return True
24376e6818aSopenharmony_ci        return False
24476e6818aSopenharmony_ci
24576e6818aSopenharmony_ci
24676e6818aSopenharmony_cidef compare_text(text, expect_text, fuzzy):
24776e6818aSopenharmony_ci    """支持多种匹配方式的文本匹配"""
24876e6818aSopenharmony_ci    if fuzzy is None or fuzzy.startswith("equal"):
24976e6818aSopenharmony_ci        result = (expect_text == text)
25076e6818aSopenharmony_ci    elif fuzzy == "starts_with":
25176e6818aSopenharmony_ci        result = text.startswith(expect_text)
25276e6818aSopenharmony_ci    elif fuzzy == "ends_with":
25376e6818aSopenharmony_ci        result = text.endswith(expect_text)
25476e6818aSopenharmony_ci    elif fuzzy == "contains":
25576e6818aSopenharmony_ci        result = expect_text in text
25676e6818aSopenharmony_ci    elif fuzzy == "regexp":
25776e6818aSopenharmony_ci        result = re.search(expect_text, text)
25876e6818aSopenharmony_ci        result = False if result is None else True
25976e6818aSopenharmony_ci    else:
26076e6818aSopenharmony_ci        raise ParamError("expected [equal, starts_with, ends_with, contains], get [{}]".format(fuzzy))
26176e6818aSopenharmony_ci    return result
26276e6818aSopenharmony_ci
26376e6818aSopenharmony_ci
26476e6818aSopenharmony_cidef get_process_pid(device, process_name):
26576e6818aSopenharmony_ci    cmd = "ps -ef | grep '{}'".format(process_name)
26676e6818aSopenharmony_ci    ret = device.execute_shell_command(cmd)
26776e6818aSopenharmony_ci    ret = ret.strip()
26876e6818aSopenharmony_ci    pids = ret.split("\n")
26976e6818aSopenharmony_ci    for pid in pids:
27076e6818aSopenharmony_ci        if "grep" not in pid:
27176e6818aSopenharmony_ci            pid = pid.split()
27276e6818aSopenharmony_ci            return pid[1]
27376e6818aSopenharmony_ci    return None
27476e6818aSopenharmony_ci
27576e6818aSopenharmony_ci
27676e6818aSopenharmony_cidef check_port_state(port: int = None) -> None:
27776e6818aSopenharmony_ci    """查看端口状态"""
27876e6818aSopenharmony_ci    try:
27976e6818aSopenharmony_ci        log.debug("##########port state##########")
28076e6818aSopenharmony_ci        sys_type = platform.system()
28176e6818aSopenharmony_ci        if sys_type == "Linux" or sys_type == "Darwin":
28276e6818aSopenharmony_ci            cmd = "lsof -i:{}".format(port)
28376e6818aSopenharmony_ci            out = shell_command(cmd)
28476e6818aSopenharmony_ci            log.debug(out)
28576e6818aSopenharmony_ci        else:
28676e6818aSopenharmony_ci            out = shell_command("netstat -aon", "findstr :{}".format(port))
28776e6818aSopenharmony_ci            log.debug(out)
28876e6818aSopenharmony_ci            results = out.strip("\r\n")
28976e6818aSopenharmony_ci            if results:
29076e6818aSopenharmony_ci                results = results.split("\r\n")
29176e6818aSopenharmony_ci            for result in results:
29276e6818aSopenharmony_ci                items = result.split()
29376e6818aSopenharmony_ci                if items[0] == "TCP" and items[-2] == "LISTENING":
29476e6818aSopenharmony_ci                    out = shell_command("tasklist", "findstr {}".format(items[-1]))
29576e6818aSopenharmony_ci                    log.debug(out)
29676e6818aSopenharmony_ci        log.debug("##########port state##########")
29776e6818aSopenharmony_ci    except Exception as e:
29876e6818aSopenharmony_ci        log.error("check port state error, reason: {}".format(e))
29976e6818aSopenharmony_ci
30076e6818aSopenharmony_ci
30176e6818aSopenharmony_cidef shell_command(cmd: str, findstr: str = "") -> str:
30276e6818aSopenharmony_ci    command = cmd.split(" ")
30376e6818aSopenharmony_ci    first_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
30476e6818aSopenharmony_ci    if findstr:
30576e6818aSopenharmony_ci        findstr_command = findstr.split(" ")
30676e6818aSopenharmony_ci        findstr_process = subprocess.Popen(findstr_command, stdin=first_process.stdout,
30776e6818aSopenharmony_ci                                           stderr=subprocess.PIPE,
30876e6818aSopenharmony_ci                                           stdout=subprocess.PIPE,
30976e6818aSopenharmony_ci                                           shell=False)
31076e6818aSopenharmony_ci        out, _ = findstr_process.communicate(timeout=10)
31176e6818aSopenharmony_ci    else:
31276e6818aSopenharmony_ci        out, _ = first_process.communicate(timeout=10)
31376e6818aSopenharmony_ci    out = out.decode("utf-8")
31476e6818aSopenharmony_ci    return out
31576e6818aSopenharmony_ci
31676e6818aSopenharmony_ci
31776e6818aSopenharmony_cidef check_device_file_md5(device, pc_file: str, device_file: str) -> bool:
31876e6818aSopenharmony_ci    if not os.path.isfile(pc_file):
31976e6818aSopenharmony_ci        raise FileNotFoundError(ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, pc_file))
32076e6818aSopenharmony_ci    _, local_file_name = os.path.split(pc_file)
32176e6818aSopenharmony_ci    device_md5 = device.execute_shell_command(
32276e6818aSopenharmony_ci        "md5sum {}".format(device_file)).split()[0].strip()
32376e6818aSopenharmony_ci    device.log.debug("device {} md5: {}".format(local_file_name, device_md5))
32476e6818aSopenharmony_ci    with open(pc_file, "rb") as f:
32576e6818aSopenharmony_ci        data = f.read()
32676e6818aSopenharmony_ci        md5hash = hashlib.md5(data)
32776e6818aSopenharmony_ci        local_md5 = md5hash.hexdigest()
32876e6818aSopenharmony_ci    device.log.debug("local {} md5: {}".format(local_file_name, local_md5))
32976e6818aSopenharmony_ci    return True if device_md5 == local_md5 else False
33076e6818aSopenharmony_ci
33176e6818aSopenharmony_ci
33276e6818aSopenharmony_cidef is_standard_lib(file_path):
33376e6818aSopenharmony_ci    """Check if the file is part of the standard library."""
33476e6818aSopenharmony_ci    std_lib_path = os.path.join(sys.base_prefix, 'lib')
33576e6818aSopenharmony_ci    return file_path.startswith(std_lib_path)
33676e6818aSopenharmony_ci
33776e6818aSopenharmony_ci
33876e6818aSopenharmony_cidef is_third_party_lib(file_path):
33976e6818aSopenharmony_ci    """Check if the file is part of a third-party library."""
34076e6818aSopenharmony_ci    site_packages = [os.path.join(sys.prefix, 'lib', 'site-packages')]
34176e6818aSopenharmony_ci    if hasattr(sys, 'real_prefix'):  # This means we are in a virtual environment
34276e6818aSopenharmony_ci        site_packages.append(os.path.join(sys.real_prefix, 'lib', 'site-packages'))
34376e6818aSopenharmony_ci    site_packages.append(site.getusersitepackages())
34476e6818aSopenharmony_ci    return any(file_path.startswith(site_package) for site_package in site_packages)
34576e6818aSopenharmony_ci
34676e6818aSopenharmony_ci
34776e6818aSopenharmony_cidef extract_file_method_code(stack_line):
34876e6818aSopenharmony_ci    """Extract the Python file name and the method after 'in' from a stack trace line."""
34976e6818aSopenharmony_ci    match = re.search(r'File "(.*?)", line \d+, in (.+)', stack_line)
35076e6818aSopenharmony_ci    if match:
35176e6818aSopenharmony_ci        file_name = match.group(1).split(os.sep)[-1]
35276e6818aSopenharmony_ci        method_name = match.group(2).strip()
35376e6818aSopenharmony_ci        code_line = stack_line.split('\n')[-2].strip()
35476e6818aSopenharmony_ci        return file_name, method_name, code_line
35576e6818aSopenharmony_ci    return None, None, None
356