1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import threading
21import time
22
23from devicetest.core.constants import RunResult
24from devicetest.core.constants import RunSection
25from devicetest.core.record import ProjectRecord
26from devicetest.log.logger import DeviceTestLog as log
27from devicetest.utils.time_util import TimeHandler
28from devicetest.utils.util import get_base_name
29from xdevice import is_env_pool_run_mode
30from xdevice import Variables
31
32
33def get_decrypt_resource_path():
34    return Variables.res_dir
35
36
37def set_resource_path(resource_path):
38    DeccVariable.project.resource_path = resource_path
39
40
41def get_testsuit_path():
42    return DeccVariable.project.test_suite_path
43
44
45def get_project_path():
46    """
47    get project path
48    :return: prcject_path
49    """
50    try:
51        if DeccVariable.project.project_path:
52            return DeccVariable.project.project_path
53
54        project_path = os.path.dirname(Variables.top_dir)
55        if project_path is None:
56            log.info("project path is None.")
57            raise Exception("")
58        if not os.path.exists(project_path):
59            log.info("project path not exists.")
60            log.debug("project path:{}".format(project_path))
61            raise Exception("")
62        return project_path
63
64    except Exception as error:
65        raise error
66
67
68class CurCase:
69
70    def __init__(self, _log):
71        # 用例级别参数
72        self.log = _log
73        self.step_total = 0  # tests 数
74        self.run_section = ""  # RunSection.SETUP
75        self.case_result = RunResult.PASSED  # 当前用例执行结果
76        self.name = ''  # 类方法名,即:用例名case_id
77        self.suite_name = ""  # 用例对应哪个测试套
78        self.error_msg = ''  # 用例失败信息
79        self.case_screenshot_dir = None  # 当前用例失败截图的图片保存路径
80        self.case_flash_error_msg = False  # 记录当前y用例是否更新了errorMsg
81        self.is_upload_method_result = False  # 记录当前用例是否上报过第一个失败步骤
82
83        self.step_section = RunSection.SETUP  # 用例方法setup/test/teardown
84        self.step_index = -1  # 当前步骤序号
85        self.step_error_msg = ''  # 当前步骤的失败信息
86        self.step_fail_msg = ''  # 用户指定的失败信息
87        self.step_result = RunResult.PASSED  # 当前步骤执行结果
88        self.steps_info = []  # 记录测试用例(含测试套子用例)的步骤信息,如步骤名称、执行结果、耗时、截图等
89        self.suite_steps_info = []  # 记录测试套的的步骤信息,如步骤名称、执行结果、耗时、截图等
90        self.auto_record_steps_info = False  # 默认记录记录用例操作步骤的信息,设为False,需人工调用record_step添加
91
92        self.test_method = TestMethod(self.log)
93        self.cur_check_cmd = CurCheckCmd()
94
95        # 失败截图相关
96        self.checkepr = False  # 啥含义?
97        self.image_num = 0
98        self.video_num = 0
99        self.dump_xml_num = 0
100
101        # prepare相关
102        self.status = 0
103        self.description = ''  # VAR.CurCase.Description  memoryLeakReport.py用到
104        self.log_details_path = "./log/test_run_details.log"
105        self.log_path = "./log/test_run_summary.log"
106        self.report_path = ''
107        self.iperf_path = None  # wifi相关
108
109        # windows
110        self.win_capture_path = ''  # WinCapturePath
111        self.exact_start_time = ''  # memoryLeakReport.py用到VAR.CurCase.ExactStartTime
112        self.start_time = ''  # memoryLeakReport.py用到VAR.CurCase.StartTime
113
114        self.case_name_file_path = ''  # VAR.CurCase.CaseName.FilePath
115        self.device_log = DeviceLog()
116
117        self.case_instance = None
118        self.suite_instance = None
119
120        self.devices = list()
121
122        self.is_capture_step_screen = False
123        self.is_record_step_screen = False
124        self.set_step_screen()
125
126    def set_step_screen(self):
127        if Variables.config.taskargs.get("screenrecorder", "").lower() == "true":
128            self.is_record_step_screen = True
129        else:
130            self.is_capture_step_screen = True
131
132    @property
133    def testcase(self):
134        return self.case_instance
135
136    @property
137    def testsuite(self):
138        return self.suite_instance
139
140    def set_case_instance(self, instance):
141        self.case_instance = instance
142        if instance:
143            self.devices = instance.devices
144
145    def set_suite_instance(self, instance):
146        self.suite_instance = instance
147        if instance:
148            self.devices = instance.devices
149
150    def set_error_msg(self, error_msg):
151        self.log.debug("set CurCase error msg as: {}".format(error_msg))
152        self.error_msg = error_msg
153
154    def set_run_section(self, run_section):
155        self.log.debug("set CurCase run section as: {}".format(run_section))
156        self.run_section = run_section
157
158    def set_case_result(self, case_result):
159        self.case_result = case_result
160        self.log.debug(
161            "set CurCase case result as: {}".format(self.case_result))
162
163    def set_step_total(self, step_total):
164        self.step_total = step_total
165        self.log.debug(
166            "set CurCase step total as: {}".format(self.step_total))
167
168    def set_name(self, name):
169        self.name = name
170        self.log.debug("set CurCase name as: {}".format(self.name))
171
172    def set_step_section(self, step_section):
173        self.step_section = step_section
174        self.log.debug(
175            "set CurCase step section as: {}".format(self.step_section))
176
177    def set_case_screenshot_dir(self, test_suite_path, task_report_dir, cur_case_full_path,
178                                repeat=1, repeat_round=1):
179        round_folder = f"round{repeat_round}" if repeat > 1 else ""
180        case_screenshot_dir = os.path.join(task_report_dir, "details", round_folder)
181        if is_env_pool_run_mode():
182            case_screenshot_dir = task_report_dir
183        case_abs_path_base_name = get_base_name(cur_case_full_path, is_abs_name=True)
184        if case_abs_path_base_name and test_suite_path:
185            self.log.debug("case_abs_path_base_name:{}, test_suite_path:{}"
186                           .format(case_abs_path_base_name, test_suite_path))
187            _list = case_abs_path_base_name.split(test_suite_path)
188            if len(_list) == 2:
189                case_screenshot_dir = os.path.abspath(
190                    os.path.join(task_report_dir, _list[1].strip(os.sep)))
191        self.case_screenshot_dir = case_screenshot_dir
192        self.log.debug("set case screenshot dir path as: {}".format(
193            self.case_screenshot_dir))
194
195    def init_stage_var(self, test_method_name,
196                       run_section=None, is_error_msg=False):
197        if run_section:
198            self.set_run_section(run_section)
199        self.test_method.init_test_method(test_method_name,
200                                          is_error_msg=is_error_msg)
201
202    def set_checkepr(self, checkepr):
203        self.checkepr = checkepr
204        self.log.debug("set project checkepr as: {}".format(self.checkepr))
205
206    def flash_error_msg_and_result(self, error_msg):
207        if not self.error_msg:
208            self.set_error_msg(error_msg)
209        if not self.test_method.error_msg:
210            self.test_method.set_error_msg(error_msg)
211        if self.case_result == RunResult.PASSED:
212            self.set_case_result(RunResult.FAILED)
213        if self.test_method.result == RunResult.PASSED:
214            self.test_method.set_result(RunResult.FAILED)
215
216    def set_step_index(self, index):
217        self.step_index = index
218
219    def set_step_info(self, name, **kwargs):
220        # 计算耗时,即前后Step记录的时间差
221        steps_info = self._get_steps_info_obj()
222        index = len(steps_info)
223        if index > 0:
224            last_step = steps_info[-1]
225            last_step["cost"] = round(time.time() - last_step.get("_timestamp"), 3)
226        log.info(f'<div class="step" id="{index}">{name}</div>')
227        shots = self._capture_step_screen(name)
228        step = {
229            "name": name, "error": "", "cost": 0, "screenshot": shots,
230            "_timestamp": time.time(), "extras": {}
231        }
232        self.__update_step_info(step, **kwargs)
233        steps_info.append(step)
234        self.set_step_index(index)
235        return index
236
237    def update_step_info(self, index, **kwargs):
238        steps_info = self._get_steps_info_obj()
239        max_index = len(steps_info) - 1
240        if not 0 <= index <= max_index:
241            log.warning(f"update step info failed, index must be in [0, {max_index}]")
242            return
243        step = steps_info[index]
244        self.__update_step_info(step, **kwargs)
245
246    def update_step_shots(self, link, name):
247        if not link or not name:
248            return
249        steps_info = self._get_steps_info_obj()
250        if len(steps_info) == 0:
251            return
252        step = steps_info[-1]
253        self.__update_step_info(step, screenshot={"link": link.replace("\\", "/"), "name": name})
254
255    @staticmethod
256    def __update_step_info(step, **kwargs):
257        """更新步骤的信息"""
258        # builtin_keys内部规定展示信息
259        builtin_keys = ["name", "error", "cost", "screenshot", "_timestamp"]
260        for k, v in kwargs.items():
261            if k not in builtin_keys:
262                step.get("extras").update({k: str(v)})
263                continue
264            if k == "error":
265                step.update({k: v})
266            elif k == "screenshot":
267                step.get("screenshot").append(v)
268
269    def get_steps_info(self):
270        steps_info = self._get_steps_info_obj()
271        if len(steps_info) > 0:
272            last_step = steps_info[-1]
273            last_step["cost"] = round(time.time() - last_step.get("_timestamp"), 3)
274        return steps_info
275
276    def _get_steps_info_obj(self):
277        """返回测试套或测试用例的记录表"""
278        return self.steps_info if self.case_instance is not None else self.suite_steps_info
279
280    def _capture_step_screen(self, step_name):
281        """
282        take a screenshot of each device after each step is performed
283        """
284        shots = []
285        if self.is_capture_step_screen:
286            from devicetest.controllers.tools.screen_agent import ScreenAgent
287            for device in self.devices:
288                path, link = ScreenAgent.capture_step_picture(TimeHandler.get_now_datetime(), step_name, device)
289                if not path or not os.path.exists(path):
290                    continue
291                shots.append({"link": link.replace("\\", "/"), "name": step_name})
292        return shots
293
294
295class TestMethod:
296    def __init__(self, _log):
297        # 步骤级别参数
298        self.log = _log
299        self.name = 'setup'
300        self.result = RunResult.PASSED
301        self.level = ''
302        self.error_msg = ''
303        self.method_return = ''
304        self.func_ret = []
305        self.step_flash_fail_msg = False
306
307    def set_result(self, result=None):
308        self.result = result or RunResult.PASSED
309        self.log.debug(
310            "set TestMethod result as: {}".format(self.result))
311
312    def set_error_msg(self, error_msg):
313        self.error_msg = error_msg
314        self.log.debug(
315            "set TestMethod error msg as: {}".format(self.error_msg))
316
317    def init_test_method(self, name, is_error_msg=False):
318        self.level = '',
319        self.name = name,
320        self.result = RunResult.PASSED
321        if is_error_msg:
322            self.error_msg = ''
323        self.func_ret.clear()
324
325    def init_aw_method(self):
326        self.error_msg = ''
327        self.result = RunResult.PASSED
328        self.step_flash_fail_msg = False  # 记录当前步骤是否更新了failMsg
329        self.func_ret.clear()
330        self.log.debug("init aw method.")
331
332
333class CurStep:
334    pass
335
336
337class Prepare:
338    def __init__(self):
339        self.path = ''
340        self.config = {}
341
342    def set_prepare_path(self, path):
343        if path:
344            self.path = path
345        log.debug("prepare path:{}".format(path))
346
347
348class Settings:
349    language = ''
350    product = ''  # VAR.Settings.Product
351
352
353class Event:
354    configs = {}  # VAR.Event.Configs
355
356
357class RedirectLog:
358    task_name = ""  # VAR.Project.RedirectLog.TaskName
359
360
361class DeviceLog:
362    ftp_path = []  # VAR.CurCase.DeviceLog.FthPath
363
364
365class ProjectVariables:
366    def __init__(self, _log):
367        # 工程级别参数
368        self.log = _log
369        self.record = ProjectRecord(_log)
370        self.project_path = ''  # xdevice工程路径
371        self.aw_path = ''  # 测试套aw路径
372        self.testcase_path = ''
373        self.settings = None
374        self.resource_path = ''  # 测试套工程resource路径
375        self.test_suite_path = ''  # 测试套工程路径
376        self.task_report_dir = ''  # 测试用例的框架日志路径
377        self.prepare = Prepare()  # prepare 相关
378        self.cur_case_full_path = ''  # 记录当前正执行用例全路径
379        self.execute_case_name = None  # 记录当前正执行用例id
380        self.config_json = {}  # 用户自定义的公共的参数
381        self.property_config = []  # 用户自定义的设备相关的参数
382        self.retry_test_list = []
383        self.devicename = {}
384        self.step_debug = ''
385
386        self.monkey = False  # extension/monkey/monkey.py VAR.Project.Monkey
387        self.task_id = ""  # VAR.Project.TaskID memoryLeakReport.py中用到,先记录该字段 VAR。Projec.TaskId
388        self.settings = Settings()  # target中用到:VAR。Settings.Language
389        self.total = 0  # VAR.Project.Total
390        self.start_time = ''  # memoryLeakReport.py用到VAR.Project.StartTime
391        self.exact_start_time = ''  # memoryLeakReport.py用到VAR.Project.ExactStartTime
392        self.finish = 0  # VAR.Project.Finish
393
394        self.config = {
395        }
396
397        self.event = Event()
398        self.test_file = ''  # VAR.Project.TestFile
399        self.is_ticc_server = False  # GlobalParam.IS_TICC_SERVER
400        self.redirect_log = RedirectLog()
401
402    def set_project_path(self, project_path=None):
403        self.project_path = project_path or get_project_path()
404        self.log.debug("project path is: {}".format(self.project_path))
405
406    def set_aw_path(self, aw_path):
407        if aw_path:
408            self.aw_path = aw_path
409            self.log.debug("aw path is: {}".format(self.aw_path))
410
411    def set_testcase_path(self, testcase_path):
412        self.testcase_path = testcase_path
413        self.log.debug("testcase path is: {}".format(self.testcase_path))
414
415    def set_settings(self, settings):
416        if settings:
417            self.settings = settings
418        self.log.debug("settings is: {}".format(self.settings))
419
420    def set_test_suite_path(self, test_suite_path):
421        if test_suite_path:
422            self.test_suite_path = test_suite_path
423        self.log.debug("test suite path is: {}".format(self.test_suite_path))
424
425    def set_task_report_dir(self, task_report_dir):
426        if task_report_dir:
427            self.task_report_dir = task_report_dir
428        self.log.debug("task report dir: {}".format(self.task_report_dir))
429
430    def set_resource_path(self, resource_path):
431        if resource_path:
432            self.resource_path = resource_path
433        self.log.debug("resource path is: {}".format(self.resource_path))
434
435    def set_config_json(self, config_json):
436        self.config_json = config_json
437        self.log.debug("config json is: {}".format(self.config_json))
438
439    def set_property_config(self, property_config):
440        self.property_config = property_config
441        self.log.debug("property config is: {}".format(self.property_config))
442
443    def set_devicename(self, devicename):
444        if devicename:
445            self.devicename = devicename
446        self.log.debug("devicename is: {}".format(self.devicename))
447
448
449class CurCheckCmd:
450    def __init__(self):
451        # 用例校验参数?
452        self.through = ""
453        self.expect = ""
454        self.actual = ""
455
456    def get_cur_check_status(self):
457        if all([self.through, self.expect, self.actual]):
458            return True
459        return False
460
461    def get_cur_check_msg(self):
462        return "{}, expect:{}, actual:{}".format(self.through, self.expect,
463                                                 self.actual)
464
465
466class DeccVariable:
467    __cur_case = {}
468    __thread_lock = threading.Lock()
469    project = ProjectVariables(log)
470
471    @classmethod
472    def set_project_obj(cls, project_obj):
473        log.info("init DeccVariable project object")
474        cls.project = project_obj
475
476    @classmethod
477    def set_cur_case_obj(cls, cur_case_obj):
478        log.info("init DeccVariable cur case object")
479        with cls.__thread_lock:
480            cls.__cur_case[cls.__cur_case_key()] = cur_case_obj
481
482    @classmethod
483    def cur_case(cls):
484        with cls.__thread_lock:
485            return cls.__cur_case.get(cls.__cur_case_key(), None)
486
487    @classmethod
488    def __cur_case_key(cls):
489        return threading.current_thread().ident
490
491    @classmethod
492    def reset(cls):
493        log.info("reset DeccVariable")
494        with cls.__thread_lock:
495            key = cls.__cur_case_key()
496            if key in cls.__cur_case:
497                cls.__cur_case.pop(key)
498