1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import sys
22import traceback
23from typing import Union
24
25from xdevice import calculate_elapsed_time
26from xdevice import check_result_report
27from xdevice import StateRecorder
28from xdevice import LifeCycle
29from xdevice import ResultCode
30from xdevice import get_cst_time
31from xdevice import platform_logger
32from xdevice import EnvPool
33from xdevice import CaseEnd
34from xdevice import Binder
35
36from devicetest.runner.prepare import PrepareHandler
37from devicetest.core.constants import RunResult
38from devicetest.utils.util import clean_sys_resource
39from devicetest.utils.util import get_base_name
40from devicetest.utils.util import get_dir_path
41from devicetest.utils.util import import_from_file
42from devicetest.core.variables import DeccVariable
43from devicetest.core.variables import ProjectVariables
44from devicetest.core.variables import CurCase
45from devicetest.core.exception import DeviceTestError
46from devicetest.core.test_case import DeviceRoot
47from devicetest.core.test_case import BaseCase
48from devicetest.error import ErrorMessage
49from devicetest.log.logger import DeviceTestLog as Log
50from devicetest.report.generation import add_log_caching_handler
51from devicetest.report.generation import del_log_caching_handler
52from devicetest.report.generation import get_caching_logs
53from devicetest.report.generation import generate_report
54
55
56class RunnerMode:
57    PIPELINE = "pipeline"
58    DEBUG = "debug"
59
60
61class TestRunner:
62    """executes test cases and
63    """
64
65    def __init__(self):
66        self.run_mode = RunnerMode.PIPELINE
67        self.run_list = None
68        self.no_run_list = None
69        self.running = None
70        self.configs = None
71        self.devices = None
72        self.log = Log
73        self.start_time = None
74        self.test_results = None
75        self.upload_result_handler = None
76        self.project = None
77        self.prepare = None
78        self.cur_case = None
79        self._repeat = 1
80        self._repeat_round = 1
81
82    def init_pipeline_runner(self, run_list, configs, devices, upload_result_handler):
83        self.run_list = run_list
84        self.no_run_list = copy.copy(self.run_list)
85        self.running = False
86        self.configs = configs
87        self.devices = devices
88        self.start_time = get_cst_time()
89        self.test_results = []
90        self.upload_result_handler = upload_result_handler
91        self.project = ProjectVariables(self.log)
92        self.prepare = None
93        self.__init_project_variables()
94        self.run_mode = RunnerMode.PIPELINE
95        self._repeat = self.configs.get("request").config.repeat
96        self._repeat_round = self.configs.get("request").get_repeat_round()
97
98    def init_case_runner(self, run_list: Union[BaseCase, list]):
99        # simple case runner
100        self.run_list = run_list
101        self.run_mode = RunnerMode.DEBUG
102        self.log = platform_logger("TestRunner")
103
104    def __init_project_variables(self):
105        """
106        testargs:为xDevice透传过来的数据,用户调用CONFIG可获取
107        :return:
108        """
109        self.log.debug("configs:{}".format(self.configs))
110        testcases_path = self.configs.get('testcases_path', "")
111        testargs = self.configs.get("testargs", {})
112        self.__flash_run_list(testargs)
113
114        self.cur_case = CurCase(self.log)
115        self.project.set_project_path()
116        self.project.set_testcase_path(testcases_path)
117        self.project.set_task_report_dir(self.configs.get("report_path"))
118        self.project.set_resource_path(self.get_local_resource_path())
119
120    def get_local_resource_path(self):
121        local_resource_path = os.path.join(
122            self.project.project_path, "testcases", "DeviceTest", "resource")
123        return local_resource_path
124
125    def get_local_aw_path(self):
126        local_aw_path = os.path.join(
127            self.project.project_path, "testcases", "DeviceTest", "aw")
128        return local_aw_path
129
130    def __flash_run_list(self, testargs):
131        """
132        retry 场景更新run list
133        :param testargs:
134        :return:
135        """
136        get_test = testargs.get("test")
137        self.log.info("get test:{}".format(get_test))
138        retry_test_list = self.parse_retry_test_list(get_test)
139        if retry_test_list is not None:
140            self.run_list = retry_test_list
141            self.no_run_list = copy.copy(self.run_list)
142            self.log.info("retry test list:{}".format(retry_test_list))
143
144    def parse_retry_test_list(self, retry_test_list):
145        if retry_test_list is None:
146            return None
147        elif not isinstance(retry_test_list, list):
148            err_msg = ErrorMessage.TestCase.Code_0203005
149            self.log.error(err_msg)
150            raise DeviceTestError(err_msg)
151
152        elif len(retry_test_list) == 1 and "#" not in str(retry_test_list[0]):
153            return None
154        else:
155            history_case_list = []
156            history_case_dict = dict()
157            retry_case_list = []
158            for abd_file_path in self.run_list:
159                base_file_name = get_base_name(abd_file_path)
160                if base_file_name not in history_case_dict.keys():
161                    history_case_dict.update({base_file_name: []})
162                history_case_dict.get(base_file_name).append(abd_file_path)
163                history_case_list.append(base_file_name)
164            self.log.debug("history case list:{}".format(history_case_list))
165
166            for _value in retry_test_list:
167                case_id = str(_value).split("#")[0]
168                if case_id in history_case_dict.keys():
169                    retry_case_list.append(history_case_dict.get(case_id)[0])
170            return retry_case_list
171
172    def parse_config(self, test_configs):
173        pass
174
175    def add_value_to_configs(self):
176        self.configs["log"] = self.log
177        self.configs["devices"] = self.devices
178        self.configs["project"] = self.project
179
180    def run(self):
181        self._pipeline_run()
182
183    def _pipeline_run(self):
184        self.running = True
185        aw_path = self.add_aw_path_to_sys(self.project.aw_path)
186        self.log.info("Executing run list {}.".format(self.run_list))
187
188        self.add_value_to_configs()
189
190        self.prepare = PrepareHandler(self.log, self.cur_case,
191                                      self.project, self.configs,
192                                      self.devices, self.run_list)
193        # **********混合root和非root**************
194        try:
195            for device in self.devices:
196                if hasattr(device, "is_root"):
197                    DeviceRoot.is_root_device = device.is_root
198                    self.log.debug(DeviceRoot.is_root_device)
199                    setattr(device, "is_device_root", DeviceRoot.is_root_device)
200
201        except Exception as _:
202            self.log.error('set branch api error.')
203        # **************混合root和非root end**********************
204        self.prepare.run_prepare()
205
206        for test_cls_name in self.run_list:
207            case_name = get_base_name(test_cls_name)
208            if self.project.record.is_shutdown(raise_exception=False):
209                break
210            self.log.info("Executing test class {}".format(test_cls_name))
211            self.project.execute_case_name = case_name
212            self.run_test_class(test_cls_name, case_name)
213        self.prepare.run_prepare(is_teardown=True)
214        clean_sys_resource(file_path=aw_path)
215        DeccVariable.reset()
216
217    def add_aw_path_to_sys(self, aw_path):
218
219        sys_aw_path = os.path.dirname(aw_path)
220        if os.path.exists(sys_aw_path):
221            sys.path.insert(1, sys_aw_path)
222            self.log.info("add {} to sys path.".format(sys_aw_path))
223            return sys_aw_path
224        return None
225
226    def run_test_class(self, case_path, case_name):
227        """Instantiates and executes a test class.
228        If the test cases list is not None, all the test cases in the test
229        class should be executed.
230        Args:
231            case_path: case path
232            case_name: case name
233        Returns:
234            A tuple, with the number of cases passed at index 0, and the total
235            number of test cases at index 1.
236        """
237        # 开始收集日志
238        case_log_buffer_hdl = add_log_caching_handler()
239
240        tests = "__init__"
241        case_result = RunResult.FAILED
242        start_time = get_cst_time()
243        case_dir_path = get_dir_path(case_path)
244        test_cls_instance = None
245
246        # 用例测试结果的拓展内容
247        result_content = None
248        try:
249            self.project.cur_case_full_path = case_path
250            DeccVariable.set_cur_case_obj(self.cur_case)
251            test_cls = import_from_file(case_dir_path, case_name)
252            self.log.info("Success to import {}.".format(case_name))
253            with test_cls(self.configs) as test_cls_instance:
254                self.cur_case.set_case_instance(test_cls_instance)
255                test_cls_instance.run()
256
257            tests = test_cls_instance.tests
258            start_time = test_cls_instance.start_time
259
260            case_result = test_cls_instance.result
261            error_msg = test_cls_instance.error_msg
262            result_content = getattr(test_cls_instance, "result_content", None)
263        except ImportError as exception:
264            error_msg = str(exception)
265            self.log.error(error_msg)
266            self.log.error(traceback.format_exc())
267        except Exception as exception:
268            error_msg = ErrorMessage.TestCase.Code_0203002.format(exception)
269            self.log.error(error_msg)
270            self.log.error(traceback.format_exc())
271        if test_cls_instance:
272            try:
273                del test_cls_instance
274                self.log.debug("del test_cls_instance success.")
275            except Exception as exception:
276                self.log.warning("del test_cls_instance exception. {}".format(exception))
277
278        Binder.notify_stage(CaseEnd(case_name, case_result, error_msg))
279
280        end_time = get_cst_time()
281        environment = self.configs.get("request").config.environment
282        steps = self.cur_case.get_steps_info()
283        # 停止收集日志
284        del_log_caching_handler(case_log_buffer_hdl)
285        # 生成报告
286        case_info = {
287            "name": case_name,
288            "result": case_result,
289            "begin": start_time.strftime("%Y-%m-%d %H:%M:%S"),
290            "end": end_time.strftime("%Y-%m-%d %H:%M:%S"),
291            'elapsed': calculate_elapsed_time(start_time, end_time),
292            "error": error_msg,
293            "logs": "",
294            "devices": [] if environment is None else environment.get_description(),
295            "steps": steps
296        }
297        log_content = {
298            "content": get_caching_logs(case_log_buffer_hdl)
299        }
300        round_folder = f"round{self._repeat_round}" if self._repeat > 1 else ""
301        report_path = os.path.join("details", round_folder, case_name + ".html")
302        to_file = os.path.join(self.project.task_report_dir, report_path)
303        generate_report(to_file, case=case_info, logs=log_content)
304        steps.clear()
305        del case_log_buffer_hdl
306        self.cur_case.set_case_instance(None)
307        self.record_current_case_result(
308            case_name, tests, case_result, start_time, error_msg, report_path,
309            result_content=result_content)
310        return case_result, error_msg
311
312    def record_current_case_result(self, case_name, tests, case_result,
313                                   start_time, error_msg, report, **kwargs):
314        test_result = self.record_cls_result(
315            case_name, tests, case_result, start_time, error_msg, report, **kwargs)
316        self.log.debug("test result: {}".format(test_result))
317        self.test_results.append(test_result)
318        self.upload_result_handler.report_handler.test_results.append(test_result)
319
320    def stop(self):
321        """
322        Releases resources from test run. Should be called right after run()
323        finishes.
324        """
325        if self.running:
326            self.running = False
327
328    @staticmethod
329    def record_cls_result(case_name, tests_step, result, start_time, error, report, **kwargs):
330        dict_result = {
331            "case_name": case_name,
332            "tests_step": tests_step or "__init__",
333            "result": result or RunResult.FAILED,
334            "start_time": start_time or get_cst_time(),
335            "error": error,
336            "end_time": get_cst_time(),
337            "report": report
338        }
339        dict_result.update(kwargs)
340        return dict_result
341
342
343class TestSuiteRunner:
344    """
345    executes test suite cases
346    """
347
348    def __init__(self, suite, configs, devices):
349        self.suite = suite
350        self.running = False
351        self.configs = configs
352        self.devices = devices
353        self.log = Log
354        self.start_time = get_cst_time()
355        self.listeners = self.configs["listeners"]
356        self.state_machine = StateRecorder()
357        self.suite_name = ""
358
359    def add_value_to_configs(self):
360        self.configs["log"] = self.log
361        self.configs["devices"] = self.devices
362        self.configs["suite_name"] = self.suite_name
363
364    def run(self):
365        self.running = True
366        self.log.info("Executing test suite: {}.".format(self.suite))
367
368        self.suite_name = get_base_name(self.suite)
369        self.add_value_to_configs()
370        self.run_test_suite(self.suite)
371
372    def run_test_suite(self, test_cls_name):
373        """Instantiates and executes a test class.
374        If the test cases list is not None, all the test cases in the test
375        class should be executed.
376        Args:
377            test_cls_name: Name of the test class to execute.
378        Returns:
379            A tuple, with the number of cases passed at index 0, and the total
380            number of test cases at index 1.
381        """
382        suite_dir_path = get_dir_path(test_cls_name)
383        test_cls_instance = None
384        self.handle_suites_started()
385        self.handle_suite_started()
386        try:
387            test_cls = import_from_file(suite_dir_path, self.suite_name)
388            self.log.info("Success to import {}.".format(self.suite_name))
389            self.configs["cur_suite"] = test_cls
390            with test_cls(self.configs, suite_dir_path) as test_cls_instance:
391                test_cls_instance.run()
392
393            error_msg = test_cls_instance.error_msg
394            self.handle_suite_ended(test_cls_instance)
395        except Exception as e:
396            error_msg = ErrorMessage.TestCase.Code_0203017.format(e)
397            self.log.error(error_msg)
398            self.log.error(traceback.format_exc())
399        self.handle_suites_ended(error_msg)
400        result_path = os.path.join(self.configs["report_path"], "result")
401        report_file = os.path.join(result_path, "%s.xml" % self.suite_name)
402        os.makedirs(result_path, exist_ok=True)
403        check_result_report("", report_file, error_message=error_msg)
404        if test_cls_instance:
405            try:
406                del test_cls_instance
407                self.log.debug("del test suite instance success.")
408            except Exception as e:
409                self.log.warning("del test suite instance exception. "
410                                 "Exception: {}".format(e))
411        return error_msg
412
413    def stop(self):
414        """
415        Releases resources from test run. Should be called right after run()
416        finishes.
417        """
418        if self.running:
419            self.running = False
420
421    def handle_suites_started(self):
422        self.state_machine.get_suites(reset=True)
423        test_suites = self.state_machine.get_suites()
424        test_suites.suites_name = self.suite_name
425        test_suites.test_num = 0
426        for listener in self.listeners:
427            suite_report = copy.copy(test_suites)
428            listener.__started__(LifeCycle.TestSuites, suite_report)
429
430    def handle_suites_ended(self, error_msg):
431        suites = self.state_machine.get_suites()
432        suites.is_completed = True
433        suites.stacktrace = error_msg
434        for listener in self.listeners:
435            listener.__ended__(LifeCycle.TestSuites, suites)
436
437    def handle_suite_started(self):
438        self.state_machine.suite(reset=True)
439        self.state_machine.running_test_index = 0
440        test_suite = self.state_machine.suite()
441        test_suite.suite_name = self.suite_name
442        test_suite.test_num = 0
443        for listener in self.listeners:
444            suite_report = copy.copy(test_suite)
445            listener.__started__(LifeCycle.TestSuite, suite_report)
446
447    def handle_suite_ended(self, testsuite_cls):
448        suite = self.state_machine.suite()
449        suites = self.state_machine.get_suites()
450        self.handle_one_case_result(testsuite_cls)
451        suite.is_completed = True
452        # 设置测试套的报告路径
453        suite.report = testsuite_cls.suite_report_path
454        for listener in self.listeners:
455            listener.__ended__(LifeCycle.TestSuite, copy.copy(suite), is_clear=True)
456        suites.run_time += suite.run_time
457
458    def handle_one_case_result(self, testsuite_cls):
459        status_dict = {RunResult.PASSED: ResultCode.PASSED,
460                       RunResult.FAILED: ResultCode.FAILED,
461                       RunResult.BLOCKED: ResultCode.BLOCKED,
462                       "ignore": ResultCode.SKIPPED}
463        for case_name, case_result in testsuite_cls.case_result.items():
464            result = case_result.get("result")
465            error = case_result.get("error")
466            run_time = case_result.get("run_time")
467            report = case_result.get("report")
468            result_content = case_result.get("result_content")
469
470            test_result = self.state_machine.test(reset=True)
471            test_suite = self.state_machine.suite()
472            test_result.test_class = test_suite.suite_name
473            test_result.test_name = case_name
474            test_result.code = status_dict.get(result).value
475            test_result.stacktrace = error
476            test_result.run_time = run_time
477            test_result.report = report
478            if result_content:
479                test_result.result_content = result_content
480            test_result.current = self.state_machine.running_test_index + 1
481
482            self.state_machine.suite().run_time += run_time
483            for listener in self.listeners:
484                listener.__started__(
485                    LifeCycle.TestCase, copy.copy(test_result))
486            test_suites = self.state_machine.get_suites()
487            test_suites.test_num += 1
488            for listener in self.listeners:
489                listener.__ended__(
490                    LifeCycle.TestCase, copy.copy(test_result))
491            self.state_machine.running_test_index += 1
492