1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import traceback 21import platform 22from xml.etree import ElementTree 23 24from devicetest.core.exception import TestPrepareError 25from devicetest.core.constants import RunResult 26from devicetest.error import ErrorMessage 27from devicetest.utils.util import get_base_name 28from devicetest.utils.util import import_from_file 29 30 31class PrepareHandler: 32 33 def __init__(self, log, cur_case, project, configs, devices, run_list): 34 self.log = log 35 self.cur_case = cur_case 36 self.project = project 37 self.configs = configs 38 self.devices = devices 39 self.is_run_prepare = False 40 self.parse_test_list(run_list) 41 42 def parse_prepare_config(self, case_path, xml_path): 43 try: 44 self.log.debug("parse prepare config case path:{}". 45 format(case_path)) 46 case_name = get_base_name(case_path) 47 prepares = ElementTree.parse(xml_path).findall("prepare") 48 for cls in prepares: 49 cls_name = get_base_name(cls.attrib["class"].strip()) 50 for case in cls.findall("testcase"): 51 name = get_base_name(case.attrib["name"].strip()) 52 if name and case_name == name: 53 if self.flash_prepare_config(case_name, cls_name): 54 break 55 except Exception: 56 self.log.debug(traceback.format_exc()) 57 self.log.error("parse prepare config exception error.") 58 59 def flash_prepare_config(self, case_name, cls_name): 60 if cls_name not in self.project.prepare.config.keys(): 61 self.project.prepare.config[cls_name] = {} 62 self.project.prepare.config[cls_name]['status'] = 'unexecuted' 63 self.project.prepare.config[cls_name]['cases'] = [] 64 if case_name not in self.project.prepare.config[cls_name]['cases']: 65 self.project.prepare.config[cls_name]['cases'].append(case_name) 66 return True 67 return False 68 69 def parse_test_list(self, test_list): 70 """Parse user provided test list into internal format for test_runner. 71 """ 72 if not self.project.prepare.path: 73 return 74 xml_path = os.path.join(self.project.prepare.path, 'prepare.xml') 75 self.log.debug("prepare xml path:{}".format(xml_path)) 76 if os.access(xml_path, os.F_OK): 77 for elem in test_list: 78 self._parse_one_test_specifier(elem, xml_path) 79 self.log.debug("prepare config:{}".format( 80 self.project.prepare.config)) 81 if self.project.prepare.config: 82 self.is_run_prepare = True 83 else: 84 self.log.warning( 85 "{} not exists, please check.".format(xml_path)) 86 87 def _parse_one_test_specifier(self, item, xml_path): 88 sys_type = platform.system() 89 if sys_type == "Windows": 90 tokens = item.split(';') 91 elif sys_type == "Linux": 92 tokens = item.split(':') 93 elif sys_type == "Darwin": 94 tokens = item.split(':') 95 else: 96 self.log.error("system '{}' is not support".format(sys_type)) 97 raise TestPrepareError(ErrorMessage.Common.Code_0201011) 98 if len(tokens) > 2: 99 raise TestPrepareError(ErrorMessage.Common.Code_0201012.format(item)) 100 if len(tokens) == 1: 101 # This should be considered a test class name 102 self.parse_prepare_config(tokens[0], xml_path) 103 elif len(tokens) == 2: 104 test_cls_name, test_case_names = tokens 105 for elem in test_case_names.split(','): 106 self.validate_test_name(elem.strip()) 107 self.parse_prepare_config(test_cls_name, xml_path) 108 109 def validate_test_name(self, name): 110 """Checks if a test name is valid. """ 111 if name == "" or name is None or len(name) < 1: 112 raise TestPrepareError(ErrorMessage.Common.Code_0201013.format(name)) 113 114 def _init_run_prepare(self, test_cls_name): 115 """ 116 prepare变量清理 117 Args: 118 test_cls_name: 119 Returns: 120 """ 121 self.cur_case.log_details_path = "./log/test_run_details.log" 122 self.cur_case.log_path = "./log/test_run_summary.log" 123 self.cur_case.set_case_screenshot_dir(self.project.test_suite_path, 124 self.project.task_report_dir, 125 test_cls_name) 126 self.cur_case.report_path = self.cur_case.case_screenshot_dir + ".html" 127 self.cur_case.case_result = RunResult.PASSED 128 self.cur_case.description = "" 129 self.cur_case.error_msg = "" 130 self.cur_case.status = 0 131 self.cur_case.image_num = 0 132 self.cur_case.dump_xml_num = 0 133 134 def run_prepare(self, is_teardown=False): 135 if not self.is_run_prepare: 136 return 137 func = 'teardown' if is_teardown else 'setup' 138 self.log.debug("in prepare {}".format(func)) 139 error_msg = None 140 try: 141 for cls, val in self.project.prepare.config.items(): 142 if self.project.prepare.path: 143 prepare_path = self.project.prepare.path 144 else: 145 prepare_path = os.path.join( 146 self.project.test_suite_path, 'prepare') 147 148 self.log.debug("prepare path:{}".format(prepare_path)) 149 test_cls_name = os.path.join(prepare_path, cls + '.py') 150 if not os.access(test_cls_name, os.F_OK): 151 test_cls_name = os.path.join(prepare_path, 152 cls + '.pyd') 153 if not os.access(test_cls_name, os.F_OK): 154 py_path = os.path.join(prepare_path, cls + '.py') 155 # .py/.pyd 156 msg = "{} or {}".format(py_path, py_path + "d") 157 raise TestPrepareError(ErrorMessage.Common.Code_0201014.format(msg)) 158 self.log.info("import prepare script:{}".format(cls)) 159 self.project.cur_case_full_path = test_cls_name 160 test_cls = import_from_file(prepare_path, cls) 161 self.log.debug( 162 "Success to import {}.".format(test_cls_name)) 163 with test_cls(self.configs) as test_instance: 164 if 'setup' == func: 165 if 'unexecuted' == val['status']: 166 self.project.prepare.config[cls][ 167 'status'] = 'executed' 168 result = test_instance._exec_func( 169 test_instance.setup) 170 if not result: 171 raise TestPrepareError(ErrorMessage.Common.Code_0201015) 172 else: 173 if 'executed' == val['status']: 174 self.project.prepare.config[cls][ 175 'status'] = 'finsh' 176 result = test_instance._exec_func( 177 test_instance.teardown) 178 if not result: 179 self.log.warning(ErrorMessage.Common.Code_0201016) 180 181 except TestPrepareError as e: 182 error_msg = str(e) 183 self.log.error(error_msg) 184 self.log.error(traceback.format_exc()) 185 186 except Exception as e: 187 error_msg = "run prepare error! {}".format(e) 188 self.log.error(error_msg) 189 self.log.error(traceback.format_exc()) 190 191 finally: 192 self.log.debug("exit prepare {}".format(func)) 193 if error_msg is not None: 194 raise TestPrepareError(error_msg) 195