1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import platform 19import random 20import shutil 21import subprocess 22from pydoc import classname 23import time 24import os 25import sys 26import datetime 27import xml.etree.ElementTree as ElementTree 28 29from core.constants import SchedulerType 30from xdevice import Plugin 31from xdevice import get_plugin 32from xdevice import platform_logger 33from xdevice import DeviceTestType 34from xdevice import Binder 35from core.utils import get_build_output_path 36from core.utils import scan_support_product 37from core.utils import is_lite_product 38from core.common import is_open_source_product 39from core.command.parameter import Parameter 40from core.command.distribute_execute import DbinderTest 41from core.testcase.testcase_manager import TestCaseManager 42from core.config.config_manager import UserConfigManager 43from core.config.parse_parts_config import ParsePartsConfig 44from core.config.resource_manager import ResourceManager 45 46LOG = platform_logger("Run") 47 48 49class Run(object): 50 51 history_cmd_list = [] 52 53 @classmethod 54 def get_history(self): 55 return self.history_cmd_list 56 57 @classmethod 58 def get_target_out_path(cls, product_form): 59 target_out_path = UserConfigManager().get_test_cases_dir() 60 if target_out_path == "": 61 target_out_path = os.path.join( 62 get_build_output_path(product_form), 63 "packages", 64 product_form) 65 target_out_path = os.path.abspath(target_out_path) 66 return target_out_path 67 68 @classmethod 69 def _build_test_cases(cls, options): 70 if options.coverage: 71 LOG.info("Coverage testing, no need to compile testcases") 72 return True 73 74 is_build_testcase = UserConfigManager().get_user_config_flag( 75 "build", "testcase") 76 project_root_path = sys.source_code_root_path 77 if is_build_testcase and project_root_path != "": 78 from core.build.build_manager import BuildManager 79 build_manager = BuildManager() 80 return build_manager.build_testcases(project_root_path, options) 81 else: 82 return True 83 84 @classmethod 85 def _check_test_dictionary(cls, test_dictionary): 86 is_valid_status = False 87 key_list = sorted(test_dictionary.keys()) 88 for key in key_list: 89 file_list = test_dictionary[key] 90 if len(file_list) > 0: 91 is_valid_status = True 92 break 93 return is_valid_status 94 95 @classmethod 96 def get_tests_out_path(cls, product_form): 97 testcase_path = UserConfigManager().get_test_cases_dir() 98 if testcase_path == "": 99 all_product_list = scan_support_product() 100 if product_form in all_product_list: 101 if is_open_source_product(product_form): 102 testcase_path = os.path.abspath(os.path.join( 103 get_build_output_path(product_form), 104 "tests")) 105 else: 106 testcase_path = os.path.abspath(os.path.join( 107 get_build_output_path(product_form), 108 "tests")) 109 else: 110 testcase_path = os.path.join( 111 get_build_output_path(product_form), "tests") 112 LOG.info("testcase_path=%s" % testcase_path) 113 return testcase_path 114 115 @classmethod 116 def get_xts_tests_out_path(cls, product_form, testtype): 117 xts_testcase_path = UserConfigManager().get_test_cases_dir() 118 if xts_testcase_path == "": 119 xts_testcase_path = os.path.abspath(os.path.join( 120 get_build_output_path(product_form), 121 "suites", 122 testtype[0], 123 "testcases")) 124 LOG.info("xts_testcase_path=%s" % xts_testcase_path) 125 return xts_testcase_path 126 127 @classmethod 128 def get_external_deps_out_path(cls, product_form): 129 external_deps_path = os.path.abspath(os.path.join( 130 get_build_output_path(product_form), 131 "part_deps_info", 132 "part_deps_info.json")) 133 LOG.info("external_deps_path=%s" % external_deps_path) 134 return external_deps_path 135 136 @classmethod 137 def get_coverage_outpath(cls, options): 138 coverage_out_path = "" 139 if options.coverage: 140 coverage_out_path = get_build_output_path(options.productform) 141 if coverage_out_path == "": 142 coverage_out_path = UserConfigManager().get_user_config( 143 "coverage").get("outpath", "") 144 if coverage_out_path == "": 145 LOG.error("Coverage test: coverage_outpath is empty.") 146 return coverage_out_path 147 148 @classmethod 149 def get_part_deps_list(cls, productform, testpart): 150 #获取预处理部件间依赖的编译结果路径 151 external_deps_path = cls.get_external_deps_out_path(productform) 152 external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart) 153 return external_deps_path_list 154 155 def process_command_run(self, command, options): 156 current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" ")))) 157 if options.coverage and platform.system() != "Windows": 158 if not options.pullgcda: 159 push_cov_path = os.path.join(sys.framework_root_dir, "local_coverage/push_coverage_so/push_coverage.py") 160 if os.path.exists(push_cov_path): 161 if str(options.testpart) == "[]" and str(options.subsystem) == "[]": 162 LOG.info("No subsystem or part input. Not push coverage so.") 163 elif str(options.testpart) != "[]" and str(options.subsystem) != "[]": 164 LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.") 165 else: 166 if str(options.testpart) != "[]": 167 param = str(options.testpart) 168 subprocess.run("python3 {} {} {}".format( 169 push_cov_path, "testpart", param), shell=True) 170 else: 171 param = str(options.subsystem) 172 subprocess.run("python3 {} {} {}".format( 173 push_cov_path, "subsystem", param), shell=True) 174 else: 175 print(f"{push_cov_path} not exists.") 176 177 init_gcov_path = os.path.join(sys.framework_root_dir, "local_coverage/resident_service/init_gcov.py") 178 if os.path.exists(init_gcov_path): 179 subprocess.run("python3 %s command_str=%s" % ( 180 init_gcov_path, current_raw_cmd), shell=True) 181 else: 182 print(f"{init_gcov_path} not exists.") 183 184 para = Parameter() 185 test_type_list = para.get_testtype_list(options.testtype) 186 if len(test_type_list) == 0: 187 LOG.error("The testtype parameter is incorrect.") 188 return 189 options.testtype = test_type_list 190 191 parser = ParsePartsConfig(options.productform) 192 partname_list = parser.get_part_list( 193 options.subsystem, 194 options.testpart) 195 options.partname_list = partname_list 196 options.coverage_outpath = self.get_coverage_outpath(options) 197 198 LOG.info("") 199 LOG.info("------------------------------------") 200 LOG.info("Input parameter:") 201 LOG.info("productform = %s" % options.productform) 202 LOG.info("testtype = %s" % str(options.testtype)) 203 LOG.info("subsystem = %s" % str(options.subsystem)) 204 LOG.info("testpart = %s" % str(options.testpart)) 205 LOG.info("testmodule = %s" % options.testmodule) 206 LOG.info("testsuit = %s" % options.testsuit) 207 LOG.info("testcase = %s" % options.testcase) 208 LOG.info("testlevel = %s" % options.testlevel) 209 LOG.info("testargs = %s" % options.testargs) 210 LOG.info("repeat = %s" % options.repeat) 211 LOG.info("retry = %s" % options.retry) 212 LOG.info("historylist = %s" % options.historylist) 213 LOG.info("runhistory = %s" % options.runhistory) 214 LOG.info("partname_list = %s" % str(options.partname_list)) 215 LOG.info("partdeps = %s" % options.partdeps) 216 LOG.info("------------------------------------") 217 LOG.info("") 218 219 if not para.check_run_parameter(options): 220 LOG.error("Input parameter is incorrect.") 221 return 222 223 current_time = datetime.datetime.now() 224 #记录命令运行历史 225 need_record_history = False 226 cmd_record = { 227 "time" : str(current_time), 228 "raw_cmd" : options.current_raw_cmd, 229 "result" : "unknown", 230 "command": command, 231 "options": options 232 } 233 if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \ 234 or "--retry" in options.current_raw_cmd): 235 need_record_history = True 236 237 #打印历史记录 238 if options.historylist: 239 print("The latest command history is: %d" % len(self.history_cmd_list)) 240 for index, cmd_record in enumerate(self.history_cmd_list): 241 print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"], 242 cmd_record["raw_cmd"], cmd_record["result"])) 243 return 244 #重新运行历史里的一条命令 245 if options.runhistory > 0: 246 #如果记录大于10则认为非法 247 if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list): 248 print("input history command[%d] out of range:", options.runhistory) 249 return 250 cmd_record = self.history_cmd_list[options.runhistory - 1] 251 print("run history command:", cmd_record["raw_cmd"]) 252 need_record_history = False 253 command = cmd_record["command"] 254 options = cmd_record["options"] 255 256 if options.retry: 257 if len(self.history_cmd_list) <= 0: 258 LOG.info("No history command exsit") 259 return 260 history_cmd = self.history_cmd_list[-1] 261 command = history_cmd["command"] 262 options = history_cmd["options"] 263 from xdevice import Variables 264 latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") 265 tree = ElementTree.parse(latest_report_path) 266 root = tree.getroot() 267 has_failed_case = 0 268 test_targets = {} 269 fail_list = [] 270 for child in root: 271 print(child.tag, ":", child.attrib) 272 for grand in child: 273 print(grand.tag, ":", grand.attrib) 274 for sub_child in grand: 275 if sub_child.tag == 'failure': 276 fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"] 277 fail_list.append(fail_case) 278 has_failed_case += 1 279 break 280 test_targets["class"] = fail_list 281 setattr(options, "testargs", test_targets) 282 setattr(options, "scheduler", "Scheduler") 283 print("retry option:", options) 284 if has_failed_case > 0: 285 if not self._build_test_cases(options): 286 LOG.error("Build test cases failed.") 287 return 288 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 289 plugin_id=SchedulerType.SCHEDULER)[0] 290 scheduler.exec_command(command, options) 291 else: 292 LOG.info("No testcase to retry") 293 return 294 295 if not self._build_test_cases(options): 296 LOG.error("Build test cases failed.") 297 return 298 299 if "partdeps" == options.partdeps: 300 self.get_part_deps_list(options.productform, options.testpart) 301 options.testcases_path = self.get_external_deps_out_path(options.productform) 302 LOG.info("partdeps = %s" % options.partdeps) 303 304 if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype: 305 test_dict = self.get_xts_test_dict(options) 306 options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype) 307 options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype) 308 else: 309 test_dict = self.get_test_dict(options) 310 311 if not self._check_test_dictionary(test_dict): 312 LOG.error("The test file list is empty.") 313 return 314 if options.coverage and platform.system() != "Windows": 315 coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage") 316 if os.path.exists(coverage_path): 317 coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True) 318 coverage_process.communicate() 319 320 if ("distributedtest" in options.testtype and 321 len(options.testtype) == 1): 322 from core.command.distribute_utils import get_test_case 323 from core.command.distribute_utils \ 324 import check_ditributetest_environment 325 from core.command.distribute_utils import make_device_info_file 326 from core.command.distribute_utils import make_reports 327 328 local_time = time.localtime() 329 create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time) 330 start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time) 331 332 if not check_ditributetest_environment(): 333 return 334 335 output_test = get_test_case(test_dict.get("CXX", None)) 336 if not output_test: 337 return 338 339 result_rootpath = os.path.join(sys.framework_root_dir, 340 "reports", 341 create_time) 342 343 log_path = os.path.join(result_rootpath, "log") 344 tmp_path = os.path.join(result_rootpath, "temp") 345 os.makedirs(log_path, exist_ok=True) 346 os.makedirs(tmp_path, exist_ok=True) 347 348 Binder.get_runtime_log().start_task_log(log_path) 349 make_device_info_file(tmp_path) 350 351 for case in output_test: 352 agent_target_name = case["agent_target_name"] 353 major_target_name = case["major_target_name"] 354 manager = DbinderTest(result_rootpath, case["suits_dir"]) 355 manager.setUp() 356 manager.test_distribute(major_target_name, agent_target_name, options) 357 manager.tearDown() 358 359 make_reports(result_rootpath, start_time) 360 Binder.get_runtime_log().stop_task_logcat() 361 else: 362 options.testdict = test_dict 363 options.target_outpath = self.get_target_out_path( 364 options.productform) 365 setattr(options, "scheduler", "Scheduler") 366 scheduler = get_plugin(plugin_type=Plugin.SCHEDULER, 367 plugin_id=SchedulerType.SCHEDULER)[0] 368 if scheduler is None: 369 LOG.error("Can not find the scheduler plugin.") 370 else: 371 options.testcases_path = self.get_tests_out_path(options.productform) 372 options.resource_path = os.path.abspath(os.path.join( 373 sys.framework_root_dir, "..", "resource")) 374 if is_lite_product(options.productform, 375 sys.source_code_root_path): 376 if options.productform.find("wifiiot") != -1: 377 Binder.get_tdd_config().update_test_type_in_source( 378 ".bin", DeviceTestType.ctest_lite) 379 Binder.get_tdd_config().update_ext_type_in_source( 380 "BIN", DeviceTestType.ctest_lite) 381 else: 382 print("productform is not wifiiot") 383 scheduler.exec_command(command, options) 384 if need_record_history: 385 #读文件获取运行结果 386 from xdevice import Variables 387 latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml") 388 with open(latest_report_path) as report_file: 389 for report_line in report_file: 390 if "testsuites name=\"summary_report\"" in report_line: 391 result = report_line.replace("\n", "") 392 result = result.replace("<testsuites name=\"summary_report\" ", "") 393 result = result.replace(">", "") 394 cmd_record["result"] = result 395 break 396 if len(self.history_cmd_list) >= 10: 397 del self.history_cmd_list[0] 398 self.history_cmd_list.append(cmd_record) 399 400 if "fuzztest" == options.testtype[0] and options.coverage is False: 401 report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0] 402 latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus") 403 if os.path.exists(latest_corpus_path): 404 shutil.rmtree(latest_corpus_path) 405 shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path) 406 407 if options.coverage and platform.system() != "Windows": 408 pull_service_gcov_path = os.path.join( 409 sys.framework_root_dir, "local_coverage/resident_service/pull_service_gcda.py") 410 if os.path.exists(pull_service_gcov_path): 411 subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True) 412 else: 413 print(f"{pull_service_gcov_path} not exists.") 414 415 if not options.pullgcda: 416 cov_main_file_path = os.path.join(sys.framework_root_dir, "local_coverage/coverage_tools.py") 417 testpart = ",".join(list(map(str, options.partname_list))) 418 if os.path.exists(cov_main_file_path): 419 subprocess.run("python3 %s testpart=%s" % ( 420 cov_main_file_path, testpart), shell=True) 421 else: 422 print(f"{cov_main_file_path} not exists.") 423 return 424 425 def get_xts_test_dict(self, options): 426 # 获取XTS测试用例编译结果路径 427 xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype) 428 if not os.path.exists(xts_test_case_path): 429 LOG.error("%s is not exist." % xts_test_case_path) 430 return {} 431 xts_test_dict = TestCaseManager().get_xts_test_files(xts_test_case_path, options) 432 return xts_test_dict 433 434 def get_test_dict(self, options): 435 # 获取测试用例编译结果路径 436 test_case_path = self.get_tests_out_path(options.productform) 437 if not os.path.exists(test_case_path): 438 LOG.error("%s is not exist." % test_case_path) 439 return {} 440 441 test_dict = TestCaseManager().get_test_files(test_case_path, options) 442 return test_dict 443