1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2021 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16# 0. 运行环境: python 3.10+, pytest 17# 1. 修改 GP 中字段 18# 2. pytest [-k case_name_pattern] 19# eg. pytest -k file 执行方法名含 file 的用例 20 21# 打包 22# pip install pyinstaller 23# prepare assert source dir includes your data files 24# pyi-makespec -D --add-data assert:assert dev_hdc_test.py 25# pyinstaller dev_hdc_test.spec 26# 执行 dev_hdc_test.exe 27 28import csv 29import hashlib 30import json 31import logging 32import os 33import random 34import re 35import stat 36import shutil 37import subprocess 38import sys 39import threading 40import time 41from multiprocessing import Process 42 43import pytest 44import pkg_resources 45 46 47class GP(object): 48 """ Global Parameters 49 50 customize here !!! 51 """ 52 hdc_exe = "hdc" 53 local_path = "resource" 54 remote_path = "data/local/tmp" 55 remote_dir_path = "data/local/tmp/it_send_dir" 56 remote_ip = "auto" 57 remote_port = 8710 58 hdc_head = "hdc" 59 device_name = "" 60 targets = [] 61 tmode = "usb" 62 changed_testcase = "n" 63 testcase_path = "ts_windows.csv" 64 loaded_testcase = 0 65 hdcd_rom = "not checked" 66 67 @classmethod 68 def init(cls): 69 logging.basicConfig(level=logging.INFO, 70 format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', 71 datefmt='%d %b %Y %H:%M:%S', 72 ) 73 logging.basicConfig(level=logging.WARNING, 74 format='%(asctime)s %(filename)s [line:%(lineno)d] %(levelname)s %(message)s', 75 datefmt='%d %b %Y %H:%M:%S', 76 ) 77 78 if os.path.exists(".hdctester.conf"): 79 cls.load() 80 cls.start_host() 81 cls.list_targets() 82 else: 83 cls.set_options() 84 cls.print_options() 85 cls.start_host() 86 cls.dump() 87 return 88 89 90 @classmethod 91 def start_host(cls): 92 cmd = f"{cls.hdc_exe} start" 93 res = subprocess.call(cmd.split()) 94 return res 95 96 @classmethod 97 def list_targets(cls): 98 try: 99 targets = subprocess.check_output(f"{cls.hdc_exe} list targets".split()).split() 100 except (OSError, IndexError): 101 targets = [b"failed to auto detect device"] 102 cls.targets = [targets[0].decode()] 103 return False 104 cls.targets = [t.decode() for t in targets] 105 return True 106 107 108 @classmethod 109 def get_device(cls): 110 cls.start_host() 111 cls.list_targets() 112 if len(cls.targets) > 1: 113 print("Multiple device detected, please select one:") 114 for i, t in enumerate(cls.targets): 115 print(f"{i+1}. {t}") 116 print("input the nums of the device above:") 117 cls.device_name = cls.targets[int(input()) - 1] 118 else: 119 cls.device_name = cls.targets[0] 120 if cls.device_name == "failed to auto detect device": 121 print("No device detected, please check your device connection") 122 return False 123 elif cls.device_name == "[empty]": 124 print("No hdc device detected.") 125 return False 126 cls.hdc_head = f"{cls.hdc_exe} -t {cls.device_name}" 127 return True 128 129 130 @classmethod 131 def dump(cls): 132 try: 133 os.remove(".hdctester.conf") 134 except OSError: 135 pass 136 content = filter( 137 lambda k: not k[0].startswith("__") and not isinstance(k[1], classmethod), cls.__dict__.items()) 138 json_str = json.dumps(dict(content)) 139 fd = os.open(".hdctester.conf", os.O_WRONLY | os.O_CREAT, 0o755) 140 os.write(fd, json_str.encode()) 141 os.close(fd) 142 return True 143 144 145 @classmethod 146 def load(cls): 147 with open(".hdctester.conf") as f: 148 content = json.load(f) 149 cls.hdc_exe = content.get("hdc_exe") 150 cls.local_path = content.get("local_path") 151 cls.remote_path = content.get("remote_path") 152 cls.remote_ip = content.get("remote_ip") 153 cls.hdc_head = content.get("hdc_head") 154 cls.tmode = content.get("tmode") 155 cls.device_name = content.get("device_name") 156 cls.changed_testcase = content.get("changed_testcase") 157 cls.testcase_path = content.get("testcase_path") 158 cls.loaded_testcase = content.get("load_testcase") 159 return True 160 161 162 @classmethod 163 def print_options(cls): 164 info = "HDC Tester Default Options: \n\n" \ 165 + f"{'hdc execution'.rjust(20, ' ')}: {cls.hdc_exe}\n" \ 166 + f"{'local storage path'.rjust(20, ' ')}: {cls.local_path}\n" \ 167 + f"{'remote storage path'.rjust(20, ' ')}: {cls.remote_path}\n" \ 168 + f"{'remote ip'.rjust(20, ' ')}: {cls.remote_ip}\n" \ 169 + f"{'remote port'.rjust(20, ' ')}: {cls.remote_port}\n" \ 170 + f"{'device name'.rjust(20, ' ')}: {cls.device_name}\n" \ 171 + f"{'connect type'.rjust(20, ' ')}: {cls.tmode}\n" \ 172 + f"{'hdc head'.rjust(20, ' ')}: {cls.hdc_head}\n" \ 173 + f"{'changed testcase'.rjust(20, ' ')}: {cls.changed_testcase}\n" \ 174 + f"{'testcase path'.rjust(20, ' ')}: {cls.testcase_path}\n" \ 175 + f"{'loaded testcase'.rjust(20, ' ')}: {cls.loaded_testcase}\n" 176 print(info) 177 178 179 @classmethod 180 def tconn_tcp(cls): 181 res = subprocess.check_output(f"{cls.hdc_exe} tconn {cls.remote_ip}:{cls.remote_port}".split()).decode() 182 if "Connect OK" in res: 183 return True 184 else: 185 return False 186 187 188 @classmethod 189 def set_options(cls): 190 if opt := input(f"Default hdc execution? [{cls.hdc_exe}]\n").strip(): 191 cls.hdc_exe = opt 192 if opt := input(f"Default local storage path? [{cls.local_path}]\n").strip(): 193 cls.local_path = opt 194 if opt := input(f"Default remote storage path? [{cls.remote_path}]\n").strip(): 195 cls.remote_path = opt 196 if opt := input(f"Default remote ip? [{cls.remote_ip}]\n").strip(): 197 cls.remote_ip = opt 198 if opt := input(f"Default remote port? [{cls.remote_port}]\n").strip(): 199 cls.remote_port = int(opt) 200 if opt := input(f"Default device name? [{cls.device_name}], opts: {cls.targets}\n").strip(): 201 cls.device_name = opt 202 if opt := input(f"Default connect type? [{cls.tmode}], opt: [usb, tcp]\n").strip(): 203 cls.tmode = opt 204 if cls.tmode == "usb": 205 ret = cls.get_device() 206 if ret: 207 print("USB device detected.") 208 elif cls.tconn_tcp(): 209 cls.hdc_head = f"{cls.hdc_exe} -t {cls.remote_ip}:{cls.remote_port}" 210 else: 211 print(f"tconn {cls.remote_ip}:{cls.remote_port} failed") 212 return False 213 return True 214 215 216 @classmethod 217 def change_testcase(cls): 218 if opt := input(f"Change default testcase?(Y/n) [{cls.changed_testcase}]\n").strip(): 219 cls.changed_testcase = opt 220 if opt == "n": 221 return False 222 if opt := input(f"Use default testcase path?(Y/n) [{cls.testcase_path}]\n").strip(): 223 cls.testcase_path = os.path.join(opt) 224 cls.print_options() 225 return True 226 227 228 @classmethod 229 def load_testcase(cls): 230 print("this fuction will coming soon.") 231 return False 232 233 @classmethod 234 def get_version(cls): 235 version = f"v1.0.4a" 236 return version 237 238 239def pytest_run(args): 240 file_list = [] 241 file_list.append("entry-default-signed-debug.hap") 242 file_list.append("libA_v10001.hsp") 243 file_list.append("libB_v10001.hsp") 244 for file in file_list: 245 if not os.path.exists(os.path.join(GP.local_path, file)): 246 print(f"No {file} File!") 247 print("请将package.zip中的安装包文件解压到当前脚本resource目录中,操作完成该步骤后重新执行脚本。") 248 print("Please unzip package.zip to resource directory, please rerun after operation.") 249 input("[ENTER]") 250 return 251 gen_package_dir() 252 start_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 253 if args.count is not None: 254 for i in range(args.count): 255 print(f"------------The {i}/{args.count} Test-------------") 256 timestamp = time.time() 257 pytest_args = [ 258 '--verbose', args.verbose, 259 '--report=report.html', 260 '--title=HDC Test Report 'f"{GP.get_version()}", 261 '--tester=tester001', 262 '--template=1', 263 '--desc='f"{args.verbose}:{args.desc}" 264 ] 265 pytest.main(pytest_args) 266 end_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) 267 report_time = time.strftime('%Y-%m-%d_%H_%M_%S', time.localtime(time.time())) 268 report_dir = os.path.join(os.getcwd(), "reports") 269 report_file = os.path.join(report_dir, f"{report_time}report.html") 270 print(f"Test over, the script version is {GP.get_version()}," 271 f" start at {start_time}, end at {end_time} \n" 272 f"=======>The device hdcd ROM size is {GP.hdcd_rom}.\n" 273 f"=======>{report_file} is saved. \n" 274 ) 275 input("=======>press [Enter] key to Show logs.") 276 277 278def rmdir(path): 279 try: 280 if sys.platform == "win32": 281 if os.path.isfile(path): 282 os.remove(path) 283 else: 284 shutil.rmtree(path) 285 else: 286 subprocess.call(f"rm -rf {path}".split()) 287 except OSError: 288 print(f"Error: {path} : cannot remove") 289 pass 290 291 292def get_local_path(path): 293 return os.path.join(GP.local_path, path) 294 295 296def get_remote_path(path): 297 return f"{GP.remote_path}/{path}" 298 299 300def get_local_md5(local): 301 md5_hash = hashlib.md5() 302 with open(local, "rb") as f: 303 for byte_block in iter(lambda: f.read(4096), b""): 304 md5_hash.update(byte_block) 305 return md5_hash.hexdigest() 306 307 308def check_shell(cmd, pattern=None, fetch=False): 309 cmd = f"{GP.hdc_head} {cmd}" 310 print(f"\nexecuting command: {cmd}") 311 if pattern: # check output valid 312 output = subprocess.check_output(cmd.split()).decode() 313 res = pattern in output 314 print(f"--> output: {output}") 315 print(f"--> pattern [{pattern}] {'FOUND' if res else 'NOT FOUND'} in output") 316 return res 317 elif fetch: 318 output = subprocess.check_output(cmd.split()).decode() 319 print(f"--> output: {output}") 320 return output 321 else: # check cmd run successfully 322 return subprocess.check_call(cmd.split()) == 0 323 324 325def get_shell_result(cmd, pattern=None, fetch=False): 326 cmd = f"{GP.hdc_head} {cmd}" 327 print(f"\nexecuting command: {cmd}") 328 return subprocess.check_output(cmd.split()).decode() 329 330 331def check_rate(cmd, expected_rate): 332 send_result = get_shell_result(cmd) 333 rate = float(send_result.split("rate:")[1].split("kB/s")[0]) 334 return rate > expected_rate 335 336 337def check_dir(local, remote, is_single_dir=False): 338 def _get_md5sum(remote, is_single_dir=False): 339 if is_single_dir: 340 cmd = f"{GP.hdc_head} shell md5sum {remote}/*" 341 else: 342 cmd = f'{GP.hdc_head} shell find {remote} -type f -exec md5sum {{}} \;' 343 result = subprocess.check_output(cmd.split()).decode() 344 return result 345 346 def _calculate_md5(file_path): 347 md5 = hashlib.md5() 348 try: 349 with open(file_path, 'rb') as f: 350 for chunk in iter(lambda: f.read(4096), b""): 351 md5.update(chunk) 352 return md5.hexdigest() 353 except PermissionError: 354 return "PermissionError" 355 except FileNotFoundError: 356 return "FileNotFoundError" 357 print("remote:" + remote) 358 output = _get_md5sum(remote) 359 print(output) 360 361 result = 1 362 for line in output.splitlines(): 363 if len(line) < 32: # length of MD5 364 continue 365 expected_md5, file_name = line.split()[:2] 366 if is_single_dir: 367 file_name = file_name.replace(f"{remote}", "") 368 elif GP.remote_path in remote: 369 file_name = file_name.split(GP.remote_dir_path)[1].replace("/", "\\") 370 else: 371 file_name = file_name.split(remote)[1].replace("/", "\\") 372 file_path = os.path.join(os.getcwd(), GP.local_path) + file_name # 构建完整的文件路径 373 if is_single_dir: 374 file_path = os.path.join(os.getcwd(), local) + file_name 375 print(file_path) 376 actual_md5 = _calculate_md5(file_path) 377 print(f"Expected: {expected_md5}") 378 print(f"Actual: {actual_md5}") 379 print(f"MD5 matched {file_name}") 380 if actual_md5 != expected_md5: 381 print(f"[Fail]MD5 mismatch for {file_name}") 382 result *= 0 383 384 return (result == 1) 385 386 387def _check_file(local, remote): 388 if remote.startswith("/proc"): 389 local_size = os.path.getsize(local) 390 if local_size > 0: 391 return True 392 else: 393 return False 394 else: 395 cmd = f"shell md5sum {remote}" 396 local_md5 = get_local_md5(local) 397 return check_shell(cmd, local_md5) 398 399 400def _check_app_installed(bundle, is_shared=False): 401 dump = "dump-shared" if is_shared else "dump" 402 cmd = f"shell bm {dump} -a" 403 return check_shell(cmd, bundle) 404 405 406def check_hdc_targets(): 407 cmd = f"{GP.hdc_head} list targets" 408 print(GP.device_name) 409 return check_shell(cmd, GP.device_name) 410 411 412def check_file_send(local, remote): 413 local_path = os.path.join(GP.local_path, local) 414 remote_path = f"{GP.remote_path}/{remote}" 415 cmd = f"file send {local_path} {remote_path}" 416 return check_shell(cmd) and _check_file(local_path, remote_path) 417 418 419def check_file_recv(remote, local): 420 local_path = os.path.join(GP.local_path, local) 421 remote_path = f"{GP.remote_path}/{remote}" 422 cmd = f"file recv {remote_path} {local_path}" 423 return check_shell(cmd) and _check_file(local_path, remote_path) 424 425 426def check_app_install(app, bundle, args=""): 427 app = os.path.join(GP.local_path, app) 428 install_cmd = f"install {args} {app}" 429 if (args == "-s" and app.endswith(".hap")) or (args == "" and app.endswith(".hsp")): 430 return check_shell(install_cmd, "failed to install bundle") 431 else: 432 return check_shell(install_cmd, "successfully") and _check_app_installed(bundle, "s" in args) 433 434 435def check_app_uninstall(bundle, args=""): 436 uninstall_cmd = f"uninstall {args} {bundle}" 437 return check_shell(uninstall_cmd, "successfully") and not _check_app_installed(bundle, "s" in args) 438 439 440def check_app_install_multi(tables, args=""): 441 apps = [] 442 bundles = [] 443 for app, bundle in tables.items() : 444 app = os.path.join(GP.local_path, app) 445 apps.append(app) 446 bundles.append(bundle) 447 448 apps_str = " ".join(apps) 449 install_cmd = f"install {args} {apps_str}" 450 451 if ((args == "-s" and re.search(".hap", apps_str)) or (re.search(".hsp", apps_str) and re.search(".hap", apps_str)) 452 or (args == "" and 0 == apps_str.count(".hap"))): 453 if not check_shell(install_cmd, "failed to install bundle"): 454 return False 455 else: 456 if not check_shell(install_cmd, "successfully"): 457 return False 458 459 for bundle in bundles: 460 if not _check_app_installed(bundle, "s" in args): 461 return False 462 463 return True 464 465 466def check_app_uninstall_multi(tables, args=""): 467 for app, bundle in tables.items() : 468 if not check_app_uninstall(bundle, args): 469 return False 470 471 return True 472 473 474def check_hdc_cmd(cmd, pattern=None, **args): 475 if cmd.startswith("file"): 476 if not check_shell(cmd, "FileTransfer finish"): 477 return False 478 if cmd.startswith("file send"): 479 local, remote = cmd.split()[-2:] 480 else: 481 remote, local = cmd.split()[-2:] 482 if os.path.isfile(local): 483 return _check_file(local, remote) 484 else: 485 return check_dir(local, remote) 486 elif cmd.startswith("install"): 487 bundle = args.get("bundle", "invalid") 488 opt = " ".join(cmd.split()[1:-1]) 489 return check_shell(cmd, "successfully") and _check_app_installed(bundle, "s" in opt) 490 491 elif cmd.startswith("uninstall"): 492 bundle = cmd.split()[-1] 493 opt = " ".join(cmd.split()[1:-1]) 494 return check_shell(cmd, "successfully") and not _check_app_installed(bundle, "s" in opt) 495 496 else: 497 return check_shell(cmd, pattern, **args) 498 499 500def check_soft_local(local_source, local_soft, remote): 501 cmd = f"file send {local_soft} {remote}" 502 if not check_shell(cmd, "FileTransfer finish"): 503 return False 504 return _check_file(local_source, remote) 505 506 507def check_soft_remote(remote_source, remote_soft, local_recv): 508 check_hdc_cmd(f"shell ln -s {remote_source} {remote_soft}") 509 cmd = f"file recv {remote_soft} {local_recv}" 510 if not check_shell(cmd, "FileTransfer finish"): 511 return False 512 return _check_file(local_recv, get_remote_path(remote_source)) 513 514 515def switch_usb(): 516 res = check_hdc_cmd("tmode usb") 517 time.sleep(3) 518 if res: 519 GP.hdc_head = f"{GP.hdc_exe} -t {GP.device_name}" 520 return res 521 522 523def copy_file(src, dst): 524 try: 525 shutil.copy2(src, dst) 526 print(f"File copied successfully from {src} to {dst}") 527 except IOError as e: 528 print(f"Unable to copy file. {e}") 529 except Exception as e: 530 print(f"Unexpected error: {e}") 531 532 533def switch_tcp(): 534 if not GP.remote_ip: # skip tcp check 535 print("!!! remote_ip is none, skip tcp check !!!") 536 return True 537 if GP.remote_ip == "auto": 538 ipconf = check_hdc_cmd("shell \"ifconfig -a | grep inet | grep -v 127.0.0.1 | grep -v inet6\"", fetch=True) 539 if not ipconf: 540 print("!!! device ip not found, skip tcp check !!!") 541 return True 542 GP.remote_ip = ipconf.split(":")[1].split()[0] 543 print(f"fetch remote ip: {GP.remote_ip}") 544 ret = check_hdc_cmd(f"tmode port {GP.remote_port}") 545 if ret: 546 time.sleep(3) 547 res = check_hdc_cmd(f"tconn {GP.remote_ip}:{GP.remote_port}", "Connect OK") 548 if res: 549 GP.hdc_head = f"{GP.hdc_exe} -t {GP.remote_ip}:{GP.remote_port}" 550 return res 551 552 553def select_cmd(): 554 msg = "1) Proceed tester\n" \ 555 + "2) Customize tester\n" \ 556 + "3) Setup files for transfer\n" \ 557 + "4) Load custom testcase(default unused) \n" \ 558 + "5) Exit\n" \ 559 + ">> " 560 561 while True: 562 opt = input(msg).strip() 563 if len(opt) == 1 and '1' <= opt <= '5': 564 return opt 565 566 567def gen_file(path, size): 568 index = 0 569 path = os.path.abspath(path) 570 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 571 572 while index < size: 573 os.write(fd, os.urandom(1024)) 574 index += 1024 575 os.close(fd) 576 577 578def gen_zero_file(path, size): 579 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 580 os.write(fd, b'0' * size) 581 os.close(fd) 582 583 584def create_file_with_size(path, size): 585 fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0o755) 586 os.write(fd, b'\0' * size) 587 os.close(fd) 588 589 590def gen_file_set(): 591 print("generating empty file ...") 592 gen_file(os.path.join(GP.local_path, "empty"), 0) 593 594 print("generating small file ...") 595 gen_file(os.path.join(GP.local_path, "small"), 102400) 596 597 print("generating medium file ...") 598 gen_file(os.path.join(GP.local_path, "medium"), 200 * 1024 ** 2) 599 600 print("generating large file ...") 601 gen_file(os.path.join(GP.local_path, "large"), 2 * 1024 ** 3) 602 603 print("generating soft link ...") 604 try: 605 os.symlink("small", os.path.join(GP.local_path, "soft_small")) 606 except FileExistsError: 607 print("soft_small already exists") 608 609 print("generating text file ...") 610 gen_zero_file(os.path.join(GP.local_path, "word_100M.txt"), 100 * 1024 ** 2) 611 612 print("generating package dir ...") 613 if not os.path.exists(os.path.join(GP.local_path, "package")): 614 os.mkdir(os.path.join(GP.local_path, "package")) 615 for i in range(1, 6): 616 gen_file(os.path.join(GP.local_path, "package", f"fake.hap.{i}"), 20 * 1024 ** 2) 617 618 print("generating deep dir ...") 619 deepth = 4 620 deep_path = os.path.join(GP.local_path, "deep_dir") 621 if not os.path.exists(deep_path): 622 os.mkdir(deep_path) 623 for deep in range(deepth): 624 deep_path = os.path.join(deep_path, f"deep_dir{deep}") 625 if not os.path.exists(deep_path): 626 os.mkdir(deep_path) 627 gen_file(os.path.join(deep_path, "deep"), 102400) 628 629 print("generating dir with file ...") 630 dir_path = os.path.join(GP.local_path, "problem_dir") 631 rmdir(dir_path) 632 os.mkdir(dir_path) 633 gen_file(os.path.join(dir_path, "small2"), 102400) 634 635 fuzz_count = 47 # 47 is the count that circulated the file transfer 636 data_unit = 1024 # 1024 is the size that circulated the file transfer 637 data_extra = 936 # 936 is the size that cased the extra file transfer 638 for i in range(fuzz_count): 639 create_file_with_size( 640 os.path.join( 641 dir_path, f"file_{i * data_unit+data_extra}.dat" 642 ), i * data_unit + data_extra 643 ) 644 645 print("generating empty dir ...") 646 dir_path = os.path.join(GP.local_path, "empty_dir") 647 rmdir(dir_path) 648 os.mkdir(dir_path) 649 650 print("generating version file ...") 651 gen_file(os.path.join(GP.local_path, GP.get_version()), 0) 652 653 654def gen_package_dir(): 655 print("generating app dir ...") 656 dir_path = os.path.join(GP.local_path, "app_dir") 657 rmdir(dir_path) 658 os.mkdir(dir_path) 659 app = os.path.join(GP.local_path, "entry-default-signed-debug.hap") 660 dst_dir = os.path.join(GP.local_path, "app_dir") 661 if not os.path.exists(app): 662 print(f"Source file {app} does not exist.") 663 else: 664 copy_file(app, dst_dir) 665 666 667def prepare_source(): 668 if os.path.exists(os.path.join(GP.local_path, GP.get_version())): 669 print(f"hdc test version is {GP.get_version}, check ok, skip prepare.") 670 return 671 print(f"in prepare {GP.local_path},wait for 2 mins.") 672 current_path = os.getcwd() 673 674 if os.path.exists(GP.local_path): 675 #打开local_path遍历其中的文件,删除hap hsp以外的所有文件 676 for file in os.listdir(GP.local_path): 677 if file.endswith(".hap") or file.endswith(".hsp"): 678 continue 679 file_path = os.path.join(GP.local_path, file) 680 rmdir(file_path) 681 else: 682 os.mkdir(GP.local_path) 683 684 gen_file_set() 685 686 687def add_prepare_source(): 688 deep_path = os.path.join(GP.local_path, "deep_test_dir") 689 print("generating deep test dir ...") 690 absolute_path = os.path.abspath(__file__) 691 deepth = (255 - 9 - len(absolute_path)) % 14 692 os.mkdir(deep_path) 693 for deep in range(deepth): 694 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 695 os.mkdir(deep_path) 696 gen_file(os.path.join(deep_path, "deep_test"), 102400) 697 698 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 699 print("generating recv test dir ...") 700 os.mkdir(recv_dir) 701 702 703def update_source(): 704 deep_path = os.path.join(GP.local_path, "deep_test_dir") 705 if not os.path.exists(deep_path): 706 print("generating deep test dir ...") 707 absolute_path = os.path.abspath(__file__) 708 deepth = (255 - 9 - len(absolute_path)) % 14 709 os.mkdir(deep_path) 710 for deep in range(deepth): 711 deep_path = os.path.join(deep_path, f"deep_test_dir{deep}") 712 os.mkdir(deep_path) 713 gen_file(os.path.join(deep_path, "deep_test"), 102400) 714 715 recv_dir = os.path.join(GP.local_path, "recv_test_dir") 716 if not os.path.exists(recv_dir): 717 print("generating recv test dir ...") 718 os.mkdir(recv_dir) 719 720 721def setup_tester(): 722 while True: 723 GP.print_options() 724 opt = int(select_cmd()) 725 if opt == 1: 726 return True 727 elif opt == 2: 728 if not GP.set_options(): 729 return False 730 GP.dump() 731 elif opt == 3: 732 prepare_source() 733 elif opt == 4: 734 if not GP.load_testcase(): 735 return False 736 elif opt == 5: 737 return False 738 else: 739 return False 740 741 742def load_testcase(): 743 if not GP.load_testcase: 744 print("load testcase failed") 745 return False 746 print("load testcase success") 747 return True 748 749 750def check_library_installation(library_name): 751 try: 752 pkg_resources.get_distribution(library_name) 753 return 0 754 except pkg_resources.DistributionNotFound: 755 print(f"\n\n{library_name} is not installed.\n\n") 756 print(f"try to use command below:") 757 print(f"pip install {library_name}") 758 return 1 759 760 761def check_subprocess_cmd(cmd, process_num, timeout): 762 763 for i in range(process_num): 764 p = subprocess.Popen(cmd.split()) 765 try: 766 p.wait(timeout=5) 767 except subprocess.TimeoutExpired: 768 p.kill() 769 770 771def check_process_mixed(process_num, timeout, local, remote): 772 multi_num = process_num 773 list_send = [] 774 list_recv = [] 775 sizes = {"small", "medium", "empty"} 776 for i in range(multi_num): 777 for size in sizes: 778 cmd_send = f"file send {get_local_path(f'{size}')} {get_remote_path(f'it_{size}_mix_{i}')}" 779 cmd_recv = f"file recv {get_remote_path(f'it_{size}_mix_{i}')} {get_local_path(f'recv_{size}_mix_{i}')}" 780 list_send.append(Process(target=check_hdc_cmd, args=(cmd_send, ))) 781 list_recv.append(Process(target=check_hdc_cmd, args=(cmd_recv, ))) 782 logging.info(f"RESULT:{cmd_send}") # 打印命令的输出 783 for send in list_send: 784 wait_time = random.uniform(0, 1) 785 send.start() 786 time.sleep(wait_time) 787 for send in list_send: 788 send.join() 789 790 for recv in list_recv: 791 wait_time = random.uniform(0, 1) 792 recv.start() 793 time.sleep(wait_time) 794 for recv in list_recv: 795 recv.join() 796 wait_time = random.uniform(0, 1) 797 time.sleep(wait_time) 798 799 800def execute_lines_in_file(file_path): 801 if not os.path.exists(file_path): 802 flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL 803 modes = stat.S_IWUSR | stat.S_IRUSR 804 with os.fdopen(os.open(file_path, flags, modes), 'w') as file: 805 file.write("1,hdc shell ls") 806 with open(file_path, 'r') as file: 807 lines = file.readlines() 808 for line in lines: 809 test_time = line.split(',')[0] 810 test_cmd = line.split(',')[1] 811 pattern = r"^hdc" 812 match = re.search(pattern, test_cmd) 813 if match: 814 result = test_cmd.replace(match.group(0), "").lstrip() 815 test_cmd = f"{GP.hdc_head} {result}" 816 817 for i in range(int(test_time)): 818 logging.info(f"THE {i+1}/{test_time} TEST,COMMAND IS:{test_cmd}") 819 output = subprocess.check_output(test_cmd.split()).decode() 820 logging.info(f"RESULT:{output}") # 打印命令的输出 821 822 823def make_multiprocess_file(local, remote, mode, num, task_type): 824 if num < 1: 825 return False 826 if task_type == "file": 827 if mode == "send" : 828 file_list = [f"{GP.hdc_head} file send {local} {remote}_{i}" for i in range(num)] 829 elif mode == "recv": 830 file_list = [f"{GP.hdc_head} file recv {remote}_{i} {local}_{i}" for i in range(num)] 831 else: 832 return False 833 if task_type == "dir": 834 if mode == "send" : 835 file_list = [f"{GP.hdc_head} file send {local} {remote}" for _ in range(num)] 836 elif mode == "recv": 837 file_list = [f"{GP.hdc_head} file recv {remote} {local}" for _ in range(num)] 838 else: 839 return False 840 print(file_list[0]) 841 p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in file_list] 842 print(f"{mode} target {num} start") 843 while(len(p_list)): 844 for p in p_list: 845 if p.poll() is not None: 846 stdout, stderr = p.communicate(timeout=512) # timeout wait 512s 847 if stderr: 848 print(f"{stderr.decode()}") 849 if stdout: 850 print(f"{stdout.decode()}") 851 if stdout.decode().find("FileTransfer finish") == -1: 852 return False 853 p_list.remove(p) 854 res = 1 855 if task_type == "file": 856 for i in range(num): 857 if mode == "send": 858 if _check_file(local, f"{remote}_{i}"): 859 res *= 1 860 else: 861 res *= 0 862 elif mode == "recv": 863 if _check_file(f"{local}_{i}", f"{remote}_{i}"): 864 res *= 1 865 else: 866 res *= 0 867 if task_type == "dir": 868 for _ in range(num): 869 if mode == "send": 870 end_of_file_name = os.path.basename(local) 871 if check_dir(local, f"{remote}/{end_of_file_name}", is_single_dir=True): 872 res *= 1 873 else: 874 res *= 0 875 elif mode == "recv": 876 end_of_file_name = os.path.basename(remote) 877 local = os.path.join(local, end_of_file_name) 878 if check_dir(f"{local}", f"{remote}", is_single_dir=True): 879 res *= 1 880 else: 881 res *= 0 882 return res == 1 883 884 885def hdc_get_key(cmd): 886 test_cmd = f"{GP.hdc_head} {cmd}" 887 result = subprocess.check_output(test_cmd.split()).decode() 888 return result 889 890 891def start_subprocess_cmd(cmd, num, assert_out): 892 if num < 1: 893 return False 894 cmd_list = [f"{GP.hdc_head} {cmd}" for _ in range(num)] 895 p_list = [subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE) for cmd in cmd_list] 896 logging.info(f"{cmd} target {num} start") 897 while(len(p_list)): 898 for p in p_list: 899 if p.poll() is not None: 900 stdout, stderr = p.communicate(timeout=512) 901 if stderr: 902 logging.error(f"{stderr.decode()}") 903 if stdout: 904 logging.info(f"{stdout.decode()}") 905 if assert_out is not None and stdout.decode().find(assert_out) == -1: 906 return False 907 p_list.remove(p) 908 return True 909 910 911def check_hdc_version(cmd, version): 912 913 def _convert_version_to_hex(_version): 914 parts = _version.split("Ver: ")[1].split('.') 915 hex_version = ''.join(parts) 916 return int(hex_version, 16) 917 918 expected_version = _convert_version_to_hex(version) 919 cmd = f"{GP.hdc_head} -v" 920 print(f"\nexecuting command: {cmd}") 921 if version is not None: # check output valid 922 output = subprocess.check_output(cmd.split()).decode().replace("\r", "").replace("\n", "") 923 real_version = _convert_version_to_hex(output) 924 print(f"--> output: {output}") 925 print(f"--> your local [{version}] is" 926 f" {'' if expected_version <= real_version else 'too old to'} fit the version [{output}]" 927 ) 928 return expected_version <= real_version 929 930 931def check_cmd_time(cmd, pattern, duration, times): 932 if times < 1: 933 print("times should be bigger than 0.") 934 return False 935 if pattern is None: 936 fetchable = True 937 else: 938 fetchable = False 939 start_time = time.time() * 1000 940 print(f"{cmd} start {start_time}") 941 res = [] 942 for i in range(times): 943 start_in = time.time() * 1000 944 if pattern is None: 945 subprocess.check_output(f"{GP.hdc_head} {cmd}".split()) 946 else: 947 assert check_shell(cmd, pattern, fetch=fetchable) 948 start_out = time.time() * 1000 949 res.append(start_out - start_in) 950 951 # 计算最大值、最小值和中位数 952 max_value = max(res) 953 min_value = min(res) 954 median_value = sorted(res)[len(res) // 2] 955 956 print(f"{GP.hdc_head} {cmd}耗时最大值:{max_value}") 957 print(f"{GP.hdc_head} {cmd}耗时最小值:{min_value}") 958 print(f"{GP.hdc_head} {cmd}耗时中位数:{median_value}") 959 960 end_time = time.time() * 1000 961 962 try: 963 timecost = int(end_time - start_time) / times 964 print(f"{GP.hdc_head} {cmd}耗时平均值 {timecost}") 965 except ZeroDivisionError: 966 print(f"除数为0") 967 968 if duration is None: 969 duration = 150 * 1.2 970 # 150ms is baseline timecost for hdc shell xxx cmd, 20% can be upper maybe system status 971 return timecost < duration 972 973 974def check_rom(baseline): 975 976 def _try_get_size(message): 977 try: 978 size = int(message.split('\t')[0]) 979 except ValueError: 980 size = -9999 * 1024 # error size 981 print(f"try get size value error, from {message}") 982 return size 983 984 if baseline is None: 985 baseline = 2200 986 # 2200KB is the baseline of hdcd and libhdc.dylib.so size all together 987 cmd_hdcd = f"{GP.hdc_head} shell du system/bin/hdcd" 988 result_hdcd = subprocess.check_output(cmd_hdcd.split()).decode() 989 hdcd_size = _try_get_size(result_hdcd) 990 cmd_libhdc = f"{GP.hdc_head} shell du system/lib/libhdc.dylib.so" 991 result_libhdc = subprocess.check_output(cmd_libhdc.split()).decode() 992 if "directory" in result_libhdc: 993 cmd_libhdc64 = f"{GP.hdc_head} shell du system/lib64/libhdc.dylib.so" 994 result_libhdc64 = subprocess.check_output(cmd_libhdc64.split()).decode() 995 if "directory" in result_libhdc64: 996 libhdc_size = 0 997 else: 998 libhdc_size = _try_get_size(result_libhdc64) 999 else: 1000 libhdc_size = _try_get_size(result_libhdc) 1001 all_size = hdcd_size + libhdc_size 1002 GP.hdcd_rom = all_size 1003 if all_size < 0: 1004 GP.hdcd_rom = "error" 1005 return False 1006 else: 1007 GP.hdcd_rom = f"{all_size} KB" 1008 if all_size > baseline: 1009 print(f"rom size is {all_size}, overlimit baseline {baseline}") 1010 return False 1011 else: 1012 print(f"rom size is {all_size}, underlimit baseline {baseline}") 1013 return True 1014 1015 1016def run_command_with_timeout(command, timeout): 1017 try: 1018 result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout) 1019 return result.stdout.decode(), result.stderr.decode() 1020 except subprocess.TimeoutExpired: 1021 return None, "Command timed out" 1022 except subprocess.CalledProcessError as e: 1023 return None, e.stderr.decode() 1024