#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2021 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # 0. 运行环境: python 3.10+, pytest # 1. 修改 GP 中字段 # 2. pytest [-k case_name_pattern] # eg. pytest -k file 执行方法名含 file 的用例 # 打包 # pip install pyinstaller # prepare assert source dir includes your data files # pyi-makespec -D --add-data assert:assert dev_hdc_test.py # pyinstaller dev_hdc_test.spec # 执行 dev_hdc_test.exe import csv import hashlib import json import logging import os import random import re import stat import shutil import subprocess import sys import threading import time from multiprocessing import Process import pytest import pkg_resources class GP(object): """ Global Parameters customize here !!! """ hdc_exe = "hdc" local_path = "resource" remote_path = "data/local/tmp" remote_dir_path = "data/local/tmp/it_send_dir" remote_ip = "auto" remote_port = 8710 hdc_head = "hdc" device_name = "" targets = [] tmode = "usb" changed_testcase = "n" testcase_path = "ts_windows.csv" loaded_testcase = 0 hdcd_rom = "not checked" @classmethod def init(cls): logging.basicConfig(level=logging.INFO, format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', datefmt='%d %b %Y %H:%M:%S', ) logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', datefmt='%d %b %Y %H:%M:%S', ) if os.path.exists(".hdctester.conf"): cls.load() cls.start_host() cls.list_targets() else: cls.set_options() cls.print_options() cls.start_host() cls.dump() return @classmethod def start_host(cls): cmd = f"{cls.hdc_exe} start" res = subprocess.call(cmd.split()) return res @classmethod def list_targets(cls): try: targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() except (OSError, IndexError): targets = [b"failed to auto detect device"] cls.targets = [targets[0].decode()] return False cls.targets = [t.decode() for t in targets] return True @classmethod def get_device(cls): cls.start_host() cls.list_targets() if len(cls.targets) > 1: print("Multiple device detected, please select one:") for i, t in enumerate(cls.targets): print(f"{i+1}. {t}") print("input the nums of the device above:") cls.device_name = cls.targets[int(input()) - 1] else: cls.device_name = cls.targets[0] if cls.device_name == "failed to auto detect device": print("No device detected, please check your device connection") return False elif cls.device_name == "[empty]": print("No hdc device detected.") return False cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" return True @classmethod def dump(cls): try: os.remove(".hdctester.conf") except OSError: pass content = filter( lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) json_str = json.dumps(dict(content)) fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, json_str.encode()) os.close(fd) return True @classmethod def load(cls): with open(".hdctester.conf") as f: content = json.load(f) cls.hdc_exe = content.get("hdc_exe") cls.local_path = content.get("local_path") cls.remote_path = content.get("remote_path") cls.remote_ip = content.get("remote_ip") cls.hdc_head = content.get("hdc_head") cls.tmode = content.get("tmode") cls.device_name = content.get("device_name") cls.changed_testcase = content.get("changed_testcase") cls.testcase_path = content.get("testcase_path") cls.loaded_testcase = content.get("load_testcase") return True @classmethod def print_options(cls): info = "HDC Tester Default Options: \n\n" \ + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" print(info) @classmethod def tconn_tcp(cls): res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() if "Connect OK" in res: return True else: return False @classmethod def set_options(cls): if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): cls.hdc_exe = opt if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): cls.local_path = opt if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): cls.remote_path = opt if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): cls.remote_ip = opt if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): cls.remote_port = int(opt) if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): cls.device_name = opt if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): cls.tmode = opt if cls.tmode == "usb": ret = cls.get_device() if ret: print("USB device detected.") elif cls.tconn_tcp(): cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" else: print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") return False return True @classmethod def change_testcase(cls): if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): cls.changed_testcase = opt if opt == "n": return False if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): cls.testcase_path = os.path.join(opt) cls.print_options() return True @classmethod def load_testcase(cls): print("this fuction will coming soon.") return False @classmethod def get_version(cls): version = f"v1.0.4a" return version def pytest_run(args): file_list = [] file_list.append("entry-default-signed-debug.hap") file_list.append("libA_v10001.hsp") file_list.append("libB_v10001.hsp") for file in file_list: if not os.path.exists(os.path.join(GP.local_path, file)): print(f"No {file} File!") print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。") print("Please unzip package.zip to resource directory, please rerun after operation.") input("[ENTER]") return gen_package_dir() start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) if args.count is not None: for i in range(args.count): print(f"------------The {i}/{args.count} Test-------------") timestamp = time.time() pytest_args = [ '--verbose', args.verbose, '--report=report.html', '--title=HDC Test Report 'f"{GP.get_version()}", '--tester=tester001', '--template=1', '--desc='f"{args.verbose}:{args.desc}" ] pytest.main(pytest_args) end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) report_dir = os.path.join(os.getcwd(), "reports") report_file = os.path.join(report_dir, f"{report_time}report.html") print(f"Test over, the script version is {GP.get_version()}," f" start at {start_time}, end at {end_time} \n" f"=======>The device hdcd ROM size is {GP.hdcd_rom}.\n" f"=======>{report_file} is saved. \n" ) input("=======>press [Enter] key to Show logs.") def rmdir(path): try: if sys.platform == "win32": if os.path.isfile(path): os.remove(path) else: shutil.rmtree(path) else: subprocess.call(f"rm -rf {path}".split()) except OSError: print(f"Error: {path} : cannot remove") pass def get_local_path(path): return os.path.join(GP.local_path, path) def get_remote_path(path): return f"{GP.remote_path}/{path}" def get_local_md5(local): md5_hash = hashlib.md5() with open(local, "rb") as f: for byte_block in iter(lambda: f.read(4096), b""): md5_hash.update(byte_block) return md5_hash.hexdigest() def check_shell(cmd, pattern=None, fetch=False): cmd = f"{GP.hdc_head} {cmd}" print(f"\nexecuting command: {cmd}") if pattern: # check output valid output = subprocess.check_output(cmd.split()).decode() res = pattern in output print(f"--> output: {output}") print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") return res elif fetch: output = subprocess.check_output(cmd.split()).decode() print(f"--> output: {output}") return output else: # check cmd run successfully return subprocess.check_call(cmd.split()) == 0 def get_shell_result(cmd, pattern=None, fetch=False): cmd = f"{GP.hdc_head} {cmd}" print(f"\nexecuting command: {cmd}") return subprocess.check_output(cmd.split()).decode() def check_rate(cmd, expected_rate): send_result = get_shell_result(cmd) rate = float(send_result.split("rate:")[1].split("kB/s")[0]) return rate > expected_rate def check_dir(local, remote, is_single_dir=False): def _get_md5sum(remote, is_single_dir=False): if is_single_dir: cmd = f"{GP.hdc_head} shell md5sum {remote}/*" else: cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;' result = subprocess.check_output(cmd.split()).decode() return result def _calculate_md5(file_path): md5 = hashlib.md5() try: with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): md5.update(chunk) return md5.hexdigest() except PermissionError: return "PermissionError" except FileNotFoundError: return "FileNotFoundError" print("remote:" + remote) output = _get_md5sum(remote) print(output) result = 1 for line in output.splitlines(): if len(line) < 32: # length of MD5 continue expected_md5, file_name = line.split()[:2] if is_single_dir: file_name = file_name.replace(f"{remote}", "") elif GP.remote_path in remote: file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") else: file_name = file_name.split(remote)[1].replace("/", "\\") file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 if is_single_dir: file_path = os.path.join(os.getcwd(), local) + file_name print(file_path) actual_md5 = _calculate_md5(file_path) print(f"Expected: {expected_md5}") print(f"Actual: {actual_md5}") print(f"MD5 matched {file_name}") if actual_md5 != expected_md5: print(f"[Fail]MD5 mismatch for {file_name}") result *= 0 return (result == 1) def _check_file(local, remote): if remote.startswith("/proc"): local_size = os.path.getsize(local) if local_size > 0: return True else: return False else: cmd = f"shell md5sum {remote}" local_md5 = get_local_md5(local) return check_shell(cmd, local_md5) def _check_app_installed(bundle, is_shared=False): dump = "dump-shared" if is_shared else "dump" cmd = f"shell bm {dump} -a" return check_shell(cmd, bundle) def check_hdc_targets(): cmd = f"{GP.hdc_head} list targets" print(GP.device_name) return check_shell(cmd, GP.device_name) def check_file_send(local, remote): local_path = os.path.join(GP.local_path, local) remote_path = f"{GP.remote_path}/{remote}" cmd = f"file send {local_path} {remote_path}" return check_shell(cmd) and _check_file(local_path, remote_path) def check_file_recv(remote, local): local_path = os.path.join(GP.local_path, local) remote_path = f"{GP.remote_path}/{remote}" cmd = f"file recv {remote_path} {local_path}" return check_shell(cmd) and _check_file(local_path, remote_path) def check_app_install(app, bundle, args=""): app = os.path.join(GP.local_path, app) install_cmd = f"install {args} {app}" if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): return check_shell(install_cmd, "failed to install bundle") else: return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) def check_app_uninstall(bundle, args=""): uninstall_cmd = f"uninstall {args} {bundle}" return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) def check_app_install_multi(tables, args=""): apps = [] bundles = [] for app, bundle in tables.items() : app = os.path.join(GP.local_path, app) apps.append(app) bundles.append(bundle) apps_str = " ".join(apps) install_cmd = f"install {args} {apps_str}" if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) or (args == "" and 0 == apps_str.count(".hap"))): if not check_shell(install_cmd, "failed to install bundle"): return False else: if not check_shell(install_cmd, "successfully"): return False for bundle in bundles: if not _check_app_installed(bundle, "s" in args): return False return True def check_app_uninstall_multi(tables, args=""): for app, bundle in tables.items() : if not check_app_uninstall(bundle, args): return False return True def check_hdc_cmd(cmd, pattern=None, **args): if cmd.startswith("file"): if not check_shell(cmd, "FileTransfer finish"): return False if cmd.startswith("file send"): local, remote = cmd.split()[-2:] else: remote, local = cmd.split()[-2:] if os.path.isfile(local): return _check_file(local, remote) else: return check_dir(local, remote) elif cmd.startswith("install"): bundle = args.get("bundle", "invalid") opt = " ".join(cmd.split()[1:-1]) return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) elif cmd.startswith("uninstall"): bundle = cmd.split()[-1] opt = " ".join(cmd.split()[1:-1]) return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) else: return check_shell(cmd, pattern, **args) def check_soft_local(local_source, local_soft, remote): cmd = f"file send {local_soft} {remote}" if not check_shell(cmd, "FileTransfer finish"): return False return _check_file(local_source, remote) def check_soft_remote(remote_source, remote_soft, local_recv): check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") cmd = f"file recv {remote_soft} {local_recv}" if not check_shell(cmd, "FileTransfer finish"): return False return _check_file(local_recv, get_remote_path(remote_source)) def switch_usb(): res = check_hdc_cmd("tmode usb") time.sleep(3) if res: GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" return res def copy_file(src, dst): try: shutil.copy2(src, dst) print(f"File copied successfully from {src} to {dst}") except IOError as e: print(f"Unable to copy file. {e}") except Exception as e: print(f"Unexpected error: {e}") def switch_tcp(): if not GP.remote_ip: # skip tcp check print("!!! remote_ip is none, skip tcp check !!!") return True if GP.remote_ip == "auto": ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) if not ipconf: print("!!! device ip not found, skip tcp check !!!") return True GP.remote_ip = ipconf.split(":")[1].split()[0] print(f"fetch remote ip: {GP.remote_ip}") ret = check_hdc_cmd(f"tmode port {GP.remote_port}") if ret: time.sleep(3) res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") if res: GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" return res def select_cmd(): msg = "1) Proceed tester\n" \ + "2) Customize tester\n" \ + "3) Setup files for transfer\n" \ + "4) Load custom testcase(default unused) \n" \ + "5) Exit\n" \ + ">> " while True: opt = input(msg).strip() if len(opt) == 1 and '1' <= opt <= '5': return opt def gen_file(path, size): index = 0 path = os.path.abspath(path) fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) while index < size: os.write(fd, os.urandom(1024)) index += 1024 os.close(fd) def gen_zero_file(path, size): fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, b'0' * size) os.close(fd) def create_file_with_size(path, size): fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) os.write(fd, b'\0' * size) os.close(fd) def gen_file_set(): print("generating empty file ...") gen_file(os.path.join(GP.local_path, "empty"), 0) print("generating small file ...") gen_file(os.path.join(GP.local_path, "small"), 102400) print("generating medium file ...") gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) print("generating large file ...") gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) print("generating soft link ...") try: os.symlink("small", os.path.join(GP.local_path, "soft_small")) except FileExistsError: print("soft_small already exists") print("generating text file ...") gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) print("generating package dir ...") if not os.path.exists(os.path.join(GP.local_path, "package")): os.mkdir(os.path.join(GP.local_path, "package")) for i in range(1, 6): gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) print("generating deep dir ...") deepth = 4 deep_path = os.path.join(GP.local_path, "deep_dir") if not os.path.exists(deep_path): os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_dir{deep}") if not os.path.exists(deep_path): os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep"), 102400) print("generating dir with file ...") dir_path = os.path.join(GP.local_path, "problem_dir") rmdir(dir_path) os.mkdir(dir_path) gen_file(os.path.join(dir_path, "small2"), 102400) fuzz_count = 47 # 47 is the count that circulated the file transfer data_unit = 1024 # 1024 is the size that circulated the file transfer data_extra = 936 # 936 is the size that cased the extra file transfer for i in range(fuzz_count): create_file_with_size( os.path.join( dir_path, f"file_{i * data_unit+data_extra}.dat" ), i * data_unit + data_extra ) print("generating empty dir ...") dir_path = os.path.join(GP.local_path, "empty_dir") rmdir(dir_path) os.mkdir(dir_path) print("generating version file ...") gen_file(os.path.join(GP.local_path, GP.get_version()), 0) def gen_package_dir(): print("generating app dir ...") dir_path = os.path.join(GP.local_path, "app_dir") rmdir(dir_path) os.mkdir(dir_path) app = os.path.join(GP.local_path, "entry-default-signed-debug.hap") dst_dir = os.path.join(GP.local_path, "app_dir") if not os.path.exists(app): print(f"Source file {app} does not exist.") else: copy_file(app, dst_dir) def prepare_source(): if os.path.exists(os.path.join(GP.local_path, GP.get_version())): print(f"hdc test version is {GP.get_version}, check ok, skip prepare.") return print(f"in prepare {GP.local_path},wait for 2 mins.") current_path = os.getcwd() if os.path.exists(GP.local_path): #打开local_path遍历其中的文件,删除hap hsp以外的所有文件 for file in os.listdir(GP.local_path): if file.endswith(".hap") or file.endswith(".hsp"): continue file_path = os.path.join(GP.local_path, file) rmdir(file_path) else: os.mkdir(GP.local_path) gen_file_set() def add_prepare_source(): deep_path = os.path.join(GP.local_path, "deep_test_dir") print("generating deep test dir ...") absolute_path = os.path.abspath(__file__) deepth = (255 - 9 - len(absolute_path)) % 14 os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep_test"), 102400) recv_dir = os.path.join(GP.local_path, "recv_test_dir") print("generating recv test dir ...") os.mkdir(recv_dir) def update_source(): deep_path = os.path.join(GP.local_path, "deep_test_dir") if not os.path.exists(deep_path): print("generating deep test dir ...") absolute_path = os.path.abspath(__file__) deepth = (255 - 9 - len(absolute_path)) % 14 os.mkdir(deep_path) for deep in range(deepth): deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") os.mkdir(deep_path) gen_file(os.path.join(deep_path, "deep_test"), 102400) recv_dir = os.path.join(GP.local_path, "recv_test_dir") if not os.path.exists(recv_dir): print("generating recv test dir ...") os.mkdir(recv_dir) def setup_tester(): while True: GP.print_options() opt = int(select_cmd()) if opt == 1: return True elif opt == 2: if not GP.set_options(): return False GP.dump() elif opt == 3: prepare_source() elif opt == 4: if not GP.load_testcase(): return False elif opt == 5: return False else: return False def load_testcase(): if not GP.load_testcase: print("load testcase failed") return False print("load testcase success") return True def check_library_installation(library_name): try: pkg_resources.get_distribution(library_name) return 0 except pkg_resources.DistributionNotFound: print(f"\n\n{library_name} is not installed.\n\n") print(f"try to use command below:") print(f"pip install {library_name}") return 1 def check_subprocess_cmd(cmd, process_num, timeout): for i in range(process_num): p = subprocess.Popen(cmd.split()) try: p.wait(timeout=5) except subprocess.TimeoutExpired: p.kill() def check_process_mixed(process_num, timeout, local, remote): multi_num = process_num list_send = [] list_recv = [] sizes = {"small", "medium", "empty"} for i in range(multi_num): for size in sizes: cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}" cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}" list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, ))) list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, ))) logging.info(f"RESULT:{cmd_send}") # 打印命令的输出 for send in list_send: wait_time = random.uniform(0, 1) send.start() time.sleep(wait_time) for send in list_send: send.join() for recv in list_recv: wait_time = random.uniform(0, 1) recv.start() time.sleep(wait_time) for recv in list_recv: recv.join() wait_time = random.uniform(0, 1) time.sleep(wait_time) def execute_lines_in_file(file_path): if not os.path.exists(file_path): flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL modes = stat.S_IWUSR | stat.S_IRUSR with os.fdopen(os.open(file_path, flags, modes), 'w') as file: file.write("1,hdc shell ls") with open(file_path, 'r') as file: lines = file.readlines() for line in lines: test_time = line.split(',')[0] test_cmd = line.split(',')[1] pattern = r"^hdc" match = re.search(pattern, test_cmd) if match: result = test_cmd.replace(match.group(0), "").lstrip() test_cmd = f"{GP.hdc_head} {result}" for i in range(int(test_time)): logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}") output = subprocess.check_output(test_cmd.split()).decode() logging.info(f"RESULT:{output}") # 打印命令的输出 def make_multiprocess_file(local, remote, mode, num, task_type): if num < 1: return False if task_type == "file": if mode == "send" : file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] elif mode == "recv": file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] else: return False if task_type == "dir": if mode == "send" : file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] elif mode == "recv": file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] else: return False print(file_list[0]) p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list] print(f"{mode} target {num} start") while(len(p_list)): for p in p_list: if p.poll() is not None: stdout, stderr = p.communicate(timeout=512) # timeout wait 512s if stderr: print(f"{stderr.decode()}") if stdout: print(f"{stdout.decode()}") if stdout.decode().find("FileTransfer finish") == -1: return False p_list.remove(p) res = 1 if task_type == "file": for i in range(num): if mode == "send": if _check_file(local, f"{remote}_{i}"): res *= 1 else: res *= 0 elif mode == "recv": if _check_file(f"{local}_{i}", f"{remote}_{i}"): res *= 1 else: res *= 0 if task_type == "dir": for _ in range(num): if mode == "send": end_of_file_name = os.path.basename(local) if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): res *= 1 else: res *= 0 elif mode == "recv": end_of_file_name = os.path.basename(remote) local = os.path.join(local, end_of_file_name) if check_dir(f"{local}", f"{remote}", is_single_dir=True): res *= 1 else: res *= 0 return res == 1 def hdc_get_key(cmd): test_cmd = f"{GP.hdc_head} {cmd}" result = subprocess.check_output(test_cmd.split()).decode() return result def start_subprocess_cmd(cmd, num, assert_out): if num < 1: return False cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)] p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list] logging.info(f"{cmd} target {num} start") while(len(p_list)): for p in p_list: if p.poll() is not None: stdout, stderr = p.communicate(timeout=512) if stderr: logging.error(f"{stderr.decode()}") if stdout: logging.info(f"{stdout.decode()}") if assert_out is not None and stdout.decode().find(assert_out) == -1: return False p_list.remove(p) return True def check_hdc_version(cmd, version): def _convert_version_to_hex(_version): parts = _version.split("Ver: ")[1].split('.') hex_version = ''.join(parts) return int(hex_version, 16) expected_version = _convert_version_to_hex(version) cmd = f"{GP.hdc_head} -v" print(f"\nexecuting command: {cmd}") if version is not None: # check output valid output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") real_version = _convert_version_to_hex(output) print(f"--> output: {output}") print(f"--> your local [{version}] is" f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" ) return expected_version <= real_version def check_cmd_time(cmd, pattern, duration, times): if times < 1: print("times should be bigger than 0.") return False if pattern is None: fetchable = True else: fetchable = False start_time = time.time() * 1000 print(f"{cmd} start {start_time}") res = [] for i in range(times): start_in = time.time() * 1000 if pattern is None: subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) else: assert check_shell(cmd, pattern, fetch=fetchable) start_out = time.time() * 1000 res.append(start_out - start_in) # 计算最大值、最小值和中位数 max_value = max(res) min_value = min(res) median_value = sorted(res)[len(res) // 2] print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") end_time = time.time() * 1000 try: timecost = int(end_time - start_time) / times print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") except ZeroDivisionError: print(f"除数为0") if duration is None: duration = 150 * 1.2 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status return timecost < duration def check_rom(baseline): def _try_get_size(message): try: size = int(message.split('\t')[0]) except ValueError: size = -9999 * 1024 # error size print(f"try get size value error, from {message}") return size if baseline is None: baseline = 2200 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() hdcd_size = _try_get_size(result_hdcd) cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() if "directory" in result_libhdc: cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() if "directory" in result_libhdc64: libhdc_size = 0 else: libhdc_size = _try_get_size(result_libhdc64) else: libhdc_size = _try_get_size(result_libhdc) all_size = hdcd_size + libhdc_size GP.hdcd_rom = all_size if all_size < 0: GP.hdcd_rom = "error" return False else: GP.hdcd_rom = f"{all_size} KB" if all_size > baseline: print(f"rom size is {all_size}, overlimit baseline {baseline}") return False else: print(f"rom size is {all_size}, underlimit baseline {baseline}") return True def run_command_with_timeout(command, timeout): try: result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) return result.stdout.decode(), result.stderr.decode() except subprocess.TimeoutExpired: return None, "Command timed out" except subprocess.CalledProcessError as e: return None, e.stderr.decode()