1#!/usr/bin/python3.4
2# coding=utf-8
3#
4
5# Copyright (C) 2016 Huawei Technologies Co., HUTAF xDevice
6#
7# Licensed under the Apache License, Version 2.0 (the "License"); you may not
8# use this file except in compliance with the License. You may obtain a copy of
9# the License at
10#
11# http://www.apache.org/licenses/LICENSE-2.0
12#
13# Unless required by applicable law or agreed to in writing, software
14# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16# License for the specific language governing permissions and limitations under
17# the License.
18
19import copy
20import sys
21import os
22import traceback
23from typing import List, Tuple
24
25from xdevice import ConfigConst
26from xdevice import calculate_elapsed_time
27from xdevice import get_cst_time
28from xdevice import get_file_absolute_path
29from xdevice import FilePermission
30from xdevice import CaseEnd
31from xdevice import Binder
32from xdevice import Variables
33
34from devicetest.core.constants import RunResult
35from devicetest.core.constants import FileAttribute
36from devicetest.core.test_case import UpdateStep
37from devicetest.core.variables import CurCase
38from devicetest.core.variables import DeccVariable
39from devicetest.core.variables import ProjectVariables
40from devicetest.error import ErrorMessage
41from devicetest.report.generation import add_log_caching_handler
42from devicetest.report.generation import del_log_caching_handler
43from devicetest.report.generation import get_caching_logs
44from devicetest.report.generation import generate_report
45from devicetest.utils.util import get_base_name
46from devicetest.utils.util import get_dir_path
47from devicetest.utils.util import import_from_file
48
49
50class TestSuite:
51    """Base class for all test classes to inherit from.
52
53    This class gets all the controller objects from test_runner and executes
54    the test cases requested within itself.
55
56    """
57
58    def __init__(self, configs, path):
59        self.configs = configs
60        self.devices = []
61        self.device1 = None
62        self.device2 = None
63        # 透传的参数
64        self.pass_through = Variables.config.pass_through
65        self.set_devices(self.configs["devices"])
66        self.path = path
67        self.log = self.configs["log"]
68        self.error_msg = ''
69        self.trace_info = ''
70        self.case_list: List[Tuple[str, str]] = []
71        self.case_result = dict()
72        self.suite_name = self.configs.get("suite_name")
73        # 白名单用例
74        self.white_case_list = []
75        # 黑名单用例
76        self.black_case_list = []
77        # 初始化透传参数的列表
78        self.arg_list = dict()
79        self.app_result_info = dict()
80        self._test_args_para_parse(self.configs["testargs"])
81        # 往DeviceTest的用例中注入logger并防止重复初始化测试套级别的变量
82        self.inject_logger = None
83        self.cur_case = None
84        # device log
85        self.device_log = dict()
86        self.hilog = dict()
87        self.log_proc = dict()
88        self.hilog_proc = dict()
89
90        self.suite_case_results = []
91        self.suite_report_path = ""
92        self._case_log_buffer_hdl = None
93
94        # device录屏截图属性
95        self.devices_media = dict()
96
97        self._repeat = self.configs.get("request").config.repeat
98        self._repeat_round = self.configs.get("request").get_repeat_round()
99        self._round_folder = f"round{self._repeat_round}" if self._repeat > 1 else ""
100
101    def __enter__(self):
102        return self
103
104    def __exit__(self, *args):
105        pass
106
107    def _device_close(self):
108        self.log.debug("Start device close")
109        for device in self.devices:
110            device.close()
111        self.log.debug("Finish device close.")
112
113    def run(self):
114        self._init_devicetest()
115        report_path = os.path.join("details", self._round_folder, self.suite_name, self.suite_name + ".html")
116        start_time = get_cst_time()
117        # 记录录屏和截图属性
118        # self._get_screenrecorder_and_screenshot()
119        # 开始收集测试套(setup和teardown)的运行日志
120        suite_log_buffer_hdl = add_log_caching_handler()
121        try:
122            self.cur_case.set_suite_instance(self)
123            # 1.先判断是否在json中指定,否则先收集当前文件夹下所有testcase得到run_list
124            for case_path in self._get_case_list(self.path):
125                case_name = get_base_name(case_path)
126                if (self.black_case_list and case_name in self.black_case_list) \
127                        or (self.white_case_list and case_name not in self.white_case_list):
128                    self.log.warning("case name {} is in black list or not in white list, ignored".format(case_name))
129                    continue
130                self.case_list.append((case_name, case_path))
131            self.log.debug("Execute test case list: {}".format(self.case_list))
132            # 2.先执行self.setup
133            if self.run_setup():
134                # 在运行测试套子用例前,停止收集测试套setup步骤的运行日志
135                del_log_caching_handler(suite_log_buffer_hdl)
136                # 3.依次执行所有的run_list
137                # 开始收集测试套子用例的运行日志
138                self._case_log_buffer_hdl = add_log_caching_handler()
139                total_case_num = len(self.case_list)
140                for index, case in enumerate(self.case_list, 1):
141                    # self._reset_screenrecorder_and_screenshot()
142                    self.log.info("[{} / {}] Executing suite case: {}".format(index, total_case_num, case[1]))
143                    self.run_one_test_case(case)
144                # 停止收集测试套子用例的运行日志
145                del_log_caching_handler(self._case_log_buffer_hdl)
146            else:
147                self.error_msg = ErrorMessage.TestCase.Code_0203017.format(self.error_msg)
148                for case in self.case_list:
149                    self.case_result[case[0]] = {
150                        "result": RunResult.BLOCKED,
151                        "error": self.error_msg,
152                        "run_time": 0,
153                        "report": report_path
154                    }
155            self._case_log_buffer_hdl = None
156            # 在运行测试套子用例后,重新开始收集测试套teardown步骤的运行日志
157            add_log_caching_handler(buffer_hdl=suite_log_buffer_hdl)
158        finally:
159            # 4.执行self.teardown
160            self.run_teardown()
161            self.cur_case.set_suite_instance(None)
162
163        steps = self.cur_case.get_steps_info()
164        # 停止收集测试套(setup和teardown)的运行日志
165        del_log_caching_handler(suite_log_buffer_hdl)
166        if suite_log_buffer_hdl is None:
167            return
168        # 生成测试套的报告
169        self.log.info("generate suite report")
170        end_time = get_cst_time()
171        environment = self.configs.get("request").config.environment
172        suite_info = {
173            "name": self.suite_name,
174            "result": "",
175            "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"),
176            "end": end_time.strftime("%Y-%m-%d %H:%M:%S"),
177            'elapsed': calculate_elapsed_time(start_time, end_time),
178            "error": "",
179            "logs": "",
180            "subcases": self.suite_case_results,
181            "devices": [] if environment is None else environment.get_description(),
182            "steps": steps
183        }
184        log_content = {
185            "content": get_caching_logs(suite_log_buffer_hdl)
186        }
187        to_file = os.path.join(self.get_case_report_path(), report_path)
188        generate_report(to_file, case=suite_info, logs=log_content)
189        del suite_log_buffer_hdl
190
191        # 往结果xml添加测试套的报告路径
192        self.suite_report_path = report_path
193        steps.clear()
194        DeccVariable.reset()
195
196    def setup(self):
197        """Setup function that will be called before executing any test suite.
198        Implementation is optional.
199        """
200        pass
201
202    def setup_start(self):
203        """
204        setup_start function that will be called after setup function.
205        Implementation is optional.
206        """
207        pass
208
209    def setup_end(self):
210        """
211        setup_end function that will be called after setup function.
212        Implementation is optional.
213        """
214        pass
215
216    def teardown(self):
217        """Teardown function that will be called after all the selected test
218        suite.
219        Implementation is optional.
220        """
221        pass
222
223    def teardown_start(self):
224        """
225        teardown_start function that will be called before Teardown function.
226        Implementation is optional.
227        """
228        pass
229
230    def teardown_end(self):
231        """
232        teardown_end function that will be called after Teardown function.
233        Implementation is optional.
234        """
235        pass
236
237    def get_params(self):
238        return self.arg_list
239
240    def set_devices(self, devices):
241        self.devices = devices
242        if not devices:
243            return
244
245        try:
246            for num, _ad in enumerate(self.devices, 1):
247                if not hasattr(_ad, "device_id") or not getattr(_ad, "device_id"):
248                    setattr(_ad, "device_id", "device{}".format(num))
249                # 兼容release2 增加id、serial
250                setattr(_ad, "id", _ad.device_id)
251                setattr(_ad, "serial", _ad.device_sn)
252                setattr(self, _ad.device_id, _ad)
253                setattr(self, "device{}".format(num), _ad)
254        except Exception as error:
255            self.log.error("Failed to initialize the device object in the "
256                           "TestCase.", error_no="01218")
257            raise error
258
259    def _get_case_list(self, path):
260        result = []
261        if len(self.configs["suitecases"]) > 0:
262            for _, case in enumerate(self.configs["suitecases"]):
263                if os.path.exists(case):
264                    case_path = case
265                else:
266                    case_path = get_file_absolute_path(case, [path,
267                                                              self.configs["resource_path"],
268                                                              self.configs["testcases_path"]])
269                result.append(case_path)
270        else:
271            all_file_list = os.listdir(path)
272            # 遍历该文件夹下的所有目录或者文件
273            for file in all_file_list:
274                filepath = os.path.join(path, file)
275                # 如果是文件夹,递归调用函数
276                if os.path.isdir(filepath):
277                    result.extend(self._get_case_list(filepath))
278                # 如果不是文件夹,保存文件路径及文件名
279                elif os.path.isfile(filepath) and \
280                        "__pycache__" not in filepath:
281                    if file.startswith(FileAttribute.TESTCASE_PREFIX) and \
282                            (file.endswith(FileAttribute.TESTCASE_POSFIX_PY) or
283                             file.endswith(FileAttribute.TESTCASE_POSFIX_PYC) or
284                             file.endswith(FileAttribute.TESTCASE_POSFIX_PYD)):
285                        result.append(filepath)
286        return result
287
288    def _exec_func(self, func, *args):
289        result = False
290        try:
291            func(*args)
292        except Exception as exception:
293            self.error_msg = str(exception)
294            self.trace_info = traceback.format_exc()
295
296            index = self.cur_case.step_index
297            if index == -1:
298                self.log.error(self.error_msg)
299                self.log.error(self.trace_info)
300            else:
301                step_error_id = f'step_error_{index}'
302                self.log.error(f'<span id="{step_error_id}">{self.error_msg}</span>')
303                self.log.error(self.trace_info)
304                _error = f'<a href="javascript:" onclick="gotoStep(\'{step_error_id}\')">{self.error_msg}</a>'
305                UpdateStep(index, error=_error)
306        else:
307            result = True
308        return result
309
310    def run_setup(self):
311        self.setup_start()
312        self.log.info("**********SetUp Starts!")
313        ret = self._exec_func(self.setup)
314        self.log.info("**********SetUp Ends!")
315        if ret:
316            self.setup_end()
317            return True
318        self.log.info("SetUp Failed!")
319        return False
320
321    def run_one_test_case(self, case: Tuple[str, str]):
322        case_name, case_path = case[0], case[1]
323        start_time = get_cst_time()
324        case_result = RunResult.FAILED
325        test_cls_instance = None
326        result_content = None   # 用例测试结果的拓展内容
327        try:
328            test_cls = import_from_file(get_dir_path(case_path), case_name)
329            self.log.info("Success to import {}.".format(case_name))
330            self._compatible_testcase(case_path, case_name)
331            with test_cls(self.configs) as test_cls_instance:
332                self.cur_case.set_case_instance(test_cls_instance)
333                test_cls_instance.run()
334            case_result, error_msg = test_cls_instance.result, test_cls_instance.error_msg
335            result_content = getattr(test_cls_instance, "result_content", None)
336        except Exception as e:
337            error_msg = str(e)
338            self.log.error("run case error! Exception: {}".format(e))
339            self.log.error(traceback.format_exc())
340
341        if test_cls_instance is None:
342            case_result = RunResult.BLOCKED
343        if test_cls_instance:
344            try:
345                del test_cls_instance
346                self.log.debug("del test case instance success")
347            except Exception as e:
348                self.log.debug(traceback.format_exc())
349                self.log.warning("del test case instance exception. Exception: {}".format(e))
350        Binder.notify_stage(CaseEnd(case_name, case_result))
351
352        end_time = get_cst_time()
353        cost = int(round((end_time - start_time).total_seconds() * 1000))
354        self.log.info("Executed case: {}, result: {}, cost time: {}ms".format(case_name, case_result, cost))
355        self.case_result[case_name] = {
356            "result": case_result, "error": error_msg,
357            "run_time": cost, "report": "", "result_content": result_content}
358
359        try:
360            self._device_close()
361        except Exception as e:
362            self.log.error("stop catch device log error! {}".format(e))
363            self.log.debug(traceback.format_exc())
364
365        if self._case_log_buffer_hdl is None:
366            return
367        # 生成子用例的报告
368        steps = self.cur_case.get_steps_info()
369        base_info = {
370            "name": case_name,
371            "result": case_result,
372            "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"),
373            "end": end_time.strftime("%Y-%m-%d %H:%M:%S"),
374            'elapsed': calculate_elapsed_time(start_time, end_time),
375            "error": error_msg
376        }
377        case_info = copy.copy(base_info)
378        case_info.update({
379            "logs": "",
380            "devices": [],
381            "steps": steps
382        })
383        log_content = {
384            "content": copy.copy(get_caching_logs(self._case_log_buffer_hdl))
385        }
386        case_html = case_name + ".html"
387        report_path = os.path.join("details", self._round_folder, self.suite_name, case_html)
388        to_file = os.path.join(self.configs.get("report_path"), report_path)
389        generate_report(to_file, case=case_info, logs=log_content)
390        base_info["report"] = case_html
391        self.suite_case_results.append(base_info)
392        # 清空日志缓存
393        self._case_log_buffer_hdl.buffer.clear()
394        steps.clear()
395        # 往结果xml添加子用例的报告路径
396        self.case_result[case_name]["report"] = report_path
397        # 将用例实例对象和用例名置为空
398        self.cur_case.set_case_instance(None)
399        self.cur_case.set_name("")
400
401    def run_teardown(self):
402        self.log.info("**********TearDown Starts!")
403        self.teardown_start()
404        self._exec_func(self.teardown)
405        self.teardown_end()
406        self.log.info("**********TearDown Ends!")
407
408    def _test_args_para_parse(self, paras):
409        paras = dict(paras)
410        for para_name in paras.keys():
411            para_name = para_name.strip()
412            para_values = paras.get(para_name, [])
413            if para_name == "class":
414                self.white_case_list.extend(para_values)
415            elif para_name == "notClass":
416                self.black_case_list.extend(para_values)
417            elif para_name == "para":
418                for arg in para_values:
419                    key, value = arg.split("#")
420                    self.arg_list[key] = value
421            elif para_name == "deveco_planet_info":
422                for app_info in para_values:
423                    key, value = app_info.split("#")
424                    if key == "task_type":
425                        setattr(sys, "category", value)
426                    else:
427                        self.app_result_info[key] = value
428                        setattr(sys, "app_result_info", self.app_result_info)
429            else:
430                continue
431
432        self.configs["pass_through"] = self.pass_through
433        self.configs["arg_list"] = self.arg_list
434
435    def get_case_report_path(self):
436        return self.configs["report_path"]
437
438    def _compatible_testcase(self, case_path, case_name):
439        DeccVariable.cur_case().set_name(case_name)
440        project_var = ProjectVariables(self.inject_logger)
441        project_var.execute_case_name = case_name
442        project_var.cur_case_full_path = case_path
443        project_var.task_report_dir = self.get_case_report_path()
444        self.configs["project"] = project_var
445
446    def _init_devicetest(self):
447        self.cur_case = CurCase(self.log)
448        self.cur_case.suite_name = self.suite_name
449        self.cur_case.set_case_screenshot_dir(
450            None, self.get_case_report_path(), None,
451            repeat=self._repeat, repeat_round=self._repeat_round)
452        DeccVariable.set_cur_case_obj(self.cur_case)
453