1cc290419Sopenharmony_ci#!/usr/bin/env python3
2cc290419Sopenharmony_ci# -*- coding: utf-8 -*-
3cc290419Sopenharmony_ci# Copyright (C) 2021 Huawei Device Co., Ltd.
4cc290419Sopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
5cc290419Sopenharmony_ci# you may not use this file except in compliance with the License.
6cc290419Sopenharmony_ci# You may obtain a copy of the License at
7cc290419Sopenharmony_ci#
8cc290419Sopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
9cc290419Sopenharmony_ci#
10cc290419Sopenharmony_ci# Unless required by applicable law or agreed to in writing, software
11cc290419Sopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
12cc290419Sopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13cc290419Sopenharmony_ci# See the License for the specific language governing permissions and
14cc290419Sopenharmony_ci# limitations under the License.
15cc290419Sopenharmony_ci
16cc290419Sopenharmony_ci# 0. 运行环境: python 3.10+, pytest
17cc290419Sopenharmony_ci# 1. 修改 GP 中字段
18cc290419Sopenharmony_ci# 2. pytest [-k case_name_pattern]
19cc290419Sopenharmony_ci#    eg. pytest -k file 执行方法名含 file 的用例
20cc290419Sopenharmony_ci
21cc290419Sopenharmony_ci# 打包
22cc290419Sopenharmony_ci# pip install pyinstaller
23cc290419Sopenharmony_ci# prepare assert source dir includes your data files
24cc290419Sopenharmony_ci# pyi-makespec  -D --add-data assert:assert dev_hdc_test.py
25cc290419Sopenharmony_ci# pyinstaller dev_hdc_test.spec
26cc290419Sopenharmony_ci# 执行 dev_hdc_test.exe
27cc290419Sopenharmony_ci
28cc290419Sopenharmony_ciimport csv
29cc290419Sopenharmony_ciimport hashlib
30cc290419Sopenharmony_ciimport json
31cc290419Sopenharmony_ciimport logging
32cc290419Sopenharmony_ciimport os
33cc290419Sopenharmony_ciimport random
34cc290419Sopenharmony_ciimport re
35cc290419Sopenharmony_ciimport stat
36cc290419Sopenharmony_ciimport shutil
37cc290419Sopenharmony_ciimport subprocess
38cc290419Sopenharmony_ciimport sys
39cc290419Sopenharmony_ciimport threading
40cc290419Sopenharmony_ciimport time
41cc290419Sopenharmony_cifrom multiprocessing import Process
42cc290419Sopenharmony_ci
43cc290419Sopenharmony_ciimport pytest
44cc290419Sopenharmony_ciimport pkg_resources
45cc290419Sopenharmony_ci
46cc290419Sopenharmony_ci
47cc290419Sopenharmony_ciclass GP(object):
48cc290419Sopenharmony_ci    """ Global Parameters
49cc290419Sopenharmony_ci
50cc290419Sopenharmony_ci    customize here !!!
51cc290419Sopenharmony_ci    """
52cc290419Sopenharmony_ci    hdc_exe = "hdc"
53cc290419Sopenharmony_ci    local_path = "resource"
54cc290419Sopenharmony_ci    remote_path = "data/local/tmp"
55cc290419Sopenharmony_ci    remote_dir_path = "data/local/tmp/it_send_dir"
56cc290419Sopenharmony_ci    remote_ip = "auto"
57cc290419Sopenharmony_ci    remote_port = 8710
58cc290419Sopenharmony_ci    hdc_head = "hdc"
59cc290419Sopenharmony_ci    device_name = ""
60cc290419Sopenharmony_ci    targets = []
61cc290419Sopenharmony_ci    tmode = "usb"
62cc290419Sopenharmony_ci    changed_testcase = "n"
63cc290419Sopenharmony_ci    testcase_path = "ts_windows.csv"
64cc290419Sopenharmony_ci    loaded_testcase = 0
65cc290419Sopenharmony_ci    hdcd_rom = "not checked"
66cc290419Sopenharmony_ci
67cc290419Sopenharmony_ci    @classmethod
68cc290419Sopenharmony_ci    def init(cls):
69cc290419Sopenharmony_ci        logging.basicConfig(level=logging.INFO,
70cc290419Sopenharmony_ci                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
71cc290419Sopenharmony_ci                            datefmt='%d %b %Y %H:%M:%S',
72cc290419Sopenharmony_ci            )
73cc290419Sopenharmony_ci        logging.basicConfig(level=logging.WARNING,
74cc290419Sopenharmony_ci                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
75cc290419Sopenharmony_ci                            datefmt='%d %b %Y %H:%M:%S',
76cc290419Sopenharmony_ci                            )
77cc290419Sopenharmony_ci
78cc290419Sopenharmony_ci        if os.path.exists(".hdctester.conf"):
79cc290419Sopenharmony_ci            cls.load()
80cc290419Sopenharmony_ci            cls.start_host()
81cc290419Sopenharmony_ci            cls.list_targets()
82cc290419Sopenharmony_ci        else:
83cc290419Sopenharmony_ci            cls.set_options()
84cc290419Sopenharmony_ci            cls.print_options()
85cc290419Sopenharmony_ci            cls.start_host()
86cc290419Sopenharmony_ci            cls.dump()
87cc290419Sopenharmony_ci        return
88cc290419Sopenharmony_ci
89cc290419Sopenharmony_ci
90cc290419Sopenharmony_ci    @classmethod
91cc290419Sopenharmony_ci    def start_host(cls):
92cc290419Sopenharmony_ci        cmd = f"{cls.hdc_exe} start"
93cc290419Sopenharmony_ci        res = subprocess.call(cmd.split())
94cc290419Sopenharmony_ci        return res
95cc290419Sopenharmony_ci
96cc290419Sopenharmony_ci    @classmethod
97cc290419Sopenharmony_ci    def list_targets(cls):
98cc290419Sopenharmony_ci        try:
99cc290419Sopenharmony_ci            targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split()
100cc290419Sopenharmony_ci        except (OSError, IndexError):
101cc290419Sopenharmony_ci            targets = [b"failed to auto detect device"]
102cc290419Sopenharmony_ci            cls.targets = [targets[0].decode()]
103cc290419Sopenharmony_ci            return False
104cc290419Sopenharmony_ci        cls.targets = [t.decode() for t in targets]
105cc290419Sopenharmony_ci        return True
106cc290419Sopenharmony_ci
107cc290419Sopenharmony_ci
108cc290419Sopenharmony_ci    @classmethod
109cc290419Sopenharmony_ci    def get_device(cls):
110cc290419Sopenharmony_ci        cls.start_host()
111cc290419Sopenharmony_ci        cls.list_targets()
112cc290419Sopenharmony_ci        if len(cls.targets) > 1:
113cc290419Sopenharmony_ci            print("Multiple device detected, please select one:")
114cc290419Sopenharmony_ci            for i, t in enumerate(cls.targets):
115cc290419Sopenharmony_ci                print(f"{i+1}. {t}")
116cc290419Sopenharmony_ci            print("input the nums of the device above:")
117cc290419Sopenharmony_ci            cls.device_name = cls.targets[int(input()) - 1]
118cc290419Sopenharmony_ci        else:
119cc290419Sopenharmony_ci            cls.device_name = cls.targets[0]
120cc290419Sopenharmony_ci        if cls.device_name == "failed to auto detect device":
121cc290419Sopenharmony_ci            print("No device detected, please check your device connection")
122cc290419Sopenharmony_ci            return False
123cc290419Sopenharmony_ci        elif cls.device_name == "[empty]":
124cc290419Sopenharmony_ci            print("No hdc device detected.")
125cc290419Sopenharmony_ci            return False
126cc290419Sopenharmony_ci        cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}"
127cc290419Sopenharmony_ci        return True
128cc290419Sopenharmony_ci
129cc290419Sopenharmony_ci
130cc290419Sopenharmony_ci    @classmethod
131cc290419Sopenharmony_ci    def dump(cls):
132cc290419Sopenharmony_ci        try:
133cc290419Sopenharmony_ci            os.remove(".hdctester.conf")
134cc290419Sopenharmony_ci        except OSError:
135cc290419Sopenharmony_ci            pass
136cc290419Sopenharmony_ci        content = filter(
137cc290419Sopenharmony_ci            lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items())
138cc290419Sopenharmony_ci        json_str = json.dumps(dict(content))
139cc290419Sopenharmony_ci        fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755)
140cc290419Sopenharmony_ci        os.write(fd, json_str.encode())
141cc290419Sopenharmony_ci        os.close(fd)
142cc290419Sopenharmony_ci        return True
143cc290419Sopenharmony_ci
144cc290419Sopenharmony_ci
145cc290419Sopenharmony_ci    @classmethod
146cc290419Sopenharmony_ci    def load(cls):
147cc290419Sopenharmony_ci        with open(".hdctester.conf") as f:
148cc290419Sopenharmony_ci            content = json.load(f)
149cc290419Sopenharmony_ci            cls.hdc_exe = content.get("hdc_exe")
150cc290419Sopenharmony_ci            cls.local_path = content.get("local_path")
151cc290419Sopenharmony_ci            cls.remote_path = content.get("remote_path")
152cc290419Sopenharmony_ci            cls.remote_ip = content.get("remote_ip")
153cc290419Sopenharmony_ci            cls.hdc_head = content.get("hdc_head")
154cc290419Sopenharmony_ci            cls.tmode = content.get("tmode")
155cc290419Sopenharmony_ci            cls.device_name = content.get("device_name")
156cc290419Sopenharmony_ci            cls.changed_testcase = content.get("changed_testcase")
157cc290419Sopenharmony_ci            cls.testcase_path = content.get("testcase_path")
158cc290419Sopenharmony_ci            cls.loaded_testcase = content.get("load_testcase")
159cc290419Sopenharmony_ci        return True
160cc290419Sopenharmony_ci
161cc290419Sopenharmony_ci
162cc290419Sopenharmony_ci    @classmethod
163cc290419Sopenharmony_ci    def print_options(cls):
164cc290419Sopenharmony_ci        info = "HDC Tester Default Options: \n\n" \
165cc290419Sopenharmony_ci        + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \
166cc290419Sopenharmony_ci        + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \
167cc290419Sopenharmony_ci        + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \
168cc290419Sopenharmony_ci        + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \
169cc290419Sopenharmony_ci        + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \
170cc290419Sopenharmony_ci        + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \
171cc290419Sopenharmony_ci        + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \
172cc290419Sopenharmony_ci        + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \
173cc290419Sopenharmony_ci        + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \
174cc290419Sopenharmony_ci        + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \
175cc290419Sopenharmony_ci        + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n"
176cc290419Sopenharmony_ci        print(info)
177cc290419Sopenharmony_ci
178cc290419Sopenharmony_ci
179cc290419Sopenharmony_ci    @classmethod
180cc290419Sopenharmony_ci    def tconn_tcp(cls):
181cc290419Sopenharmony_ci        res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode()
182cc290419Sopenharmony_ci        if "Connect OK" in res:
183cc290419Sopenharmony_ci            return True
184cc290419Sopenharmony_ci        else:
185cc290419Sopenharmony_ci            return False
186cc290419Sopenharmony_ci
187cc290419Sopenharmony_ci
188cc290419Sopenharmony_ci    @classmethod
189cc290419Sopenharmony_ci    def set_options(cls):
190cc290419Sopenharmony_ci        if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip():
191cc290419Sopenharmony_ci            cls.hdc_exe = opt
192cc290419Sopenharmony_ci        if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip():
193cc290419Sopenharmony_ci            cls.local_path = opt
194cc290419Sopenharmony_ci        if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip():
195cc290419Sopenharmony_ci            cls.remote_path = opt
196cc290419Sopenharmony_ci        if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip():
197cc290419Sopenharmony_ci            cls.remote_ip = opt
198cc290419Sopenharmony_ci        if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip():
199cc290419Sopenharmony_ci            cls.remote_port = int(opt)
200cc290419Sopenharmony_ci        if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip():
201cc290419Sopenharmony_ci            cls.device_name = opt
202cc290419Sopenharmony_ci        if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip():
203cc290419Sopenharmony_ci            cls.tmode = opt
204cc290419Sopenharmony_ci        if cls.tmode == "usb":
205cc290419Sopenharmony_ci            ret = cls.get_device()
206cc290419Sopenharmony_ci            if ret:
207cc290419Sopenharmony_ci                print("USB device detected.")
208cc290419Sopenharmony_ci        elif cls.tconn_tcp():
209cc290419Sopenharmony_ci            cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}"
210cc290419Sopenharmony_ci        else:
211cc290419Sopenharmony_ci            print(f"tconn {cls.remote_ip}:{cls.remote_port} failed")
212cc290419Sopenharmony_ci            return False
213cc290419Sopenharmony_ci        return True
214cc290419Sopenharmony_ci
215cc290419Sopenharmony_ci
216cc290419Sopenharmony_ci    @classmethod
217cc290419Sopenharmony_ci    def change_testcase(cls):
218cc290419Sopenharmony_ci        if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip():
219cc290419Sopenharmony_ci            cls.changed_testcase = opt
220cc290419Sopenharmony_ci            if opt == "n":
221cc290419Sopenharmony_ci                return False
222cc290419Sopenharmony_ci        if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip():
223cc290419Sopenharmony_ci            cls.testcase_path = os.path.join(opt)
224cc290419Sopenharmony_ci        cls.print_options()
225cc290419Sopenharmony_ci        return True
226cc290419Sopenharmony_ci
227cc290419Sopenharmony_ci
228cc290419Sopenharmony_ci    @classmethod
229cc290419Sopenharmony_ci    def load_testcase(cls):
230cc290419Sopenharmony_ci        print("this fuction will coming soon.")
231cc290419Sopenharmony_ci        return False
232cc290419Sopenharmony_ci
233cc290419Sopenharmony_ci    @classmethod
234cc290419Sopenharmony_ci    def get_version(cls):
235cc290419Sopenharmony_ci        version = f"v1.0.4a"
236cc290419Sopenharmony_ci        return version
237cc290419Sopenharmony_ci
238cc290419Sopenharmony_ci
239cc290419Sopenharmony_cidef pytest_run(args):
240cc290419Sopenharmony_ci    file_list = []
241cc290419Sopenharmony_ci    file_list.append("entry-default-signed-debug.hap")
242cc290419Sopenharmony_ci    file_list.append("libA_v10001.hsp")
243cc290419Sopenharmony_ci    file_list.append("libB_v10001.hsp")
244cc290419Sopenharmony_ci    for file in file_list:
245cc290419Sopenharmony_ci        if not os.path.exists(os.path.join(GP.local_path, file)):
246cc290419Sopenharmony_ci            print(f"No {file} File!")
247cc290419Sopenharmony_ci            print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。")
248cc290419Sopenharmony_ci            print("Please unzip package.zip to resource directory, please rerun after operation.")
249cc290419Sopenharmony_ci            input("[ENTER]")
250cc290419Sopenharmony_ci            return
251cc290419Sopenharmony_ci    gen_package_dir()
252cc290419Sopenharmony_ci    start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
253cc290419Sopenharmony_ci    if args.count is not None:
254cc290419Sopenharmony_ci        for i in range(args.count):
255cc290419Sopenharmony_ci            print(f"------------The {i}/{args.count} Test-------------")
256cc290419Sopenharmony_ci            timestamp = time.time()
257cc290419Sopenharmony_ci            pytest_args = [
258cc290419Sopenharmony_ci                '--verbose', args.verbose,
259cc290419Sopenharmony_ci                '--report=report.html',
260cc290419Sopenharmony_ci                '--title=HDC Test Report 'f"{GP.get_version()}",
261cc290419Sopenharmony_ci                '--tester=tester001',
262cc290419Sopenharmony_ci                '--template=1',
263cc290419Sopenharmony_ci                '--desc='f"{args.verbose}:{args.desc}"
264cc290419Sopenharmony_ci            ]
265cc290419Sopenharmony_ci            pytest.main(pytest_args)
266cc290419Sopenharmony_ci    end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
267cc290419Sopenharmony_ci    report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time()))
268cc290419Sopenharmony_ci    report_dir = os.path.join(os.getcwd(), "reports")
269cc290419Sopenharmony_ci    report_file = os.path.join(report_dir, f"{report_time}report.html")
270cc290419Sopenharmony_ci    print(f"Test over, the script version is {GP.get_version()},"
271cc290419Sopenharmony_ci        f" start at {start_time}, end at {end_time} \n"
272cc290419Sopenharmony_ci        f"=======>The device hdcd ROM size is {GP.hdcd_rom}.\n"
273cc290419Sopenharmony_ci        f"=======>{report_file} is saved. \n"
274cc290419Sopenharmony_ci    )
275cc290419Sopenharmony_ci    input("=======>press [Enter] key to Show logs.")
276cc290419Sopenharmony_ci
277cc290419Sopenharmony_ci
278cc290419Sopenharmony_cidef rmdir(path):
279cc290419Sopenharmony_ci    try:
280cc290419Sopenharmony_ci        if sys.platform == "win32":
281cc290419Sopenharmony_ci            if os.path.isfile(path):
282cc290419Sopenharmony_ci                os.remove(path)
283cc290419Sopenharmony_ci            else:
284cc290419Sopenharmony_ci                shutil.rmtree(path)
285cc290419Sopenharmony_ci        else:
286cc290419Sopenharmony_ci            subprocess.call(f"rm -rf {path}".split())
287cc290419Sopenharmony_ci    except OSError:
288cc290419Sopenharmony_ci        print(f"Error: {path} : cannot remove")
289cc290419Sopenharmony_ci        pass
290cc290419Sopenharmony_ci
291cc290419Sopenharmony_ci
292cc290419Sopenharmony_cidef get_local_path(path):
293cc290419Sopenharmony_ci    return os.path.join(GP.local_path, path)
294cc290419Sopenharmony_ci
295cc290419Sopenharmony_ci
296cc290419Sopenharmony_cidef get_remote_path(path):
297cc290419Sopenharmony_ci    return f"{GP.remote_path}/{path}"
298cc290419Sopenharmony_ci
299cc290419Sopenharmony_ci
300cc290419Sopenharmony_cidef get_local_md5(local):
301cc290419Sopenharmony_ci    md5_hash = hashlib.md5()
302cc290419Sopenharmony_ci    with open(local, "rb") as f:
303cc290419Sopenharmony_ci        for byte_block in iter(lambda: f.read(4096), b""):
304cc290419Sopenharmony_ci            md5_hash.update(byte_block)
305cc290419Sopenharmony_ci    return md5_hash.hexdigest()
306cc290419Sopenharmony_ci
307cc290419Sopenharmony_ci
308cc290419Sopenharmony_cidef check_shell(cmd, pattern=None, fetch=False):
309cc290419Sopenharmony_ci    cmd = f"{GP.hdc_head} {cmd}"
310cc290419Sopenharmony_ci    print(f"\nexecuting command: {cmd}")
311cc290419Sopenharmony_ci    if pattern: # check output valid
312cc290419Sopenharmony_ci        output = subprocess.check_output(cmd.split()).decode()
313cc290419Sopenharmony_ci        res = pattern in output
314cc290419Sopenharmony_ci        print(f"--> output: {output}")
315cc290419Sopenharmony_ci        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
316cc290419Sopenharmony_ci        return res
317cc290419Sopenharmony_ci    elif fetch:
318cc290419Sopenharmony_ci        output = subprocess.check_output(cmd.split()).decode()
319cc290419Sopenharmony_ci        print(f"--> output: {output}")
320cc290419Sopenharmony_ci        return output
321cc290419Sopenharmony_ci    else: # check cmd run successfully
322cc290419Sopenharmony_ci        return subprocess.check_call(cmd.split()) == 0
323cc290419Sopenharmony_ci
324cc290419Sopenharmony_ci
325cc290419Sopenharmony_cidef get_shell_result(cmd, pattern=None, fetch=False):
326cc290419Sopenharmony_ci    cmd = f"{GP.hdc_head} {cmd}"
327cc290419Sopenharmony_ci    print(f"\nexecuting command: {cmd}")
328cc290419Sopenharmony_ci    return subprocess.check_output(cmd.split()).decode()
329cc290419Sopenharmony_ci
330cc290419Sopenharmony_ci
331cc290419Sopenharmony_cidef check_rate(cmd, expected_rate):
332cc290419Sopenharmony_ci    send_result = get_shell_result(cmd)
333cc290419Sopenharmony_ci    rate = float(send_result.split("rate:")[1].split("kB/s")[0])
334cc290419Sopenharmony_ci    return rate > expected_rate
335cc290419Sopenharmony_ci
336cc290419Sopenharmony_ci
337cc290419Sopenharmony_cidef check_dir(local, remote, is_single_dir=False):
338cc290419Sopenharmony_ci    def _get_md5sum(remote, is_single_dir=False):
339cc290419Sopenharmony_ci        if is_single_dir:
340cc290419Sopenharmony_ci            cmd = f"{GP.hdc_head} shell md5sum {remote}/*"
341cc290419Sopenharmony_ci        else:
342cc290419Sopenharmony_ci            cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;'
343cc290419Sopenharmony_ci        result = subprocess.check_output(cmd.split()).decode()
344cc290419Sopenharmony_ci        return result
345cc290419Sopenharmony_ci
346cc290419Sopenharmony_ci    def _calculate_md5(file_path):
347cc290419Sopenharmony_ci        md5 = hashlib.md5()
348cc290419Sopenharmony_ci        try:
349cc290419Sopenharmony_ci            with open(file_path, 'rb') as f:
350cc290419Sopenharmony_ci                for chunk in iter(lambda: f.read(4096), b""):
351cc290419Sopenharmony_ci                    md5.update(chunk)
352cc290419Sopenharmony_ci            return md5.hexdigest()
353cc290419Sopenharmony_ci        except PermissionError:
354cc290419Sopenharmony_ci            return "PermissionError"
355cc290419Sopenharmony_ci        except FileNotFoundError:
356cc290419Sopenharmony_ci            return "FileNotFoundError"
357cc290419Sopenharmony_ci    print("remote:" + remote)
358cc290419Sopenharmony_ci    output = _get_md5sum(remote)
359cc290419Sopenharmony_ci    print(output)
360cc290419Sopenharmony_ci
361cc290419Sopenharmony_ci    result = 1
362cc290419Sopenharmony_ci    for line in output.splitlines():
363cc290419Sopenharmony_ci        if len(line) < 32: # length of MD5
364cc290419Sopenharmony_ci            continue
365cc290419Sopenharmony_ci        expected_md5, file_name = line.split()[:2]
366cc290419Sopenharmony_ci        if is_single_dir:
367cc290419Sopenharmony_ci            file_name = file_name.replace(f"{remote}", "")
368cc290419Sopenharmony_ci        elif GP.remote_path in remote:
369cc290419Sopenharmony_ci            file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\")
370cc290419Sopenharmony_ci        else:
371cc290419Sopenharmony_ci            file_name = file_name.split(remote)[1].replace("/", "\\")
372cc290419Sopenharmony_ci        file_path = os.path.join(os.getcwd(), GP.local_path) + file_name  # 构建完整的文件路径
373cc290419Sopenharmony_ci        if is_single_dir:
374cc290419Sopenharmony_ci            file_path = os.path.join(os.getcwd(), local) + file_name
375cc290419Sopenharmony_ci        print(file_path)
376cc290419Sopenharmony_ci        actual_md5 = _calculate_md5(file_path)
377cc290419Sopenharmony_ci        print(f"Expected: {expected_md5}")
378cc290419Sopenharmony_ci        print(f"Actual: {actual_md5}")
379cc290419Sopenharmony_ci        print(f"MD5 matched {file_name}")
380cc290419Sopenharmony_ci        if actual_md5 != expected_md5:
381cc290419Sopenharmony_ci            print(f"[Fail]MD5 mismatch for {file_name}")
382cc290419Sopenharmony_ci            result *= 0
383cc290419Sopenharmony_ci
384cc290419Sopenharmony_ci    return (result == 1)
385cc290419Sopenharmony_ci
386cc290419Sopenharmony_ci
387cc290419Sopenharmony_cidef _check_file(local, remote):
388cc290419Sopenharmony_ci    if remote.startswith("/proc"):
389cc290419Sopenharmony_ci        local_size = os.path.getsize(local)
390cc290419Sopenharmony_ci        if local_size > 0:
391cc290419Sopenharmony_ci            return True
392cc290419Sopenharmony_ci        else:
393cc290419Sopenharmony_ci            return False
394cc290419Sopenharmony_ci    else:
395cc290419Sopenharmony_ci        cmd = f"shell md5sum {remote}"
396cc290419Sopenharmony_ci        local_md5 = get_local_md5(local)
397cc290419Sopenharmony_ci        return check_shell(cmd, local_md5)
398cc290419Sopenharmony_ci
399cc290419Sopenharmony_ci
400cc290419Sopenharmony_cidef _check_app_installed(bundle, is_shared=False):
401cc290419Sopenharmony_ci    dump = "dump-shared" if is_shared else "dump"
402cc290419Sopenharmony_ci    cmd = f"shell bm {dump} -a"
403cc290419Sopenharmony_ci    return check_shell(cmd, bundle)
404cc290419Sopenharmony_ci
405cc290419Sopenharmony_ci
406cc290419Sopenharmony_cidef check_hdc_targets():
407cc290419Sopenharmony_ci    cmd = f"{GP.hdc_head} list targets"
408cc290419Sopenharmony_ci    print(GP.device_name)
409cc290419Sopenharmony_ci    return check_shell(cmd, GP.device_name)
410cc290419Sopenharmony_ci
411cc290419Sopenharmony_ci
412cc290419Sopenharmony_cidef check_file_send(local, remote):
413cc290419Sopenharmony_ci    local_path = os.path.join(GP.local_path, local)
414cc290419Sopenharmony_ci    remote_path = f"{GP.remote_path}/{remote}"
415cc290419Sopenharmony_ci    cmd = f"file send {local_path} {remote_path}"
416cc290419Sopenharmony_ci    return check_shell(cmd) and _check_file(local_path, remote_path)
417cc290419Sopenharmony_ci
418cc290419Sopenharmony_ci
419cc290419Sopenharmony_cidef check_file_recv(remote, local):
420cc290419Sopenharmony_ci    local_path = os.path.join(GP.local_path, local)
421cc290419Sopenharmony_ci    remote_path = f"{GP.remote_path}/{remote}"
422cc290419Sopenharmony_ci    cmd = f"file recv {remote_path} {local_path}"
423cc290419Sopenharmony_ci    return check_shell(cmd) and _check_file(local_path, remote_path)
424cc290419Sopenharmony_ci
425cc290419Sopenharmony_ci
426cc290419Sopenharmony_cidef check_app_install(app, bundle, args=""):
427cc290419Sopenharmony_ci    app = os.path.join(GP.local_path, app)
428cc290419Sopenharmony_ci    install_cmd = f"install {args} {app}"
429cc290419Sopenharmony_ci    if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")):
430cc290419Sopenharmony_ci        return check_shell(install_cmd, "failed to install bundle")
431cc290419Sopenharmony_ci    else:
432cc290419Sopenharmony_ci        return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args)
433cc290419Sopenharmony_ci
434cc290419Sopenharmony_ci
435cc290419Sopenharmony_cidef check_app_uninstall(bundle, args=""):
436cc290419Sopenharmony_ci    uninstall_cmd = f"uninstall {args} {bundle}"
437cc290419Sopenharmony_ci    return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args)
438cc290419Sopenharmony_ci
439cc290419Sopenharmony_ci
440cc290419Sopenharmony_cidef check_app_install_multi(tables, args=""):
441cc290419Sopenharmony_ci    apps = []
442cc290419Sopenharmony_ci    bundles = []
443cc290419Sopenharmony_ci    for app, bundle in tables.items() :
444cc290419Sopenharmony_ci        app = os.path.join(GP.local_path, app)
445cc290419Sopenharmony_ci        apps.append(app)
446cc290419Sopenharmony_ci        bundles.append(bundle)
447cc290419Sopenharmony_ci
448cc290419Sopenharmony_ci    apps_str = " ".join(apps)
449cc290419Sopenharmony_ci    install_cmd = f"install {args} {apps_str}"
450cc290419Sopenharmony_ci
451cc290419Sopenharmony_ci    if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str))
452cc290419Sopenharmony_ci        or (args == "" and 0 == apps_str.count(".hap"))):
453cc290419Sopenharmony_ci        if not check_shell(install_cmd, "failed to install bundle"):
454cc290419Sopenharmony_ci            return False
455cc290419Sopenharmony_ci    else:
456cc290419Sopenharmony_ci        if not check_shell(install_cmd, "successfully"):
457cc290419Sopenharmony_ci            return False
458cc290419Sopenharmony_ci
459cc290419Sopenharmony_ci        for bundle in bundles:
460cc290419Sopenharmony_ci            if not _check_app_installed(bundle, "s" in args):
461cc290419Sopenharmony_ci                return False
462cc290419Sopenharmony_ci
463cc290419Sopenharmony_ci    return True
464cc290419Sopenharmony_ci
465cc290419Sopenharmony_ci
466cc290419Sopenharmony_cidef check_app_uninstall_multi(tables, args=""):
467cc290419Sopenharmony_ci    for app, bundle in tables.items() :
468cc290419Sopenharmony_ci        if not check_app_uninstall(bundle, args):
469cc290419Sopenharmony_ci            return False
470cc290419Sopenharmony_ci
471cc290419Sopenharmony_ci    return True
472cc290419Sopenharmony_ci
473cc290419Sopenharmony_ci
474cc290419Sopenharmony_cidef check_hdc_cmd(cmd, pattern=None, **args):
475cc290419Sopenharmony_ci    if cmd.startswith("file"):
476cc290419Sopenharmony_ci        if not check_shell(cmd, "FileTransfer finish"):
477cc290419Sopenharmony_ci            return False
478cc290419Sopenharmony_ci        if cmd.startswith("file send"):
479cc290419Sopenharmony_ci            local, remote = cmd.split()[-2:]
480cc290419Sopenharmony_ci        else:
481cc290419Sopenharmony_ci            remote, local = cmd.split()[-2:]
482cc290419Sopenharmony_ci        if os.path.isfile(local):
483cc290419Sopenharmony_ci            return _check_file(local, remote)
484cc290419Sopenharmony_ci        else:
485cc290419Sopenharmony_ci            return check_dir(local, remote)
486cc290419Sopenharmony_ci    elif cmd.startswith("install"):
487cc290419Sopenharmony_ci        bundle = args.get("bundle", "invalid")
488cc290419Sopenharmony_ci        opt = " ".join(cmd.split()[1:-1])
489cc290419Sopenharmony_ci        return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt)
490cc290419Sopenharmony_ci
491cc290419Sopenharmony_ci    elif cmd.startswith("uninstall"):
492cc290419Sopenharmony_ci        bundle = cmd.split()[-1]
493cc290419Sopenharmony_ci        opt = " ".join(cmd.split()[1:-1])
494cc290419Sopenharmony_ci        return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt)
495cc290419Sopenharmony_ci
496cc290419Sopenharmony_ci    else:
497cc290419Sopenharmony_ci        return check_shell(cmd, pattern, **args)
498cc290419Sopenharmony_ci
499cc290419Sopenharmony_ci
500cc290419Sopenharmony_cidef check_soft_local(local_source, local_soft, remote):
501cc290419Sopenharmony_ci    cmd = f"file send {local_soft} {remote}"
502cc290419Sopenharmony_ci    if not check_shell(cmd, "FileTransfer finish"):
503cc290419Sopenharmony_ci        return False
504cc290419Sopenharmony_ci    return _check_file(local_source, remote)
505cc290419Sopenharmony_ci
506cc290419Sopenharmony_ci
507cc290419Sopenharmony_cidef check_soft_remote(remote_source, remote_soft, local_recv):
508cc290419Sopenharmony_ci    check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}")
509cc290419Sopenharmony_ci    cmd = f"file recv {remote_soft} {local_recv}"
510cc290419Sopenharmony_ci    if not check_shell(cmd, "FileTransfer finish"):
511cc290419Sopenharmony_ci        return False
512cc290419Sopenharmony_ci    return _check_file(local_recv, get_remote_path(remote_source))
513cc290419Sopenharmony_ci
514cc290419Sopenharmony_ci
515cc290419Sopenharmony_cidef switch_usb():
516cc290419Sopenharmony_ci    res = check_hdc_cmd("tmode usb")
517cc290419Sopenharmony_ci    time.sleep(3)
518cc290419Sopenharmony_ci    if res:
519cc290419Sopenharmony_ci        GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}"
520cc290419Sopenharmony_ci    return res
521cc290419Sopenharmony_ci
522cc290419Sopenharmony_ci
523cc290419Sopenharmony_cidef copy_file(src, dst):
524cc290419Sopenharmony_ci    try:
525cc290419Sopenharmony_ci        shutil.copy2(src, dst)
526cc290419Sopenharmony_ci        print(f"File copied successfully from {src} to {dst}")
527cc290419Sopenharmony_ci    except IOError as e:
528cc290419Sopenharmony_ci        print(f"Unable to copy file. {e}")
529cc290419Sopenharmony_ci    except Exception as e:
530cc290419Sopenharmony_ci        print(f"Unexpected error: {e}")
531cc290419Sopenharmony_ci
532cc290419Sopenharmony_ci
533cc290419Sopenharmony_cidef switch_tcp():
534cc290419Sopenharmony_ci    if not GP.remote_ip: # skip tcp check
535cc290419Sopenharmony_ci        print("!!! remote_ip is none, skip tcp check !!!")
536cc290419Sopenharmony_ci        return True
537cc290419Sopenharmony_ci    if GP.remote_ip == "auto":
538cc290419Sopenharmony_ci        ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True)
539cc290419Sopenharmony_ci        if not ipconf:
540cc290419Sopenharmony_ci            print("!!! device ip not found, skip tcp check !!!")
541cc290419Sopenharmony_ci            return True
542cc290419Sopenharmony_ci        GP.remote_ip = ipconf.split(":")[1].split()[0]
543cc290419Sopenharmony_ci        print(f"fetch remote ip: {GP.remote_ip}")
544cc290419Sopenharmony_ci    ret = check_hdc_cmd(f"tmode port {GP.remote_port}")
545cc290419Sopenharmony_ci    if ret:
546cc290419Sopenharmony_ci        time.sleep(3)
547cc290419Sopenharmony_ci    res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK")
548cc290419Sopenharmony_ci    if res:
549cc290419Sopenharmony_ci        GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}"
550cc290419Sopenharmony_ci    return res
551cc290419Sopenharmony_ci
552cc290419Sopenharmony_ci
553cc290419Sopenharmony_cidef select_cmd():
554cc290419Sopenharmony_ci    msg = "1) Proceed tester\n" \
555cc290419Sopenharmony_ci        + "2) Customize tester\n" \
556cc290419Sopenharmony_ci        + "3) Setup files for transfer\n" \
557cc290419Sopenharmony_ci        + "4) Load custom testcase(default unused) \n" \
558cc290419Sopenharmony_ci        + "5) Exit\n" \
559cc290419Sopenharmony_ci        + ">> "
560cc290419Sopenharmony_ci
561cc290419Sopenharmony_ci    while True:
562cc290419Sopenharmony_ci        opt = input(msg).strip()
563cc290419Sopenharmony_ci        if len(opt) == 1 and '1' <= opt <= '5':
564cc290419Sopenharmony_ci            return opt
565cc290419Sopenharmony_ci
566cc290419Sopenharmony_ci
567cc290419Sopenharmony_cidef gen_file(path, size):
568cc290419Sopenharmony_ci    index = 0
569cc290419Sopenharmony_ci    path = os.path.abspath(path)
570cc290419Sopenharmony_ci    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
571cc290419Sopenharmony_ci
572cc290419Sopenharmony_ci    while index < size:
573cc290419Sopenharmony_ci        os.write(fd, os.urandom(1024))
574cc290419Sopenharmony_ci        index += 1024
575cc290419Sopenharmony_ci    os.close(fd)
576cc290419Sopenharmony_ci
577cc290419Sopenharmony_ci
578cc290419Sopenharmony_cidef gen_zero_file(path, size):
579cc290419Sopenharmony_ci    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
580cc290419Sopenharmony_ci    os.write(fd, b'0' * size)
581cc290419Sopenharmony_ci    os.close(fd)
582cc290419Sopenharmony_ci
583cc290419Sopenharmony_ci
584cc290419Sopenharmony_cidef create_file_with_size(path, size):
585cc290419Sopenharmony_ci    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
586cc290419Sopenharmony_ci    os.write(fd, b'\0' * size)
587cc290419Sopenharmony_ci    os.close(fd)
588cc290419Sopenharmony_ci
589cc290419Sopenharmony_ci
590cc290419Sopenharmony_cidef gen_file_set():
591cc290419Sopenharmony_ci    print("generating empty file ...")
592cc290419Sopenharmony_ci    gen_file(os.path.join(GP.local_path, "empty"), 0)
593cc290419Sopenharmony_ci
594cc290419Sopenharmony_ci    print("generating small file ...")
595cc290419Sopenharmony_ci    gen_file(os.path.join(GP.local_path, "small"), 102400)
596cc290419Sopenharmony_ci
597cc290419Sopenharmony_ci    print("generating medium file ...")
598cc290419Sopenharmony_ci    gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2)
599cc290419Sopenharmony_ci
600cc290419Sopenharmony_ci    print("generating large file ...")
601cc290419Sopenharmony_ci    gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
602cc290419Sopenharmony_ci
603cc290419Sopenharmony_ci    print("generating soft link ...")
604cc290419Sopenharmony_ci    try:
605cc290419Sopenharmony_ci        os.symlink("small", os.path.join(GP.local_path, "soft_small"))
606cc290419Sopenharmony_ci    except FileExistsError:
607cc290419Sopenharmony_ci        print("soft_small already exists")
608cc290419Sopenharmony_ci
609cc290419Sopenharmony_ci    print("generating text file ...")
610cc290419Sopenharmony_ci    gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2)
611cc290419Sopenharmony_ci
612cc290419Sopenharmony_ci    print("generating package dir ...")
613cc290419Sopenharmony_ci    if not os.path.exists(os.path.join(GP.local_path, "package")):
614cc290419Sopenharmony_ci        os.mkdir(os.path.join(GP.local_path, "package"))
615cc290419Sopenharmony_ci    for i in range(1, 6):
616cc290419Sopenharmony_ci        gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2)
617cc290419Sopenharmony_ci
618cc290419Sopenharmony_ci    print("generating deep dir ...")
619cc290419Sopenharmony_ci    deepth = 4
620cc290419Sopenharmony_ci    deep_path = os.path.join(GP.local_path, "deep_dir")
621cc290419Sopenharmony_ci    if not os.path.exists(deep_path):
622cc290419Sopenharmony_ci        os.mkdir(deep_path)
623cc290419Sopenharmony_ci    for deep in range(deepth):
624cc290419Sopenharmony_ci        deep_path = os.path.join(deep_path, f"deep_dir{deep}")
625cc290419Sopenharmony_ci        if not os.path.exists(deep_path):
626cc290419Sopenharmony_ci            os.mkdir(deep_path)
627cc290419Sopenharmony_ci    gen_file(os.path.join(deep_path, "deep"), 102400)
628cc290419Sopenharmony_ci
629cc290419Sopenharmony_ci    print("generating dir with file ...")
630cc290419Sopenharmony_ci    dir_path = os.path.join(GP.local_path, "problem_dir")
631cc290419Sopenharmony_ci    rmdir(dir_path)
632cc290419Sopenharmony_ci    os.mkdir(dir_path)
633cc290419Sopenharmony_ci    gen_file(os.path.join(dir_path, "small2"), 102400)
634cc290419Sopenharmony_ci
635cc290419Sopenharmony_ci    fuzz_count = 47 # 47 is the count that circulated the file transfer
636cc290419Sopenharmony_ci    data_unit = 1024 # 1024 is the size that circulated the file transfer
637cc290419Sopenharmony_ci    data_extra = 936 # 936 is the size that cased the extra file transfer
638cc290419Sopenharmony_ci    for i in range(fuzz_count):
639cc290419Sopenharmony_ci        create_file_with_size(
640cc290419Sopenharmony_ci            os.path.join(
641cc290419Sopenharmony_ci                dir_path, f"file_{i * data_unit+data_extra}.dat"
642cc290419Sopenharmony_ci                ), i * data_unit + data_extra
643cc290419Sopenharmony_ci            )
644cc290419Sopenharmony_ci
645cc290419Sopenharmony_ci    print("generating empty dir ...")
646cc290419Sopenharmony_ci    dir_path = os.path.join(GP.local_path, "empty_dir")
647cc290419Sopenharmony_ci    rmdir(dir_path)
648cc290419Sopenharmony_ci    os.mkdir(dir_path)
649cc290419Sopenharmony_ci
650cc290419Sopenharmony_ci    print("generating version file ...")
651cc290419Sopenharmony_ci    gen_file(os.path.join(GP.local_path, GP.get_version()), 0)
652cc290419Sopenharmony_ci
653cc290419Sopenharmony_ci
654cc290419Sopenharmony_cidef gen_package_dir():
655cc290419Sopenharmony_ci    print("generating app dir ...")
656cc290419Sopenharmony_ci    dir_path = os.path.join(GP.local_path, "app_dir")
657cc290419Sopenharmony_ci    rmdir(dir_path)
658cc290419Sopenharmony_ci    os.mkdir(dir_path)
659cc290419Sopenharmony_ci    app = os.path.join(GP.local_path, "entry-default-signed-debug.hap")
660cc290419Sopenharmony_ci    dst_dir = os.path.join(GP.local_path, "app_dir")
661cc290419Sopenharmony_ci    if not os.path.exists(app):
662cc290419Sopenharmony_ci        print(f"Source file {app} does not exist.")
663cc290419Sopenharmony_ci    else:
664cc290419Sopenharmony_ci        copy_file(app, dst_dir)
665cc290419Sopenharmony_ci
666cc290419Sopenharmony_ci
667cc290419Sopenharmony_cidef prepare_source():
668cc290419Sopenharmony_ci    if os.path.exists(os.path.join(GP.local_path, GP.get_version())):
669cc290419Sopenharmony_ci        print(f"hdc test version is {GP.get_version}, check ok, skip prepare.")
670cc290419Sopenharmony_ci        return
671cc290419Sopenharmony_ci    print(f"in prepare {GP.local_path},wait for 2 mins.")
672cc290419Sopenharmony_ci    current_path = os.getcwd()
673cc290419Sopenharmony_ci
674cc290419Sopenharmony_ci    if os.path.exists(GP.local_path):
675cc290419Sopenharmony_ci        #打开local_path遍历其中的文件,删除hap hsp以外的所有文件
676cc290419Sopenharmony_ci        for file in os.listdir(GP.local_path):
677cc290419Sopenharmony_ci            if file.endswith(".hap") or file.endswith(".hsp"):
678cc290419Sopenharmony_ci                continue
679cc290419Sopenharmony_ci            file_path = os.path.join(GP.local_path, file)
680cc290419Sopenharmony_ci            rmdir(file_path)
681cc290419Sopenharmony_ci    else:
682cc290419Sopenharmony_ci        os.mkdir(GP.local_path)
683cc290419Sopenharmony_ci
684cc290419Sopenharmony_ci    gen_file_set()
685cc290419Sopenharmony_ci
686cc290419Sopenharmony_ci
687cc290419Sopenharmony_cidef add_prepare_source():
688cc290419Sopenharmony_ci    deep_path = os.path.join(GP.local_path, "deep_test_dir")
689cc290419Sopenharmony_ci    print("generating deep test dir ...")
690cc290419Sopenharmony_ci    absolute_path = os.path.abspath(__file__)
691cc290419Sopenharmony_ci    deepth = (255 - 9 - len(absolute_path)) % 14
692cc290419Sopenharmony_ci    os.mkdir(deep_path)
693cc290419Sopenharmony_ci    for deep in range(deepth):
694cc290419Sopenharmony_ci        deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
695cc290419Sopenharmony_ci        os.mkdir(deep_path)
696cc290419Sopenharmony_ci    gen_file(os.path.join(deep_path, "deep_test"), 102400)
697cc290419Sopenharmony_ci
698cc290419Sopenharmony_ci    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
699cc290419Sopenharmony_ci    print("generating recv test dir ...")
700cc290419Sopenharmony_ci    os.mkdir(recv_dir)
701cc290419Sopenharmony_ci
702cc290419Sopenharmony_ci
703cc290419Sopenharmony_cidef update_source():
704cc290419Sopenharmony_ci    deep_path = os.path.join(GP.local_path, "deep_test_dir")
705cc290419Sopenharmony_ci    if not os.path.exists(deep_path):
706cc290419Sopenharmony_ci        print("generating deep test dir ...")
707cc290419Sopenharmony_ci        absolute_path = os.path.abspath(__file__)
708cc290419Sopenharmony_ci        deepth = (255 - 9 - len(absolute_path)) % 14
709cc290419Sopenharmony_ci        os.mkdir(deep_path)
710cc290419Sopenharmony_ci        for deep in range(deepth):
711cc290419Sopenharmony_ci            deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
712cc290419Sopenharmony_ci            os.mkdir(deep_path)
713cc290419Sopenharmony_ci        gen_file(os.path.join(deep_path, "deep_test"), 102400)
714cc290419Sopenharmony_ci
715cc290419Sopenharmony_ci    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
716cc290419Sopenharmony_ci    if not os.path.exists(recv_dir):
717cc290419Sopenharmony_ci        print("generating recv test dir ...")
718cc290419Sopenharmony_ci        os.mkdir(recv_dir)
719cc290419Sopenharmony_ci
720cc290419Sopenharmony_ci
721cc290419Sopenharmony_cidef setup_tester():
722cc290419Sopenharmony_ci    while True:
723cc290419Sopenharmony_ci        GP.print_options()
724cc290419Sopenharmony_ci        opt = int(select_cmd())
725cc290419Sopenharmony_ci        if opt == 1:
726cc290419Sopenharmony_ci            return True
727cc290419Sopenharmony_ci        elif opt == 2:
728cc290419Sopenharmony_ci            if not GP.set_options():
729cc290419Sopenharmony_ci                return False
730cc290419Sopenharmony_ci            GP.dump()
731cc290419Sopenharmony_ci        elif opt == 3:
732cc290419Sopenharmony_ci            prepare_source()
733cc290419Sopenharmony_ci        elif opt == 4:
734cc290419Sopenharmony_ci            if not GP.load_testcase():
735cc290419Sopenharmony_ci                return False
736cc290419Sopenharmony_ci        elif opt == 5:
737cc290419Sopenharmony_ci            return False
738cc290419Sopenharmony_ci        else:
739cc290419Sopenharmony_ci            return False
740cc290419Sopenharmony_ci
741cc290419Sopenharmony_ci
742cc290419Sopenharmony_cidef load_testcase():
743cc290419Sopenharmony_ci    if not GP.load_testcase:
744cc290419Sopenharmony_ci        print("load testcase failed")
745cc290419Sopenharmony_ci        return False
746cc290419Sopenharmony_ci    print("load testcase success")
747cc290419Sopenharmony_ci    return True
748cc290419Sopenharmony_ci
749cc290419Sopenharmony_ci
750cc290419Sopenharmony_cidef check_library_installation(library_name):
751cc290419Sopenharmony_ci    try:
752cc290419Sopenharmony_ci        pkg_resources.get_distribution(library_name)
753cc290419Sopenharmony_ci        return 0
754cc290419Sopenharmony_ci    except pkg_resources.DistributionNotFound:
755cc290419Sopenharmony_ci        print(f"\n\n{library_name} is not installed.\n\n")
756cc290419Sopenharmony_ci        print(f"try to use command below:")
757cc290419Sopenharmony_ci        print(f"pip install {library_name}")
758cc290419Sopenharmony_ci        return 1
759cc290419Sopenharmony_ci
760cc290419Sopenharmony_ci
761cc290419Sopenharmony_cidef check_subprocess_cmd(cmd, process_num, timeout):
762cc290419Sopenharmony_ci
763cc290419Sopenharmony_ci    for i in range(process_num):
764cc290419Sopenharmony_ci        p = subprocess.Popen(cmd.split())
765cc290419Sopenharmony_ci    try:
766cc290419Sopenharmony_ci        p.wait(timeout=5)
767cc290419Sopenharmony_ci    except subprocess.TimeoutExpired:
768cc290419Sopenharmony_ci        p.kill()
769cc290419Sopenharmony_ci
770cc290419Sopenharmony_ci
771cc290419Sopenharmony_cidef check_process_mixed(process_num, timeout, local, remote):
772cc290419Sopenharmony_ci    multi_num = process_num
773cc290419Sopenharmony_ci    list_send = []
774cc290419Sopenharmony_ci    list_recv = []
775cc290419Sopenharmony_ci    sizes = {"small", "medium", "empty"}
776cc290419Sopenharmony_ci    for i in range(multi_num):
777cc290419Sopenharmony_ci        for size in sizes:
778cc290419Sopenharmony_ci            cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}"
779cc290419Sopenharmony_ci            cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}"
780cc290419Sopenharmony_ci            list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, )))
781cc290419Sopenharmony_ci            list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, )))
782cc290419Sopenharmony_ci            logging.info(f"RESULT:{cmd_send}")  # 打印命令的输出
783cc290419Sopenharmony_ci    for send in list_send:
784cc290419Sopenharmony_ci        wait_time = random.uniform(0, 1)
785cc290419Sopenharmony_ci        send.start()
786cc290419Sopenharmony_ci        time.sleep(wait_time)
787cc290419Sopenharmony_ci    for send in list_send:
788cc290419Sopenharmony_ci        send.join()
789cc290419Sopenharmony_ci
790cc290419Sopenharmony_ci    for recv in list_recv:
791cc290419Sopenharmony_ci        wait_time = random.uniform(0, 1)
792cc290419Sopenharmony_ci        recv.start()
793cc290419Sopenharmony_ci        time.sleep(wait_time)
794cc290419Sopenharmony_ci    for recv in list_recv:
795cc290419Sopenharmony_ci        recv.join()
796cc290419Sopenharmony_ci        wait_time = random.uniform(0, 1)
797cc290419Sopenharmony_ci        time.sleep(wait_time)
798cc290419Sopenharmony_ci
799cc290419Sopenharmony_ci
800cc290419Sopenharmony_cidef execute_lines_in_file(file_path):
801cc290419Sopenharmony_ci    if not os.path.exists(file_path):
802cc290419Sopenharmony_ci        flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
803cc290419Sopenharmony_ci        modes = stat.S_IWUSR | stat.S_IRUSR
804cc290419Sopenharmony_ci        with os.fdopen(os.open(file_path, flags, modes), 'w') as file:
805cc290419Sopenharmony_ci            file.write("1,hdc shell ls")
806cc290419Sopenharmony_ci    with open(file_path, 'r') as file:
807cc290419Sopenharmony_ci        lines = file.readlines()
808cc290419Sopenharmony_ci        for line in lines:
809cc290419Sopenharmony_ci            test_time = line.split(',')[0]
810cc290419Sopenharmony_ci            test_cmd = line.split(',')[1]
811cc290419Sopenharmony_ci            pattern = r"^hdc"
812cc290419Sopenharmony_ci            match = re.search(pattern, test_cmd)
813cc290419Sopenharmony_ci            if match:
814cc290419Sopenharmony_ci                result = test_cmd.replace(match.group(0), "").lstrip()
815cc290419Sopenharmony_ci                test_cmd = f"{GP.hdc_head} {result}"
816cc290419Sopenharmony_ci
817cc290419Sopenharmony_ci            for i in range(int(test_time)):
818cc290419Sopenharmony_ci                logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}")
819cc290419Sopenharmony_ci                output = subprocess.check_output(test_cmd.split()).decode()
820cc290419Sopenharmony_ci                logging.info(f"RESULT:{output}")  # 打印命令的输出
821cc290419Sopenharmony_ci
822cc290419Sopenharmony_ci
823cc290419Sopenharmony_cidef make_multiprocess_file(local, remote, mode, num, task_type):
824cc290419Sopenharmony_ci    if num < 1:
825cc290419Sopenharmony_ci        return False
826cc290419Sopenharmony_ci    if task_type == "file":
827cc290419Sopenharmony_ci        if mode == "send" :
828cc290419Sopenharmony_ci            file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)]
829cc290419Sopenharmony_ci        elif mode == "recv":
830cc290419Sopenharmony_ci            file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)]
831cc290419Sopenharmony_ci        else:
832cc290419Sopenharmony_ci            return False
833cc290419Sopenharmony_ci    if task_type == "dir":
834cc290419Sopenharmony_ci        if mode == "send" :
835cc290419Sopenharmony_ci            file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)]
836cc290419Sopenharmony_ci        elif mode == "recv":
837cc290419Sopenharmony_ci            file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)]
838cc290419Sopenharmony_ci        else:
839cc290419Sopenharmony_ci            return False
840cc290419Sopenharmony_ci    print(file_list[0])
841cc290419Sopenharmony_ci    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list]
842cc290419Sopenharmony_ci    print(f"{mode} target {num} start")
843cc290419Sopenharmony_ci    while(len(p_list)):
844cc290419Sopenharmony_ci        for p in p_list:
845cc290419Sopenharmony_ci            if p.poll() is not None:
846cc290419Sopenharmony_ci                stdout, stderr = p.communicate(timeout=512) # timeout wait 512s
847cc290419Sopenharmony_ci                if stderr:
848cc290419Sopenharmony_ci                    print(f"{stderr.decode()}")
849cc290419Sopenharmony_ci                if stdout:
850cc290419Sopenharmony_ci                    print(f"{stdout.decode()}")
851cc290419Sopenharmony_ci                if stdout.decode().find("FileTransfer finish") == -1:
852cc290419Sopenharmony_ci                    return False
853cc290419Sopenharmony_ci                p_list.remove(p)
854cc290419Sopenharmony_ci    res = 1
855cc290419Sopenharmony_ci    if task_type == "file":
856cc290419Sopenharmony_ci        for i in range(num):
857cc290419Sopenharmony_ci            if mode == "send":
858cc290419Sopenharmony_ci                if _check_file(local, f"{remote}_{i}"):
859cc290419Sopenharmony_ci                    res *= 1
860cc290419Sopenharmony_ci                else:
861cc290419Sopenharmony_ci                    res *= 0
862cc290419Sopenharmony_ci            elif mode == "recv":
863cc290419Sopenharmony_ci                if _check_file(f"{local}_{i}", f"{remote}_{i}"):
864cc290419Sopenharmony_ci                    res *= 1
865cc290419Sopenharmony_ci                else:
866cc290419Sopenharmony_ci                    res *= 0
867cc290419Sopenharmony_ci    if task_type == "dir":
868cc290419Sopenharmony_ci        for _ in range(num):
869cc290419Sopenharmony_ci            if mode == "send":
870cc290419Sopenharmony_ci                end_of_file_name = os.path.basename(local)
871cc290419Sopenharmony_ci                if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True):
872cc290419Sopenharmony_ci                    res *= 1
873cc290419Sopenharmony_ci                else:
874cc290419Sopenharmony_ci                    res *= 0
875cc290419Sopenharmony_ci            elif mode == "recv":
876cc290419Sopenharmony_ci                end_of_file_name = os.path.basename(remote)
877cc290419Sopenharmony_ci                local = os.path.join(local, end_of_file_name)
878cc290419Sopenharmony_ci                if check_dir(f"{local}", f"{remote}", is_single_dir=True):
879cc290419Sopenharmony_ci                    res *= 1
880cc290419Sopenharmony_ci                else:
881cc290419Sopenharmony_ci                    res *= 0
882cc290419Sopenharmony_ci    return res == 1
883cc290419Sopenharmony_ci
884cc290419Sopenharmony_ci
885cc290419Sopenharmony_cidef hdc_get_key(cmd):
886cc290419Sopenharmony_ci    test_cmd = f"{GP.hdc_head} {cmd}"
887cc290419Sopenharmony_ci    result = subprocess.check_output(test_cmd.split()).decode()
888cc290419Sopenharmony_ci    return result
889cc290419Sopenharmony_ci
890cc290419Sopenharmony_ci
891cc290419Sopenharmony_cidef start_subprocess_cmd(cmd, num, assert_out):
892cc290419Sopenharmony_ci    if num < 1:
893cc290419Sopenharmony_ci        return False
894cc290419Sopenharmony_ci    cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)]
895cc290419Sopenharmony_ci    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list]
896cc290419Sopenharmony_ci    logging.info(f"{cmd} target {num} start")
897cc290419Sopenharmony_ci    while(len(p_list)):
898cc290419Sopenharmony_ci        for p in p_list:
899cc290419Sopenharmony_ci            if p.poll() is not None:
900cc290419Sopenharmony_ci                stdout, stderr = p.communicate(timeout=512)
901cc290419Sopenharmony_ci                if stderr:
902cc290419Sopenharmony_ci                    logging.error(f"{stderr.decode()}")
903cc290419Sopenharmony_ci                if stdout:
904cc290419Sopenharmony_ci                    logging.info(f"{stdout.decode()}")
905cc290419Sopenharmony_ci                if assert_out is not None and stdout.decode().find(assert_out) == -1:
906cc290419Sopenharmony_ci                    return False
907cc290419Sopenharmony_ci                p_list.remove(p)
908cc290419Sopenharmony_ci    return True
909cc290419Sopenharmony_ci
910cc290419Sopenharmony_ci
911cc290419Sopenharmony_cidef check_hdc_version(cmd, version):
912cc290419Sopenharmony_ci
913cc290419Sopenharmony_ci    def _convert_version_to_hex(_version):
914cc290419Sopenharmony_ci        parts = _version.split("Ver: ")[1].split('.')
915cc290419Sopenharmony_ci        hex_version = ''.join(parts)
916cc290419Sopenharmony_ci        return int(hex_version, 16)
917cc290419Sopenharmony_ci
918cc290419Sopenharmony_ci    expected_version = _convert_version_to_hex(version)
919cc290419Sopenharmony_ci    cmd = f"{GP.hdc_head} -v"
920cc290419Sopenharmony_ci    print(f"\nexecuting command: {cmd}")
921cc290419Sopenharmony_ci    if version is not None: # check output valid
922cc290419Sopenharmony_ci        output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "")
923cc290419Sopenharmony_ci        real_version = _convert_version_to_hex(output)
924cc290419Sopenharmony_ci        print(f"--> output: {output}")
925cc290419Sopenharmony_ci        print(f"--> your local [{version}] is"
926cc290419Sopenharmony_ci            f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]"
927cc290419Sopenharmony_ci        )
928cc290419Sopenharmony_ci        return expected_version <= real_version
929cc290419Sopenharmony_ci
930cc290419Sopenharmony_ci
931cc290419Sopenharmony_cidef check_cmd_time(cmd, pattern, duration, times):
932cc290419Sopenharmony_ci    if times < 1:
933cc290419Sopenharmony_ci        print("times should be bigger than 0.")
934cc290419Sopenharmony_ci        return False
935cc290419Sopenharmony_ci    if pattern is None:
936cc290419Sopenharmony_ci        fetchable = True
937cc290419Sopenharmony_ci    else:
938cc290419Sopenharmony_ci        fetchable = False
939cc290419Sopenharmony_ci    start_time = time.time() * 1000
940cc290419Sopenharmony_ci    print(f"{cmd} start {start_time}")
941cc290419Sopenharmony_ci    res = []
942cc290419Sopenharmony_ci    for i in range(times):
943cc290419Sopenharmony_ci        start_in = time.time() * 1000
944cc290419Sopenharmony_ci        if pattern is None:
945cc290419Sopenharmony_ci            subprocess.check_output(f"{GP.hdc_head} {cmd}".split())
946cc290419Sopenharmony_ci        else:
947cc290419Sopenharmony_ci            assert check_shell(cmd, pattern, fetch=fetchable)
948cc290419Sopenharmony_ci        start_out = time.time() * 1000
949cc290419Sopenharmony_ci        res.append(start_out - start_in)
950cc290419Sopenharmony_ci
951cc290419Sopenharmony_ci    # 计算最大值、最小值和中位数
952cc290419Sopenharmony_ci    max_value = max(res)
953cc290419Sopenharmony_ci    min_value = min(res)
954cc290419Sopenharmony_ci    median_value = sorted(res)[len(res) // 2]
955cc290419Sopenharmony_ci
956cc290419Sopenharmony_ci    print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}")
957cc290419Sopenharmony_ci    print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}")
958cc290419Sopenharmony_ci    print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}")
959cc290419Sopenharmony_ci
960cc290419Sopenharmony_ci    end_time = time.time() * 1000
961cc290419Sopenharmony_ci
962cc290419Sopenharmony_ci    try:
963cc290419Sopenharmony_ci        timecost = int(end_time - start_time) / times
964cc290419Sopenharmony_ci        print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}")
965cc290419Sopenharmony_ci    except ZeroDivisionError:
966cc290419Sopenharmony_ci        print(f"除数为0")
967cc290419Sopenharmony_ci
968cc290419Sopenharmony_ci    if duration is None:
969cc290419Sopenharmony_ci        duration = 150 * 1.2
970cc290419Sopenharmony_ci    # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status
971cc290419Sopenharmony_ci    return timecost < duration
972cc290419Sopenharmony_ci
973cc290419Sopenharmony_ci
974cc290419Sopenharmony_cidef check_rom(baseline):
975cc290419Sopenharmony_ci
976cc290419Sopenharmony_ci    def _try_get_size(message):
977cc290419Sopenharmony_ci        try:
978cc290419Sopenharmony_ci            size = int(message.split('\t')[0])
979cc290419Sopenharmony_ci        except ValueError:
980cc290419Sopenharmony_ci            size = -9999 * 1024 # error size
981cc290419Sopenharmony_ci            print(f"try get size value error, from {message}")
982cc290419Sopenharmony_ci        return size
983cc290419Sopenharmony_ci
984cc290419Sopenharmony_ci    if baseline is None:
985cc290419Sopenharmony_ci        baseline = 2200
986cc290419Sopenharmony_ci    # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together
987cc290419Sopenharmony_ci    cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd"
988cc290419Sopenharmony_ci    result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode()
989cc290419Sopenharmony_ci    hdcd_size = _try_get_size(result_hdcd)
990cc290419Sopenharmony_ci    cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so"
991cc290419Sopenharmony_ci    result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode()
992cc290419Sopenharmony_ci    if "directory" in result_libhdc:
993cc290419Sopenharmony_ci        cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so"
994cc290419Sopenharmony_ci        result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode()
995cc290419Sopenharmony_ci        if "directory" in result_libhdc64:
996cc290419Sopenharmony_ci            libhdc_size = 0
997cc290419Sopenharmony_ci        else:
998cc290419Sopenharmony_ci            libhdc_size = _try_get_size(result_libhdc64)
999cc290419Sopenharmony_ci    else:
1000cc290419Sopenharmony_ci        libhdc_size = _try_get_size(result_libhdc)
1001cc290419Sopenharmony_ci    all_size = hdcd_size + libhdc_size
1002cc290419Sopenharmony_ci    GP.hdcd_rom = all_size
1003cc290419Sopenharmony_ci    if all_size < 0:
1004cc290419Sopenharmony_ci        GP.hdcd_rom = "error"
1005cc290419Sopenharmony_ci        return False
1006cc290419Sopenharmony_ci    else:
1007cc290419Sopenharmony_ci        GP.hdcd_rom = f"{all_size} KB"
1008cc290419Sopenharmony_ci    if all_size > baseline:
1009cc290419Sopenharmony_ci        print(f"rom size is {all_size}, overlimit baseline {baseline}")
1010cc290419Sopenharmony_ci        return False
1011cc290419Sopenharmony_ci    else:
1012cc290419Sopenharmony_ci        print(f"rom size is {all_size}, underlimit baseline {baseline}")
1013cc290419Sopenharmony_ci        return True
1014cc290419Sopenharmony_ci
1015cc290419Sopenharmony_ci
1016cc290419Sopenharmony_cidef run_command_with_timeout(command, timeout):
1017cc290419Sopenharmony_ci    try:
1018cc290419Sopenharmony_ci        result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
1019cc290419Sopenharmony_ci        return result.stdout.decode(), result.stderr.decode()
1020cc290419Sopenharmony_ci    except subprocess.TimeoutExpired:
1021cc290419Sopenharmony_ci        return None, "Command timed out"
1022cc290419Sopenharmony_ci    except subprocess.CalledProcessError as e:
1023cc290419Sopenharmony_ci        return None, e.stderr.decode()
1024