1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import time 22from dataclasses import dataclass 23 24from xdevice import DeviceTestType 25from xdevice import IDriver 26from xdevice import Plugin 27from xdevice import platform_logger 28from xdevice import DeviceLabelType 29from xdevice import ShellHandler 30from xdevice import ExecuteTerminate 31from xdevice import get_plugin 32from xdevice import JsonParser 33from xdevice import get_config_value 34from xdevice import get_kit_instances 35from xdevice import check_result_report 36from xdevice import get_device_log_file 37from xdevice import get_test_component_version 38from xdevice import ParamError 39from ohos.constants import ParserType 40from ohos.constants import ComType 41from ohos.constants import CKit 42from ohos.exception import LiteDeviceExecuteCommandError 43from core.utils import get_filename_extension 44from core.testkit.kit_lite import DeployKit 45from core.config.resource_manager import ResourceManager 46from core.config.config_manager import UserConfigManager 47 48__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"] 49LOG = platform_logger("LiteUnitTest") 50 51 52def get_level_para_string(level_string): 53 level_list = list(set(level_string.split(","))) 54 level_para_string = "" 55 for item in level_list: 56 if not item.isdigit(): 57 continue 58 item = item.strip(" ") 59 level_para_string = f"{level_para_string}Level{item}," 60 level_para_string = level_para_string.strip(",") 61 return level_para_string 62 63 64@dataclass 65class GTestConst(object): 66 exec_para_filter = "--gtest_filter" 67 exec_para_level = "--gtest_testsize" 68 69 70@Plugin(type=Plugin.DRIVER, id=DeviceTestType.lite_cpp_test) 71class LiteUnitTest(IDriver): 72 """ 73 lite gtest test driver for L1 74 """ 75 config = None 76 log = platform_logger("LiteUnitTest") 77 nfs_dir = "" 78 mnt_cmd = "" 79 lite_device = None 80 result = None 81 82 def __check_failed__(self, msg): 83 self.log.error("check failed {}".format(msg)) 84 return 85 86 def __check_environment__(self, device_options): 87 pass 88 89 def __check_config__(self, config): 90 """ 91 1. check serial protocol 92 2. login device 93 3. NFS is available 94 :param config: serial device 95 :return: 96 """ 97 self.log.error("Lite driver check config:{}".format(config)) 98 99 def __execute__(self, request): 100 """ 101 102 1. select test case by subsystem, module, suite 103 2. open test dir 104 3、execute single test case, eg. ./test_demo 105 :param request: contains test condition, sub_system 106 module_name,test_suit, 107 test_case,test_level,test_case_dir 108 :return: 109 """ 110 self.log.debug("Test suite FilePath: %s" % 111 request.root.source.source_file) 112 self.lite_device = request.config.environment.devices[0] 113 self.lite_device.connect() 114 if not self._before_execute_test(): 115 self.log.error("open test dir failed") 116 return 117 self.log.debug("open test dir success") 118 if self._execute_test(request) == "": 119 self.log.error("execute test command failed") 120 return 121 self.log.info("execute test command success") 122 if not self._after_execute_test(request): 123 self.log.error("after execute test failed") 124 return 125 self.log.info("lite device execute request success") 126 127 def __result__(self): 128 pass 129 130 def show_help_info(self): 131 """ 132 show help info. 133 """ 134 self.log.info("this is test driver for cpp test") 135 return 136 137 def show_driver_info(self): 138 """ 139 show driver info. 140 """ 141 self.log.info("this is test driver for cpp test") 142 return 143 144 def _mount_nfs_server(self): 145 #before execute each suits bin, mount nfs 146 self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config( 147 "NFS").get("mnt_cmd")) 148 if self.mnt_cmd == "mount ": 149 self.log.error("no configure for mount command") 150 return 151 152 filter_result, status, _ = \ 153 self.lite_device.execute_command_with_timeout( 154 self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3) 155 if "already mounted" in filter_result: 156 self.log.info("nfs has been mounted") 157 return 158 159 for i in range(0, 2): 160 if status: 161 self.log.info("execute mount command success") 162 return 163 164 self.log.info("try mount %d" % (i + 2)) 165 _, status, _ = self.lite_device.execute_command_with_timeout( 166 self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, 167 timeout=3) 168 169 self.log.error("execute mount command failed") 170 171 def _before_execute_test(self): 172 """ 173 need copy test case to nfs dir 174 :param request: nfs dir, test case path 175 :return: 176 """ 177 self.nfs_dir = \ 178 UserConfigManager().get_user_config("NFS").get("host_dir") 179 if self.nfs_dir == "": 180 self.log.error("no configure for nfs directory") 181 return False 182 183 self._mount_nfs_server() 184 _, status, _ = \ 185 self.lite_device.execute_command_with_timeout("cd /{}".format( 186 UserConfigManager().get_user_config("NFS").get("board_dir")), 187 case_type=DeviceTestType.lite_cpp_test) 188 if not status: 189 self.log.error("pre execute command failed") 190 return False 191 192 self.log.info("pre execute command success") 193 return True 194 195 def _execute_test(self, request): 196 test_case = request.root.source.source_file 197 self.config = request.config 198 test_para = self._get_test_para(self.config.testcase, 199 self.config.testlevel) 200 case_name = os.path.basename(test_case) 201 if os.path.exists(os.path.join(self.nfs_dir, case_name)): 202 os.remove(os.path.join(self.nfs_dir, case_name)) 203 result_name = case_name + ".xml" 204 result_file = os.path.join(self.nfs_dir, result_name) 205 if os.path.exists(result_file): 206 os.remove(result_file) 207 shutil.copyfile(test_case, os.path.join(self.nfs_dir, case_name)) 208 # push resource files 209 resource_manager = ResourceManager() 210 resource_data_dic, resource_dir = \ 211 resource_manager.get_resource_data_dic(test_case) 212 resource_manager.lite_process_preparer_data(resource_data_dic, resource_dir) 213 self.lite_device.execute_command_with_timeout( 214 "chmod 777 {}".format(case_name), 215 case_type=DeviceTestType.lite_cpp_test) 216 test_command = "./%s %s" % (case_name, test_para) 217 case_result, status, _ = \ 218 self.lite_device.execute_command_with_timeout( 219 test_command, case_type=DeviceTestType.lite_cpp_test) 220 if status: 221 self.log.info("test case result:\n %s" % case_result) 222 return 223 224 self.log.error("failed case: %s" % test_case) 225 226 def _get_test_para(self, testcase, testlevel): 227 if "" != testcase and "" == testlevel: 228 test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase) 229 elif "" == testcase and "" != testlevel: 230 level_para = get_level_para_string(testlevel) 231 test_para = "%s=%s" % (GTestConst.exec_para_level, level_para) 232 else: 233 test_para = "" 234 return test_para 235 236 def _after_execute_test(self, request): 237 """ 238 copy test result to result dir 239 :param request: 240 :return: 241 """ 242 if request.config is None: 243 self.log.error("test config is null") 244 return False 245 246 report_path = request.config.report_path 247 test_result = os.path.join(report_path, "result") 248 test_case = request.root.source.source_file 249 case_name = os.path.basename(test_case) 250 if not os.path.exists(test_result): 251 os.mkdir(test_result) 252 sub_system_module = test_case.split( 253 "unittest" + os.sep)[1].split(os.sep + "bin")[0] 254 if os.sep in sub_system_module: 255 sub_system = sub_system_module.split(os.sep)[0] 256 module_name = sub_system_module.split(os.sep)[1] 257 subsystem_dir = os.path.join(test_result, sub_system) 258 if not os.path.exists(subsystem_dir): 259 os.mkdir(subsystem_dir) 260 module_dir = os.path.join(subsystem_dir, module_name) 261 if not os.path.exists(module_dir): 262 os.mkdir(module_dir) 263 test_result = module_dir 264 else: 265 if sub_system_module != "": 266 test_result = os.path.join(test_result, sub_system_module) 267 if not os.path.exists(test_result): 268 os.mkdir(test_result) 269 result_name = case_name + ".xml" 270 result_file = os.path.join(self.nfs_dir, result_name) 271 if not self._check_xml_exist(result_name): 272 self.log.error("result xml file %s not exist." % result_name) 273 if not os.path.exists(result_file): 274 self.log.error("file %s not exist." % result_file) 275 self._clear_nfs_space() 276 return False 277 278 file_name = os.path.basename(result_file) 279 final_result = os.path.join(test_result, file_name) 280 shutil.copyfile(result_file, 281 final_result) 282 self.log.info("after execute test") 283 self._clear_nfs_space() 284 self.lite_device.close() 285 return True 286 287 def _clear_nfs_space(self): 288 _, status, _ = \ 289 self.lite_device.execute_command_with_timeout( 290 "cd ..", 291 case_type=DeviceTestType.lite_cpp_test) 292 _, status, _ = \ 293 self.lite_device.execute_command_with_timeout( 294 "umount %s" % UserConfigManager().get_user_config("NFS").get("board_dir"), 295 case_type=DeviceTestType.lite_cpp_test) 296 shutil.rmtree(self.nfs_dir) 297 os.mkdir(self.nfs_dir) 298 299 def _check_xml_exist(self, xml_file, timeout=10): 300 ls_command = \ 301 "ls /%s" % \ 302 UserConfigManager().get_user_config("NFS").get("board_dir") 303 start_time = time.time() 304 while time.time() - start_time < timeout: 305 result, _, _ = self.lite_device.execute_command_with_timeout( 306 command=ls_command, case_type=DeviceTestType.cpp_test_lite, 307 timeout=5, receiver=None) 308 if xml_file in result: 309 return True 310 311 time.sleep(5) 312 return False 313 314 315@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite) 316class CTestDriver(IDriver): 317 """ 318 CTest is a test that runs a native test package on given lite device. 319 """ 320 config = None 321 result = "" 322 error_message = "" 323 version_cmd = "AT+CSV" 324 325 def __init__(self): 326 self.file_name = "" 327 328 def __check_environment__(self, device_options): 329 if len(device_options) != 1 or \ 330 device_options[0].label != DeviceLabelType.wifiiot: 331 self.error_message = "check environment failed" 332 return False 333 return True 334 335 def __check_config__(self, config=None): 336 del config 337 self.config = None 338 339 def __execute__(self, request): 340 from xdevice import Variables 341 try: 342 self.config = request.config 343 self.config.device = request.config.environment.devices[0] 344 345 if request.config.resource_path: 346 current_dir = request.config.resource_path 347 else: 348 current_dir = Variables.exec_dir 349 350 config_file = request.root.source.config_file.strip() 351 if config_file: 352 source = os.path.join(current_dir, config_file) 353 self.file_name = os.path.basename(config_file).split(".")[0] 354 else: 355 source = request.root.source.source_string.strip() 356 357 self._run_ctest(source=source, request=request) 358 359 except (LiteDeviceExecuteCommandError, Exception) as exception: 360 LOG.error(exception, error_no=getattr(exception, "error_no", 361 "00000")) 362 self.error_message = exception 363 finally: 364 if request.root.source.test_name.startswith("{"): 365 report_name = "report" 366 else: 367 report_name = get_filename_extension( 368 request.root.source.test_name)[0] 369 370 self.result = check_result_report(request.config.report_path, 371 self.result, 372 self.error_message, 373 report_name) 374 375 def __result__(self): 376 return self.result if os.path.exists(self.result) else "" 377 378 def _run_ctest(self, source=None, request=None): 379 if not source: 380 LOG.error("Error: %s don't exist." % source, error_no="00101") 381 return 382 383 try: 384 parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite) 385 version = get_test_component_version(self.config) 386 parser_instances = [] 387 for parser in parsers: 388 parser_instance = parser.__class__() 389 parser_instance.suites_name = self.file_name 390 parser_instance.product_info.setdefault("Version", version) 391 parser_instance.listeners = request.listeners 392 parser_instances.append(parser_instance) 393 handler = ShellHandler(parser_instances) 394 395 reset_cmd = self._reset_device(request, source) 396 self.result = "%s.xml" % os.path.join(request.config.report_path, 397 "result", self.file_name) 398 399 self.config.device.device.com_dict.get( 400 ComType.deploy_com).connect() 401 402 result, _, error = self.config.device.device. \ 403 execute_command_with_timeout( 404 command=reset_cmd, 405 case_type=DeviceTestType.ctest_lite, 406 key=ComType.deploy_com, 407 timeout=90, 408 receiver=handler) 409 410 device_log_file = get_device_log_file(request.config.report_path, 411 request.config.device. 412 __get_serial__()) 413 414 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 415 os.O_CREAT | os.O_APPEND, 0o755) 416 with os.fdopen(device_log_file_open, "a") as file_name: 417 file_name.write("{}{}".format( 418 "\n".join(result.split("\n")[0:-1]), "\n")) 419 file_name.flush() 420 finally: 421 self.config.device.device.com_dict.get(ComType.deploy_com).close() 422 423 def _reset_device(self, request, source): 424 json_config = JsonParser(source) 425 reset_cmd = [] 426 kit_instances = get_kit_instances(json_config, 427 request.config.resource_path, 428 request.config.testcases_path) 429 from xdevice import Binder 430 431 for (kit_instance, kit_info) in zip(kit_instances, 432 json_config.get_kits()): 433 if not isinstance(kit_instance, DeployKit): 434 continue 435 if not self.file_name: 436 self.file_name = get_config_value( 437 'burn_file', kit_info)[0].split("\\")[-1].split(".")[0] 438 reset_cmd = kit_instance.burn_command 439 if not Binder.is_executing(): 440 raise ExecuteTerminate("ExecuteTerminate", error_no="00300") 441 442 kit_instance.__setup__(self.config.device, 443 source_file=request.root.source.source_file.strip()) 444 445 reset_cmd = [int(item, 16) for item in reset_cmd] 446 return reset_cmd 447 448 449@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite) 450class JSUnitTestLiteDriver(IDriver): 451 """ 452 JSUnitTestDriver is a Test that runs a native test package on given device. 453 """ 454 455 def __init__(self): 456 self.result = "" 457 self.error_message = "" 458 self.kits = [] 459 self.config = None 460 461 def __check_environment__(self, device_options): 462 pass 463 464 def __check_config__(self, config): 465 pass 466 467 def __execute__(self, request): 468 try: 469 LOG.debug("Start execute xdevice extension JSUnit Test") 470 471 self.config = request.config 472 self.config.device = request.config.environment.devices[0] 473 474 config_file = request.root.source.config_file 475 suite_file = request.root.source.source_file 476 477 if not suite_file: 478 raise ParamError( 479 "test source '%s' not exists" % 480 request.root.source.source_string, error_no="00101") 481 482 if not os.path.exists(config_file): 483 LOG.error("Error: Test cases don't exist %s." % config_file, 484 error_no="00101") 485 raise ParamError( 486 "Error: Test cases don't exist %s." % config_file, 487 error_no="00101") 488 489 self.file_name = os.path.basename( 490 request.root.source.source_file.strip()).split(".")[0] 491 492 self.result = "%s.xml" % os.path.join( 493 request.config.report_path, "result", self.file_name) 494 495 json_config = JsonParser(config_file) 496 self.kits = get_kit_instances(json_config, 497 self.config.resource_path, 498 self.config.testcases_path) 499 500 self._get_driver_config(json_config) 501 502 from xdevice import Binder 503 for kit in self.kits: 504 if not Binder.is_executing(): 505 raise ExecuteTerminate("ExecuteTerminate", 506 error_no="00300") 507 if kit.__class__.__name__ == CKit.liteinstall: 508 kit.bundle_name = self.config.bundle_name 509 kit.__setup__(self.config.device, request=request) 510 511 self._run_jsunit(request) 512 513 except Exception as exception: 514 self.error_message = exception 515 finally: 516 report_name = "report" if request.root.source. \ 517 test_name.startswith("{") else get_filename_extension( 518 request.root.source.test_name)[0] 519 520 self.result = check_result_report( 521 request.config.report_path, self.result, self.error_message, 522 report_name) 523 524 for kit in self.kits: 525 kit.__teardown__(self.config.device) 526 527 self.config.device.close() 528 529 def __result__(self): 530 return self.result if os.path.exists(self.result) else "" 531 532 def _get_driver_config(self, json_config): 533 bundle_name = get_config_value('bundle-name', 534 json_config.get_driver(), False) 535 if not bundle_name: 536 raise ParamError("Can't find bundle-name in config file.", 537 error_no="00108") 538 else: 539 self.config.bundle_name = bundle_name 540 541 ability = get_config_value('ability', 542 json_config.get_driver(), False) 543 if not ability: 544 self.config.ability = "default" 545 else: 546 self.config.ability = ability 547 548 def _run_jsunit(self, request): 549 parser_instances = [] 550 parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite) 551 for parser in parsers: 552 parser_instance = parser.__class__() 553 parser_instance.suites_name = self.file_name 554 parser_instance.listeners = request.listeners 555 parser_instances.append(parser_instance) 556 handler = ShellHandler(parser_instances) 557 558 command = "./bin/aa start -p %s -n %s" % \ 559 (self.config.bundle_name, self.config.ability) 560 result, _, error = self.config.device.execute_command_with_timeout( 561 command=command, 562 timeout=300, 563 receiver=handler) 564 565 device_log_file = get_device_log_file(request.config.report_path, 566 request.config.device. 567 __get_serial__()) 568 569 device_log_file_open = os.open(device_log_file, os.O_WRONLY | 570 os.O_CREAT | os.O_APPEND, 0o755) 571 with os.fdopen(device_log_file_open, "a") as file_name: 572 file_name.write("{}{}".format( 573 "\n".join(result.split("\n")[0:-1]), "\n")) 574 file_name.flush() 575 576