1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import importlib
20import hashlib
21import os
22import platform
23import re
24import secrets
25import site
26import socket
27import subprocess
28import sys
29
30from xdevice import get_decode
31from xdevice import ParamError
32from xdevice import DeviceConnectorType
33
34from devicetest.core.exception import DeviceTestError
35from devicetest.core.exception import ModuleNotAttributeError
36from devicetest.error import ErrorCategory
37from devicetest.error import ErrorMessage
38from devicetest.log.logger import DeviceTestLog as log
39
40
41def clean_sys_resource(file_path=None, file_base_name=None):
42    """
43    clean sys.path/sys.modules resource
44    :param file_path: sys path
45    :param file_base_name: module name
46    :return: None
47    """
48    if file_path in sys.path:
49        sys.path.remove(file_path)
50
51    if file_base_name in sys.modules:
52        del sys.modules[file_base_name]
53
54
55def get_base_name(file_abs_path, is_abs_name=False):
56    """
57    Args:
58        file_abs_path: str , file path
59        is_abs_name  : bool,
60    Returns:
61        file base name
62    Example:
63        input: D:/xdevice/decc.py
64        if is_abs_name return: D:/xdevice/decc, else return: decc
65    """
66    if isinstance(file_abs_path, str):
67        base_name = file_abs_path if is_abs_name else os.path.basename(
68            file_abs_path)
69        file_base_name, _ = os.path.splitext(base_name)
70        return file_base_name
71    return None
72
73
74def get_dir_path(file_path):
75    if isinstance(file_path, str):
76        if os.path.exists(file_path):
77            return os.path.dirname(file_path)
78    return None
79
80
81def import_from_file(file_path, file_base_name):
82    if file_path in sys.path:
83        sys.path.remove(file_path)
84
85    sys.path.insert(0, file_path)
86    if file_base_name in sys.modules:
87        del sys.modules[file_base_name]
88
89    try:
90        importlib.import_module(file_base_name)
91    except Exception as exception:
92        file_abs_path = os.path.join(file_path, file_base_name)
93        error_msg = ErrorMessage.TestCase.Code_0203001.format(file_abs_path, exception)
94        raise ImportError(error_msg) from exception
95    if not hasattr(sys.modules.get(file_base_name), file_base_name):
96        raise ModuleNotAttributeError(ErrorMessage.TestCase.Code_0203016)
97    return getattr(sys.modules[file_base_name], file_base_name)
98
99
100def get_forward_ports(self=None):
101    try:
102        ports_list = []
103        cmd = "fport ls"
104        out = get_decode(self.connector_command(cmd)).strip()
105        clean_lines = out.split('\n')
106        for line_text in clean_lines:
107            # clear reverse port first  Example: 'tcp:8011 tcp:9963'     [Reverse]
108            if "Reverse" in line_text and "fport" in cmd:
109                connector_tokens = line_text.split()
110                self.connector_command(["fport", "rm",
111                                        connector_tokens[0].replace("'", ""),
112                                        connector_tokens[1].replace("'", "")])
113                continue
114            connector_tokens = line_text.split("tcp:")
115            if len(connector_tokens) != 3:
116                continue
117            ports_list.append(int(connector_tokens[1]))
118        return ports_list
119    except Exception:
120        log.error(ErrorMessage.Common.Code_0201005)
121        return []
122
123
124def is_port_idle(host: str = "127.0.0.1", port: int = None) -> bool:
125    """端口是否空闲"""
126    s = None
127    is_idle = False
128    try:
129        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
130        s.settimeout(3)
131        s.connect((host, port))
132    except Exception:
133        # 已知会抛出ConnectionRefusedError和TimeoutError两种
134        is_idle = True
135    finally:
136        if s is not None:
137            s.close()
138    return is_idle
139
140
141def get_forward_port(self, host: str = None, port: int = None, filter_ports: list = None):
142    if filter_ports is None:
143        filter_ports = []
144    try:
145        ports_list = get_forward_ports(self)
146
147        port = 9999 - secrets.randbelow(99)
148        cnt = 0
149        while cnt < 10 and port > 1024:
150            if port not in filter_ports and port not in ports_list and is_port_idle(host, port):
151                cnt += 1
152                break
153
154            port -= 1
155        return port
156    except Exception as error:
157        err_msg = ErrorMessage.Common.Code_0201005
158        log.error(err_msg)
159        raise DeviceTestError(err_msg) from error
160
161
162def get_local_ip_address():
163    """
164    查询本机ip地址
165    :return: ip
166    """
167    ip = "127.0.0.1"
168    return ip
169
170
171def compare_version(version, base_version: tuple, rex: str):
172    """比较两个版本号的大小,若version版本大于等于base_version,返回True
173    Args:
174        version: str, version
175        rex: version style rex
176        base_version: list, base_version
177    Example:
178        version: "4.0.0.1" base_version:[4.0.0.0]
179        if version bigger than base_version or equal to base_version, return True, else return False
180    """
181    version = version.strip()
182    if re.match(rex, version):
183        version = tuple(version.split("."))
184        if version > base_version:
185            return True
186    else:
187        return True
188    return False
189
190
191def extract_version(version_str: str):
192    """
193    获取版本
194    :param version_str: 版本信息 ALN-AL00 5.0.0.26(SP1C00E25R4P5log)
195    :return: 5.0.0.26
196    """
197    match = re.search(r'(\d+\.\d+\.\d+\.\d+)', version_str)
198    if match:
199        return match.group(1)
200    return None
201
202
203def compare_versions_by_product(version1: str, version2: str):
204    """
205    比较两个版本号
206    :param version1: 5.0.0.26
207    :param version2: 5.0.0.23
208    :return:
209    """
210    version1 = extract_version(version1)
211    version2 = extract_version(version2)
212    v1 = tuple(map(int, version1.split('.')))
213    v2 = tuple(map(int, version2.split('.')))
214    if v1 > v2:
215        return True
216    else:
217        return False
218
219
220class DeviceFileUtils:
221    @staticmethod
222    def check_remote_file_is_exist(_ad, remote_file):
223        # test -f remotepath judge file exists.
224        # if exist,return 0,else return None
225        # 判断设备中文件是否存在
226        ret = _ad.execute_shell_command("test -f %s && echo 0" % remote_file)
227        if ret != "" \
228                and len(str(ret).split()) \
229                != 0 and str(ret).split()[0] == "0":
230            return True
231        return False
232
233    @staticmethod
234    def check_remote_dict_is_exist(_ad, remote_file):
235        # test -f remotepath judge folder exists.
236        # if exist,return 0,else return None
237        # 判断设备中文件夹是否存在
238        ret = _ad.execute_shell_command(
239            "test -d {} && echo 0".format(remote_file))
240        if ret != "" \
241                and len(str(ret).split()) != 0 and str(ret).split()[0] == "0":
242            return True
243        return False
244
245
246def compare_text(text, expect_text, fuzzy):
247    """支持多种匹配方式的文本匹配"""
248    if fuzzy is None or fuzzy.startswith("equal"):
249        result = (expect_text == text)
250    elif fuzzy == "starts_with":
251        result = text.startswith(expect_text)
252    elif fuzzy == "ends_with":
253        result = text.endswith(expect_text)
254    elif fuzzy == "contains":
255        result = expect_text in text
256    elif fuzzy == "regexp":
257        result = re.search(expect_text, text)
258        result = False if result is None else True
259    else:
260        raise ParamError("expected [equal, starts_with, ends_with, contains], get [{}]".format(fuzzy))
261    return result
262
263
264def get_process_pid(device, process_name):
265    cmd = "ps -ef | grep '{}'".format(process_name)
266    ret = device.execute_shell_command(cmd)
267    ret = ret.strip()
268    pids = ret.split("\n")
269    for pid in pids:
270        if "grep" not in pid:
271            pid = pid.split()
272            return pid[1]
273    return None
274
275
276def check_port_state(port: int = None) -> None:
277    """查看端口状态"""
278    try:
279        log.debug("##########port state##########")
280        sys_type = platform.system()
281        if sys_type == "Linux" or sys_type == "Darwin":
282            cmd = "lsof -i:{}".format(port)
283            out = shell_command(cmd)
284            log.debug(out)
285        else:
286            out = shell_command("netstat -aon", "findstr :{}".format(port))
287            log.debug(out)
288            results = out.strip("\r\n")
289            if results:
290                results = results.split("\r\n")
291            for result in results:
292                items = result.split()
293                if items[0] == "TCP" and items[-2] == "LISTENING":
294                    out = shell_command("tasklist", "findstr {}".format(items[-1]))
295                    log.debug(out)
296        log.debug("##########port state##########")
297    except Exception as e:
298        log.error("check port state error, reason: {}".format(e))
299
300
301def shell_command(cmd: str, findstr: str = "") -> str:
302    command = cmd.split(" ")
303    first_process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False)
304    if findstr:
305        findstr_command = findstr.split(" ")
306        findstr_process = subprocess.Popen(findstr_command, stdin=first_process.stdout,
307                                           stderr=subprocess.PIPE,
308                                           stdout=subprocess.PIPE,
309                                           shell=False)
310        out, _ = findstr_process.communicate(timeout=10)
311    else:
312        out, _ = first_process.communicate(timeout=10)
313    out = out.decode("utf-8")
314    return out
315
316
317def check_device_file_md5(device, pc_file: str, device_file: str) -> bool:
318    if not os.path.isfile(pc_file):
319        raise FileNotFoundError(ErrorMessage.Common.Code_0201001.format(ErrorCategory.Environment, pc_file))
320    _, local_file_name = os.path.split(pc_file)
321    device_md5 = device.execute_shell_command(
322        "md5sum {}".format(device_file)).split()[0].strip()
323    device.log.debug("device {} md5: {}".format(local_file_name, device_md5))
324    with open(pc_file, "rb") as f:
325        data = f.read()
326        md5hash = hashlib.md5(data)
327        local_md5 = md5hash.hexdigest()
328    device.log.debug("local {} md5: {}".format(local_file_name, local_md5))
329    return True if device_md5 == local_md5 else False
330
331
332def is_standard_lib(file_path):
333    """Check if the file is part of the standard library."""
334    std_lib_path = os.path.join(sys.base_prefix, 'lib')
335    return file_path.startswith(std_lib_path)
336
337
338def is_third_party_lib(file_path):
339    """Check if the file is part of a third-party library."""
340    site_packages = [os.path.join(sys.prefix, 'lib', 'site-packages')]
341    if hasattr(sys, 'real_prefix'):  # This means we are in a virtual environment
342        site_packages.append(os.path.join(sys.real_prefix, 'lib', 'site-packages'))
343    site_packages.append(site.getusersitepackages())
344    return any(file_path.startswith(site_package) for site_package in site_packages)
345
346
347def extract_file_method_code(stack_line):
348    """Extract the Python file name and the method after 'in' from a stack trace line."""
349    match = re.search(r'File "(.*?)", line \d+, in (.+)', stack_line)
350    if match:
351        file_name = match.group(1).split(os.sep)[-1]
352        method_name = match.group(2).strip()
353        code_line = stack_line.split('\n')[-2].strip()
354        return file_name, method_name, code_line
355    return None, None, None
356