1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2024 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import json 20import os 21import shutil 22import stat 23import time 24 25from xdevice import ParamError 26from xdevice import get_device_log_file 27from xdevice import check_result_report 28from xdevice import get_kit_instances 29from xdevice import do_module_kit_setup 30from xdevice import do_module_kit_teardown 31from xdevice import get_config_value 32from xdevice import Plugin 33from xdevice import DeviceTestType 34from xdevice import IDriver 35from xdevice import get_plugin 36from xdevice import CommonParserType 37from xdevice import ShellHandler 38from xdevice import ConfigConst 39from xdevice import JsonParser 40from xdevice import TestDescription 41from xdevice import platform_logger 42 43from ohos.constants import CKit 44from ohos.executor.listener import CollectingPassListener 45from core.driver.drivers import update_xml 46from core.driver.drivers import get_result_savepath 47 48__all__ = ["OHJSUnitTestDriver", "oh_jsunit_para_parse"] 49 50LOG = platform_logger("Drivers") 51TIME_OUT = 300 * 1000 52 53 54def oh_jsunit_para_parse(runner, junit_paras): 55 junit_paras = dict(junit_paras) 56 test_type_list = ["function", "performance", "reliability", "security"] 57 size_list = ["small", "medium", "large"] 58 level_list = ["0", "1", "2", "3"] 59 for para_name in junit_paras.keys(): 60 para_name = para_name.strip() 61 para_values = junit_paras.get(para_name, []) 62 if para_name == "class": 63 runner.add_arg(para_name, ",".join(para_values)) 64 elif para_name == "notClass": 65 runner.add_arg(para_name, ",".join(para_values)) 66 elif para_name == "testType": 67 if para_values[0] not in test_type_list: 68 continue 69 # function/performance/reliability/security 70 runner.add_arg(para_name, para_values[0]) 71 elif para_name == "size": 72 if para_values[0] not in size_list: 73 continue 74 # size small/medium/large 75 runner.add_arg(para_name, para_values[0]) 76 elif para_name == "level": 77 if para_values[0] not in level_list: 78 continue 79 # 0/1/2/3/4 80 runner.add_arg(para_name, para_values[0]) 81 elif para_name == "stress": 82 runner.add_arg(para_name, para_values[0]) 83 84 85class OHJSUnitTestRunner: 86 MAX_RETRY_TIMES = 3 87 88 def __init__(self, config): 89 self.arg_list = {} 90 self.suites_name = None 91 self.config = config 92 self.rerun_attemp = 3 93 self.suite_recorder = {} 94 self.finished = False 95 self.expect_tests_dict = dict() 96 self.finished_observer = None 97 self.retry_times = 1 98 self.compile_mode = "" 99 100 def dry_run(self): 101 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit_list) 102 if parsers: 103 parsers = parsers[:1] 104 parser_instances = [] 105 for parser in parsers: 106 parser_instance = parser.__class__() 107 parser_instances.append(parser_instance) 108 handler = ShellHandler(parser_instances) 109 command = self._get_dry_run_command() 110 self.config.device.execute_shell_command( 111 command, timeout=self.config.timeout, receiver=handler, retry=0) 112 self.expect_tests_dict = parser_instances[0].tests_dict 113 return parser_instances[0].tests 114 115 def run(self, listener): 116 handler = self._get_shell_handler(listener) 117 command = self._get_run_command() 118 self.config.device.execute_shell_command( 119 command, timeout=self.config.timeout, receiver=handler, retry=0) 120 121 def notify_finished(self): 122 if self.finished_observer: 123 self.finished_observer.notify_task_finished() 124 self.retry_times -= 1 125 126 def get_oh_test_runner_path(self): 127 if self.compile_mode == "esmodule": 128 return "/ets/testrunner/OpenHarmonyTestRunner" 129 else: 130 return "OpenHarmonyTestRunner" 131 132 def add_arg(self, name, value): 133 if not name or not value: 134 return 135 self.arg_list[name] = value 136 137 def remove_arg(self, name): 138 if not name: 139 return 140 if name in self.arg_list: 141 del self.arg_list[name] 142 143 def get_args_command(self): 144 args_commands = "" 145 for key, value in self.arg_list.items(): 146 if "wait_time" == key: 147 args_commands = "%s -w %s " % (args_commands, value) 148 else: 149 args_commands = "%s -s %s %s " % (args_commands, key, value) 150 return args_commands 151 152 def _get_shell_handler(self, listener): 153 parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_jsunit) 154 if parsers: 155 parsers = parsers[:1] 156 parser_instances = [] 157 for parser in parsers: 158 parser_instance = parser.__class__() 159 parser_instance.suites_name = self.suites_name 160 parser_instance.listeners = listener 161 parser_instance.runner = self 162 parser_instances.append(parser_instance) 163 self.finished_observer = parser_instance 164 handler = ShellHandler(parser_instances) 165 return handler 166 167 def _get_run_command(self): 168 command = "" 169 if self.config.package_name: 170 # aa test -p ${packageName} -b ${bundleName}-s 171 # unittest OpenHarmonyTestRunner 172 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 173 " {}".format(self.config.package_name, 174 self.config.bundle_name, 175 self.get_args_command()) 176 elif self.config.module_name: 177 # aa test -m ${moduleName} -b ${bundleName} 178 # -s unittest OpenHarmonyTestRunner 179 command = "aa test -m {} -b {} -s unittest {} {}".format( 180 self.config.module_name, self.config.bundle_name, 181 self.get_oh_test_runner_path(), self.get_args_command()) 182 return command 183 184 def _get_dry_run_command(self): 185 command = "" 186 if self.config.package_name: 187 command = "aa test -p {} -b {} -s unittest OpenHarmonyTestRunner" \ 188 " {} -s dryRun true".format(self.config.package_name, 189 self.config.bundle_name, 190 self.get_args_command()) 191 elif self.config.module_name: 192 command = "aa test -m {} -b {} -s unittest {}" \ 193 " {} -s dryRun true".format(self.config.module_name, 194 self.config.bundle_name, 195 self.get_oh_test_runner_path(), 196 self.get_args_command()) 197 198 return command 199 200 201@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_jsunit_test) 202class OHJSUnitTestDriver(IDriver): 203 """ 204 OHJSUnitTestDriver is a Test that runs a native test package on 205 given device. 206 """ 207 208 def __init__(self): 209 self.timeout = 80 * 1000 210 self.start_time = None 211 self.result = "" 212 self.error_message = "" 213 self.kits = [] 214 self.config = None 215 self.runner = None 216 self.rerun = True 217 self.rerun_all = True 218 # log 219 self.device_log = None 220 self.hilog = None 221 self.log_proc = None 222 self.hilog_proc = None 223 224 def __check_environment__(self, device_options): 225 pass 226 227 def __check_config__(self, config): 228 pass 229 230 def __execute__(self, request): 231 try: 232 LOG.debug("Developer_test Start execute OpenHarmony JSUnitTest") 233 self.config = request.config 234 self.config.device = request.config.environment.devices[0] 235 236 config_file = request.root.source.config_file 237 suite_file = request.root.source.source_file 238 result_save_path = get_result_savepath(suite_file, self.config.report_path) 239 self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name()) 240 if not suite_file: 241 raise ParamError( 242 "test source '%s' not exists" % 243 request.root.source.source_string, error_no="00110") 244 LOG.debug("Test case file path: %s" % suite_file) 245 self.config.device.set_device_report_path(request.config.report_path) 246 self.hilog = get_device_log_file(request.config.report_path, 247 request.config.device.__get_serial__() + "_" + request. 248 get_module_name(), 249 "device_hilog") 250 251 hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND, 252 0o755) 253 self.config.device.device_log_collector.add_log_address(self.device_log, self.hilog) 254 self.config.device.execute_shell_command(command="hilog -r") 255 with os.fdopen(hilog_open, "a") as hilog_file_pipe: 256 if hasattr(self.config, ConfigConst.device_log) \ 257 and self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 258 and hasattr(self.config.device, "clear_crash_log"): 259 self.config.device.device_log_collector.clear_crash_log() 260 self.log_proc, self.hilog_proc = self.config.device.device_log_collector.\ 261 start_catch_device_log(hilog_file_pipe=hilog_file_pipe) 262 self._run_oh_jsunit(config_file, request) 263 except Exception as exception: 264 self.error_message = exception 265 if not getattr(exception, "error_no", ""): 266 setattr(exception, "error_no", "03409") 267 LOG.exception(self.error_message, exc_info=True, error_no="03409") 268 raise exception 269 finally: 270 try: 271 self._handle_logs(request) 272 finally: 273 xml_path = os.path.join( 274 request.config.report_path, "result", 275 '.'.join((request.get_module_name(), "xml"))) 276 shutil.move(xml_path, self.result) 277 self.result = check_result_report( 278 request.config.report_path, self.result, self.error_message) 279 update_xml(request.root.source.source_file, self.result) 280 281 def __result__(self): 282 return self.result if os.path.exists(self.result) else "" 283 284 def _run_oh_jsunit(self, config_file, request): 285 try: 286 if not os.path.exists(config_file): 287 LOG.error("Error: Test cases don't exist %s." % config_file) 288 raise ParamError( 289 "Error: Test cases don't exist %s." % config_file, 290 error_no="00102") 291 json_config = JsonParser(config_file) 292 self.kits = get_kit_instances(json_config, 293 self.config.resource_path, 294 self.config.testcases_path) 295 296 self._get_driver_config(json_config) 297 self.config.device.connector_command("target mount") 298 self._start_smart_perf() 299 do_module_kit_setup(request, self.kits) 300 self.runner = OHJSUnitTestRunner(self.config) 301 self.runner.suites_name = request.get_module_name() 302 self._get_runner_config(json_config) 303 if hasattr(self.config, "history_report_path") and \ 304 self.config.testargs.get("test"): 305 self._do_test_retry(request.listeners, self.config.testargs) 306 else: 307 if self.rerun: 308 self.runner.retry_times = self.runner.MAX_RETRY_TIMES 309 # execute test case 310 self._do_tf_suite() 311 self._make_exclude_list_file(request) 312 oh_jsunit_para_parse(self.runner, self.config.testargs) 313 self._do_test_run(listener=request.listeners) 314 315 finally: 316 do_module_kit_teardown(request) 317 318 def _get_driver_config(self, json_config): 319 package = get_config_value('package-name', 320 json_config.get_driver(), False) 321 module = get_config_value('module-name', 322 json_config.get_driver(), False) 323 bundle = get_config_value('bundle-name', 324 json_config. get_driver(), False) 325 is_rerun = get_config_value('rerun', json_config.get_driver(), False) 326 327 self.config.package_name = package 328 self.config.module_name = module 329 self.config.bundle_name = bundle 330 self.rerun = True if is_rerun == 'true' else False 331 332 if not package and not module: 333 raise ParamError("Neither package nor module is found" 334 " in config file.", error_no="03201") 335 timeout_config = get_config_value("shell-timeout", 336 json_config.get_driver(), False) 337 if timeout_config: 338 self.config.timeout = int(timeout_config) 339 else: 340 self.config.timeout = TIME_OUT 341 342 def _get_runner_config(self, json_config): 343 test_timeout = get_config_value('test-timeout', 344 json_config.get_driver(), False) 345 if test_timeout: 346 self.runner.add_arg("wait_time", int(test_timeout)) 347 348 testcase_timeout = get_config_value('testcase-timeout', 349 json_config.get_driver(), False) 350 if testcase_timeout: 351 self.runner.add_arg("timeout", int(testcase_timeout)) 352 self.runner.compile_mode = get_config_value( 353 'compile-mode', json_config.get_driver(), False) 354 355 def _do_test_run(self, listener): 356 test_to_run = self._collect_test_to_run() 357 LOG.info("Collected suite count is: {}, test count is: {}". 358 format(len(self.runner.expect_tests_dict.keys()), 359 len(test_to_run) if test_to_run else 0)) 360 if not test_to_run or not self.rerun: 361 self.runner.run(listener) 362 self.runner.notify_finished() 363 else: 364 self._run_with_rerun(listener, test_to_run) 365 366 def _collect_test_to_run(self): 367 run_results = self.runner.dry_run() 368 return run_results 369 370 def _run_tests(self, listener): 371 test_tracker = CollectingPassListener() 372 listener_copy = listener.copy() 373 listener_copy.append(test_tracker) 374 self.runner.run(listener_copy) 375 test_run = test_tracker.get_current_run_results() 376 return test_run 377 378 def _run_with_rerun(self, listener, expected_tests): 379 LOG.debug("Developer_test Ready to run with rerun, expect run: %s" 380 % len(expected_tests)) 381 test_run = self._run_tests(listener) 382 self.runner.retry_times -= 1 383 LOG.debug("Run with rerun, has run: %s" % len(test_run) 384 if test_run else 0) 385 if len(test_run) < len(expected_tests): 386 expected_tests = TestDescription.remove_test(expected_tests, 387 test_run) 388 if not expected_tests: 389 LOG.debug("No tests to re-run twice,please check") 390 self.runner.notify_finished() 391 else: 392 self._rerun_twice(expected_tests, listener) 393 else: 394 LOG.debug("Rerun once success") 395 self.runner.notify_finished() 396 397 def _rerun_twice(self, expected_tests, listener): 398 tests = [] 399 for test in expected_tests: 400 tests.append("%s#%s" % (test.class_name, test.test_name)) 401 self.runner.add_arg("class", ",".join(tests)) 402 LOG.debug("Ready to rerun twice, expect run: %s" % len(expected_tests)) 403 test_run = self._run_tests(listener) 404 self.runner.retry_times -= 1 405 LOG.debug("Rerun twice, has run: %s" % len(test_run)) 406 if len(test_run) < len(expected_tests): 407 expected_tests = TestDescription.remove_test(expected_tests, 408 test_run) 409 if not expected_tests: 410 LOG.debug("No tests to re-run third,please check") 411 self.runner.notify_finished() 412 else: 413 self._rerun_third(expected_tests, listener) 414 else: 415 LOG.debug("Rerun twice success") 416 self.runner.notify_finished() 417 418 def _rerun_third(self, expected_tests, listener): 419 tests = [] 420 for test in expected_tests: 421 tests.append("%s#%s" % (test.class_name, test.test_name)) 422 self.runner.add_arg("class", ",".join(tests)) 423 LOG.debug("Rerun to rerun third, expect run: %s" % len(expected_tests)) 424 self._run_tests(listener) 425 LOG.debug("Rerun third success") 426 self.runner.notify_finished() 427 428 def _make_exclude_list_file(self, request): 429 if "all-test-file-exclude-filter" in self.config.testargs: 430 json_file_list = self.config.testargs.get( 431 "all-test-file-exclude-filter") 432 self.config.testargs.pop("all-test-file-exclude-filter") 433 if not json_file_list: 434 LOG.warning("all-test-file-exclude-filter value is empty!") 435 else: 436 if not os.path.isfile(json_file_list[0]): 437 LOG.warning( 438 "[{}] is not a valid file".format(json_file_list[0])) 439 return 440 file_open = os.open(json_file_list[0], os.O_RDONLY, 441 stat.S_IWUSR | stat.S_IRUSR) 442 with os.fdopen(file_open, "r") as file_handler: 443 json_data = json.load(file_handler) 444 exclude_list = json_data.get( 445 DeviceTestType.oh_jsunit_test, []) 446 filter_list = [] 447 for exclude in exclude_list: 448 if request.get_module_name() not in exclude: 449 continue 450 filter_list.extend(exclude.get(request.get_module_name())) 451 if not isinstance(self.config.testargs, dict): 452 return 453 if 'notClass' in self.config.testargs.keys(): 454 filter_list.extend(self.config.testargs.get('notClass', [])) 455 self.config.testargs.update({"notClass": filter_list}) 456 457 def _do_test_retry(self, listener, testargs): 458 tests_dict = dict() 459 case_list = list() 460 for test in testargs.get("test"): 461 test_item = test.split("#") 462 if len(test_item) != 2: 463 continue 464 case_list.append(test) 465 if test_item[0] not in tests_dict: 466 tests_dict.update({test_item[0] : []}) 467 tests_dict.get(test_item[0]).append( 468 TestDescription(test_item[0], test_item[1])) 469 self.runner.add_arg("class", ",".join(case_list)) 470 self.runner.expect_tests_dict = tests_dict 471 self.config.testargs.pop("test") 472 self.runner.run(listener) 473 self.runner.notify_finished() 474 475 def _do_tf_suite(self): 476 if hasattr(self.config, "tf_suite") and \ 477 self.config.tf_suite.get("cases", []): 478 case_list = self.config["tf_suite"]["cases"] 479 self.config.testargs.update({"class": case_list}) 480 481 def _start_smart_perf(self): 482 if not hasattr(self.config, ConfigConst.kits_in_module): 483 return 484 if CKit.smartperf not in self.config.get(ConfigConst.kits_in_module): 485 return 486 sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0] 487 sp_kits.target_name = self.config.bundle_name 488 param_config = self.config.get(ConfigConst.kits_params).get( 489 CKit.smartperf, "") 490 sp_kits.__check_config__(param_config) 491 self.kits.insert(0, sp_kits) 492 493 def _handle_logs(self, request): 494 serial = "{}_{}".format(str(self.config.device.__get_serial__()), time.time_ns()) 495 log_tar_file_name = "{}".format(str(serial).replace(":", "_")) 496 if hasattr(self.config, ConfigConst.device_log) and \ 497 self.config.device_log.get(ConfigConst.tag_enable) == ConfigConst.device_log_on \ 498 and hasattr(self.config.device, "start_get_crash_log"): 499 self.config.device.device_log_collector.\ 500 start_get_crash_log(log_tar_file_name, module_name=request.get_module_name()) 501 self.config.device.device_log_collector.\ 502 remove_log_address(self.device_log, self.hilog) 503 self.config.device.device_log_collector.\ 504 stop_catch_device_log(self.log_proc) 505 self.config.device.device_log_collector.\ 506 stop_catch_device_log(self.hilog_proc) 507