xref: /developtools/hdc/scripts/dev_hdc_test.py (revision cc290419)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2021 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# 0. 运行环境: python 3.10+, pytest
17# 1. 修改 GP 中字段
18# 2. pytest [-k case_name_pattern]
19#    eg. pytest -k file 执行方法名含 file 的用例
20
21# 打包
22# pip install pyinstaller
23# prepare assert source dir includes your data files
24# pyi-makespec  -D --add-data assert:assert dev_hdc_test.py
25# pyinstaller dev_hdc_test.spec
26# 执行 dev_hdc_test.exe
27
28import csv
29import hashlib
30import json
31import logging
32import os
33import random
34import re
35import stat
36import shutil
37import subprocess
38import sys
39import threading
40import time
41from multiprocessing import Process
42
43import pytest
44import pkg_resources
45
46
47class GP(object):
48    """ Global Parameters
49
50    customize here !!!
51    """
52    hdc_exe = "hdc"
53    local_path = "resource"
54    remote_path = "data/local/tmp"
55    remote_dir_path = "data/local/tmp/it_send_dir"
56    remote_ip = "auto"
57    remote_port = 8710
58    hdc_head = "hdc"
59    device_name = ""
60    targets = []
61    tmode = "usb"
62    changed_testcase = "n"
63    testcase_path = "ts_windows.csv"
64    loaded_testcase = 0
65    hdcd_rom = "not checked"
66
67    @classmethod
68    def init(cls):
69        logging.basicConfig(level=logging.INFO,
70                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
71                            datefmt='%d %b %Y %H:%M:%S',
72            )
73        logging.basicConfig(level=logging.WARNING,
74                            format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s',
75                            datefmt='%d %b %Y %H:%M:%S',
76                            )
77
78        if os.path.exists(".hdctester.conf"):
79            cls.load()
80            cls.start_host()
81            cls.list_targets()
82        else:
83            cls.set_options()
84            cls.print_options()
85            cls.start_host()
86            cls.dump()
87        return
88
89
90    @classmethod
91    def start_host(cls):
92        cmd = f"{cls.hdc_exe} start"
93        res = subprocess.call(cmd.split())
94        return res
95
96    @classmethod
97    def list_targets(cls):
98        try:
99            targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split()
100        except (OSError, IndexError):
101            targets = [b"failed to auto detect device"]
102            cls.targets = [targets[0].decode()]
103            return False
104        cls.targets = [t.decode() for t in targets]
105        return True
106
107
108    @classmethod
109    def get_device(cls):
110        cls.start_host()
111        cls.list_targets()
112        if len(cls.targets) > 1:
113            print("Multiple device detected, please select one:")
114            for i, t in enumerate(cls.targets):
115                print(f"{i+1}. {t}")
116            print("input the nums of the device above:")
117            cls.device_name = cls.targets[int(input()) - 1]
118        else:
119            cls.device_name = cls.targets[0]
120        if cls.device_name == "failed to auto detect device":
121            print("No device detected, please check your device connection")
122            return False
123        elif cls.device_name == "[empty]":
124            print("No hdc device detected.")
125            return False
126        cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}"
127        return True
128
129
130    @classmethod
131    def dump(cls):
132        try:
133            os.remove(".hdctester.conf")
134        except OSError:
135            pass
136        content = filter(
137            lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items())
138        json_str = json.dumps(dict(content))
139        fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755)
140        os.write(fd, json_str.encode())
141        os.close(fd)
142        return True
143
144
145    @classmethod
146    def load(cls):
147        with open(".hdctester.conf") as f:
148            content = json.load(f)
149            cls.hdc_exe = content.get("hdc_exe")
150            cls.local_path = content.get("local_path")
151            cls.remote_path = content.get("remote_path")
152            cls.remote_ip = content.get("remote_ip")
153            cls.hdc_head = content.get("hdc_head")
154            cls.tmode = content.get("tmode")
155            cls.device_name = content.get("device_name")
156            cls.changed_testcase = content.get("changed_testcase")
157            cls.testcase_path = content.get("testcase_path")
158            cls.loaded_testcase = content.get("load_testcase")
159        return True
160
161
162    @classmethod
163    def print_options(cls):
164        info = "HDC Tester Default Options: \n\n" \
165        + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \
166        + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \
167        + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \
168        + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \
169        + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \
170        + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \
171        + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \
172        + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \
173        + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \
174        + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \
175        + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n"
176        print(info)
177
178
179    @classmethod
180    def tconn_tcp(cls):
181        res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode()
182        if "Connect OK" in res:
183            return True
184        else:
185            return False
186
187
188    @classmethod
189    def set_options(cls):
190        if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip():
191            cls.hdc_exe = opt
192        if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip():
193            cls.local_path = opt
194        if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip():
195            cls.remote_path = opt
196        if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip():
197            cls.remote_ip = opt
198        if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip():
199            cls.remote_port = int(opt)
200        if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip():
201            cls.device_name = opt
202        if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip():
203            cls.tmode = opt
204        if cls.tmode == "usb":
205            ret = cls.get_device()
206            if ret:
207                print("USB device detected.")
208        elif cls.tconn_tcp():
209            cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}"
210        else:
211            print(f"tconn {cls.remote_ip}:{cls.remote_port} failed")
212            return False
213        return True
214
215
216    @classmethod
217    def change_testcase(cls):
218        if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip():
219            cls.changed_testcase = opt
220            if opt == "n":
221                return False
222        if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip():
223            cls.testcase_path = os.path.join(opt)
224        cls.print_options()
225        return True
226
227
228    @classmethod
229    def load_testcase(cls):
230        print("this fuction will coming soon.")
231        return False
232
233    @classmethod
234    def get_version(cls):
235        version = f"v1.0.4a"
236        return version
237
238
239def pytest_run(args):
240    file_list = []
241    file_list.append("entry-default-signed-debug.hap")
242    file_list.append("libA_v10001.hsp")
243    file_list.append("libB_v10001.hsp")
244    for file in file_list:
245        if not os.path.exists(os.path.join(GP.local_path, file)):
246            print(f"No {file} File!")
247            print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。")
248            print("Please unzip package.zip to resource directory, please rerun after operation.")
249            input("[ENTER]")
250            return
251    gen_package_dir()
252    start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
253    if args.count is not None:
254        for i in range(args.count):
255            print(f"------------The {i}/{args.count} Test-------------")
256            timestamp = time.time()
257            pytest_args = [
258                '--verbose', args.verbose,
259                '--report=report.html',
260                '--title=HDC Test Report 'f"{GP.get_version()}",
261                '--tester=tester001',
262                '--template=1',
263                '--desc='f"{args.verbose}:{args.desc}"
264            ]
265            pytest.main(pytest_args)
266    end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
267    report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time()))
268    report_dir = os.path.join(os.getcwd(), "reports")
269    report_file = os.path.join(report_dir, f"{report_time}report.html")
270    print(f"Test over, the script version is {GP.get_version()},"
271        f" start at {start_time}, end at {end_time} \n"
272        f"=======>The device hdcd ROM size is {GP.hdcd_rom}.\n"
273        f"=======>{report_file} is saved. \n"
274    )
275    input("=======>press [Enter] key to Show logs.")
276
277
278def rmdir(path):
279    try:
280        if sys.platform == "win32":
281            if os.path.isfile(path):
282                os.remove(path)
283            else:
284                shutil.rmtree(path)
285        else:
286            subprocess.call(f"rm -rf {path}".split())
287    except OSError:
288        print(f"Error: {path} : cannot remove")
289        pass
290
291
292def get_local_path(path):
293    return os.path.join(GP.local_path, path)
294
295
296def get_remote_path(path):
297    return f"{GP.remote_path}/{path}"
298
299
300def get_local_md5(local):
301    md5_hash = hashlib.md5()
302    with open(local, "rb") as f:
303        for byte_block in iter(lambda: f.read(4096), b""):
304            md5_hash.update(byte_block)
305    return md5_hash.hexdigest()
306
307
308def check_shell(cmd, pattern=None, fetch=False):
309    cmd = f"{GP.hdc_head} {cmd}"
310    print(f"\nexecuting command: {cmd}")
311    if pattern: # check output valid
312        output = subprocess.check_output(cmd.split()).decode()
313        res = pattern in output
314        print(f"--> output: {output}")
315        print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output")
316        return res
317    elif fetch:
318        output = subprocess.check_output(cmd.split()).decode()
319        print(f"--> output: {output}")
320        return output
321    else: # check cmd run successfully
322        return subprocess.check_call(cmd.split()) == 0
323
324
325def get_shell_result(cmd, pattern=None, fetch=False):
326    cmd = f"{GP.hdc_head} {cmd}"
327    print(f"\nexecuting command: {cmd}")
328    return subprocess.check_output(cmd.split()).decode()
329
330
331def check_rate(cmd, expected_rate):
332    send_result = get_shell_result(cmd)
333    rate = float(send_result.split("rate:")[1].split("kB/s")[0])
334    return rate > expected_rate
335
336
337def check_dir(local, remote, is_single_dir=False):
338    def _get_md5sum(remote, is_single_dir=False):
339        if is_single_dir:
340            cmd = f"{GP.hdc_head} shell md5sum {remote}/*"
341        else:
342            cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;'
343        result = subprocess.check_output(cmd.split()).decode()
344        return result
345
346    def _calculate_md5(file_path):
347        md5 = hashlib.md5()
348        try:
349            with open(file_path, 'rb') as f:
350                for chunk in iter(lambda: f.read(4096), b""):
351                    md5.update(chunk)
352            return md5.hexdigest()
353        except PermissionError:
354            return "PermissionError"
355        except FileNotFoundError:
356            return "FileNotFoundError"
357    print("remote:" + remote)
358    output = _get_md5sum(remote)
359    print(output)
360
361    result = 1
362    for line in output.splitlines():
363        if len(line) < 32: # length of MD5
364            continue
365        expected_md5, file_name = line.split()[:2]
366        if is_single_dir:
367            file_name = file_name.replace(f"{remote}", "")
368        elif GP.remote_path in remote:
369            file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\")
370        else:
371            file_name = file_name.split(remote)[1].replace("/", "\\")
372        file_path = os.path.join(os.getcwd(), GP.local_path) + file_name  # 构建完整的文件路径
373        if is_single_dir:
374            file_path = os.path.join(os.getcwd(), local) + file_name
375        print(file_path)
376        actual_md5 = _calculate_md5(file_path)
377        print(f"Expected: {expected_md5}")
378        print(f"Actual: {actual_md5}")
379        print(f"MD5 matched {file_name}")
380        if actual_md5 != expected_md5:
381            print(f"[Fail]MD5 mismatch for {file_name}")
382            result *= 0
383
384    return (result == 1)
385
386
387def _check_file(local, remote):
388    if remote.startswith("/proc"):
389        local_size = os.path.getsize(local)
390        if local_size > 0:
391            return True
392        else:
393            return False
394    else:
395        cmd = f"shell md5sum {remote}"
396        local_md5 = get_local_md5(local)
397        return check_shell(cmd, local_md5)
398
399
400def _check_app_installed(bundle, is_shared=False):
401    dump = "dump-shared" if is_shared else "dump"
402    cmd = f"shell bm {dump} -a"
403    return check_shell(cmd, bundle)
404
405
406def check_hdc_targets():
407    cmd = f"{GP.hdc_head} list targets"
408    print(GP.device_name)
409    return check_shell(cmd, GP.device_name)
410
411
412def check_file_send(local, remote):
413    local_path = os.path.join(GP.local_path, local)
414    remote_path = f"{GP.remote_path}/{remote}"
415    cmd = f"file send {local_path} {remote_path}"
416    return check_shell(cmd) and _check_file(local_path, remote_path)
417
418
419def check_file_recv(remote, local):
420    local_path = os.path.join(GP.local_path, local)
421    remote_path = f"{GP.remote_path}/{remote}"
422    cmd = f"file recv {remote_path} {local_path}"
423    return check_shell(cmd) and _check_file(local_path, remote_path)
424
425
426def check_app_install(app, bundle, args=""):
427    app = os.path.join(GP.local_path, app)
428    install_cmd = f"install {args} {app}"
429    if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")):
430        return check_shell(install_cmd, "failed to install bundle")
431    else:
432        return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args)
433
434
435def check_app_uninstall(bundle, args=""):
436    uninstall_cmd = f"uninstall {args} {bundle}"
437    return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args)
438
439
440def check_app_install_multi(tables, args=""):
441    apps = []
442    bundles = []
443    for app, bundle in tables.items() :
444        app = os.path.join(GP.local_path, app)
445        apps.append(app)
446        bundles.append(bundle)
447
448    apps_str = " ".join(apps)
449    install_cmd = f"install {args} {apps_str}"
450
451    if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str))
452        or (args == "" and 0 == apps_str.count(".hap"))):
453        if not check_shell(install_cmd, "failed to install bundle"):
454            return False
455    else:
456        if not check_shell(install_cmd, "successfully"):
457            return False
458
459        for bundle in bundles:
460            if not _check_app_installed(bundle, "s" in args):
461                return False
462
463    return True
464
465
466def check_app_uninstall_multi(tables, args=""):
467    for app, bundle in tables.items() :
468        if not check_app_uninstall(bundle, args):
469            return False
470
471    return True
472
473
474def check_hdc_cmd(cmd, pattern=None, **args):
475    if cmd.startswith("file"):
476        if not check_shell(cmd, "FileTransfer finish"):
477            return False
478        if cmd.startswith("file send"):
479            local, remote = cmd.split()[-2:]
480        else:
481            remote, local = cmd.split()[-2:]
482        if os.path.isfile(local):
483            return _check_file(local, remote)
484        else:
485            return check_dir(local, remote)
486    elif cmd.startswith("install"):
487        bundle = args.get("bundle", "invalid")
488        opt = " ".join(cmd.split()[1:-1])
489        return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt)
490
491    elif cmd.startswith("uninstall"):
492        bundle = cmd.split()[-1]
493        opt = " ".join(cmd.split()[1:-1])
494        return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt)
495
496    else:
497        return check_shell(cmd, pattern, **args)
498
499
500def check_soft_local(local_source, local_soft, remote):
501    cmd = f"file send {local_soft} {remote}"
502    if not check_shell(cmd, "FileTransfer finish"):
503        return False
504    return _check_file(local_source, remote)
505
506
507def check_soft_remote(remote_source, remote_soft, local_recv):
508    check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}")
509    cmd = f"file recv {remote_soft} {local_recv}"
510    if not check_shell(cmd, "FileTransfer finish"):
511        return False
512    return _check_file(local_recv, get_remote_path(remote_source))
513
514
515def switch_usb():
516    res = check_hdc_cmd("tmode usb")
517    time.sleep(3)
518    if res:
519        GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}"
520    return res
521
522
523def copy_file(src, dst):
524    try:
525        shutil.copy2(src, dst)
526        print(f"File copied successfully from {src} to {dst}")
527    except IOError as e:
528        print(f"Unable to copy file. {e}")
529    except Exception as e:
530        print(f"Unexpected error: {e}")
531
532
533def switch_tcp():
534    if not GP.remote_ip: # skip tcp check
535        print("!!! remote_ip is none, skip tcp check !!!")
536        return True
537    if GP.remote_ip == "auto":
538        ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True)
539        if not ipconf:
540            print("!!! device ip not found, skip tcp check !!!")
541            return True
542        GP.remote_ip = ipconf.split(":")[1].split()[0]
543        print(f"fetch remote ip: {GP.remote_ip}")
544    ret = check_hdc_cmd(f"tmode port {GP.remote_port}")
545    if ret:
546        time.sleep(3)
547    res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK")
548    if res:
549        GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}"
550    return res
551
552
553def select_cmd():
554    msg = "1) Proceed tester\n" \
555        + "2) Customize tester\n" \
556        + "3) Setup files for transfer\n" \
557        + "4) Load custom testcase(default unused) \n" \
558        + "5) Exit\n" \
559        + ">> "
560
561    while True:
562        opt = input(msg).strip()
563        if len(opt) == 1 and '1' <= opt <= '5':
564            return opt
565
566
567def gen_file(path, size):
568    index = 0
569    path = os.path.abspath(path)
570    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
571
572    while index < size:
573        os.write(fd, os.urandom(1024))
574        index += 1024
575    os.close(fd)
576
577
578def gen_zero_file(path, size):
579    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
580    os.write(fd, b'0' * size)
581    os.close(fd)
582
583
584def create_file_with_size(path, size):
585    fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755)
586    os.write(fd, b'\0' * size)
587    os.close(fd)
588
589
590def gen_file_set():
591    print("generating empty file ...")
592    gen_file(os.path.join(GP.local_path, "empty"), 0)
593
594    print("generating small file ...")
595    gen_file(os.path.join(GP.local_path, "small"), 102400)
596
597    print("generating medium file ...")
598    gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2)
599
600    print("generating large file ...")
601    gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3)
602
603    print("generating soft link ...")
604    try:
605        os.symlink("small", os.path.join(GP.local_path, "soft_small"))
606    except FileExistsError:
607        print("soft_small already exists")
608
609    print("generating text file ...")
610    gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2)
611
612    print("generating package dir ...")
613    if not os.path.exists(os.path.join(GP.local_path, "package")):
614        os.mkdir(os.path.join(GP.local_path, "package"))
615    for i in range(1, 6):
616        gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2)
617
618    print("generating deep dir ...")
619    deepth = 4
620    deep_path = os.path.join(GP.local_path, "deep_dir")
621    if not os.path.exists(deep_path):
622        os.mkdir(deep_path)
623    for deep in range(deepth):
624        deep_path = os.path.join(deep_path, f"deep_dir{deep}")
625        if not os.path.exists(deep_path):
626            os.mkdir(deep_path)
627    gen_file(os.path.join(deep_path, "deep"), 102400)
628
629    print("generating dir with file ...")
630    dir_path = os.path.join(GP.local_path, "problem_dir")
631    rmdir(dir_path)
632    os.mkdir(dir_path)
633    gen_file(os.path.join(dir_path, "small2"), 102400)
634
635    fuzz_count = 47 # 47 is the count that circulated the file transfer
636    data_unit = 1024 # 1024 is the size that circulated the file transfer
637    data_extra = 936 # 936 is the size that cased the extra file transfer
638    for i in range(fuzz_count):
639        create_file_with_size(
640            os.path.join(
641                dir_path, f"file_{i * data_unit+data_extra}.dat"
642                ), i * data_unit + data_extra
643            )
644
645    print("generating empty dir ...")
646    dir_path = os.path.join(GP.local_path, "empty_dir")
647    rmdir(dir_path)
648    os.mkdir(dir_path)
649
650    print("generating version file ...")
651    gen_file(os.path.join(GP.local_path, GP.get_version()), 0)
652
653
654def gen_package_dir():
655    print("generating app dir ...")
656    dir_path = os.path.join(GP.local_path, "app_dir")
657    rmdir(dir_path)
658    os.mkdir(dir_path)
659    app = os.path.join(GP.local_path, "entry-default-signed-debug.hap")
660    dst_dir = os.path.join(GP.local_path, "app_dir")
661    if not os.path.exists(app):
662        print(f"Source file {app} does not exist.")
663    else:
664        copy_file(app, dst_dir)
665
666
667def prepare_source():
668    if os.path.exists(os.path.join(GP.local_path, GP.get_version())):
669        print(f"hdc test version is {GP.get_version}, check ok, skip prepare.")
670        return
671    print(f"in prepare {GP.local_path},wait for 2 mins.")
672    current_path = os.getcwd()
673
674    if os.path.exists(GP.local_path):
675        #打开local_path遍历其中的文件,删除hap hsp以外的所有文件
676        for file in os.listdir(GP.local_path):
677            if file.endswith(".hap") or file.endswith(".hsp"):
678                continue
679            file_path = os.path.join(GP.local_path, file)
680            rmdir(file_path)
681    else:
682        os.mkdir(GP.local_path)
683
684    gen_file_set()
685
686
687def add_prepare_source():
688    deep_path = os.path.join(GP.local_path, "deep_test_dir")
689    print("generating deep test dir ...")
690    absolute_path = os.path.abspath(__file__)
691    deepth = (255 - 9 - len(absolute_path)) % 14
692    os.mkdir(deep_path)
693    for deep in range(deepth):
694        deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
695        os.mkdir(deep_path)
696    gen_file(os.path.join(deep_path, "deep_test"), 102400)
697
698    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
699    print("generating recv test dir ...")
700    os.mkdir(recv_dir)
701
702
703def update_source():
704    deep_path = os.path.join(GP.local_path, "deep_test_dir")
705    if not os.path.exists(deep_path):
706        print("generating deep test dir ...")
707        absolute_path = os.path.abspath(__file__)
708        deepth = (255 - 9 - len(absolute_path)) % 14
709        os.mkdir(deep_path)
710        for deep in range(deepth):
711            deep_path = os.path.join(deep_path, f"deep_test_dir{deep}")
712            os.mkdir(deep_path)
713        gen_file(os.path.join(deep_path, "deep_test"), 102400)
714
715    recv_dir = os.path.join(GP.local_path, "recv_test_dir")
716    if not os.path.exists(recv_dir):
717        print("generating recv test dir ...")
718        os.mkdir(recv_dir)
719
720
721def setup_tester():
722    while True:
723        GP.print_options()
724        opt = int(select_cmd())
725        if opt == 1:
726            return True
727        elif opt == 2:
728            if not GP.set_options():
729                return False
730            GP.dump()
731        elif opt == 3:
732            prepare_source()
733        elif opt == 4:
734            if not GP.load_testcase():
735                return False
736        elif opt == 5:
737            return False
738        else:
739            return False
740
741
742def load_testcase():
743    if not GP.load_testcase:
744        print("load testcase failed")
745        return False
746    print("load testcase success")
747    return True
748
749
750def check_library_installation(library_name):
751    try:
752        pkg_resources.get_distribution(library_name)
753        return 0
754    except pkg_resources.DistributionNotFound:
755        print(f"\n\n{library_name} is not installed.\n\n")
756        print(f"try to use command below:")
757        print(f"pip install {library_name}")
758        return 1
759
760
761def check_subprocess_cmd(cmd, process_num, timeout):
762
763    for i in range(process_num):
764        p = subprocess.Popen(cmd.split())
765    try:
766        p.wait(timeout=5)
767    except subprocess.TimeoutExpired:
768        p.kill()
769
770
771def check_process_mixed(process_num, timeout, local, remote):
772    multi_num = process_num
773    list_send = []
774    list_recv = []
775    sizes = {"small", "medium", "empty"}
776    for i in range(multi_num):
777        for size in sizes:
778            cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}"
779            cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}"
780            list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, )))
781            list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, )))
782            logging.info(f"RESULT:{cmd_send}")  # 打印命令的输出
783    for send in list_send:
784        wait_time = random.uniform(0, 1)
785        send.start()
786        time.sleep(wait_time)
787    for send in list_send:
788        send.join()
789
790    for recv in list_recv:
791        wait_time = random.uniform(0, 1)
792        recv.start()
793        time.sleep(wait_time)
794    for recv in list_recv:
795        recv.join()
796        wait_time = random.uniform(0, 1)
797        time.sleep(wait_time)
798
799
800def execute_lines_in_file(file_path):
801    if not os.path.exists(file_path):
802        flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
803        modes = stat.S_IWUSR | stat.S_IRUSR
804        with os.fdopen(os.open(file_path, flags, modes), 'w') as file:
805            file.write("1,hdc shell ls")
806    with open(file_path, 'r') as file:
807        lines = file.readlines()
808        for line in lines:
809            test_time = line.split(',')[0]
810            test_cmd = line.split(',')[1]
811            pattern = r"^hdc"
812            match = re.search(pattern, test_cmd)
813            if match:
814                result = test_cmd.replace(match.group(0), "").lstrip()
815                test_cmd = f"{GP.hdc_head} {result}"
816
817            for i in range(int(test_time)):
818                logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}")
819                output = subprocess.check_output(test_cmd.split()).decode()
820                logging.info(f"RESULT:{output}")  # 打印命令的输出
821
822
823def make_multiprocess_file(local, remote, mode, num, task_type):
824    if num < 1:
825        return False
826    if task_type == "file":
827        if mode == "send" :
828            file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)]
829        elif mode == "recv":
830            file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)]
831        else:
832            return False
833    if task_type == "dir":
834        if mode == "send" :
835            file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)]
836        elif mode == "recv":
837            file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)]
838        else:
839            return False
840    print(file_list[0])
841    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list]
842    print(f"{mode} target {num} start")
843    while(len(p_list)):
844        for p in p_list:
845            if p.poll() is not None:
846                stdout, stderr = p.communicate(timeout=512) # timeout wait 512s
847                if stderr:
848                    print(f"{stderr.decode()}")
849                if stdout:
850                    print(f"{stdout.decode()}")
851                if stdout.decode().find("FileTransfer finish") == -1:
852                    return False
853                p_list.remove(p)
854    res = 1
855    if task_type == "file":
856        for i in range(num):
857            if mode == "send":
858                if _check_file(local, f"{remote}_{i}"):
859                    res *= 1
860                else:
861                    res *= 0
862            elif mode == "recv":
863                if _check_file(f"{local}_{i}", f"{remote}_{i}"):
864                    res *= 1
865                else:
866                    res *= 0
867    if task_type == "dir":
868        for _ in range(num):
869            if mode == "send":
870                end_of_file_name = os.path.basename(local)
871                if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True):
872                    res *= 1
873                else:
874                    res *= 0
875            elif mode == "recv":
876                end_of_file_name = os.path.basename(remote)
877                local = os.path.join(local, end_of_file_name)
878                if check_dir(f"{local}", f"{remote}", is_single_dir=True):
879                    res *= 1
880                else:
881                    res *= 0
882    return res == 1
883
884
885def hdc_get_key(cmd):
886    test_cmd = f"{GP.hdc_head} {cmd}"
887    result = subprocess.check_output(test_cmd.split()).decode()
888    return result
889
890
891def start_subprocess_cmd(cmd, num, assert_out):
892    if num < 1:
893        return False
894    cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)]
895    p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list]
896    logging.info(f"{cmd} target {num} start")
897    while(len(p_list)):
898        for p in p_list:
899            if p.poll() is not None:
900                stdout, stderr = p.communicate(timeout=512)
901                if stderr:
902                    logging.error(f"{stderr.decode()}")
903                if stdout:
904                    logging.info(f"{stdout.decode()}")
905                if assert_out is not None and stdout.decode().find(assert_out) == -1:
906                    return False
907                p_list.remove(p)
908    return True
909
910
911def check_hdc_version(cmd, version):
912
913    def _convert_version_to_hex(_version):
914        parts = _version.split("Ver: ")[1].split('.')
915        hex_version = ''.join(parts)
916        return int(hex_version, 16)
917
918    expected_version = _convert_version_to_hex(version)
919    cmd = f"{GP.hdc_head} -v"
920    print(f"\nexecuting command: {cmd}")
921    if version is not None: # check output valid
922        output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "")
923        real_version = _convert_version_to_hex(output)
924        print(f"--> output: {output}")
925        print(f"--> your local [{version}] is"
926            f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]"
927        )
928        return expected_version <= real_version
929
930
931def check_cmd_time(cmd, pattern, duration, times):
932    if times < 1:
933        print("times should be bigger than 0.")
934        return False
935    if pattern is None:
936        fetchable = True
937    else:
938        fetchable = False
939    start_time = time.time() * 1000
940    print(f"{cmd} start {start_time}")
941    res = []
942    for i in range(times):
943        start_in = time.time() * 1000
944        if pattern is None:
945            subprocess.check_output(f"{GP.hdc_head} {cmd}".split())
946        else:
947            assert check_shell(cmd, pattern, fetch=fetchable)
948        start_out = time.time() * 1000
949        res.append(start_out - start_in)
950
951    # 计算最大值、最小值和中位数
952    max_value = max(res)
953    min_value = min(res)
954    median_value = sorted(res)[len(res) // 2]
955
956    print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}")
957    print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}")
958    print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}")
959
960    end_time = time.time() * 1000
961
962    try:
963        timecost = int(end_time - start_time) / times
964        print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}")
965    except ZeroDivisionError:
966        print(f"除数为0")
967
968    if duration is None:
969        duration = 150 * 1.2
970    # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status
971    return timecost < duration
972
973
974def check_rom(baseline):
975
976    def _try_get_size(message):
977        try:
978            size = int(message.split('\t')[0])
979        except ValueError:
980            size = -9999 * 1024 # error size
981            print(f"try get size value error, from {message}")
982        return size
983
984    if baseline is None:
985        baseline = 2200
986    # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together
987    cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd"
988    result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode()
989    hdcd_size = _try_get_size(result_hdcd)
990    cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so"
991    result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode()
992    if "directory" in result_libhdc:
993        cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so"
994        result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode()
995        if "directory" in result_libhdc64:
996            libhdc_size = 0
997        else:
998            libhdc_size = _try_get_size(result_libhdc64)
999    else:
1000        libhdc_size = _try_get_size(result_libhdc)
1001    all_size = hdcd_size + libhdc_size
1002    GP.hdcd_rom = all_size
1003    if all_size < 0:
1004        GP.hdcd_rom = "error"
1005        return False
1006    else:
1007        GP.hdcd_rom = f"{all_size} KB"
1008    if all_size > baseline:
1009        print(f"rom size is {all_size}, overlimit baseline {baseline}")
1010        return False
1011    else:
1012        print(f"rom size is {all_size}, underlimit baseline {baseline}")
1013        return True
1014
1015
1016def run_command_with_timeout(command, timeout):
1017    try:
1018        result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout)
1019        return result.stdout.decode(), result.stderr.decode()
1020    except subprocess.TimeoutExpired:
1021        return None, "Command timed out"
1022    except subprocess.CalledProcessError as e:
1023        return None, e.stderr.decode()
1024