1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import copy 21import json 22import sys 23from json import JSONDecodeError 24from core.utils import get_build_output_path 25from core.common import is_open_source_product 26 27from core.utils import get_file_list_by_postfix 28from core.config.config_manager import FilterConfigManager 29from xdevice import platform_logger 30from xdevice import DeviceTestType 31from xdevice import Binder 32 33LOG = platform_logger("TestcaseManager") 34 35TESTFILE_TYPE_DATA_DIC = { 36 "DEX": [], 37 "HAP": [], 38 "PYT": [], 39 "CXX": [], 40 "BIN": [], 41 "OHJST": [], 42 "JST": [], 43 "LTPPosix": [], 44 "OHRust": [] 45} 46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"] 47 48 49class TestCaseManager(object): 50 @classmethod 51 def get_valid_suite_file(cls, test_case_out_path, suite_file, options): 52 partlist = options.partname_list 53 testmodule = options.testmodule 54 testsuit = options.testsuit 55 56 if not suite_file.startswith(test_case_out_path): 57 return False 58 59 if testsuit != "": 60 short_name, _ = os.path.splitext(os.path.basename(suite_file)) 61 testsuit_list = testsuit.split(',') 62 for test in testsuit_list: 63 if short_name.startswith(test) or \ 64 testsuit.startswith(short_name): 65 return True 66 return False 67 68 is_valid_status = False 69 suitfile_subpath = suite_file.replace(test_case_out_path, "") 70 suitfile_subpath = suitfile_subpath.strip(os.sep) 71 if len(partlist) == 0: 72 if testmodule != "": 73 temp_list = suitfile_subpath.split(os.sep) 74 if len(temp_list) > 2 and testmodule == temp_list[1]: 75 is_valid_status = True 76 else: 77 is_valid_status = True 78 else: 79 for partname in partlist: 80 if testmodule != "": 81 if suitfile_subpath.startswith( 82 partname + os.sep + testmodule + os.sep): 83 is_valid_status = True 84 break 85 else: 86 if suitfile_subpath.startswith(partname + os.sep): 87 is_valid_status = True 88 break 89 return is_valid_status 90 91 @classmethod 92 def check_python_test_file(cls, suite_file): 93 if suite_file.endswith(".py"): 94 filename = os.path.basename(suite_file) 95 if filename.startswith("test_"): 96 return True 97 return False 98 99 @classmethod 100 def check_hap_test_file(cls, hap_file_path): 101 try: 102 if hap_file_path.endswith(".hap"): 103 json_file_path = hap_file_path.replace(".hap", ".json") 104 if os.path.exists(json_file_path): 105 with open(json_file_path, 'r') as json_file: 106 data_dic = json.load(json_file) 107 if not data_dic: 108 return False 109 else: 110 if "kits" in data_dic.keys(): 111 kits_list = data_dic.get("kits") 112 if len(kits_list) > 0: 113 for kits_dict in kits_list: 114 if "test-file-name" not in kits_dict.keys(): 115 continue 116 else: 117 return True 118 else: 119 return False 120 return False 121 except JSONDecodeError: 122 return False 123 finally: 124 print(" check hap test file finally") 125 126 @classmethod 127 def get_hap_test_driver(cls, hap_file_path): 128 data_dic = cls.get_hap_json(hap_file_path) 129 if not data_dic: 130 return "" 131 else: 132 if "driver" in data_dic.keys(): 133 driver_dict = data_dic.get("driver") 134 if bool(driver_dict): 135 driver_type = driver_dict.get("type") 136 return driver_type 137 else: 138 LOG.error("%s has not set driver." % hap_file_path) 139 return "" 140 else: 141 return "" 142 143 @classmethod 144 def get_hap_json(cls, hap_file_path): 145 if hap_file_path.endswith(".hap"): 146 json_file_path = hap_file_path.replace(".hap", ".json") 147 if os.path.exists(json_file_path): 148 with open(json_file_path, 'r') as json_file: 149 data_dic = json.load(json_file) 150 return data_dic 151 else: 152 return {} 153 else: 154 return {} 155 156 @classmethod 157 def get_hap_part_json(cls, hap_file_path): 158 if hap_file_path.endswith(".hap"): 159 json_file_path = hap_file_path.replace(".hap", ".moduleInfo") 160 if os.path.exists(json_file_path): 161 with open(json_file_path, 'r') as json_file: 162 data_dic = json.load(json_file) 163 return data_dic 164 else: 165 return {} 166 else: 167 return {} 168 169 @classmethod 170 def get_part_name_test_file(cls, hap_file_path): 171 data_dic = cls.get_hap_part_json(hap_file_path) 172 if not data_dic: 173 return "" 174 else: 175 if "part" in data_dic.keys(): 176 part_name = data_dic["part"] 177 return part_name 178 else: 179 return "" 180 181 def get_test_files(self, test_case_path, options): 182 LOG.info("test case path: " + test_case_path) 183 LOG.info("test type list: " + str(options.testtype)) 184 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 185 if os.path.exists(test_case_path): 186 if len(options.testtype) != 0: 187 test_type_list = options.testtype 188 suit_file_dic = self.get_test_file_data( 189 test_case_path, 190 test_type_list, 191 options) 192 else: 193 LOG.error("%s is not exist." % test_case_path) 194 return suit_file_dic 195 196 def get_test_file_data(self, test_case_path, test_type_list, options): 197 suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 198 for test_type in test_type_list: 199 temp_dic = self.get_test_file_data_by_test_type( 200 test_case_path, 201 test_type, 202 options) 203 for key, value in suit_file_dic.items(): 204 suit_file_dic[key] = value + temp_dic.get(key) 205 return suit_file_dic 206 207 def get_test_file_data_by_test_type(self, test_case_path, 208 test_type, options): 209 suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 210 test_case_out_path = os.path.join(test_case_path, test_type) 211 if os.path.exists(test_case_out_path): 212 LOG.info("The test case directory: %s" % test_case_out_path) 213 return self.get_all_test_file(test_case_out_path, options) 214 else: 215 LOG.error("Test case dir does not exist. %s" % test_case_out_path) 216 return suit_file_dictionary 217 218 def get_all_test_file(self, test_case_out_path, options): 219 suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 220 filter_part_list = FilterConfigManager().get_filtering_list( 221 "subsystem_name", options.productform) 222 filter_list_test_file = FilterConfigManager().get_filtering_list( 223 "testfile_name", options.productform) 224 # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统 225 command_list = options.current_raw_cmd.split(" ") 226 for part_name in os.listdir(test_case_out_path): 227 if "-ss" in command_list or "-tp" in command_list: 228 if part_name not in options.partname_list: 229 continue 230 part_case_dir = os.path.join(test_case_out_path, part_name) 231 if not os.path.isdir(part_case_dir): 232 continue 233 # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤 234 if part_name in filter_part_list: 235 continue 236 237 # 获取子系统目录下面的所有文件路径列表 238 suite_file_list = get_file_list_by_postfix(part_case_dir) 239 for suite_file in suite_file_list: 240 # 如果文件在resource目录下面,需要过滤 241 if -1 != suite_file.replace(test_case_out_path, "").find( 242 os.sep + "resource" + os.sep): 243 continue 244 245 file_name = os.path.basename(suite_file) 246 # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤 247 if file_name in filter_list_test_file: 248 continue 249 250 prefix_name, suffix_name = os.path.splitext(file_name) 251 if suffix_name in FILTER_SUFFIX_NAME_LIST: 252 continue 253 254 if not self.get_valid_suite_file(test_case_out_path, 255 suite_file, 256 options): 257 continue 258 259 if suffix_name == ".dex": 260 suite_file_dictionary.get("DEX").append(suite_file) 261 elif suffix_name == ".hap": 262 if self.get_hap_test_driver(suite_file) == "OHJSUnitTest": 263 # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列 264 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file( 265 suite_file): 266 continue 267 # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列 268 if options.testsuit != "": 269 testsuit_list = options.testsuit.split(";") 270 is_match = False 271 for suite_item in testsuit_list: 272 if suite_item == prefix_name: 273 is_match = True 274 break 275 if not is_match: 276 continue 277 if not self.check_hap_test_file(suite_file): 278 continue 279 suite_file_dictionary.get("OHJST").append(suite_file) 280 if self.get_hap_test_driver(suite_file) == "JSUnitTest": 281 suite_file_dictionary.get("JST").append(suite_file) 282 elif suffix_name == ".py": 283 if not self.check_python_test_file(suite_file): 284 continue 285 suite_file_dictionary.get("PYT").append(suite_file) 286 elif suffix_name == "": 287 if file_name.startswith("rust_"): 288 Binder.get_tdd_config().update_test_type_in_source( 289 "OHRust", DeviceTestType.oh_rust_test) 290 suite_file_dictionary.get("OHRust").append(suite_file) 291 else: 292 suite_file_dictionary.get("CXX").append(suite_file) 293 elif suffix_name == ".bin": 294 suite_file_dictionary.get("BIN").append(suite_file) 295 296 return suite_file_dictionary 297 298 def get_part_deps_files(self, external_deps_path, testpart): 299 LOG.info("external_deps_path:" + external_deps_path) 300 if os.path.exists(external_deps_path): 301 with open(external_deps_path, 'r') as json_file: 302 data_dic = json.load(json_file) 303 if not data_dic: 304 LOG.error("data_dic is empty.") 305 return [] 306 test_part = testpart[0] 307 if test_part in data_dic.keys(): 308 external_deps_part_list = data_dic.get(test_part) 309 LOG.info("external_deps_part_list = %s" % external_deps_part_list) 310 return external_deps_part_list 311 else: 312 LOG.error("%s is not in part deps info json." % test_part) 313 else: 314 LOG.error("Part deps info %s is not exist." % external_deps_path) 315 return [] 316 317 def check_xts_config_match(self, options, prefix_name, xts_suite_file): 318 # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列 319 if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file): 320 return False 321 # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列 322 if options.testsuit != "": 323 testsuit_list = options.testsuit.split(";") 324 for suite_item in testsuit_list: 325 if suite_item == prefix_name: 326 return True 327 return False 328 return True 329 330 def get_xts_test_files(self, xts_test_case_path, options): 331 LOG.info("xts test case path: " + xts_test_case_path) 332 xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC) 333 if not os.path.exists(xts_test_case_path): 334 LOG.error("xts %s is not exist." % xts_test_case_path) 335 return xts_suit_file_dic 336 # 获取XTS测试用例输出目录下面的所有文件路径列表 337 xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path) 338 for xts_suite_file in xts_suite_file_list: 339 file_name = os.path.basename(xts_suite_file) 340 prefix_name, suffix_name = os.path.splitext(file_name) 341 if not self.check_xts_config_match(options, prefix_name, xts_suite_file): 342 continue 343 if suffix_name == "": 344 if file_name == "HatsOpenPosixTest": 345 xts_suit_file_dic.get("LTPPosix").append(xts_suite_file) 346 else: 347 xts_suit_file_dic.get("CXX").append(xts_suite_file) 348 elif suffix_name == ".hap": 349 if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest": 350 xts_suit_file_dic.get("OHJST").append(xts_suite_file) 351 if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest": 352 xts_suit_file_dic.get("JST").append(xts_suite_file) 353 return xts_suit_file_dic 354