#!/usr/bin/env python3 # coding=utf-8 # # Copyright (c) 2022 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import threading import time from devicetest.core.constants import RunResult from devicetest.core.constants import RunSection from devicetest.core.record import ProjectRecord from devicetest.log.logger import DeviceTestLog as log from devicetest.utils.time_util import TimeHandler from devicetest.utils.util import get_base_name from xdevice import is_env_pool_run_mode from xdevice import Variables def get_decrypt_resource_path(): return Variables.res_dir def set_resource_path(resource_path): DeccVariable.project.resource_path = resource_path def get_testsuit_path(): return DeccVariable.project.test_suite_path def get_project_path(): """ get project path :return: prcject_path """ try: if DeccVariable.project.project_path: return DeccVariable.project.project_path project_path = os.path.dirname(Variables.top_dir) if project_path is None: log.info("project path is None.") raise Exception("") if not os.path.exists(project_path): log.info("project path not exists.") log.debug("project path:{}".format(project_path)) raise Exception("") return project_path except Exception as error: raise error class CurCase: def __init__(self, _log): # 用例级别参数 self.log = _log self.step_total = 0 # tests 数 self.run_section = "" # RunSection.SETUP self.case_result = RunResult.PASSED # 当前用例执行结果 self.name = '' # 类方法名,即:用例名case_id self.suite_name = "" # 用例对应哪个测试套 self.error_msg = '' # 用例失败信息 self.case_screenshot_dir = None # 当前用例失败截图的图片保存路径 self.case_flash_error_msg = False # 记录当前y用例是否更新了errorMsg self.is_upload_method_result = False # 记录当前用例是否上报过第一个失败步骤 self.step_section = RunSection.SETUP # 用例方法setup/test/teardown self.step_index = -1 # 当前步骤序号 self.step_error_msg = '' # 当前步骤的失败信息 self.step_fail_msg = '' # 用户指定的失败信息 self.step_result = RunResult.PASSED # 当前步骤执行结果 self.steps_info = [] # 记录测试用例(含测试套子用例)的步骤信息,如步骤名称、执行结果、耗时、截图等 self.suite_steps_info = [] # 记录测试套的的步骤信息,如步骤名称、执行结果、耗时、截图等 self.auto_record_steps_info = False # 默认记录记录用例操作步骤的信息,设为False,需人工调用record_step添加 self.test_method = TestMethod(self.log) self.cur_check_cmd = CurCheckCmd() # 失败截图相关 self.checkepr = False # 啥含义? self.image_num = 0 self.video_num = 0 self.dump_xml_num = 0 # prepare相关 self.status = 0 self.description = '' # VAR.CurCase.Description memoryLeakReport.py用到 self.log_details_path = "./log/test_run_details.log" self.log_path = "./log/test_run_summary.log" self.report_path = '' self.iperf_path = None # wifi相关 # windows self.win_capture_path = '' # WinCapturePath self.exact_start_time = '' # memoryLeakReport.py用到VAR.CurCase.ExactStartTime self.start_time = '' # memoryLeakReport.py用到VAR.CurCase.StartTime self.case_name_file_path = '' # VAR.CurCase.CaseName.FilePath self.device_log = DeviceLog() self.case_instance = None self.suite_instance = None self.devices = list() self.is_capture_step_screen = False self.is_record_step_screen = False self.set_step_screen() def set_step_screen(self): if Variables.config.taskargs.get("screenrecorder", "").lower() == "true": self.is_record_step_screen = True else: self.is_capture_step_screen = True @property def testcase(self): return self.case_instance @property def testsuite(self): return self.suite_instance def set_case_instance(self, instance): self.case_instance = instance if instance: self.devices = instance.devices def set_suite_instance(self, instance): self.suite_instance = instance if instance: self.devices = instance.devices def set_error_msg(self, error_msg): self.log.debug("set CurCase error msg as: {}".format(error_msg)) self.error_msg = error_msg def set_run_section(self, run_section): self.log.debug("set CurCase run section as: {}".format(run_section)) self.run_section = run_section def set_case_result(self, case_result): self.case_result = case_result self.log.debug( "set CurCase case result as: {}".format(self.case_result)) def set_step_total(self, step_total): self.step_total = step_total self.log.debug( "set CurCase step total as: {}".format(self.step_total)) def set_name(self, name): self.name = name self.log.debug("set CurCase name as: {}".format(self.name)) def set_step_section(self, step_section): self.step_section = step_section self.log.debug( "set CurCase step section as: {}".format(self.step_section)) def set_case_screenshot_dir(self, test_suite_path, task_report_dir, cur_case_full_path, repeat=1, repeat_round=1): round_folder = f"round{repeat_round}" if repeat > 1 else "" case_screenshot_dir = os.path.join(task_report_dir, "details", round_folder) if is_env_pool_run_mode(): case_screenshot_dir = task_report_dir case_abs_path_base_name = get_base_name(cur_case_full_path, is_abs_name=True) if case_abs_path_base_name and test_suite_path: self.log.debug("case_abs_path_base_name:{}, test_suite_path:{}" .format(case_abs_path_base_name, test_suite_path)) _list = case_abs_path_base_name.split(test_suite_path) if len(_list) == 2: case_screenshot_dir = os.path.abspath( os.path.join(task_report_dir, _list[1].strip(os.sep))) self.case_screenshot_dir = case_screenshot_dir self.log.debug("set case screenshot dir path as: {}".format( self.case_screenshot_dir)) def init_stage_var(self, test_method_name, run_section=None, is_error_msg=False): if run_section: self.set_run_section(run_section) self.test_method.init_test_method(test_method_name, is_error_msg=is_error_msg) def set_checkepr(self, checkepr): self.checkepr = checkepr self.log.debug("set project checkepr as: {}".format(self.checkepr)) def flash_error_msg_and_result(self, error_msg): if not self.error_msg: self.set_error_msg(error_msg) if not self.test_method.error_msg: self.test_method.set_error_msg(error_msg) if self.case_result == RunResult.PASSED: self.set_case_result(RunResult.FAILED) if self.test_method.result == RunResult.PASSED: self.test_method.set_result(RunResult.FAILED) def set_step_index(self, index): self.step_index = index def set_step_info(self, name, **kwargs): # 计算耗时,即前后Step记录的时间差 steps_info = self._get_steps_info_obj() index = len(steps_info) if index > 0: last_step = steps_info[-1] last_step["cost"] = round(time.time() - last_step.get("_timestamp"), 3) log.info(f'
{name}
') shots = self._capture_step_screen(name) step = { "name": name, "error": "", "cost": 0, "screenshot": shots, "_timestamp": time.time(), "extras": {} } self.__update_step_info(step, **kwargs) steps_info.append(step) self.set_step_index(index) return index def update_step_info(self, index, **kwargs): steps_info = self._get_steps_info_obj() max_index = len(steps_info) - 1 if not 0 <= index <= max_index: log.warning(f"update step info failed, index must be in [0, {max_index}]") return step = steps_info[index] self.__update_step_info(step, **kwargs) def update_step_shots(self, link, name): if not link or not name: return steps_info = self._get_steps_info_obj() if len(steps_info) == 0: return step = steps_info[-1] self.__update_step_info(step, screenshot={"link": link.replace("\\", "/"), "name": name}) @staticmethod def __update_step_info(step, **kwargs): """更新步骤的信息""" # builtin_keys内部规定展示信息 builtin_keys = ["name", "error", "cost", "screenshot", "_timestamp"] for k, v in kwargs.items(): if k not in builtin_keys: step.get("extras").update({k: str(v)}) continue if k == "error": step.update({k: v}) elif k == "screenshot": step.get("screenshot").append(v) def get_steps_info(self): steps_info = self._get_steps_info_obj() if len(steps_info) > 0: last_step = steps_info[-1] last_step["cost"] = round(time.time() - last_step.get("_timestamp"), 3) return steps_info def _get_steps_info_obj(self): """返回测试套或测试用例的记录表""" return self.steps_info if self.case_instance is not None else self.suite_steps_info def _capture_step_screen(self, step_name): """ take a screenshot of each device after each step is performed """ shots = [] if self.is_capture_step_screen: from devicetest.controllers.tools.screen_agent import ScreenAgent for device in self.devices: path, link = ScreenAgent.capture_step_picture(TimeHandler.get_now_datetime(), step_name, device) if not path or not os.path.exists(path): continue shots.append({"link": link.replace("\\", "/"), "name": step_name}) return shots class TestMethod: def __init__(self, _log): # 步骤级别参数 self.log = _log self.name = 'setup' self.result = RunResult.PASSED self.level = '' self.error_msg = '' self.method_return = '' self.func_ret = [] self.step_flash_fail_msg = False def set_result(self, result=None): self.result = result or RunResult.PASSED self.log.debug( "set TestMethod result as: {}".format(self.result)) def set_error_msg(self, error_msg): self.error_msg = error_msg self.log.debug( "set TestMethod error msg as: {}".format(self.error_msg)) def init_test_method(self, name, is_error_msg=False): self.level = '', self.name = name, self.result = RunResult.PASSED if is_error_msg: self.error_msg = '' self.func_ret.clear() def init_aw_method(self): self.error_msg = '' self.result = RunResult.PASSED self.step_flash_fail_msg = False # 记录当前步骤是否更新了failMsg self.func_ret.clear() self.log.debug("init aw method.") class CurStep: pass class Prepare: def __init__(self): self.path = '' self.config = {} def set_prepare_path(self, path): if path: self.path = path log.debug("prepare path:{}".format(path)) class Settings: language = '' product = '' # VAR.Settings.Product class Event: configs = {} # VAR.Event.Configs class RedirectLog: task_name = "" # VAR.Project.RedirectLog.TaskName class DeviceLog: ftp_path = [] # VAR.CurCase.DeviceLog.FthPath class ProjectVariables: def __init__(self, _log): # 工程级别参数 self.log = _log self.record = ProjectRecord(_log) self.project_path = '' # xdevice工程路径 self.aw_path = '' # 测试套aw路径 self.testcase_path = '' self.settings = None self.resource_path = '' # 测试套工程resource路径 self.test_suite_path = '' # 测试套工程路径 self.task_report_dir = '' # 测试用例的框架日志路径 self.prepare = Prepare() # prepare 相关 self.cur_case_full_path = '' # 记录当前正执行用例全路径 self.execute_case_name = None # 记录当前正执行用例id self.config_json = {} # 用户自定义的公共的参数 self.property_config = [] # 用户自定义的设备相关的参数 self.retry_test_list = [] self.devicename = {} self.step_debug = '' self.monkey = False # extension/monkey/monkey.py VAR.Project.Monkey self.task_id = "" # VAR.Project.TaskID memoryLeakReport.py中用到,先记录该字段 VAR。Projec.TaskId self.settings = Settings() # target中用到:VAR。Settings.Language self.total = 0 # VAR.Project.Total self.start_time = '' # memoryLeakReport.py用到VAR.Project.StartTime self.exact_start_time = '' # memoryLeakReport.py用到VAR.Project.ExactStartTime self.finish = 0 # VAR.Project.Finish self.config = { } self.event = Event() self.test_file = '' # VAR.Project.TestFile self.is_ticc_server = False # GlobalParam.IS_TICC_SERVER self.redirect_log = RedirectLog() def set_project_path(self, project_path=None): self.project_path = project_path or get_project_path() self.log.debug("project path is: {}".format(self.project_path)) def set_aw_path(self, aw_path): if aw_path: self.aw_path = aw_path self.log.debug("aw path is: {}".format(self.aw_path)) def set_testcase_path(self, testcase_path): self.testcase_path = testcase_path self.log.debug("testcase path is: {}".format(self.testcase_path)) def set_settings(self, settings): if settings: self.settings = settings self.log.debug("settings is: {}".format(self.settings)) def set_test_suite_path(self, test_suite_path): if test_suite_path: self.test_suite_path = test_suite_path self.log.debug("test suite path is: {}".format(self.test_suite_path)) def set_task_report_dir(self, task_report_dir): if task_report_dir: self.task_report_dir = task_report_dir self.log.debug("task report dir: {}".format(self.task_report_dir)) def set_resource_path(self, resource_path): if resource_path: self.resource_path = resource_path self.log.debug("resource path is: {}".format(self.resource_path)) def set_config_json(self, config_json): self.config_json = config_json self.log.debug("config json is: {}".format(self.config_json)) def set_property_config(self, property_config): self.property_config = property_config self.log.debug("property config is: {}".format(self.property_config)) def set_devicename(self, devicename): if devicename: self.devicename = devicename self.log.debug("devicename is: {}".format(self.devicename)) class CurCheckCmd: def __init__(self): # 用例校验参数? self.through = "" self.expect = "" self.actual = "" def get_cur_check_status(self): if all([self.through, self.expect, self.actual]): return True return False def get_cur_check_msg(self): return "{}, expect:{}, actual:{}".format(self.through, self.expect, self.actual) class DeccVariable: __cur_case = {} __thread_lock = threading.Lock() project = ProjectVariables(log) @classmethod def set_project_obj(cls, project_obj): log.info("init DeccVariable project object") cls.project = project_obj @classmethod def set_cur_case_obj(cls, cur_case_obj): log.info("init DeccVariable cur case object") with cls.__thread_lock: cls.__cur_case[cls.__cur_case_key()] = cur_case_obj @classmethod def cur_case(cls): with cls.__thread_lock: return cls.__cur_case.get(cls.__cur_case_key(), None) @classmethod def __cur_case_key(cls): return threading.current_thread().ident @classmethod def reset(cls): log.info("reset DeccVariable") with cls.__thread_lock: key = cls.__cur_case_key() if key in cls.__cur_case: cls.__cur_case.pop(key)