1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import re 22import shutil 23import subprocess 24import sys 25import time 26import platform 27import zipfile 28import stat 29import random 30import xml.etree.ElementTree as ET 31from dataclasses import dataclass 32from json import JSONDecodeError 33 34from xdevice import DeviceTestType, check_result_report 35from xdevice import DeviceLabelType 36from xdevice import CommonParserType 37from xdevice import ExecuteTerminate 38from xdevice import DeviceError 39from xdevice import ShellHandler 40 41from xdevice import IDriver 42from xdevice import platform_logger 43from xdevice import Plugin 44from xdevice import get_plugin 45from ohos.environment.dmlib import process_command_ret 46from core.utils import get_decode 47from core.utils import get_fuzzer_path 48from core.config.resource_manager import ResourceManager 49from core.config.config_manager import FuzzerConfigManager 50 51__all__ = [ 52 "CppTestDriver", 53 "JSUnitTestDriver", 54 "disable_keyguard", 55 "GTestConst"] 56 57LOG = platform_logger("Drivers") 58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test") 59OBJ = "obj" 60_ACE_LOG_MARKER = " a0c0d0" 61TIME_OUT = 900 * 1000 62JS_TIMEOUT = 10 63CYCLE_TIMES = 50 64 65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL 66MODES = stat.S_IWUSR | stat.S_IRUSR 67 68 69class CollectingOutputReceiver: 70 def __init__(self): 71 self.output = "" 72 73 def __read__(self, output): 74 self.output = "%s%s" % (self.output, output) 75 76 def __error__(self, message): 77 pass 78 79 def __done__(self, result_code="", message=""): 80 pass 81 82 83class DisplayOutputReceiver: 84 def __init__(self): 85 self.output = "" 86 self.unfinished_line = "" 87 88 def __read__(self, output): 89 self.output = "%s%s" % (self.output, output) 90 lines = self._process_output(output) 91 for line in lines: 92 line = line.strip() 93 if line: 94 LOG.info(get_decode(line)) 95 96 def __error__(self, message): 97 pass 98 99 def __done__(self, result_code="", message=""): 100 pass 101 102 def _process_output(self, output, end_mark="\n"): 103 content = output 104 if self.unfinished_line: 105 content = "".join((self.unfinished_line, content)) 106 self.unfinished_line = "" 107 lines = content.split(end_mark) 108 if content.endswith(end_mark): 109 return lines[:-1] 110 else: 111 self.unfinished_line = lines[-1] 112 return lines[:-1] 113 114 115@dataclass 116class GTestConst(object): 117 exec_para_filter = "--gtest_filter" 118 exec_para_level = "--gtest_testsize" 119 exec_acts_para_filter = "--jstest_filter" 120 exec_acts_para_level = "--jstest_testsize" 121 122 123def get_device_log_file(report_path, serial=None, log_name="device_log"): 124 from xdevice import Variables 125 log_path = os.path.join(report_path, Variables.report_vars.log_dir) 126 os.makedirs(log_path, exist_ok=True) 127 128 serial = serial or time.time_ns() 129 device_file_name = "{}_{}.log".format(log_name, serial) 130 device_log_file = os.path.join(log_path, device_file_name) 131 return device_log_file 132 133 134def get_level_para_string(level_string): 135 level_list = list(set(level_string.split(","))) 136 level_para_string = "" 137 for item in level_list: 138 if not item.isdigit(): 139 continue 140 item = item.strip(" ") 141 level_para_string = f"{level_para_string}Level{item}," 142 level_para_string = level_para_string.strip(",") 143 return level_para_string 144 145 146def get_result_savepath(testsuit_path, result_rootpath): 147 findkey = os.sep + "tests" + os.sep 148 filedir, _ = os.path.split(testsuit_path) 149 pos = filedir.find(findkey) 150 if -1 != pos: 151 subpath = filedir[pos + len(findkey):] 152 pos1 = subpath.find(os.sep) 153 if -1 != pos1: 154 subpath = subpath[pos1 + len(os.sep):] 155 result_path = os.path.join(result_rootpath, "result", subpath) 156 else: 157 result_path = os.path.join(result_rootpath, "result") 158 else: 159 result_path = os.path.join(result_rootpath, "result") 160 161 if not os.path.exists(result_path): 162 os.makedirs(result_path) 163 164 LOG.info("result_savepath = " + result_path) 165 return result_path 166 167 168def get_test_log_savepath(result_rootpath, result_suit_path): 169 suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0] 170 test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path) 171 172 if not os.path.exists(test_log_path): 173 os.makedirs(test_log_path) 174 175 LOG.info("test_log_savepath = {}".format(test_log_path)) 176 return test_log_path 177 178 179def update_xml(suite_file, result_xml): 180 suite_path_txt = suite_file.split(".")[0] + "_path.txt" 181 if os.path.exists(suite_path_txt) and os.path.exists(result_xml): 182 with open(suite_path_txt, "r") as path_text: 183 line = path_text.readline().replace("//", "").strip() 184 tree = ET.parse(result_xml) 185 tree.getroot().attrib["path"] = line 186 tree.getroot().attrib["name"] = os.path.basename(suite_file.split(".")[0]) 187 188 test_suit = os.path.basename(result_xml).split(".")[0] 189 log_path = os.path.abspath(result_xml).split("result")[0] 190 crash_path = os.path.join(log_path, "log", test_suit) 191 all_items = os.listdir(crash_path) 192 193 # 筛选以crash开头的目录 194 matching_dirs = [os.path.join(crash_path, item) for item in all_items if 195 os.path.isdir(os.path.join(crash_path, item)) 196 and item.startswith(f"crash_log_{test_suit}")] 197 if len(matching_dirs) >= 1: 198 tree.getroot().attrib["is_crash"] = "1" 199 tree.write(result_xml) 200 201 202# all testsuit common Unavailable test result xml 203def _create_empty_result_file(filepath, filename, error_message): 204 error_message = str(error_message) 205 error_message = error_message.replace("\"", "") 206 error_message = error_message.replace("<", "") 207 error_message = error_message.replace(">", "") 208 error_message = error_message.replace("&", "") 209 if filename.endswith(".hap"): 210 filename = filename.split(".")[0] 211 if not os.path.exists(filepath): 212 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 213 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 214 time.localtime()) 215 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 216 file_desc.write( 217 '<testsuites tests="0" failures="0" ' 218 'disabled="0" errors="0" timestamp="%s" ' 219 'time="0" name="%s" unavailable="1">\n' % (time_stamp, filename)) 220 file_desc.write( 221 ' <testsuite name="%s" tests="0" failures="0" ' 222 'disabled="0" errors="0" time="0.0" ' 223 'unavailable="1" message="%s">\n' % 224 (filename, error_message)) 225 file_desc.write(' </testsuite>\n') 226 file_desc.write('</testsuites>\n') 227 return 228 229 230def _unlock_screen(device): 231 device.execute_shell_command("svc power stayon true") 232 time.sleep(1) 233 234 235def _unlock_device(device): 236 device.execute_shell_command("input keyevent 82") 237 time.sleep(1) 238 device.execute_shell_command("wm dismiss-keyguard") 239 time.sleep(1) 240 241 242def _lock_screen(device): 243 device.execute_shell_command("svc power stayon false") 244 time.sleep(1) 245 246 247def disable_keyguard(device): 248 _unlock_screen(device) 249 _unlock_device(device) 250 251 252def _sleep_according_to_result(result): 253 if result: 254 time.sleep(1) 255 256 257def _create_fuzz_crash_file(filepath, filename): 258 if not os.path.exists(filepath): 259 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 260 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 261 time.localtime()) 262 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 263 file_desc.write( 264 '<testsuites disabled="0" name="%s" ' 265 'time="300" timestamp="%s" errors="0" ' 266 'failures="1" tests="1">\n' % (filename, time_stamp)) 267 file_desc.write( 268 ' <testsuite disabled="0" name="%s" time="300" ' 269 'errors="0" failures="1" tests="1">\n' % filename) 270 file_desc.write( 271 ' <testcase name="%s" time="300" classname="%s" ' 272 'status="run">\n' % (filename, filename)) 273 file_desc.write( 274 ' <failure type="" ' 275 'message="Fuzzer crash. See ERROR in log file">\n') 276 file_desc.write(' </failure>\n') 277 file_desc.write(' </testcase>\n') 278 file_desc.write(' </testsuite>\n') 279 file_desc.write('</testsuites>\n') 280 return 281 282 283def _create_fuzz_pass_file(filepath, filename): 284 if not os.path.exists(filepath): 285 with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc: 286 time_stamp = time.strftime("%Y-%m-%d %H:%M:%S", 287 time.localtime()) 288 file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n') 289 file_desc.write( 290 '<testsuites disabled="0" name="%s" ' 291 'time="300" timestamp="%s" errors="0" ' 292 'failures="0" tests="1">\n' % (filename, time_stamp)) 293 file_desc.write( 294 ' <testsuite disabled="0" name="%s" time="300" ' 295 'errors="0" failures="0" tests="1">\n' % filename) 296 file_desc.write( 297 ' <testcase name="%s" time="300" classname="%s" ' 298 'status="run"/>\n' % (filename, filename)) 299 file_desc.write(' </testsuite>\n') 300 file_desc.write('</testsuites>\n') 301 return 302 303 304def _create_fuzz_result_file(filepath, filename, error_message): 305 error_message = str(error_message) 306 error_message = error_message.replace("\"", "") 307 error_message = error_message.replace("<", "") 308 error_message = error_message.replace(">", "") 309 error_message = error_message.replace("&", "") 310 if "AddressSanitizer" in error_message: 311 LOG.error("FUZZ TEST CRASH") 312 _create_fuzz_crash_file(filepath, filename) 313 elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second', 314 error_message, re.M) is not None: 315 LOG.info("FUZZ TEST PASS") 316 _create_fuzz_pass_file(filepath, filename) 317 else: 318 LOG.error("FUZZ TEST UNAVAILABLE") 319 _create_empty_result_file(filepath, filename, error_message) 320 return 321 322 323class ResultManager(object): 324 def __init__(self, testsuit_path, config): 325 self.testsuite_path = testsuit_path 326 self.config = config 327 self.result_rootpath = self.config.report_path 328 self.device = self.config.device 329 if testsuit_path.endswith(".hap"): 330 self.device_testpath = self.config.test_hap_out_path 331 else: 332 self.device_testpath = self.config.target_test_path 333 self.testsuite_name = os.path.basename(self.testsuite_path) 334 self.is_coverage = False 335 336 def set_is_coverage(self, is_coverage): 337 self.is_coverage = is_coverage 338 339 def get_test_results(self, error_message=""): 340 # Get test result files 341 filepath, _ = self.obtain_test_result_file() 342 if "fuzztest" == self.config.testtype[0]: 343 LOG.info("create fuzz test report") 344 _create_fuzz_result_file(filepath, self.testsuite_name, 345 error_message) 346 if not self.is_coverage: 347 self._obtain_fuzz_corpus() 348 349 if not os.path.exists(filepath): 350 _create_empty_result_file(filepath, self.testsuite_name, 351 error_message) 352 if "benchmark" == self.config.testtype[0]: 353 self._obtain_benchmark_result() 354 # Get coverage data files 355 if self.is_coverage: 356 self.obtain_coverage_data() 357 358 return filepath 359 360 def get_test_results_hidelog(self, error_message=""): 361 # Get test result files 362 result_file_path, test_log_path = self.obtain_test_result_file() 363 log_content = "" 364 if not error_message: 365 if os.path.exists(test_log_path): 366 with open(test_log_path, "r") as log: 367 log_content = log.readlines() 368 else: 369 LOG.error("{}: Test log not exist.".format(test_log_path)) 370 else: 371 log_content = error_message 372 373 if "fuzztest" == self.config.testtype[0]: 374 LOG.info("create fuzz test report") 375 _create_fuzz_result_file(result_file_path, self.testsuite_name, 376 log_content) 377 if not self.is_coverage: 378 self._obtain_fuzz_corpus() 379 380 if not os.path.exists(result_file_path): 381 _create_empty_result_file(result_file_path, self.testsuite_name, 382 log_content) 383 if "benchmark" == self.config.testtype[0]: 384 self._obtain_benchmark_result() 385 # Get coverage data files 386 if self.is_coverage: 387 self.obtain_coverage_data() 388 389 return result_file_path 390 391 def get_result_sub_save_path(self): 392 find_key = os.sep + "benchmark" + os.sep 393 file_dir, _ = os.path.split(self.testsuite_path) 394 pos = file_dir.find(find_key) 395 subpath = "" 396 if -1 != pos: 397 subpath = file_dir[pos + len(find_key):] 398 LOG.info("subpath = " + subpath) 399 return subpath 400 401 def obtain_test_result_file(self): 402 result_save_path = get_result_savepath(self.testsuite_path, 403 self.result_rootpath) 404 405 result_file_path = os.path.join(result_save_path, 406 "%s.xml" % self.testsuite_name) 407 408 result_josn_file_path = os.path.join(result_save_path, 409 "%s.json" % self.testsuite_name) 410 411 if self.testsuite_path.endswith('.hap'): 412 remote_result_file = os.path.join(self.device_testpath, 413 "testcase_result.xml") 414 remote_json_result_file = os.path.join(self.device_testpath, 415 "%s.json" % self.testsuite_name) 416 else: 417 remote_result_file = os.path.join(self.device_testpath, 418 "%s.xml" % self.testsuite_name) 419 remote_json_result_file = os.path.join(self.device_testpath, 420 "%s.json" % self.testsuite_name) 421 422 if self.config.testtype[0] != "fuzztest": 423 if self.device.is_file_exist(remote_result_file): 424 self.device.pull_file(remote_result_file, result_file_path) 425 elif self.device.is_file_exist(remote_json_result_file): 426 self.device.pull_file(remote_json_result_file, 427 result_josn_file_path) 428 result_file_path = result_josn_file_path 429 else: 430 LOG.info("%s not exist", remote_result_file) 431 432 if self.config.hidelog: 433 remote_log_result_file = os.path.join(self.device_testpath, 434 "%s.log" % self.testsuite_name) 435 test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path) 436 test_log_file_path = os.path.join(test_log_save_path, 437 "%s.log" % self.testsuite_name) 438 self.device.pull_file(remote_log_result_file, test_log_file_path) 439 return result_file_path, test_log_file_path 440 441 return result_file_path, "" 442 443 def make_empty_result_file(self, error_message=""): 444 result_savepath = get_result_savepath(self.testsuite_path, 445 self.result_rootpath) 446 result_filepath = os.path.join(result_savepath, "%s.xml" % 447 self.testsuite_name) 448 if not os.path.exists(result_filepath): 449 _create_empty_result_file(result_filepath, 450 self.testsuite_name, error_message) 451 452 def is_exist_target_in_device(self, path, target): 453 if platform.system() == "Windows": 454 command = '\"ls -l %s | grep %s\"' % (path, target) 455 else: 456 command = "ls -l %s | grep %s" % (path, target) 457 458 check_result = False 459 stdout_info = self.device.execute_shell_command(command) 460 if stdout_info != "" and stdout_info.find(target) != -1: 461 check_result = True 462 return check_result 463 464 def obtain_coverage_data(self): 465 cov_root_dir = os.path.abspath(os.path.join( 466 self.result_rootpath, 467 "..", 468 "coverage", 469 "data", 470 "exec")) 471 472 473 tests_path = self.config.testcases_path 474 test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0] 475 cxx_cov_path = os.path.abspath(os.path.join( 476 self.result_rootpath, 477 "..", 478 "coverage", 479 "data", 480 "cxx", 481 self.testsuite_name + '_' + test_type)) 482 483 if os.path.basename(self.testsuite_name).startswith("rust_"): 484 target_name = "lib.unstripped" 485 else: 486 target_name = OBJ 487 if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name): 488 if not os.path.exists(cxx_cov_path): 489 os.makedirs(cxx_cov_path) 490 else: 491 cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}" 492 self.config.device.execute_shell_command( 493 "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name)) 494 src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name) 495 self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT) 496 tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name) 497 if platform.system() == "Windows": 498 process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True) 499 process.communicate() 500 os.remove(tar_path) 501 os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ)) 502 else: 503 subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" % 504 (tar_path, cxx_cov_path), shell=True).communicate() 505 subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate() 506 if target_name != OBJ: 507 subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name), 508 os.path.join(cxx_cov_path, OBJ)), shell=True).communicate() 509 510 def _obtain_fuzz_corpus(self): 511 command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;" 512 self.config.device.execute_shell_command(command) 513 result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath) 514 LOG.info(f"fuzz_dir = {result_save_path}") 515 self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path) 516 517 def _obtain_benchmark_result(self): 518 benchmark_root_dir = os.path.abspath( 519 os.path.join(self.result_rootpath, "benchmark")) 520 benchmark_dir = os.path.abspath( 521 os.path.join(benchmark_root_dir, 522 self.get_result_sub_save_path(), 523 self.testsuite_name)) 524 525 if not os.path.exists(benchmark_dir): 526 os.makedirs(benchmark_dir) 527 528 LOG.info("benchmark_dir = %s" % benchmark_dir) 529 self.device.pull_file(os.path.join(self.device_testpath, 530 "%s.json" % self.testsuite_name), benchmark_dir) 531 if not os.path.exists(os.path.join(benchmark_dir, 532 "%s.json" % self.testsuite_name)): 533 os.rmdir(benchmark_dir) 534 return benchmark_dir 535 536 537@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test) 538class CppTestDriver(IDriver): 539 """ 540 CppTest is a Test that runs a native test package on given device. 541 """ 542 # test driver config 543 config = None 544 result = "" 545 546 def __check_environment__(self, device_options): 547 if len(device_options) == 1 and device_options[0].label is None: 548 return True 549 if len(device_options) != 1 or \ 550 device_options[0].label != DeviceLabelType.phone: 551 return False 552 return True 553 554 def __check_config__(self, config): 555 pass 556 557 def __result__(self): 558 return self.result if os.path.exists(self.result) else "" 559 560 def __execute__(self, request): 561 try: 562 self.config = request.config 563 self.config.target_test_path = DEFAULT_TEST_PATH 564 self.config.device = request.config.environment.devices[0] 565 566 suite_file = request.root.source.source_file 567 LOG.debug("Testsuite FilePath: %s" % suite_file) 568 569 if not suite_file: 570 LOG.error("test source '%s' not exists" % 571 request.root.source.source_string) 572 return 573 574 if not self.config.device: 575 result = ResultManager(suite_file, self.config) 576 result.set_is_coverage(False) 577 result.make_empty_result_file( 578 "No test device is found. ") 579 return 580 self.config.device.set_device_report_path(request.config.report_path) 581 self.config.device.device_log_collector.start_hilog_task() 582 self._init_gtest() 583 self._run_gtest(suite_file) 584 585 finally: 586 log_path = get_result_savepath(request.root.source.source_file, request.config.report_path) 587 suit_name = os.path.basename(request.root.source.source_file) 588 xml_path = os.path.join(log_path, f"{suit_name}.xml") 589 if not os.path.exists(xml_path): 590 _create_empty_result_file(xml_path, suit_name, "ERROR") 591 serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns()) 592 log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace( 593 ":", "_")) 594 self.config.device.device_log_collector.stop_hilog_task( 595 log_tar_file_name, module_name=request.get_module_name()) 596 update_xml(request.root.source.source_file, xml_path) 597 598 @staticmethod 599 def _alter_init(name): 600 with open(name, "rb") as f: 601 lines = f.read() 602 str_content = lines.decode("utf-8") 603 604 pattern_sharp = '^\s*#.*$(\n|\r\n)' 605 pattern_star = '/\*.*?\*/(\n|\r\n)+' 606 pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+' 607 608 if re.match(pattern_sharp, str_content, flags=re.M): 609 striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M) 610 elif re.findall(pattern_star, str_content, flags=re.S): 611 striped_content = re.sub(pattern_star, '', str_content, flags=re.S) 612 elif re.findall(pattern_xml, str_content, flags=re.S): 613 striped_content = re.sub(pattern_xml, '', str_content, flags=re.S) 614 else: 615 striped_content = str_content 616 617 striped_bt = striped_content.encode("utf-8") 618 if os.path.exists(name): 619 os.remove(name) 620 with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f: 621 f.write(striped_bt) 622 623 def _init_gtest(self): 624 self.config.device.connector_command("target mount") 625 self.config.device.execute_shell_command( 626 "rm -rf %s" % self.config.target_test_path) 627 self.config.device.execute_shell_command( 628 "mkdir -p %s" % self.config.target_test_path) 629 self.config.device.execute_shell_command( 630 "mount -o rw,remount,rw /") 631 if "fuzztest" == self.config.testtype[0]: 632 self.config.device.execute_shell_command( 633 "mkdir -p %s" % os.path.join(self.config.target_test_path, 634 "corpus")) 635 636 def _gtest_command(self, suite_file): 637 filename = os.path.basename(suite_file) 638 test_para = self._get_test_para(self.config.testcase, 639 self.config.testlevel, 640 self.config.testtype, 641 self.config.target_test_path, 642 suite_file, 643 filename) 644 645 # execute testcase 646 if not self.config.coverage: 647 if self.config.random == "random": 648 seed = random.randint(1, 100) 649 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % ( 650 self.config.target_test_path, 651 filename, 652 filename, 653 test_para, 654 seed) 655 else: 656 command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % ( 657 self.config.target_test_path, 658 filename, 659 filename, 660 test_para) 661 else: 662 coverage_outpath = self.config.coverage_outpath 663 if coverage_outpath: 664 strip_num = len(coverage_outpath.strip("/").split("/")) 665 else: 666 ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json") 667 with open(ohos_config_path, 'r') as json_file: 668 json_info = json.load(json_file) 669 out_path = json_info.get("out_path") 670 strip_num = len(out_path.strip("/").split("/")) 671 if "fuzztest" == self.config.testtype[0]: 672 self._push_corpus_cov_if_exist(suite_file) 673 command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \ 674 rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \ 675 GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}" 676 else: 677 command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \ 678 "GCOV_PREFIX_STRIP=%s ./%s %s" % \ 679 (self.config.target_test_path, 680 filename, 681 DEFAULT_TEST_PATH, 682 str(strip_num), 683 filename, 684 test_para) 685 686 if self.config.hidelog: 687 command += " > {}.log 2>&1".format(filename) 688 689 return command 690 691 def _run_gtest(self, suite_file): 692 from xdevice import Variables 693 is_coverage_test = True if self.config.coverage else False 694 695 # push testsuite file 696 self.config.device.push_file(suite_file, self.config.target_test_path) 697 self.config.device.execute_shell_command( 698 "hilog -d %s" % (os.path.join(self.config.target_test_path, 699 os.path.basename(suite_file))) 700 ) 701 self._push_corpus_if_exist(suite_file) 702 703 # push resource files 704 resource_manager = ResourceManager() 705 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 706 resource_manager.process_preparer_data(resource_data_dic, resource_dir, 707 self.config.device) 708 709 command = self._gtest_command(suite_file) 710 711 result = ResultManager(suite_file, self.config) 712 result.set_is_coverage(is_coverage_test) 713 714 try: 715 # get result 716 if self.config.hidelog: 717 return_message = "" 718 display_receiver = CollectingOutputReceiver() 719 self.config.device.execute_shell_command( 720 command, 721 receiver=display_receiver, 722 timeout=TIME_OUT, 723 retry=0) 724 else: 725 display_receiver = DisplayOutputReceiver() 726 self.config.device.execute_shell_command( 727 command, 728 receiver=display_receiver, 729 timeout=TIME_OUT, 730 retry=0) 731 return_message = display_receiver.output 732 except (ExecuteTerminate, DeviceError) as exception: 733 return_message = str(exception.args) 734 735 if self.config.hidelog: 736 self.result = result.get_test_results_hidelog(return_message) 737 else: 738 self.result = result.get_test_results(return_message) 739 740 resource_manager.process_cleaner_data(resource_data_dic, 741 resource_dir, 742 self.config.device) 743 744 def _push_corpus_cov_if_exist(self, suite_file): 745 corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep) 746 cov_file = os.path.join( 747 sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz") 748 LOG.info("corpus_cov file :%s" % str(cov_file)) 749 self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path)) 750 751 def _push_corpus_if_exist(self, suite_file): 752 if "fuzztest" == self.config.testtype[0]: 753 corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus") 754 if not os.path.isdir(corpus_path): 755 return 756 757 corpus_dirs = [] 758 corpus_file_list = [] 759 760 for root, _, files in os.walk(corpus_path): 761 if not files: 762 continue 763 764 corpus_dir = root.split("corpus")[-1] 765 if corpus_dir != "": 766 corpus_dirs.append(corpus_dir) 767 768 for file in files: 769 cp_file = os.path.normcase(os.path.join(root, file)) 770 corpus_file_list.append(cp_file) 771 if file == "init": 772 self._alter_init(cp_file) 773 774 # mkdir corpus files dir 775 if corpus_dirs: 776 for corpus in corpus_dirs: 777 mkdir_corpus_command = f"shell; mkdir -p {corpus}" 778 self.config.device.connector_command(mkdir_corpus_command) 779 780 # push corpus file 781 if corpus_file_list: 782 for corpus_file in corpus_file_list: 783 self.config.device.push_file(corpus_file, 784 os.path.join(self.config.target_test_path, "corpus")) 785 786 def _get_test_para(self, 787 testcase, 788 testlevel, 789 testtype, 790 target_test_path, 791 suite_file, 792 filename): 793 if "benchmark" == testtype[0]: 794 test_para = (" --benchmark_out_format=json" 795 " --benchmark_out=%s%s.json") % ( 796 target_test_path, filename) 797 return test_para 798 799 if "" != testcase and "" == testlevel: 800 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 801 elif "" == testcase and "" != testlevel: 802 level_para = get_level_para_string(testlevel) 803 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 804 else: 805 test_para = "" 806 807 if "fuzztest" == testtype[0]: 808 cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path( 809 suite_file), "project.xml")).get_fuzzer_config("fuzztest") 810 LOG.info("config list :%s" % str(cfg_list)) 811 if self.config.coverage: 812 test_para += "corpus -runs=0" + \ 813 " -max_len=" + cfg_list[0] + \ 814 " -max_total_time=" + cfg_list[1] + \ 815 " -rss_limit_mb=" + cfg_list[2] 816 else: 817 test_para += "corpus -max_len=" + cfg_list[0] + \ 818 " -max_total_time=" + cfg_list[1] + \ 819 " -rss_limit_mb=" + cfg_list[2] 820 821 return test_para 822 823 824@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test) 825class JSUnitTestDriver(IDriver): 826 """ 827 JSUnitTestDriver is a Test that runs a native test package on given device. 828 """ 829 830 def __init__(self): 831 self.config = None 832 self.result = "" 833 self.start_time = None 834 self.ability_name = "" 835 self.package_name = "" 836 # log 837 self.hilog = None 838 self.hilog_proc = None 839 840 def __check_environment__(self, device_options): 841 pass 842 843 def __check_config__(self, config): 844 pass 845 846 def __result__(self): 847 return self.result if os.path.exists(self.result) else "" 848 849 def __execute__(self, request): 850 try: 851 LOG.info("developer_test driver") 852 self.config = request.config 853 self.config.target_test_path = DEFAULT_TEST_PATH 854 self.config.device = request.config.environment.devices[0] 855 856 suite_file = request.root.source.source_file 857 result_save_path = get_result_savepath(suite_file, self.config.report_path) 858 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 859 if not suite_file: 860 LOG.error("test source '%s' not exists" % 861 request.root.source.source_string) 862 return 863 864 if not self.config.device: 865 result = ResultManager(suite_file, self.config) 866 result.set_is_coverage(False) 867 result.make_empty_result_file( 868 "No test device is found") 869 return 870 871 package_name, ability_name = self._get_package_and_ability_name( 872 suite_file) 873 self.package_name = package_name 874 self.ability_name = ability_name 875 self.config.test_hap_out_path = \ 876 "/data/data/%s/files/" % self.package_name 877 self.config.device.connector_command("shell hilog -r") 878 879 self.hilog = get_device_log_file( 880 request.config.report_path, 881 request.config.device.__get_serial__() + "_" + request. 882 get_module_name(), 883 "device_hilog") 884 885 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 886 0o755) 887 888 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 889 self.config.device.device_log_collector.add_log_address(None, self.hilog) 890 _, self.hilog_proc = self.config.device.device_log_collector.\ 891 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 892 self._init_jsunit_test() 893 self._run_jsunit(suite_file, self.hilog) 894 hilog_file_pipe.flush() 895 self.generate_console_output(self.hilog, request) 896 xml_path = os.path.join( 897 request.config.report_path, "result", 898 '.'.join((request.get_module_name(), "xml"))) 899 shutil.move(xml_path, self.result) 900 finally: 901 self.config.device.device_log_collector.remove_log_address(None, self.hilog) 902 self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc) 903 update_xml(request.root.source.source_file, self.result) 904 905 @staticmethod 906 def _get_acts_test_para(testcase, 907 testlevel, 908 testtype, 909 target_test_path, 910 suite_file, 911 filename): 912 if "actstest" == testtype[0]: 913 test_para = (" --actstest_out_format=json" 914 " --actstest_out=%s%s.json") % ( 915 target_test_path, filename) 916 return test_para 917 918 if "" != testcase and "" == testlevel: 919 test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase) 920 elif "" == testcase and "" != testlevel: 921 level_para = get_level_para_string(testlevel) 922 test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para) 923 else: 924 test_para = "" 925 return test_para 926 927 @staticmethod 928 def _get_hats_test_para(testcase, 929 testlevel, 930 testtype, 931 target_test_path, 932 suite_file, 933 filename): 934 if "hatstest" == testtype[0]: 935 test_hats_para = (" --hatstest_out_format=json" 936 " --hatstest_out=%s%s.json") % ( 937 target_test_path, filename) 938 return test_hats_para 939 940 if "" != testcase and "" == testlevel: 941 test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 942 elif "" == testcase and "" != testlevel: 943 level_para = get_level_para_string(testlevel) 944 test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 945 else: 946 test_hats_para = "" 947 return test_hats_para 948 949 @classmethod 950 def _get_json_shell_timeout(cls, json_filepath): 951 test_timeout = 0 952 try: 953 with open(json_filepath, 'r') as json_file: 954 data_dic = json.load(json_file) 955 if not data_dic: 956 return test_timeout 957 else: 958 if "driver" in data_dic.keys(): 959 driver_dict = data_dic.get("driver") 960 if driver_dict and "test-timeout" in driver_dict.keys(): 961 test_timeout = int(driver_dict["shell-timeout"]) / 1000 962 return test_timeout 963 except JSONDecodeError: 964 return test_timeout 965 finally: 966 print(" get json shell timeout finally") 967 968 @staticmethod 969 def _get_package_and_ability_name(hap_filepath): 970 package_name = "" 971 ability_name = "" 972 if os.path.exists(hap_filepath): 973 filename = os.path.basename(hap_filepath) 974 975 # unzip the hap file 976 hap_bak_path = os.path.abspath(os.path.join( 977 os.path.dirname(hap_filepath), 978 "%s.bak" % filename)) 979 zf_desc = zipfile.ZipFile(hap_filepath) 980 try: 981 zf_desc.extractall(path=hap_bak_path) 982 except RuntimeError as error: 983 print("Unzip error: ", hap_bak_path) 984 zf_desc.close() 985 986 # verify config.json file 987 app_profile_path = os.path.join(hap_bak_path, "config.json") 988 if not os.path.exists(app_profile_path): 989 print("file %s not exist" % app_profile_path) 990 return package_name, ability_name 991 992 if os.path.isdir(app_profile_path): 993 print("%s is a folder, and not a file" % app_profile_path) 994 return package_name, ability_name 995 996 # get package_name and ability_name value 997 load_dict = {} 998 with open(app_profile_path, 'r') as load_f: 999 load_dict = json.load(load_f) 1000 profile_list = load_dict.values() 1001 for profile in profile_list: 1002 package_name = profile.get("package") 1003 if not package_name: 1004 continue 1005 abilities = profile.get("abilities") 1006 for abilitie in abilities: 1007 abilities_name = abilitie.get("name") 1008 if abilities_name.startswith("."): 1009 ability_name = package_name + abilities_name[ 1010 abilities_name.find("."):] 1011 else: 1012 ability_name = abilities_name 1013 break 1014 break 1015 1016 # delete hap_bak_path 1017 if os.path.exists(hap_bak_path): 1018 shutil.rmtree(hap_bak_path) 1019 else: 1020 print("file %s not exist" % hap_filepath) 1021 return package_name, ability_name 1022 1023 def generate_console_output(self, device_log_file, request): 1024 result_message = self.read_device_log(device_log_file) 1025 1026 report_name = request.get_module_name() 1027 parsers = get_plugin( 1028 Plugin.PARSER, CommonParserType.jsunit) 1029 if parsers: 1030 parsers = parsers[:1] 1031 for listener in request.listeners: 1032 listener.device_sn = self.config.device.device_sn 1033 parser_instances = [] 1034 1035 for parser in parsers: 1036 parser_instance = parser.__class__() 1037 parser_instance.suites_name = report_name 1038 parser_instance.suite_name = report_name 1039 parser_instance.listeners = request.listeners 1040 parser_instances.append(parser_instance) 1041 handler = ShellHandler(parser_instances) 1042 process_command_ret(result_message, handler) 1043 1044 def read_device_log(self, device_log_file): 1045 result_message = "" 1046 with open(device_log_file, "r", encoding='utf-8', 1047 errors='ignore') as file_read_pipe: 1048 while True: 1049 data = file_read_pipe.readline() 1050 if not data: 1051 break 1052 # only filter JSApp log 1053 if data.lower().find(_ACE_LOG_MARKER) != -1: 1054 result_message += data 1055 if data.find("[end] run suites end") != -1: 1056 break 1057 return result_message 1058 1059 def start_hap_execute(self): 1060 try: 1061 command = "aa start -d 123 -a %s.MainAbility -b %s" \ 1062 % (self.package_name, self.package_name) 1063 self.start_time = time.time() 1064 result_value = self.config.device.execute_shell_command( 1065 command, timeout=TIME_OUT) 1066 1067 if "success" in str(result_value).lower(): 1068 LOG.info("execute %s's testcase success. result value=%s" 1069 % (self.package_name, result_value)) 1070 else: 1071 LOG.info("execute %s's testcase failed. result value=%s" 1072 % (self.package_name, result_value)) 1073 1074 _sleep_according_to_result(result_value) 1075 return_message = result_value 1076 except (ExecuteTerminate, DeviceError) as exception: 1077 return_message = exception.args 1078 1079 return return_message 1080 1081 def _init_jsunit_test(self): 1082 self.config.device.connector_command("target mount") 1083 self.config.device.execute_shell_command( 1084 "rm -rf %s" % self.config.target_test_path) 1085 self.config.device.execute_shell_command( 1086 "mkdir -p %s" % self.config.target_test_path) 1087 self.config.device.execute_shell_command( 1088 "mount -o rw,remount,rw /") 1089 1090 def _run_jsunit(self, suite_file, device_log_file): 1091 filename = os.path.basename(suite_file) 1092 _, suffix_name = os.path.splitext(filename) 1093 1094 resource_manager = ResourceManager() 1095 resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file) 1096 if suffix_name == ".hap": 1097 json_file_path = suite_file.replace(".hap", ".json") 1098 if os.path.exists(json_file_path): 1099 timeout = self._get_json_shell_timeout(json_file_path) 1100 else: 1101 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1102 else: 1103 timeout = ResourceManager.get_nodeattrib_data(resource_data_dic) 1104 resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device) 1105 main_result = self._install_hap(suite_file) 1106 result = ResultManager(suite_file, self.config) 1107 if main_result: 1108 self._execute_hapfile_jsunittest() 1109 try: 1110 status = False 1111 actiontime = JS_TIMEOUT 1112 times = CYCLE_TIMES 1113 if timeout: 1114 actiontime = timeout 1115 times = 1 1116 with open(device_log_file, "r", encoding='utf-8', 1117 errors='ignore') as file_read_pipe: 1118 for i in range(0, times): 1119 if status: 1120 break 1121 else: 1122 time.sleep(float(actiontime)) 1123 start_time = int(time.time()) 1124 while True: 1125 data = file_read_pipe.readline() 1126 if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1: 1127 LOG.info("execute testcase successfully.") 1128 status = True 1129 break 1130 if int(time.time()) - start_time > 5: 1131 break 1132 finally: 1133 _lock_screen(self.config.device) 1134 self._uninstall_hap(self.package_name) 1135 else: 1136 self.result = result.get_test_results("Error: install hap failed") 1137 LOG.error("Error: install hap failed") 1138 1139 resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device) 1140 1141 def _execute_hapfile_jsunittest(self): 1142 _unlock_screen(self.config.device) 1143 _unlock_device(self.config.device) 1144 1145 try: 1146 return_message = self.start_hap_execute() 1147 except (ExecuteTerminate, DeviceError) as exception: 1148 return_message = str(exception.args) 1149 1150 return return_message 1151 1152 def _install_hap(self, suite_file): 1153 message = self.config.device.connector_command("install %s" % suite_file) 1154 message = str(message).rstrip() 1155 if message == "" or "success" in message: 1156 return_code = True 1157 if message != "": 1158 LOG.info(message) 1159 else: 1160 return_code = False 1161 if message != "": 1162 LOG.warning(message) 1163 1164 _sleep_according_to_result(return_code) 1165 return return_code 1166 1167 def _uninstall_hap(self, package_name): 1168 return_message = self.config.device.execute_shell_command( 1169 "bm uninstall -n %s" % package_name) 1170 _sleep_according_to_result(return_message) 1171 return return_message 1172 1173 1174@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test) 1175class OHRustTestDriver(IDriver): 1176 def __init__(self): 1177 self.result = "" 1178 self.error_message = "" 1179 self.config = None 1180 1181 def __check_environment__(self, device_options): 1182 pass 1183 1184 def __check_config__(self, config): 1185 pass 1186 1187 def __execute__(self, request): 1188 try: 1189 LOG.debug("Start to execute open harmony rust test") 1190 self.config = request.config 1191 self.config.device = request.config.environment.devices[0] 1192 self.config.target_test_path = DEFAULT_TEST_PATH 1193 suite_file = request.root.source.source_file 1194 LOG.debug("Testsuite filepath:{}".format(suite_file)) 1195 1196 if not suite_file: 1197 LOG.error("test source '{}' not exists".format( 1198 request.root.source.source_string)) 1199 return 1200 1201 result_save_path = get_result_savepath(suite_file, self.config.report_path) 1202 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 1203 self.config.device.set_device_report_path(request.config.report_path) 1204 self.config.device.device_log_collector.start_hilog_task() 1205 self._init_oh_rust() 1206 self._run_oh_rust(suite_file, request) 1207 except Exception as exception: 1208 self.error_message = exception 1209 if not getattr(exception, "error_no", ""): 1210 setattr(exception, "error_no", "03409") 1211 LOG.exception(self.error_message, exc_info=False, error_no="03409") 1212 finally: 1213 serial = "{}_{}".format(str(request.config.device.__get_serial__()), 1214 time.time_ns()) 1215 log_tar_file_name = "{}_{}".format( 1216 request.get_module_name(), str(serial).replace(":", "_")) 1217 self.config.device.device_log_collector.stop_hilog_task( 1218 log_tar_file_name, module_name=request.get_module_name()) 1219 xml_path = os.path.join( 1220 request.config.report_path, "result", 1221 '.'.join((request.get_module_name(), "xml"))) 1222 shutil.move(xml_path, self.result) 1223 self.result = check_result_report( 1224 request.config.report_path, self.result, self.error_message) 1225 update_xml(request.root.source.source_file, self.result) 1226 1227 def __result__(self): 1228 return self.result if os.path.exists(self.result) else "" 1229 1230 def _init_oh_rust(self): 1231 self.config.device.connector_command("target mount") 1232 self.config.device.execute_shell_command( 1233 "rm -rf %s" % self.config.target_test_path) 1234 self.config.device.execute_shell_command( 1235 "mkdir -p %s" % self.config.target_test_path) 1236 self.config.device.execute_shell_command( 1237 "mount -o rw,remount,rw /") 1238 1239 def _run_oh_rust(self, suite_file, request=None): 1240 self.config.device.push_file(suite_file, self.config.target_test_path) 1241 self.config.device.execute_shell_command( 1242 "hilog -d %s" % (os.path.join(self.config.target_test_path, 1243 os.path.basename(suite_file))) 1244 ) 1245 resource_manager = ResourceManager() 1246 resource_data_dict, resource_dir = \ 1247 resource_manager.get_resource_data_dic(suite_file) 1248 resource_manager.process_preparer_data(resource_data_dict, 1249 resource_dir, 1250 self.config.device) 1251 for listener in request.listeners: 1252 listener.device_sn = self.config.device.device_sn 1253 1254 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust) 1255 if parsers: 1256 parsers = parsers[:1] 1257 parser_instances = [] 1258 for parser in parsers: 1259 parser_instance = parser.__class__() 1260 parser_instance.suite_name = request.get_module_name() 1261 parser_instance.listeners = request.listeners 1262 parser_instances.append(parser_instance) 1263 handler = ShellHandler(parser_instances) 1264 if self.config.coverage: 1265 command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format( 1266 self.config.target_test_path, os.path.basename(suite_file)) 1267 else: 1268 command = "cd {}; chmod +x *; ./{}".format( 1269 self.config.target_test_path, os.path.basename(suite_file)) 1270 self.config.device.execute_shell_command( 1271 command, timeout=TIME_OUT, receiver=handler, retry=0) 1272 if self.config.coverage: 1273 result = ResultManager(suite_file, self.config) 1274 result.obtain_coverage_data() 1275 resource_manager.process_cleaner_data(resource_data_dict, resource_dir, 1276 self.config.device) 1277