1#!/usr/bin/python3.4 2# coding=utf-8 3# 4 5# Copyright (C) 2016 Huawei Technologies Co., HUTAF xDevice 6# 7# Licensed under the Apache License, Version 2.0 (the "License"); you may not 8# use this file except in compliance with the License. You may obtain a copy of 9# the License at 10# 11# http://www.apache.org/licenses/LICENSE-2.0 12# 13# Unless required by applicable law or agreed to in writing, software 14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 16# License for the specific language governing permissions and limitations under 17# the License. 18 19import copy 20import sys 21import os 22import traceback 23from typing import List, Tuple 24 25from xdevice import ConfigConst 26from xdevice import calculate_elapsed_time 27from xdevice import get_cst_time 28from xdevice import get_file_absolute_path 29from xdevice import FilePermission 30from xdevice import CaseEnd 31from xdevice import Binder 32from xdevice import Variables 33 34from devicetest.core.constants import RunResult 35from devicetest.core.constants import FileAttribute 36from devicetest.core.test_case import UpdateStep 37from devicetest.core.variables import CurCase 38from devicetest.core.variables import DeccVariable 39from devicetest.core.variables import ProjectVariables 40from devicetest.error import ErrorMessage 41from devicetest.report.generation import add_log_caching_handler 42from devicetest.report.generation import del_log_caching_handler 43from devicetest.report.generation import get_caching_logs 44from devicetest.report.generation import generate_report 45from devicetest.utils.util import get_base_name 46from devicetest.utils.util import get_dir_path 47from devicetest.utils.util import import_from_file 48 49 50class TestSuite: 51 """Base class for all test classes to inherit from. 52 53 This class gets all the controller objects from test_runner and executes 54 the test cases requested within itself. 55 56 """ 57 58 def __init__(self, configs, path): 59 self.configs = configs 60 self.devices = [] 61 self.device1 = None 62 self.device2 = None 63 # 透传的参数 64 self.pass_through = Variables.config.pass_through 65 self.set_devices(self.configs["devices"]) 66 self.path = path 67 self.log = self.configs["log"] 68 self.error_msg = '' 69 self.trace_info = '' 70 self.case_list: List[Tuple[str, str]] = [] 71 self.case_result = dict() 72 self.suite_name = self.configs.get("suite_name") 73 # 白名单用例 74 self.white_case_list = [] 75 # 黑名单用例 76 self.black_case_list = [] 77 # 初始化透传参数的列表 78 self.arg_list = dict() 79 self.app_result_info = dict() 80 self._test_args_para_parse(self.configs["testargs"]) 81 # 往DeviceTest的用例中注入logger并防止重复初始化测试套级别的变量 82 self.inject_logger = None 83 self.cur_case = None 84 # device log 85 self.device_log = dict() 86 self.hilog = dict() 87 self.log_proc = dict() 88 self.hilog_proc = dict() 89 90 self.suite_case_results = [] 91 self.suite_report_path = "" 92 self._case_log_buffer_hdl = None 93 94 # device录屏截图属性 95 self.devices_media = dict() 96 97 self._repeat = self.configs.get("request").config.repeat 98 self._repeat_round = self.configs.get("request").get_repeat_round() 99 self._round_folder = f"round{self._repeat_round}" if self._repeat > 1 else "" 100 101 def __enter__(self): 102 return self 103 104 def __exit__(self, *args): 105 pass 106 107 def _device_close(self): 108 self.log.debug("Start device close") 109 for device in self.devices: 110 device.close() 111 self.log.debug("Finish device close.") 112 113 def run(self): 114 self._init_devicetest() 115 report_path = os.path.join("details", self._round_folder, self.suite_name, self.suite_name + ".html") 116 start_time = get_cst_time() 117 # 记录录屏和截图属性 118 # self._get_screenrecorder_and_screenshot() 119 # 开始收集测试套(setup和teardown)的运行日志 120 suite_log_buffer_hdl = add_log_caching_handler() 121 try: 122 self.cur_case.set_suite_instance(self) 123 # 1.先判断是否在json中指定,否则先收集当前文件夹下所有testcase得到run_list 124 for case_path in self._get_case_list(self.path): 125 case_name = get_base_name(case_path) 126 if (self.black_case_list and case_name in self.black_case_list) \ 127 or (self.white_case_list and case_name not in self.white_case_list): 128 self.log.warning("case name {} is in black list or not in white list, ignored".format(case_name)) 129 continue 130 self.case_list.append((case_name, case_path)) 131 self.log.debug("Execute test case list: {}".format(self.case_list)) 132 # 2.先执行self.setup 133 if self.run_setup(): 134 # 在运行测试套子用例前,停止收集测试套setup步骤的运行日志 135 del_log_caching_handler(suite_log_buffer_hdl) 136 # 3.依次执行所有的run_list 137 # 开始收集测试套子用例的运行日志 138 self._case_log_buffer_hdl = add_log_caching_handler() 139 total_case_num = len(self.case_list) 140 for index, case in enumerate(self.case_list, 1): 141 # self._reset_screenrecorder_and_screenshot() 142 self.log.info("[{} / {}] Executing suite case: {}".format(index, total_case_num, case[1])) 143 self.run_one_test_case(case) 144 # 停止收集测试套子用例的运行日志 145 del_log_caching_handler(self._case_log_buffer_hdl) 146 else: 147 self.error_msg = ErrorMessage.TestCase.Code_0203017.format(self.error_msg) 148 for case in self.case_list: 149 self.case_result[case[0]] = { 150 "result": RunResult.BLOCKED, 151 "error": self.error_msg, 152 "run_time": 0, 153 "report": report_path 154 } 155 self._case_log_buffer_hdl = None 156 # 在运行测试套子用例后,重新开始收集测试套teardown步骤的运行日志 157 add_log_caching_handler(buffer_hdl=suite_log_buffer_hdl) 158 finally: 159 # 4.执行self.teardown 160 self.run_teardown() 161 self.cur_case.set_suite_instance(None) 162 163 steps = self.cur_case.get_steps_info() 164 # 停止收集测试套(setup和teardown)的运行日志 165 del_log_caching_handler(suite_log_buffer_hdl) 166 if suite_log_buffer_hdl is None: 167 return 168 # 生成测试套的报告 169 self.log.info("generate suite report") 170 end_time = get_cst_time() 171 environment = self.configs.get("request").config.environment 172 suite_info = { 173 "name": self.suite_name, 174 "result": "", 175 "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"), 176 "end": end_time.strftime("%Y-%m-%d %H:%M:%S"), 177 'elapsed': calculate_elapsed_time(start_time, end_time), 178 "error": "", 179 "logs": "", 180 "subcases": self.suite_case_results, 181 "devices": [] if environment is None else environment.get_description(), 182 "steps": steps 183 } 184 log_content = { 185 "content": get_caching_logs(suite_log_buffer_hdl) 186 } 187 to_file = os.path.join(self.get_case_report_path(), report_path) 188 generate_report(to_file, case=suite_info, logs=log_content) 189 del suite_log_buffer_hdl 190 191 # 往结果xml添加测试套的报告路径 192 self.suite_report_path = report_path 193 steps.clear() 194 DeccVariable.reset() 195 196 def setup(self): 197 """Setup function that will be called before executing any test suite. 198 Implementation is optional. 199 """ 200 pass 201 202 def setup_start(self): 203 """ 204 setup_start function that will be called after setup function. 205 Implementation is optional. 206 """ 207 pass 208 209 def setup_end(self): 210 """ 211 setup_end function that will be called after setup function. 212 Implementation is optional. 213 """ 214 pass 215 216 def teardown(self): 217 """Teardown function that will be called after all the selected test 218 suite. 219 Implementation is optional. 220 """ 221 pass 222 223 def teardown_start(self): 224 """ 225 teardown_start function that will be called before Teardown function. 226 Implementation is optional. 227 """ 228 pass 229 230 def teardown_end(self): 231 """ 232 teardown_end function that will be called after Teardown function. 233 Implementation is optional. 234 """ 235 pass 236 237 def get_params(self): 238 return self.arg_list 239 240 def set_devices(self, devices): 241 self.devices = devices 242 if not devices: 243 return 244 245 try: 246 for num, _ad in enumerate(self.devices, 1): 247 if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"): 248 setattr(_ad, "device_id", "device{}".format(num)) 249 # 兼容release2 增加id、serial 250 setattr(_ad, "id", _ad.device_id) 251 setattr(_ad, "serial", _ad.device_sn) 252 setattr(self, _ad.device_id, _ad) 253 setattr(self, "device{}".format(num), _ad) 254 except Exception as error: 255 self.log.error("Failed to initialize the device object in the " 256 "TestCase.", error_no="01218") 257 raise error 258 259 def _get_case_list(self, path): 260 result = [] 261 if len(self.configs["suitecases"]) > 0: 262 for _, case in enumerate(self.configs["suitecases"]): 263 if os.path.exists(case): 264 case_path = case 265 else: 266 case_path = get_file_absolute_path(case, [path, 267 self.configs["resource_path"], 268 self.configs["testcases_path"]]) 269 result.append(case_path) 270 else: 271 all_file_list = os.listdir(path) 272 # 遍历该文件夹下的所有目录或者文件 273 for file in all_file_list: 274 filepath = os.path.join(path, file) 275 # 如果是文件夹,递归调用函数 276 if os.path.isdir(filepath): 277 result.extend(self._get_case_list(filepath)) 278 # 如果不是文件夹,保存文件路径及文件名 279 elif os.path.isfile(filepath) and \ 280 "__pycache__" not in filepath: 281 if file.startswith(FileAttribute.TESTCASE_PREFIX) and \ 282 (file.endswith(FileAttribute.TESTCASE_POSFIX_PY) or 283 file.endswith(FileAttribute.TESTCASE_POSFIX_PYC) or 284 file.endswith(FileAttribute.TESTCASE_POSFIX_PYD)): 285 result.append(filepath) 286 return result 287 288 def _exec_func(self, func, *args): 289 result = False 290 try: 291 func(*args) 292 except Exception as exception: 293 self.error_msg = str(exception) 294 self.trace_info = traceback.format_exc() 295 296 index = self.cur_case.step_index 297 if index == -1: 298 self.log.error(self.error_msg) 299 self.log.error(self.trace_info) 300 else: 301 step_error_id = f'step_error_{index}' 302 self.log.error(f'<span id="{step_error_id}">{self.error_msg}</span>') 303 self.log.error(self.trace_info) 304 _error = f'<a href="javascript:" onclick="gotoStep(\'{step_error_id}\')">{self.error_msg}</a>' 305 UpdateStep(index, error=_error) 306 else: 307 result = True 308 return result 309 310 def run_setup(self): 311 self.setup_start() 312 self.log.info("**********SetUp Starts!") 313 ret = self._exec_func(self.setup) 314 self.log.info("**********SetUp Ends!") 315 if ret: 316 self.setup_end() 317 return True 318 self.log.info("SetUp Failed!") 319 return False 320 321 def run_one_test_case(self, case: Tuple[str, str]): 322 case_name, case_path = case[0], case[1] 323 start_time = get_cst_time() 324 case_result = RunResult.FAILED 325 test_cls_instance = None 326 result_content = None # 用例测试结果的拓展内容 327 try: 328 test_cls = import_from_file(get_dir_path(case_path), case_name) 329 self.log.info("Success to import {}.".format(case_name)) 330 self._compatible_testcase(case_path, case_name) 331 with test_cls(self.configs) as test_cls_instance: 332 self.cur_case.set_case_instance(test_cls_instance) 333 test_cls_instance.run() 334 case_result, error_msg = test_cls_instance.result, test_cls_instance.error_msg 335 result_content = getattr(test_cls_instance, "result_content", None) 336 except Exception as e: 337 error_msg = str(e) 338 self.log.error("run case error! Exception: {}".format(e)) 339 self.log.error(traceback.format_exc()) 340 341 if test_cls_instance is None: 342 case_result = RunResult.BLOCKED 343 if test_cls_instance: 344 try: 345 del test_cls_instance 346 self.log.debug("del test case instance success") 347 except Exception as e: 348 self.log.debug(traceback.format_exc()) 349 self.log.warning("del test case instance exception. Exception: {}".format(e)) 350 Binder.notify_stage(CaseEnd(case_name, case_result)) 351 352 end_time = get_cst_time() 353 cost = int(round((end_time - start_time).total_seconds() * 1000)) 354 self.log.info("Executed case: {}, result: {}, cost time: {}ms".format(case_name, case_result, cost)) 355 self.case_result[case_name] = { 356 "result": case_result, "error": error_msg, 357 "run_time": cost, "report": "", "result_content": result_content} 358 359 try: 360 self._device_close() 361 except Exception as e: 362 self.log.error("stop catch device log error! {}".format(e)) 363 self.log.debug(traceback.format_exc()) 364 365 if self._case_log_buffer_hdl is None: 366 return 367 # 生成子用例的报告 368 steps = self.cur_case.get_steps_info() 369 base_info = { 370 "name": case_name, 371 "result": case_result, 372 "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"), 373 "end": end_time.strftime("%Y-%m-%d %H:%M:%S"), 374 'elapsed': calculate_elapsed_time(start_time, end_time), 375 "error": error_msg 376 } 377 case_info = copy.copy(base_info) 378 case_info.update({ 379 "logs": "", 380 "devices": [], 381 "steps": steps 382 }) 383 log_content = { 384 "content": copy.copy(get_caching_logs(self._case_log_buffer_hdl)) 385 } 386 case_html = case_name + ".html" 387 report_path = os.path.join("details", self._round_folder, self.suite_name, case_html) 388 to_file = os.path.join(self.configs.get("report_path"), report_path) 389 generate_report(to_file, case=case_info, logs=log_content) 390 base_info["report"] = case_html 391 self.suite_case_results.append(base_info) 392 # 清空日志缓存 393 self._case_log_buffer_hdl.buffer.clear() 394 steps.clear() 395 # 往结果xml添加子用例的报告路径 396 self.case_result[case_name]["report"] = report_path 397 # 将用例实例对象和用例名置为空 398 self.cur_case.set_case_instance(None) 399 self.cur_case.set_name("") 400 401 def run_teardown(self): 402 self.log.info("**********TearDown Starts!") 403 self.teardown_start() 404 self._exec_func(self.teardown) 405 self.teardown_end() 406 self.log.info("**********TearDown Ends!") 407 408 def _test_args_para_parse(self, paras): 409 paras = dict(paras) 410 for para_name in paras.keys(): 411 para_name = para_name.strip() 412 para_values = paras.get(para_name, []) 413 if para_name == "class": 414 self.white_case_list.extend(para_values) 415 elif para_name == "notClass": 416 self.black_case_list.extend(para_values) 417 elif para_name == "para": 418 for arg in para_values: 419 key, value = arg.split("#") 420 self.arg_list[key] = value 421 elif para_name == "deveco_planet_info": 422 for app_info in para_values: 423 key, value = app_info.split("#") 424 if key == "task_type": 425 setattr(sys, "category", value) 426 else: 427 self.app_result_info[key] = value 428 setattr(sys, "app_result_info", self.app_result_info) 429 else: 430 continue 431 432 self.configs["pass_through"] = self.pass_through 433 self.configs["arg_list"] = self.arg_list 434 435 def get_case_report_path(self): 436 return self.configs["report_path"] 437 438 def _compatible_testcase(self, case_path, case_name): 439 DeccVariable.cur_case().set_name(case_name) 440 project_var = ProjectVariables(self.inject_logger) 441 project_var.execute_case_name = case_name 442 project_var.cur_case_full_path = case_path 443 project_var.task_report_dir = self.get_case_report_path() 444 self.configs["project"] = project_var 445 446 def _init_devicetest(self): 447 self.cur_case = CurCase(self.log) 448 self.cur_case.suite_name = self.suite_name 449 self.cur_case.set_case_screenshot_dir( 450 None, self.get_case_report_path(), None, 451 repeat=self._repeat, repeat_round=self._repeat_round) 452 DeccVariable.set_cur_case_obj(self.cur_case) 453