1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import sys 22import traceback 23from typing import Union 24 25from xdevice import calculate_elapsed_time 26from xdevice import check_result_report 27from xdevice import StateRecorder 28from xdevice import LifeCycle 29from xdevice import ResultCode 30from xdevice import get_cst_time 31from xdevice import platform_logger 32from xdevice import EnvPool 33from xdevice import CaseEnd 34from xdevice import Binder 35 36from devicetest.runner.prepare import PrepareHandler 37from devicetest.core.constants import RunResult 38from devicetest.utils.util import clean_sys_resource 39from devicetest.utils.util import get_base_name 40from devicetest.utils.util import get_dir_path 41from devicetest.utils.util import import_from_file 42from devicetest.core.variables import DeccVariable 43from devicetest.core.variables import ProjectVariables 44from devicetest.core.variables import CurCase 45from devicetest.core.exception import DeviceTestError 46from devicetest.core.test_case import DeviceRoot 47from devicetest.core.test_case import BaseCase 48from devicetest.error import ErrorMessage 49from devicetest.log.logger import DeviceTestLog as Log 50from devicetest.report.generation import add_log_caching_handler 51from devicetest.report.generation import del_log_caching_handler 52from devicetest.report.generation import get_caching_logs 53from devicetest.report.generation import generate_report 54 55 56class RunnerMode: 57 PIPELINE = "pipeline" 58 DEBUG = "debug" 59 60 61class TestRunner: 62 """executes test cases and 63 """ 64 65 def __init__(self): 66 self.run_mode = RunnerMode.PIPELINE 67 self.run_list = None 68 self.no_run_list = None 69 self.running = None 70 self.configs = None 71 self.devices = None 72 self.log = Log 73 self.start_time = None 74 self.test_results = None 75 self.upload_result_handler = None 76 self.project = None 77 self.prepare = None 78 self.cur_case = None 79 self._repeat = 1 80 self._repeat_round = 1 81 82 def init_pipeline_runner(self, run_list, configs, devices, upload_result_handler): 83 self.run_list = run_list 84 self.no_run_list = copy.copy(self.run_list) 85 self.running = False 86 self.configs = configs 87 self.devices = devices 88 self.start_time = get_cst_time() 89 self.test_results = [] 90 self.upload_result_handler = upload_result_handler 91 self.project = ProjectVariables(self.log) 92 self.prepare = None 93 self.__init_project_variables() 94 self.run_mode = RunnerMode.PIPELINE 95 self._repeat = self.configs.get("request").config.repeat 96 self._repeat_round = self.configs.get("request").get_repeat_round() 97 98 def init_case_runner(self, run_list: Union[BaseCase, list]): 99 # simple case runner 100 self.run_list = run_list 101 self.run_mode = RunnerMode.DEBUG 102 self.log = platform_logger("TestRunner") 103 104 def __init_project_variables(self): 105 """ 106 testargs:为xDevice透传过来的数据,用户调用CONFIG可获取 107 :return: 108 """ 109 self.log.debug("configs:{}".format(self.configs)) 110 testcases_path = self.configs.get('testcases_path', "") 111 testargs = self.configs.get("testargs", {}) 112 self.__flash_run_list(testargs) 113 114 self.cur_case = CurCase(self.log) 115 self.project.set_project_path() 116 self.project.set_testcase_path(testcases_path) 117 self.project.set_task_report_dir(self.configs.get("report_path")) 118 self.project.set_resource_path(self.get_local_resource_path()) 119 120 def get_local_resource_path(self): 121 local_resource_path = os.path.join( 122 self.project.project_path, "testcases", "DeviceTest", "resource") 123 return local_resource_path 124 125 def get_local_aw_path(self): 126 local_aw_path = os.path.join( 127 self.project.project_path, "testcases", "DeviceTest", "aw") 128 return local_aw_path 129 130 def __flash_run_list(self, testargs): 131 """ 132 retry 场景更新run list 133 :param testargs: 134 :return: 135 """ 136 get_test = testargs.get("test") 137 self.log.info("get test:{}".format(get_test)) 138 retry_test_list = self.parse_retry_test_list(get_test) 139 if retry_test_list is not None: 140 self.run_list = retry_test_list 141 self.no_run_list = copy.copy(self.run_list) 142 self.log.info("retry test list:{}".format(retry_test_list)) 143 144 def parse_retry_test_list(self, retry_test_list): 145 if retry_test_list is None: 146 return None 147 elif not isinstance(retry_test_list, list): 148 err_msg = ErrorMessage.TestCase.Code_0203005 149 self.log.error(err_msg) 150 raise DeviceTestError(err_msg) 151 152 elif len(retry_test_list) == 1 and "#" not in str(retry_test_list[0]): 153 return None 154 else: 155 history_case_list = [] 156 history_case_dict = dict() 157 retry_case_list = [] 158 for abd_file_path in self.run_list: 159 base_file_name = get_base_name(abd_file_path) 160 if base_file_name not in history_case_dict.keys(): 161 history_case_dict.update({base_file_name: []}) 162 history_case_dict.get(base_file_name).append(abd_file_path) 163 history_case_list.append(base_file_name) 164 self.log.debug("history case list:{}".format(history_case_list)) 165 166 for _value in retry_test_list: 167 case_id = str(_value).split("#")[0] 168 if case_id in history_case_dict.keys(): 169 retry_case_list.append(history_case_dict.get(case_id)[0]) 170 return retry_case_list 171 172 def parse_config(self, test_configs): 173 pass 174 175 def add_value_to_configs(self): 176 self.configs["log"] = self.log 177 self.configs["devices"] = self.devices 178 self.configs["project"] = self.project 179 180 def run(self): 181 self._pipeline_run() 182 183 def _pipeline_run(self): 184 self.running = True 185 aw_path = self.add_aw_path_to_sys(self.project.aw_path) 186 self.log.info("Executing run list {}.".format(self.run_list)) 187 188 self.add_value_to_configs() 189 190 self.prepare = PrepareHandler(self.log, self.cur_case, 191 self.project, self.configs, 192 self.devices, self.run_list) 193 # **********混合root和非root************** 194 try: 195 for device in self.devices: 196 if hasattr(device, "is_root"): 197 DeviceRoot.is_root_device = device.is_root 198 self.log.debug(DeviceRoot.is_root_device) 199 setattr(device, "is_device_root", DeviceRoot.is_root_device) 200 201 except Exception as _: 202 self.log.error('set branch api error.') 203 # **************混合root和非root end********************** 204 self.prepare.run_prepare() 205 206 for test_cls_name in self.run_list: 207 case_name = get_base_name(test_cls_name) 208 if self.project.record.is_shutdown(raise_exception=False): 209 break 210 self.log.info("Executing test class {}".format(test_cls_name)) 211 self.project.execute_case_name = case_name 212 self.run_test_class(test_cls_name, case_name) 213 self.prepare.run_prepare(is_teardown=True) 214 clean_sys_resource(file_path=aw_path) 215 DeccVariable.reset() 216 217 def add_aw_path_to_sys(self, aw_path): 218 219 sys_aw_path = os.path.dirname(aw_path) 220 if os.path.exists(sys_aw_path): 221 sys.path.insert(1, sys_aw_path) 222 self.log.info("add {} to sys path.".format(sys_aw_path)) 223 return sys_aw_path 224 return None 225 226 def run_test_class(self, case_path, case_name): 227 """Instantiates and executes a test class. 228 If the test cases list is not None, all the test cases in the test 229 class should be executed. 230 Args: 231 case_path: case path 232 case_name: case name 233 Returns: 234 A tuple, with the number of cases passed at index 0, and the total 235 number of test cases at index 1. 236 """ 237 # 开始收集日志 238 case_log_buffer_hdl = add_log_caching_handler() 239 240 tests = "__init__" 241 case_result = RunResult.FAILED 242 start_time = get_cst_time() 243 case_dir_path = get_dir_path(case_path) 244 test_cls_instance = None 245 246 # 用例测试结果的拓展内容 247 result_content = None 248 try: 249 self.project.cur_case_full_path = case_path 250 DeccVariable.set_cur_case_obj(self.cur_case) 251 test_cls = import_from_file(case_dir_path, case_name) 252 self.log.info("Success to import {}.".format(case_name)) 253 with test_cls(self.configs) as test_cls_instance: 254 self.cur_case.set_case_instance(test_cls_instance) 255 test_cls_instance.run() 256 257 tests = test_cls_instance.tests 258 start_time = test_cls_instance.start_time 259 260 case_result = test_cls_instance.result 261 error_msg = test_cls_instance.error_msg 262 result_content = getattr(test_cls_instance, "result_content", None) 263 except ImportError as exception: 264 error_msg = str(exception) 265 self.log.error(error_msg) 266 self.log.error(traceback.format_exc()) 267 except Exception as exception: 268 error_msg = ErrorMessage.TestCase.Code_0203002.format(exception) 269 self.log.error(error_msg) 270 self.log.error(traceback.format_exc()) 271 if test_cls_instance: 272 try: 273 del test_cls_instance 274 self.log.debug("del test_cls_instance success.") 275 except Exception as exception: 276 self.log.warning("del test_cls_instance exception. {}".format(exception)) 277 278 Binder.notify_stage(CaseEnd(case_name, case_result, error_msg)) 279 280 end_time = get_cst_time() 281 environment = self.configs.get("request").config.environment 282 steps = self.cur_case.get_steps_info() 283 # 停止收集日志 284 del_log_caching_handler(case_log_buffer_hdl) 285 # 生成报告 286 case_info = { 287 "name": case_name, 288 "result": case_result, 289 "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"), 290 "end": end_time.strftime("%Y-%m-%d %H:%M:%S"), 291 'elapsed': calculate_elapsed_time(start_time, end_time), 292 "error": error_msg, 293 "logs": "", 294 "devices": [] if environment is None else environment.get_description(), 295 "steps": steps 296 } 297 log_content = { 298 "content": get_caching_logs(case_log_buffer_hdl) 299 } 300 round_folder = f"round{self._repeat_round}" if self._repeat > 1 else "" 301 report_path = os.path.join("details", round_folder, case_name + ".html") 302 to_file = os.path.join(self.project.task_report_dir, report_path) 303 generate_report(to_file, case=case_info, logs=log_content) 304 steps.clear() 305 del case_log_buffer_hdl 306 self.cur_case.set_case_instance(None) 307 self.record_current_case_result( 308 case_name, tests, case_result, start_time, error_msg, report_path, 309 result_content=result_content) 310 return case_result, error_msg 311 312 def record_current_case_result(self, case_name, tests, case_result, 313 start_time, error_msg, report, **kwargs): 314 test_result = self.record_cls_result( 315 case_name, tests, case_result, start_time, error_msg, report, **kwargs) 316 self.log.debug("test result: {}".format(test_result)) 317 self.test_results.append(test_result) 318 self.upload_result_handler.report_handler.test_results.append(test_result) 319 320 def stop(self): 321 """ 322 Releases resources from test run. Should be called right after run() 323 finishes. 324 """ 325 if self.running: 326 self.running = False 327 328 @staticmethod 329 def record_cls_result(case_name, tests_step, result, start_time, error, report, **kwargs): 330 dict_result = { 331 "case_name": case_name, 332 "tests_step": tests_step or "__init__", 333 "result": result or RunResult.FAILED, 334 "start_time": start_time or get_cst_time(), 335 "error": error, 336 "end_time": get_cst_time(), 337 "report": report 338 } 339 dict_result.update(kwargs) 340 return dict_result 341 342 343class TestSuiteRunner: 344 """ 345 executes test suite cases 346 """ 347 348 def __init__(self, suite, configs, devices): 349 self.suite = suite 350 self.running = False 351 self.configs = configs 352 self.devices = devices 353 self.log = Log 354 self.start_time = get_cst_time() 355 self.listeners = self.configs["listeners"] 356 self.state_machine = StateRecorder() 357 self.suite_name = "" 358 359 def add_value_to_configs(self): 360 self.configs["log"] = self.log 361 self.configs["devices"] = self.devices 362 self.configs["suite_name"] = self.suite_name 363 364 def run(self): 365 self.running = True 366 self.log.info("Executing test suite: {}.".format(self.suite)) 367 368 self.suite_name = get_base_name(self.suite) 369 self.add_value_to_configs() 370 self.run_test_suite(self.suite) 371 372 def run_test_suite(self, test_cls_name): 373 """Instantiates and executes a test class. 374 If the test cases list is not None, all the test cases in the test 375 class should be executed. 376 Args: 377 test_cls_name: Name of the test class to execute. 378 Returns: 379 A tuple, with the number of cases passed at index 0, and the total 380 number of test cases at index 1. 381 """ 382 suite_dir_path = get_dir_path(test_cls_name) 383 test_cls_instance = None 384 self.handle_suites_started() 385 self.handle_suite_started() 386 try: 387 test_cls = import_from_file(suite_dir_path, self.suite_name) 388 self.log.info("Success to import {}.".format(self.suite_name)) 389 self.configs["cur_suite"] = test_cls 390 with test_cls(self.configs, suite_dir_path) as test_cls_instance: 391 test_cls_instance.run() 392 393 error_msg = test_cls_instance.error_msg 394 self.handle_suite_ended(test_cls_instance) 395 except Exception as e: 396 error_msg = ErrorMessage.TestCase.Code_0203017.format(e) 397 self.log.error(error_msg) 398 self.log.error(traceback.format_exc()) 399 self.handle_suites_ended(error_msg) 400 result_path = os.path.join(self.configs["report_path"], "result") 401 report_file = os.path.join(result_path, "%s.xml" % self.suite_name) 402 os.makedirs(result_path, exist_ok=True) 403 check_result_report("", report_file, error_message=error_msg) 404 if test_cls_instance: 405 try: 406 del test_cls_instance 407 self.log.debug("del test suite instance success.") 408 except Exception as e: 409 self.log.warning("del test suite instance exception. " 410 "Exception: {}".format(e)) 411 return error_msg 412 413 def stop(self): 414 """ 415 Releases resources from test run. Should be called right after run() 416 finishes. 417 """ 418 if self.running: 419 self.running = False 420 421 def handle_suites_started(self): 422 self.state_machine.get_suites(reset=True) 423 test_suites = self.state_machine.get_suites() 424 test_suites.suites_name = self.suite_name 425 test_suites.test_num = 0 426 for listener in self.listeners: 427 suite_report = copy.copy(test_suites) 428 listener.__started__(LifeCycle.TestSuites, suite_report) 429 430 def handle_suites_ended(self, error_msg): 431 suites = self.state_machine.get_suites() 432 suites.is_completed = True 433 suites.stacktrace = error_msg 434 for listener in self.listeners: 435 listener.__ended__(LifeCycle.TestSuites, suites) 436 437 def handle_suite_started(self): 438 self.state_machine.suite(reset=True) 439 self.state_machine.running_test_index = 0 440 test_suite = self.state_machine.suite() 441 test_suite.suite_name = self.suite_name 442 test_suite.test_num = 0 443 for listener in self.listeners: 444 suite_report = copy.copy(test_suite) 445 listener.__started__(LifeCycle.TestSuite, suite_report) 446 447 def handle_suite_ended(self, testsuite_cls): 448 suite = self.state_machine.suite() 449 suites = self.state_machine.get_suites() 450 self.handle_one_case_result(testsuite_cls) 451 suite.is_completed = True 452 # 设置测试套的报告路径 453 suite.report = testsuite_cls.suite_report_path 454 for listener in self.listeners: 455 listener.__ended__(LifeCycle.TestSuite, copy.copy(suite), is_clear=True) 456 suites.run_time += suite.run_time 457 458 def handle_one_case_result(self, testsuite_cls): 459 status_dict = {RunResult.PASSED: ResultCode.PASSED, 460 RunResult.FAILED: ResultCode.FAILED, 461 RunResult.BLOCKED: ResultCode.BLOCKED, 462 "ignore": ResultCode.SKIPPED} 463 for case_name, case_result in testsuite_cls.case_result.items(): 464 result = case_result.get("result") 465 error = case_result.get("error") 466 run_time = case_result.get("run_time") 467 report = case_result.get("report") 468 result_content = case_result.get("result_content") 469 470 test_result = self.state_machine.test(reset=True) 471 test_suite = self.state_machine.suite() 472 test_result.test_class = test_suite.suite_name 473 test_result.test_name = case_name 474 test_result.code = status_dict.get(result).value 475 test_result.stacktrace = error 476 test_result.run_time = run_time 477 test_result.report = report 478 if result_content: 479 test_result.result_content = result_content 480 test_result.current = self.state_machine.running_test_index + 1 481 482 self.state_machine.suite().run_time += run_time 483 for listener in self.listeners: 484 listener.__started__( 485 LifeCycle.TestCase, copy.copy(test_result)) 486 test_suites = self.state_machine.get_suites() 487 test_suites.test_num += 1 488 for listener in self.listeners: 489 listener.__ended__( 490 LifeCycle.TestCase, copy.copy(test_result)) 491 self.state_machine.running_test_index += 1 492