1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import threading 21import time 22 23from devicetest.core.constants import RunResult 24from devicetest.core.constants import RunSection 25from devicetest.core.record import ProjectRecord 26from devicetest.log.logger import DeviceTestLog as log 27from devicetest.utils.time_util import TimeHandler 28from devicetest.utils.util import get_base_name 29from xdevice import is_env_pool_run_mode 30from xdevice import Variables 31 32 33def get_decrypt_resource_path(): 34 return Variables.res_dir 35 36 37def set_resource_path(resource_path): 38 DeccVariable.project.resource_path = resource_path 39 40 41def get_testsuit_path(): 42 return DeccVariable.project.test_suite_path 43 44 45def get_project_path(): 46 """ 47 get project path 48 :return: prcject_path 49 """ 50 try: 51 if DeccVariable.project.project_path: 52 return DeccVariable.project.project_path 53 54 project_path = os.path.dirname(Variables.top_dir) 55 if project_path is None: 56 log.info("project path is None.") 57 raise Exception("") 58 if not os.path.exists(project_path): 59 log.info("project path not exists.") 60 log.debug("project path:{}".format(project_path)) 61 raise Exception("") 62 return project_path 63 64 except Exception as error: 65 raise error 66 67 68class CurCase: 69 70 def __init__(self, _log): 71 # 用例级别参数 72 self.log = _log 73 self.step_total = 0 # tests 数 74 self.run_section = "" # RunSection.SETUP 75 self.case_result = RunResult.PASSED # 当前用例执行结果 76 self.name = '' # 类方法名,即:用例名case_id 77 self.suite_name = "" # 用例对应哪个测试套 78 self.error_msg = '' # 用例失败信息 79 self.case_screenshot_dir = None # 当前用例失败截图的图片保存路径 80 self.case_flash_error_msg = False # 记录当前y用例是否更新了errorMsg 81 self.is_upload_method_result = False # 记录当前用例是否上报过第一个失败步骤 82 83 self.step_section = RunSection.SETUP # 用例方法setup/test/teardown 84 self.step_index = -1 # 当前步骤序号 85 self.step_error_msg = '' # 当前步骤的失败信息 86 self.step_fail_msg = '' # 用户指定的失败信息 87 self.step_result = RunResult.PASSED # 当前步骤执行结果 88 self.steps_info = [] # 记录测试用例(含测试套子用例)的步骤信息,如步骤名称、执行结果、耗时、截图等 89 self.suite_steps_info = [] # 记录测试套的的步骤信息,如步骤名称、执行结果、耗时、截图等 90 self.auto_record_steps_info = False # 默认记录记录用例操作步骤的信息,设为False,需人工调用record_step添加 91 92 self.test_method = TestMethod(self.log) 93 self.cur_check_cmd = CurCheckCmd() 94 95 # 失败截图相关 96 self.checkepr = False # 啥含义? 97 self.image_num = 0 98 self.video_num = 0 99 self.dump_xml_num = 0 100 101 # prepare相关 102 self.status = 0 103 self.description = '' # VAR.CurCase.Description memoryLeakReport.py用到 104 self.log_details_path = "./log/test_run_details.log" 105 self.log_path = "./log/test_run_summary.log" 106 self.report_path = '' 107 self.iperf_path = None # wifi相关 108 109 # windows 110 self.win_capture_path = '' # WinCapturePath 111 self.exact_start_time = '' # memoryLeakReport.py用到VAR.CurCase.ExactStartTime 112 self.start_time = '' # memoryLeakReport.py用到VAR.CurCase.StartTime 113 114 self.case_name_file_path = '' # VAR.CurCase.CaseName.FilePath 115 self.device_log = DeviceLog() 116 117 self.case_instance = None 118 self.suite_instance = None 119 120 self.devices = list() 121 122 self.is_capture_step_screen = False 123 self.is_record_step_screen = False 124 self.set_step_screen() 125 126 def set_step_screen(self): 127 if Variables.config.taskargs.get("screenrecorder", "").lower() == "true": 128 self.is_record_step_screen = True 129 else: 130 self.is_capture_step_screen = True 131 132 @property 133 def testcase(self): 134 return self.case_instance 135 136 @property 137 def testsuite(self): 138 return self.suite_instance 139 140 def set_case_instance(self, instance): 141 self.case_instance = instance 142 if instance: 143 self.devices = instance.devices 144 145 def set_suite_instance(self, instance): 146 self.suite_instance = instance 147 if instance: 148 self.devices = instance.devices 149 150 def set_error_msg(self, error_msg): 151 self.log.debug("set CurCase error msg as: {}".format(error_msg)) 152 self.error_msg = error_msg 153 154 def set_run_section(self, run_section): 155 self.log.debug("set CurCase run section as: {}".format(run_section)) 156 self.run_section = run_section 157 158 def set_case_result(self, case_result): 159 self.case_result = case_result 160 self.log.debug( 161 "set CurCase case result as: {}".format(self.case_result)) 162 163 def set_step_total(self, step_total): 164 self.step_total = step_total 165 self.log.debug( 166 "set CurCase step total as: {}".format(self.step_total)) 167 168 def set_name(self, name): 169 self.name = name 170 self.log.debug("set CurCase name as: {}".format(self.name)) 171 172 def set_step_section(self, step_section): 173 self.step_section = step_section 174 self.log.debug( 175 "set CurCase step section as: {}".format(self.step_section)) 176 177 def set_case_screenshot_dir(self, test_suite_path, task_report_dir, cur_case_full_path, 178 repeat=1, repeat_round=1): 179 round_folder = f"round{repeat_round}" if repeat > 1 else "" 180 case_screenshot_dir = os.path.join(task_report_dir, "details", round_folder) 181 if is_env_pool_run_mode(): 182 case_screenshot_dir = task_report_dir 183 case_abs_path_base_name = get_base_name(cur_case_full_path, is_abs_name=True) 184 if case_abs_path_base_name and test_suite_path: 185 self.log.debug("case_abs_path_base_name:{}, test_suite_path:{}" 186 .format(case_abs_path_base_name, test_suite_path)) 187 _list = case_abs_path_base_name.split(test_suite_path) 188 if len(_list) == 2: 189 case_screenshot_dir = os.path.abspath( 190 os.path.join(task_report_dir, _list[1].strip(os.sep))) 191 self.case_screenshot_dir = case_screenshot_dir 192 self.log.debug("set case screenshot dir path as: {}".format( 193 self.case_screenshot_dir)) 194 195 def init_stage_var(self, test_method_name, 196 run_section=None, is_error_msg=False): 197 if run_section: 198 self.set_run_section(run_section) 199 self.test_method.init_test_method(test_method_name, 200 is_error_msg=is_error_msg) 201 202 def set_checkepr(self, checkepr): 203 self.checkepr = checkepr 204 self.log.debug("set project checkepr as: {}".format(self.checkepr)) 205 206 def flash_error_msg_and_result(self, error_msg): 207 if not self.error_msg: 208 self.set_error_msg(error_msg) 209 if not self.test_method.error_msg: 210 self.test_method.set_error_msg(error_msg) 211 if self.case_result == RunResult.PASSED: 212 self.set_case_result(RunResult.FAILED) 213 if self.test_method.result == RunResult.PASSED: 214 self.test_method.set_result(RunResult.FAILED) 215 216 def set_step_index(self, index): 217 self.step_index = index 218 219 def set_step_info(self, name, **kwargs): 220 # 计算耗时,即前后Step记录的时间差 221 steps_info = self._get_steps_info_obj() 222 index = len(steps_info) 223 if index > 0: 224 last_step = steps_info[-1] 225 last_step["cost"] = round(time.time() - last_step.get("_timestamp"), 3) 226 log.info(f'<div class="step" id="{index}">{name}</div>') 227 shots = self._capture_step_screen(name) 228 step = { 229 "name": name, "error": "", "cost": 0, "screenshot": shots, 230 "_timestamp": time.time(), "extras": {} 231 } 232 self.__update_step_info(step, **kwargs) 233 steps_info.append(step) 234 self.set_step_index(index) 235 return index 236 237 def update_step_info(self, index, **kwargs): 238 steps_info = self._get_steps_info_obj() 239 max_index = len(steps_info) - 1 240 if not 0 <= index <= max_index: 241 log.warning(f"update step info failed, index must be in [0, {max_index}]") 242 return 243 step = steps_info[index] 244 self.__update_step_info(step, **kwargs) 245 246 def update_step_shots(self, link, name): 247 if not link or not name: 248 return 249 steps_info = self._get_steps_info_obj() 250 if len(steps_info) == 0: 251 return 252 step = steps_info[-1] 253 self.__update_step_info(step, screenshot={"link": link.replace("\\", "/"), "name": name}) 254 255 @staticmethod 256 def __update_step_info(step, **kwargs): 257 """更新步骤的信息""" 258 # builtin_keys内部规定展示信息 259 builtin_keys = ["name", "error", "cost", "screenshot", "_timestamp"] 260 for k, v in kwargs.items(): 261 if k not in builtin_keys: 262 step.get("extras").update({k: str(v)}) 263 continue 264 if k == "error": 265 step.update({k: v}) 266 elif k == "screenshot": 267 step.get("screenshot").append(v) 268 269 def get_steps_info(self): 270 steps_info = self._get_steps_info_obj() 271 if len(steps_info) > 0: 272 last_step = steps_info[-1] 273 last_step["cost"] = round(time.time() - last_step.get("_timestamp"), 3) 274 return steps_info 275 276 def _get_steps_info_obj(self): 277 """返回测试套或测试用例的记录表""" 278 return self.steps_info if self.case_instance is not None else self.suite_steps_info 279 280 def _capture_step_screen(self, step_name): 281 """ 282 take a screenshot of each device after each step is performed 283 """ 284 shots = [] 285 if self.is_capture_step_screen: 286 from devicetest.controllers.tools.screen_agent import ScreenAgent 287 for device in self.devices: 288 path, link = ScreenAgent.capture_step_picture(TimeHandler.get_now_datetime(), step_name, device) 289 if not path or not os.path.exists(path): 290 continue 291 shots.append({"link": link.replace("\\", "/"), "name": step_name}) 292 return shots 293 294 295class TestMethod: 296 def __init__(self, _log): 297 # 步骤级别参数 298 self.log = _log 299 self.name = 'setup' 300 self.result = RunResult.PASSED 301 self.level = '' 302 self.error_msg = '' 303 self.method_return = '' 304 self.func_ret = [] 305 self.step_flash_fail_msg = False 306 307 def set_result(self, result=None): 308 self.result = result or RunResult.PASSED 309 self.log.debug( 310 "set TestMethod result as: {}".format(self.result)) 311 312 def set_error_msg(self, error_msg): 313 self.error_msg = error_msg 314 self.log.debug( 315 "set TestMethod error msg as: {}".format(self.error_msg)) 316 317 def init_test_method(self, name, is_error_msg=False): 318 self.level = '', 319 self.name = name, 320 self.result = RunResult.PASSED 321 if is_error_msg: 322 self.error_msg = '' 323 self.func_ret.clear() 324 325 def init_aw_method(self): 326 self.error_msg = '' 327 self.result = RunResult.PASSED 328 self.step_flash_fail_msg = False # 记录当前步骤是否更新了failMsg 329 self.func_ret.clear() 330 self.log.debug("init aw method.") 331 332 333class CurStep: 334 pass 335 336 337class Prepare: 338 def __init__(self): 339 self.path = '' 340 self.config = {} 341 342 def set_prepare_path(self, path): 343 if path: 344 self.path = path 345 log.debug("prepare path:{}".format(path)) 346 347 348class Settings: 349 language = '' 350 product = '' # VAR.Settings.Product 351 352 353class Event: 354 configs = {} # VAR.Event.Configs 355 356 357class RedirectLog: 358 task_name = "" # VAR.Project.RedirectLog.TaskName 359 360 361class DeviceLog: 362 ftp_path = [] # VAR.CurCase.DeviceLog.FthPath 363 364 365class ProjectVariables: 366 def __init__(self, _log): 367 # 工程级别参数 368 self.log = _log 369 self.record = ProjectRecord(_log) 370 self.project_path = '' # xdevice工程路径 371 self.aw_path = '' # 测试套aw路径 372 self.testcase_path = '' 373 self.settings = None 374 self.resource_path = '' # 测试套工程resource路径 375 self.test_suite_path = '' # 测试套工程路径 376 self.task_report_dir = '' # 测试用例的框架日志路径 377 self.prepare = Prepare() # prepare 相关 378 self.cur_case_full_path = '' # 记录当前正执行用例全路径 379 self.execute_case_name = None # 记录当前正执行用例id 380 self.config_json = {} # 用户自定义的公共的参数 381 self.property_config = [] # 用户自定义的设备相关的参数 382 self.retry_test_list = [] 383 self.devicename = {} 384 self.step_debug = '' 385 386 self.monkey = False # extension/monkey/monkey.py VAR.Project.Monkey 387 self.task_id = "" # VAR.Project.TaskID memoryLeakReport.py中用到,先记录该字段 VAR。Projec.TaskId 388 self.settings = Settings() # target中用到:VAR。Settings.Language 389 self.total = 0 # VAR.Project.Total 390 self.start_time = '' # memoryLeakReport.py用到VAR.Project.StartTime 391 self.exact_start_time = '' # memoryLeakReport.py用到VAR.Project.ExactStartTime 392 self.finish = 0 # VAR.Project.Finish 393 394 self.config = { 395 } 396 397 self.event = Event() 398 self.test_file = '' # VAR.Project.TestFile 399 self.is_ticc_server = False # GlobalParam.IS_TICC_SERVER 400 self.redirect_log = RedirectLog() 401 402 def set_project_path(self, project_path=None): 403 self.project_path = project_path or get_project_path() 404 self.log.debug("project path is: {}".format(self.project_path)) 405 406 def set_aw_path(self, aw_path): 407 if aw_path: 408 self.aw_path = aw_path 409 self.log.debug("aw path is: {}".format(self.aw_path)) 410 411 def set_testcase_path(self, testcase_path): 412 self.testcase_path = testcase_path 413 self.log.debug("testcase path is: {}".format(self.testcase_path)) 414 415 def set_settings(self, settings): 416 if settings: 417 self.settings = settings 418 self.log.debug("settings is: {}".format(self.settings)) 419 420 def set_test_suite_path(self, test_suite_path): 421 if test_suite_path: 422 self.test_suite_path = test_suite_path 423 self.log.debug("test suite path is: {}".format(self.test_suite_path)) 424 425 def set_task_report_dir(self, task_report_dir): 426 if task_report_dir: 427 self.task_report_dir = task_report_dir 428 self.log.debug("task report dir: {}".format(self.task_report_dir)) 429 430 def set_resource_path(self, resource_path): 431 if resource_path: 432 self.resource_path = resource_path 433 self.log.debug("resource path is: {}".format(self.resource_path)) 434 435 def set_config_json(self, config_json): 436 self.config_json = config_json 437 self.log.debug("config json is: {}".format(self.config_json)) 438 439 def set_property_config(self, property_config): 440 self.property_config = property_config 441 self.log.debug("property config is: {}".format(self.property_config)) 442 443 def set_devicename(self, devicename): 444 if devicename: 445 self.devicename = devicename 446 self.log.debug("devicename is: {}".format(self.devicename)) 447 448 449class CurCheckCmd: 450 def __init__(self): 451 # 用例校验参数? 452 self.through = "" 453 self.expect = "" 454 self.actual = "" 455 456 def get_cur_check_status(self): 457 if all([self.through, self.expect, self.actual]): 458 return True 459 return False 460 461 def get_cur_check_msg(self): 462 return "{}, expect:{}, actual:{}".format(self.through, self.expect, 463 self.actual) 464 465 466class DeccVariable: 467 __cur_case = {} 468 __thread_lock = threading.Lock() 469 project = ProjectVariables(log) 470 471 @classmethod 472 def set_project_obj(cls, project_obj): 473 log.info("init DeccVariable project object") 474 cls.project = project_obj 475 476 @classmethod 477 def set_cur_case_obj(cls, cur_case_obj): 478 log.info("init DeccVariable cur case object") 479 with cls.__thread_lock: 480 cls.__cur_case[cls.__cur_case_key()] = cur_case_obj 481 482 @classmethod 483 def cur_case(cls): 484 with cls.__thread_lock: 485 return cls.__cur_case.get(cls.__cur_case_key(), None) 486 487 @classmethod 488 def __cur_case_key(cls): 489 return threading.current_thread().ident 490 491 @classmethod 492 def reset(cls): 493 log.info("reset DeccVariable") 494 with cls.__thread_lock: 495 key = cls.__cur_case_key() 496 if key in cls.__cur_case: 497 cls.__cur_case.pop(key) 498