1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import copy 20import os 21import sys 22 23from xdevice import get_plugin 24from xdevice import ModeType 25from xdevice import ConfigConst 26from xdevice import DeviceLabelType 27from xdevice import TestExecType 28from xdevice import DeviceError 29from xdevice import ParamError 30from xdevice import ReportException 31from xdevice import ExecuteTerminate 32from xdevice import IDriver 33from xdevice import platform_logger 34from xdevice import Plugin 35from xdevice import JsonParser 36from xdevice import get_config_value 37from xdevice import do_module_kit_setup 38from xdevice import do_module_kit_teardown 39from xdevice import get_filename_extension 40from xdevice import get_file_absolute_path 41from xdevice import get_kit_instances 42from xdevice import check_result_report 43from xdevice import check_mode 44from xdevice import SuiteReporter 45from xdevice import CaseEnd 46from xdevice import Binder 47from xdevice import HostDrivenTestType 48from xdevice import Variables 49from devicetest.constants import CKit 50from devicetest.core.constants import DeviceTestMode 51from devicetest.error import ErrorMessage 52 53__all__ = ["DeviceTestDriver", "DeviceTestSuiteDriver"] 54LOG = platform_logger("DeviceTest") 55PY_SUFFIX = ".py" 56PYD_SUFFIX = ".pyd" 57PYC_SUFFIX = ".pyc" 58 59 60def _get_dict_test_list(module_path, file_name): 61 test_list = [] 62 for root, _, files in os.walk(module_path): 63 for _file in files: 64 if (_file.endswith(".py") or _file.endswith(".pyd")) \ 65 and file_name == os.path.splitext(_file)[0]: 66 test_list.append(os.path.join(root, _file)) 67 return test_list 68 69 70def start_smart_perf(config, kits): 71 if not hasattr(config, ConfigConst.kits_in_module): 72 return 73 if CKit.smartperf not in config.get(ConfigConst.kits_in_module): 74 return 75 sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0] 76 sp_kits.target_name = config.get("bundle_name", "") 77 param_config = config.get(ConfigConst.kits_params).get(CKit.smartperf, "") 78 sp_kits.__check_config__(param_config) 79 kits.insert(0, sp_kits) 80 81 82def handle_test_args(config, request): 83 pass 84 85 86def do_driver_execute(driver_obj: IDriver, request): 87 try: 88 # delete sys devicetest mode 89 if hasattr(sys, DeviceTestMode.MODE): 90 delattr(sys, DeviceTestMode.MODE) 91 92 # set self.config 93 driver_obj.config = request.config 94 driver_obj.config.devices = request.get_devices() 95 if request.get("exectype") == TestExecType.device_test and \ 96 not driver_obj.config.devices: 97 err_msg = ErrorMessage.TestCase.Code_0203009 98 LOG.error(err_msg) 99 raise ParamError(err_msg) 100 101 # get source, json config and kits 102 if request.get_config_file(): 103 source = request.get_config_file() 104 LOG.debug("Test config file path: %s" % source) 105 else: 106 source = request.get_source_string() 107 LOG.debug("Test String: %s" % source) 108 109 if not source: 110 err_msg = ErrorMessage.TestCase.Code_0203010.format(request.get_source_file()) 111 LOG.error(err_msg) 112 raise ParamError(err_msg) 113 114 json_config = JsonParser(source) 115 kits = get_kit_instances(json_config, request.config.resource_path, 116 request.config.testcases_path) 117 start_smart_perf(driver_obj.config, kits) 118 test_name = request.get_module_name() 119 driver_obj.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name) 120 121 # set configs keys 122 configs = driver_obj._set_configs(json_config, kits, request) 123 124 # handle test args 125 handle_test_args(config=driver_obj.config, request=request) 126 127 # get test list 128 test_list = driver_obj._get_test_list(json_config, request, source) 129 if not test_list: 130 raise ParamError(ErrorMessage.TestCase.Code_0203011) 131 driver_obj._run(configs, test_list) 132 except (ReportException, ModuleNotFoundError, ExecuteTerminate, 133 SyntaxError, ValueError, AttributeError, TypeError, 134 KeyboardInterrupt, ParamError, DeviceError) \ 135 as exception: 136 error_no = getattr(exception, "error_no", "00000") 137 LOG.exception(exception, exc_info=False, error_no=error_no) 138 driver_obj.error_message = exception 139 Binder.notify_stage( 140 CaseEnd(request.get_module_name(), "Failed", str(driver_obj.error_message))) 141 finally: 142 driver_obj._handle_finally(request) 143 144 145@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_test) 146class DeviceTestDriver(IDriver): 147 """ 148 DeviceTest is a Test that runs a host-driven test on given devices. 149 """ 150 # test driver config 151 config = None 152 result = "" 153 error_message = "" 154 py_file = "" 155 156 def __init__(self): 157 self.linux_host = "" 158 self.linux_directory = "" 159 160 def __check_environment__(self, device_options): 161 pass 162 163 def __check_config__(self, config=None): 164 pass 165 166 def __init_nfs_server__(self, request=None): 167 pass 168 169 def __execute__(self, request): 170 do_driver_execute(self, request) 171 172 def _get_test_list(self, json_config, request, source): 173 test_list = get_config_value('py_file', json_config.get_driver(), 174 is_list=True) 175 if str(request.root.source.source_file).endswith(PYD_SUFFIX) or \ 176 str(request.root.source.source_file).endswith(PY_SUFFIX): 177 test_list = [request.root.source.source_file] 178 179 if not test_list and os.path.exists(source): 180 dir_name, file_name = os.path.split(source) 181 file_name, _ = os.path.splitext(file_name) 182 test_list = _get_dict_test_list(os.path.dirname(source), file_name) 183 184 # check test list 185 testcase = request.get("testcase") 186 testcase_list = [] 187 if testcase: 188 testcase_list = str(testcase).split(";") 189 190 checked_test_list = [] 191 for _, test in enumerate(test_list): 192 if not os.path.exists(test): 193 try: 194 absolute_file = get_file_absolute_path(test, [ 195 self.config.resource_path, self.config.testcases_path]) 196 except ParamError as error: 197 LOG.error(error, error_no=error.error_no) 198 continue 199 else: 200 absolute_file = test 201 202 file_name = get_filename_extension(absolute_file)[0] 203 if not testcase_list or file_name in testcase_list: 204 checked_test_list.append(absolute_file) 205 else: 206 LOG.info("Test '%s' is ignored", absolute_file) 207 if checked_test_list: 208 LOG.info("Test list: {}".format(checked_test_list)) 209 else: 210 err_msg = ErrorMessage.TestCase.Code_0203012 211 LOG.error(err_msg) 212 raise ParamError(err_msg) 213 if len(checked_test_list) > 1: 214 err_msg = ErrorMessage.TestCase.Code_0203013.format(request.get_module_name(), test_list) 215 LOG.error(err_msg) 216 raise ParamError(err_msg) 217 return checked_test_list 218 219 def _set_configs(self, json_config, kits, request): 220 configs = dict() 221 configs["testargs"] = self.config.testargs or {} 222 configs["testcases_path"] = self.config.testcases_path or "" 223 configs["request"] = request 224 configs["test_name"] = request.get_module_name() 225 configs["report_path"] = request.config.report_path 226 configs["execute"] = get_config_value( 227 'execute', json_config.get_driver(), False) 228 229 for device in self.config.devices: 230 do_module_kit_setup(request, kits) 231 if device.label == DeviceLabelType.ipcamera: 232 # add extra keys to configs for ipcamera device 233 self.__init_nfs_server__(request=request) 234 configs["linux_host"] = self.linux_host 235 configs["linux_directory"] = self.linux_directory 236 configs["kits"] = kits 237 238 return configs 239 240 def _handle_finally(self, request): 241 # do kit teardown 242 do_module_kit_teardown(request) 243 244 # close device connect 245 for device in self.config.devices: 246 if device.label == DeviceLabelType.ipcamera or device.label == \ 247 DeviceLabelType.watch_gt: 248 device.close() 249 if device.label == DeviceLabelType.phone: 250 device.close() 251 252 # check result report 253 report_name = request.root.source.test_name if \ 254 not request.root.source.test_name.startswith("{") \ 255 else "report" 256 module_name = request.get_module_name() 257 if Binder.session().mode != ModeType.decc: 258 self.result = check_result_report( 259 request.config.report_path, self.result, self.error_message, 260 report_name, module_name) 261 else: 262 tmp_list = copy.copy(SuiteReporter.get_report_result()) 263 if self.result not in [report_path for report_path, _ in tmp_list]: 264 if not self.error_message: 265 self.error_message = "Case not execute[01205]" 266 self.result = check_result_report( 267 request.config.report_path, self.result, 268 self.error_message, report_name, module_name) 269 270 def _run(self, configs, test_list): 271 # insert paths for loading _devicetest module and testcases 272 devicetest_module = os.path.join(Variables.modules_dir, "_devicetest") 273 if os.path.exists(devicetest_module): 274 sys.path.insert(1, devicetest_module) 275 if configs["testcases_path"]: 276 sys.path.insert(1, configs["testcases_path"]) 277 sys.path.insert(1, os.path.dirname(configs["testcases_path"])) 278 279 # apply data to devicetest module about resource path 280 request = configs.get('request', None) 281 if request: 282 sys.ecotest_resource_path = request.config.resource_path 283 284 # run devicetest 285 from devicetest.main import DeviceTest 286 device_test = DeviceTest(test_list, configs, self.config.devices, self.result) 287 device_test.run() 288 289 def __result__(self): 290 if check_mode(ModeType.decc): 291 return self.result 292 return self.result if os.path.exists(self.result) else "" 293 294 295def _get_dict_testsuite(testsuite, config): 296 post_suffix = [PY_SUFFIX, PYD_SUFFIX, PYC_SUFFIX] 297 for suffix in post_suffix: 298 testsuite_file = "{}{}".format(testsuite, suffix) 299 if not os.path.exists(testsuite_file): 300 try: 301 absolute_file = get_file_absolute_path(testsuite_file, [ 302 config.resource_path, config.testcases_path]) 303 return absolute_file 304 except ParamError as error: 305 LOG.error(error, error_no=error.error_no) 306 continue 307 else: 308 return testsuite_file 309 return None 310 311 312@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_testsuite) 313class DeviceTestSuiteDriver(IDriver): 314 """ 315 DeviceTestSuiteDriver is a Test that runs a host-driven test on given devices. 316 """ 317 # test driver config 318 config = None 319 result = "" 320 error_message = "" 321 py_file = "" 322 323 def __init__(self): 324 pass 325 326 def __check_environment__(self, device_options): 327 pass 328 329 def __check_config__(self, config=None): 330 pass 331 332 def __init_nfs_server__(self, request=None): 333 pass 334 335 def __execute__(self, request): 336 do_driver_execute(self, request) 337 338 def _get_test_list(self, json_config, request, source): 339 testsuite = get_config_value('testsuite', json_config.get_driver(), 340 is_list=False) 341 342 if not testsuite and os.path.exists(source): 343 dir_name, file_name = os.path.split(source) 344 file_name, _ = os.path.splitext(file_name) 345 temp_testsuite = _get_dict_test_list(os.path.dirname(source), file_name) 346 if temp_testsuite: 347 testsuite = temp_testsuite[0] 348 349 if not testsuite: 350 err_msg = ErrorMessage.TestCase.Code_0203014 351 LOG.error(err_msg) 352 raise ParamError(err_msg) 353 354 checked_testsuite = None 355 if testsuite.endswith(PY_SUFFIX) or \ 356 testsuite.endswith(PYD_SUFFIX) or \ 357 testsuite.endswith(PYC_SUFFIX): 358 if not os.path.exists(testsuite): 359 try: 360 checked_testsuite = get_file_absolute_path(testsuite, [ 361 self.config.resource_path, self.config.testcases_path]) 362 except ParamError as error: 363 LOG.debug(error, error_no=error.error_no) 364 else: 365 checked_testsuite = testsuite 366 else: 367 checked_testsuite = _get_dict_testsuite(testsuite, self.config) 368 369 if checked_testsuite: 370 LOG.info("Test suite list: {}".format(checked_testsuite)) 371 else: 372 err_msg = ErrorMessage.TestCase.Code_0203012 373 LOG.error(err_msg) 374 raise ParamError(err_msg) 375 return checked_testsuite 376 377 def _set_configs(self, json_config, kits, request): 378 configs = dict() 379 configs["testargs"] = self.config.testargs or {} 380 configs["testcases_path"] = self.config.testcases_path or "" 381 configs["resource_path"] = self.config.resource_path or "" 382 configs["request"] = request 383 configs["device_log"] = request.config.device_log 384 configs["test_name"] = request.get_module_name() 385 configs["report_path"] = request.config.report_path 386 configs["suitecases"] = get_config_value( 387 'suitecases', json_config.get_driver(), True) 388 configs["listeners"] = request.listeners.copy() 389 390 do_module_kit_setup(request, kits) 391 392 return configs 393 394 def _handle_finally(self, request): 395 # do kit teardown 396 do_module_kit_teardown(request) 397 398 # close device connect 399 for device in self.config.devices: 400 if device.label == DeviceLabelType.phone: 401 device.close() 402 403 # check result report 404 report_name = request.root.source.test_name if \ 405 not request.root.source.test_name.startswith("{") \ 406 else "report" 407 module_name = request.get_module_name() 408 self.result = check_result_report( 409 request.config.report_path, self.result, self.error_message, 410 report_name, module_name) 411 412 def _run(self, configs, test_list): 413 if configs["testcases_path"]: 414 sys.path.insert(1, configs["testcases_path"]) 415 sys.path.insert(1, os.path.dirname(configs["testcases_path"])) 416 request = configs.get('request', None) 417 if request: 418 sys.ecotest_resource_path = request.config.resource_path 419 420 # run AppTest 421 from devicetest.main import DeviceTestSuite 422 app_test = DeviceTestSuite(test_list, configs, self.config.devices) 423 app_test.run() 424 425 def __result__(self): 426 return self.result if os.path.exists(self.result) else "" 427