1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import sys
22
23from xdevice import get_plugin
24from xdevice import ModeType
25from xdevice import ConfigConst
26from xdevice import DeviceLabelType
27from xdevice import TestExecType
28from xdevice import DeviceError
29from xdevice import ParamError
30from xdevice import ReportException
31from xdevice import ExecuteTerminate
32from xdevice import IDriver
33from xdevice import platform_logger
34from xdevice import Plugin
35from xdevice import JsonParser
36from xdevice import get_config_value
37from xdevice import do_module_kit_setup
38from xdevice import do_module_kit_teardown
39from xdevice import get_filename_extension
40from xdevice import get_file_absolute_path
41from xdevice import get_kit_instances
42from xdevice import check_result_report
43from xdevice import check_mode
44from xdevice import SuiteReporter
45from xdevice import CaseEnd
46from xdevice import Binder
47from xdevice import HostDrivenTestType
48from xdevice import Variables
49from devicetest.constants import CKit
50from devicetest.core.constants import DeviceTestMode
51from devicetest.error import ErrorMessage
52
53__all__ = ["DeviceTestDriver", "DeviceTestSuiteDriver"]
54LOG = platform_logger("DeviceTest")
55PY_SUFFIX = ".py"
56PYD_SUFFIX = ".pyd"
57PYC_SUFFIX = ".pyc"
58
59
60def _get_dict_test_list(module_path, file_name):
61    test_list = []
62    for root, _, files in os.walk(module_path):
63        for _file in files:
64            if (_file.endswith(".py") or _file.endswith(".pyd")) \
65                    and file_name == os.path.splitext(_file)[0]:
66                test_list.append(os.path.join(root, _file))
67    return test_list
68
69
70def start_smart_perf(config, kits):
71    if not hasattr(config, ConfigConst.kits_in_module):
72        return
73    if CKit.smartperf not in config.get(ConfigConst.kits_in_module):
74        return
75    sp_kits = get_plugin(Plugin.TEST_KIT, CKit.smartperf)[0]
76    sp_kits.target_name = config.get("bundle_name", "")
77    param_config = config.get(ConfigConst.kits_params).get(CKit.smartperf, "")
78    sp_kits.__check_config__(param_config)
79    kits.insert(0, sp_kits)
80
81
82def handle_test_args(config, request):
83    pass
84
85
86def do_driver_execute(driver_obj: IDriver, request):
87    try:
88        # delete sys devicetest mode
89        if hasattr(sys, DeviceTestMode.MODE):
90            delattr(sys, DeviceTestMode.MODE)
91
92        # set self.config
93        driver_obj.config = request.config
94        driver_obj.config.devices = request.get_devices()
95        if request.get("exectype") == TestExecType.device_test and \
96                not driver_obj.config.devices:
97            err_msg = ErrorMessage.TestCase.Code_0203009
98            LOG.error(err_msg)
99            raise ParamError(err_msg)
100
101        # get source, json config and kits
102        if request.get_config_file():
103            source = request.get_config_file()
104            LOG.debug("Test config file path: %s" % source)
105        else:
106            source = request.get_source_string()
107            LOG.debug("Test String: %s" % source)
108
109        if not source:
110            err_msg = ErrorMessage.TestCase.Code_0203010.format(request.get_source_file())
111            LOG.error(err_msg)
112            raise ParamError(err_msg)
113
114        json_config = JsonParser(source)
115        kits = get_kit_instances(json_config, request.config.resource_path,
116                                 request.config.testcases_path)
117        start_smart_perf(driver_obj.config, kits)
118        test_name = request.get_module_name()
119        driver_obj.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name)
120
121        # set configs keys
122        configs = driver_obj._set_configs(json_config, kits, request)
123
124        # handle test args
125        handle_test_args(config=driver_obj.config, request=request)
126
127        # get test list
128        test_list = driver_obj._get_test_list(json_config, request, source)
129        if not test_list:
130            raise ParamError(ErrorMessage.TestCase.Code_0203011)
131        driver_obj._run(configs, test_list)
132    except (ReportException, ModuleNotFoundError, ExecuteTerminate,
133            SyntaxError, ValueError, AttributeError, TypeError,
134            KeyboardInterrupt, ParamError, DeviceError) \
135            as exception:
136        error_no = getattr(exception, "error_no", "00000")
137        LOG.exception(exception, exc_info=False, error_no=error_no)
138        driver_obj.error_message = exception
139        Binder.notify_stage(
140            CaseEnd(request.get_module_name(), "Failed", str(driver_obj.error_message)))
141    finally:
142        driver_obj._handle_finally(request)
143
144
145@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_test)
146class DeviceTestDriver(IDriver):
147    """
148    DeviceTest is a Test that runs a host-driven test on given devices.
149    """
150    # test driver config
151    config = None
152    result = ""
153    error_message = ""
154    py_file = ""
155
156    def __init__(self):
157        self.linux_host = ""
158        self.linux_directory = ""
159
160    def __check_environment__(self, device_options):
161        pass
162
163    def __check_config__(self, config=None):
164        pass
165
166    def __init_nfs_server__(self, request=None):
167        pass
168
169    def __execute__(self, request):
170        do_driver_execute(self, request)
171
172    def _get_test_list(self, json_config, request, source):
173        test_list = get_config_value('py_file', json_config.get_driver(),
174                                     is_list=True)
175        if str(request.root.source.source_file).endswith(PYD_SUFFIX) or \
176                str(request.root.source.source_file).endswith(PY_SUFFIX):
177            test_list = [request.root.source.source_file]
178
179        if not test_list and os.path.exists(source):
180            dir_name, file_name = os.path.split(source)
181            file_name, _ = os.path.splitext(file_name)
182            test_list = _get_dict_test_list(os.path.dirname(source), file_name)
183
184        # check test list
185        testcase = request.get("testcase")
186        testcase_list = []
187        if testcase:
188            testcase_list = str(testcase).split(";")
189
190        checked_test_list = []
191        for _, test in enumerate(test_list):
192            if not os.path.exists(test):
193                try:
194                    absolute_file = get_file_absolute_path(test, [
195                        self.config.resource_path, self.config.testcases_path])
196                except ParamError as error:
197                    LOG.error(error, error_no=error.error_no)
198                    continue
199            else:
200                absolute_file = test
201
202            file_name = get_filename_extension(absolute_file)[0]
203            if not testcase_list or file_name in testcase_list:
204                checked_test_list.append(absolute_file)
205            else:
206                LOG.info("Test '%s' is ignored", absolute_file)
207        if checked_test_list:
208            LOG.info("Test list: {}".format(checked_test_list))
209        else:
210            err_msg = ErrorMessage.TestCase.Code_0203012
211            LOG.error(err_msg)
212            raise ParamError(err_msg)
213        if len(checked_test_list) > 1:
214            err_msg = ErrorMessage.TestCase.Code_0203013.format(request.get_module_name(), test_list)
215            LOG.error(err_msg)
216            raise ParamError(err_msg)
217        return checked_test_list
218
219    def _set_configs(self, json_config, kits, request):
220        configs = dict()
221        configs["testargs"] = self.config.testargs or {}
222        configs["testcases_path"] = self.config.testcases_path or ""
223        configs["request"] = request
224        configs["test_name"] = request.get_module_name()
225        configs["report_path"] = request.config.report_path
226        configs["execute"] = get_config_value(
227            'execute', json_config.get_driver(), False)
228
229        for device in self.config.devices:
230            do_module_kit_setup(request, kits)
231            if device.label == DeviceLabelType.ipcamera:
232                # add extra keys to configs for ipcamera device
233                self.__init_nfs_server__(request=request)
234                configs["linux_host"] = self.linux_host
235                configs["linux_directory"] = self.linux_directory
236                configs["kits"] = kits
237
238        return configs
239
240    def _handle_finally(self, request):
241        # do kit teardown
242        do_module_kit_teardown(request)
243
244        # close device connect
245        for device in self.config.devices:
246            if device.label == DeviceLabelType.ipcamera or device.label == \
247                    DeviceLabelType.watch_gt:
248                device.close()
249            if device.label == DeviceLabelType.phone:
250                device.close()
251
252        # check result report
253        report_name = request.root.source.test_name if \
254            not request.root.source.test_name.startswith("{") \
255            else "report"
256        module_name = request.get_module_name()
257        if Binder.session().mode != ModeType.decc:
258            self.result = check_result_report(
259                request.config.report_path, self.result, self.error_message,
260                report_name, module_name)
261        else:
262            tmp_list = copy.copy(SuiteReporter.get_report_result())
263            if self.result not in [report_path for report_path, _ in tmp_list]:
264                if not self.error_message:
265                    self.error_message = "Case not execute[01205]"
266                self.result = check_result_report(
267                    request.config.report_path, self.result,
268                    self.error_message, report_name, module_name)
269
270    def _run(self, configs, test_list):
271        # insert paths for loading _devicetest module and testcases
272        devicetest_module = os.path.join(Variables.modules_dir, "_devicetest")
273        if os.path.exists(devicetest_module):
274            sys.path.insert(1, devicetest_module)
275        if configs["testcases_path"]:
276            sys.path.insert(1, configs["testcases_path"])
277            sys.path.insert(1, os.path.dirname(configs["testcases_path"]))
278
279        # apply data to devicetest module about resource path
280        request = configs.get('request', None)
281        if request:
282            sys.ecotest_resource_path = request.config.resource_path
283
284        # run devicetest
285        from devicetest.main import DeviceTest
286        device_test = DeviceTest(test_list, configs, self.config.devices, self.result)
287        device_test.run()
288
289    def __result__(self):
290        if check_mode(ModeType.decc):
291            return self.result
292        return self.result if os.path.exists(self.result) else ""
293
294
295def _get_dict_testsuite(testsuite, config):
296    post_suffix = [PY_SUFFIX, PYD_SUFFIX, PYC_SUFFIX]
297    for suffix in post_suffix:
298        testsuite_file = "{}{}".format(testsuite, suffix)
299        if not os.path.exists(testsuite_file):
300            try:
301                absolute_file = get_file_absolute_path(testsuite_file, [
302                    config.resource_path, config.testcases_path])
303                return absolute_file
304            except ParamError as error:
305                LOG.error(error, error_no=error.error_no)
306                continue
307        else:
308            return testsuite_file
309    return None
310
311
312@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.device_testsuite)
313class DeviceTestSuiteDriver(IDriver):
314    """
315    DeviceTestSuiteDriver is a Test that runs a host-driven test on given devices.
316    """
317    # test driver config
318    config = None
319    result = ""
320    error_message = ""
321    py_file = ""
322
323    def __init__(self):
324        pass
325
326    def __check_environment__(self, device_options):
327        pass
328
329    def __check_config__(self, config=None):
330        pass
331
332    def __init_nfs_server__(self, request=None):
333        pass
334
335    def __execute__(self, request):
336        do_driver_execute(self, request)
337
338    def _get_test_list(self, json_config, request, source):
339        testsuite = get_config_value('testsuite', json_config.get_driver(),
340                                     is_list=False)
341
342        if not testsuite and os.path.exists(source):
343            dir_name, file_name = os.path.split(source)
344            file_name, _ = os.path.splitext(file_name)
345            temp_testsuite = _get_dict_test_list(os.path.dirname(source), file_name)
346            if temp_testsuite:
347                testsuite = temp_testsuite[0]
348
349        if not testsuite:
350            err_msg = ErrorMessage.TestCase.Code_0203014
351            LOG.error(err_msg)
352            raise ParamError(err_msg)
353
354        checked_testsuite = None
355        if testsuite.endswith(PY_SUFFIX) or \
356                testsuite.endswith(PYD_SUFFIX) or \
357                testsuite.endswith(PYC_SUFFIX):
358            if not os.path.exists(testsuite):
359                try:
360                    checked_testsuite = get_file_absolute_path(testsuite, [
361                        self.config.resource_path, self.config.testcases_path])
362                except ParamError as error:
363                    LOG.debug(error, error_no=error.error_no)
364            else:
365                checked_testsuite = testsuite
366        else:
367            checked_testsuite = _get_dict_testsuite(testsuite, self.config)
368
369        if checked_testsuite:
370            LOG.info("Test suite list: {}".format(checked_testsuite))
371        else:
372            err_msg = ErrorMessage.TestCase.Code_0203012
373            LOG.error(err_msg)
374            raise ParamError(err_msg)
375        return checked_testsuite
376
377    def _set_configs(self, json_config, kits, request):
378        configs = dict()
379        configs["testargs"] = self.config.testargs or {}
380        configs["testcases_path"] = self.config.testcases_path or ""
381        configs["resource_path"] = self.config.resource_path or ""
382        configs["request"] = request
383        configs["device_log"] = request.config.device_log
384        configs["test_name"] = request.get_module_name()
385        configs["report_path"] = request.config.report_path
386        configs["suitecases"] = get_config_value(
387            'suitecases', json_config.get_driver(), True)
388        configs["listeners"] = request.listeners.copy()
389
390        do_module_kit_setup(request, kits)
391
392        return configs
393
394    def _handle_finally(self, request):
395        # do kit teardown
396        do_module_kit_teardown(request)
397
398        # close device connect
399        for device in self.config.devices:
400            if device.label == DeviceLabelType.phone:
401                device.close()
402
403        # check result report
404        report_name = request.root.source.test_name if \
405            not request.root.source.test_name.startswith("{") \
406            else "report"
407        module_name = request.get_module_name()
408        self.result = check_result_report(
409            request.config.report_path, self.result, self.error_message,
410            report_name, module_name)
411
412    def _run(self, configs, test_list):
413        if configs["testcases_path"]:
414            sys.path.insert(1, configs["testcases_path"])
415            sys.path.insert(1, os.path.dirname(configs["testcases_path"]))
416        request = configs.get('request', None)
417        if request:
418            sys.ecotest_resource_path = request.config.resource_path
419
420        # run AppTest
421        from devicetest.main import DeviceTestSuite
422        app_test = DeviceTestSuite(test_list, configs, self.config.devices)
423        app_test.run()
424
425    def __result__(self):
426        return self.result if os.path.exists(self.result) else ""
427