1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import traceback
21import platform
22from xml.etree import ElementTree
23
24from devicetest.core.exception import TestPrepareError
25from devicetest.core.constants import RunResult
26from devicetest.error import ErrorMessage
27from devicetest.utils.util import get_base_name
28from devicetest.utils.util import import_from_file
29
30
31class PrepareHandler:
32
33    def __init__(self, log, cur_case, project, configs, devices, run_list):
34        self.log = log
35        self.cur_case = cur_case
36        self.project = project
37        self.configs = configs
38        self.devices = devices
39        self.is_run_prepare = False
40        self.parse_test_list(run_list)
41
42    def parse_prepare_config(self, case_path, xml_path):
43        try:
44            self.log.debug("parse prepare config case path:{}".
45                           format(case_path))
46            case_name = get_base_name(case_path)
47            prepares = ElementTree.parse(xml_path).findall("prepare")
48            for cls in prepares:
49                cls_name = get_base_name(cls.attrib["class"].strip())
50                for case in cls.findall("testcase"):
51                    name = get_base_name(case.attrib["name"].strip())
52                    if name and case_name == name:
53                        if self.flash_prepare_config(case_name, cls_name):
54                            break
55        except Exception:
56            self.log.debug(traceback.format_exc())
57            self.log.error("parse prepare config exception error.")
58
59    def flash_prepare_config(self, case_name, cls_name):
60        if cls_name not in self.project.prepare.config.keys():
61            self.project.prepare.config[cls_name] = {}
62            self.project.prepare.config[cls_name]['status'] = 'unexecuted'
63            self.project.prepare.config[cls_name]['cases'] = []
64        if case_name not in self.project.prepare.config[cls_name]['cases']:
65            self.project.prepare.config[cls_name]['cases'].append(case_name)
66            return True
67        return False
68
69    def parse_test_list(self, test_list):
70        """Parse user provided test list into internal format for test_runner.
71        """
72        if not self.project.prepare.path:
73            return
74        xml_path = os.path.join(self.project.prepare.path, 'prepare.xml')
75        self.log.debug("prepare xml path:{}".format(xml_path))
76        if os.access(xml_path, os.F_OK):
77            for elem in test_list:
78                self._parse_one_test_specifier(elem, xml_path)
79            self.log.debug("prepare config:{}".format(
80                self.project.prepare.config))
81            if self.project.prepare.config:
82                self.is_run_prepare = True
83        else:
84            self.log.warning(
85                "{} not exists, please check.".format(xml_path))
86
87    def _parse_one_test_specifier(self, item, xml_path):
88        sys_type = platform.system()
89        if sys_type == "Windows":
90            tokens = item.split(';')
91        elif sys_type == "Linux":
92            tokens = item.split(':')
93        elif sys_type == "Darwin":
94            tokens = item.split(':')
95        else:
96            self.log.error("system '{}' is not support".format(sys_type))
97            raise TestPrepareError(ErrorMessage.Common.Code_0201011)
98        if len(tokens) > 2:
99            raise TestPrepareError(ErrorMessage.Common.Code_0201012.format(item))
100        if len(tokens) == 1:
101            # This should be considered a test class name
102            self.parse_prepare_config(tokens[0], xml_path)
103        elif len(tokens) == 2:
104            test_cls_name, test_case_names = tokens
105            for elem in test_case_names.split(','):
106                self.validate_test_name(elem.strip())
107                self.parse_prepare_config(test_cls_name, xml_path)
108
109    def validate_test_name(self, name):
110        """Checks if a test name is valid. """
111        if name == "" or name is None or len(name) < 1:
112            raise TestPrepareError(ErrorMessage.Common.Code_0201013.format(name))
113
114    def _init_run_prepare(self, test_cls_name):
115        """
116        prepare变量清理
117        Args:
118            test_cls_name:
119        Returns:
120        """
121        self.cur_case.log_details_path = "./log/test_run_details.log"
122        self.cur_case.log_path = "./log/test_run_summary.log"
123        self.cur_case.set_case_screenshot_dir(self.project.test_suite_path,
124                                              self.project.task_report_dir,
125                                              test_cls_name)
126        self.cur_case.report_path = self.cur_case.case_screenshot_dir + ".html"
127        self.cur_case.case_result = RunResult.PASSED
128        self.cur_case.description = ""
129        self.cur_case.error_msg = ""
130        self.cur_case.status = 0
131        self.cur_case.image_num = 0
132        self.cur_case.dump_xml_num = 0
133
134    def run_prepare(self, is_teardown=False):
135        if not self.is_run_prepare:
136            return
137        func = 'teardown' if is_teardown else 'setup'
138        self.log.debug("in prepare {}".format(func))
139        error_msg = None
140        try:
141            for cls, val in self.project.prepare.config.items():
142                if self.project.prepare.path:
143                    prepare_path = self.project.prepare.path
144                else:
145                    prepare_path = os.path.join(
146                        self.project.test_suite_path, 'prepare')
147
148                self.log.debug("prepare path:{}".format(prepare_path))
149                test_cls_name = os.path.join(prepare_path, cls + '.py')
150                if not os.access(test_cls_name, os.F_OK):
151                    test_cls_name = os.path.join(prepare_path,
152                                                 cls + '.pyd')
153                    if not os.access(test_cls_name, os.F_OK):
154                        py_path = os.path.join(prepare_path, cls + '.py')
155                        # .py/.pyd
156                        msg = "{} or {}".format(py_path, py_path + "d")
157                        raise TestPrepareError(ErrorMessage.Common.Code_0201014.format(msg))
158                self.log.info("import prepare script:{}".format(cls))
159                self.project.cur_case_full_path = test_cls_name
160                test_cls = import_from_file(prepare_path, cls)
161                self.log.debug(
162                    "Success to import {}.".format(test_cls_name))
163                with test_cls(self.configs) as test_instance:
164                    if 'setup' == func:
165                        if 'unexecuted' == val['status']:
166                            self.project.prepare.config[cls][
167                                'status'] = 'executed'
168                            result = test_instance._exec_func(
169                                test_instance.setup)
170                            if not result:
171                                raise TestPrepareError(ErrorMessage.Common.Code_0201015)
172                    else:
173                        if 'executed' == val['status']:
174                            self.project.prepare.config[cls][
175                                'status'] = 'finsh'
176                            result = test_instance._exec_func(
177                                test_instance.teardown)
178                            if not result:
179                                self.log.warning(ErrorMessage.Common.Code_0201016)
180
181        except TestPrepareError as e:
182            error_msg = str(e)
183            self.log.error(error_msg)
184            self.log.error(traceback.format_exc())
185
186        except Exception as e:
187            error_msg = "run prepare error! {}".format(e)
188            self.log.error(error_msg)
189            self.log.error(traceback.format_exc())
190
191        finally:
192            self.log.debug("exit prepare {}".format(func))
193            if error_msg is not None:
194                raise TestPrepareError(error_msg)
195