1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import copy
20import os
21import sys
22
23from xdevice import HostDrivenTestType
24from xdevice import ModeType
25from xdevice import DeviceError
26from xdevice import LiteDeviceError
27from xdevice import ParamError
28from xdevice import ReportException
29from xdevice import ExecuteTerminate
30from xdevice import IDriver
31from xdevice import platform_logger
32from xdevice import Plugin
33from xdevice import JsonParser
34from xdevice import get_config_value
35from xdevice import do_module_kit_teardown
36from xdevice import get_filename_extension
37from xdevice import get_file_absolute_path
38from xdevice import get_kit_instances
39from xdevice import check_result_report
40from xdevice import check_mode
41from xdevice import SuiteReporter
42from devicetest.error import ErrorMessage
43
44LOG = platform_logger("WindowsTest")
45PY_SUFFIX = ".py"
46PYD_SUFFIX = ".pyd"
47
48
49@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.windows_test)
50class WindowsTestDriver(IDriver):
51    """
52    DeviceTest is a Test that runs a host-driven test on given devices.
53    """
54    # test driver config
55    config = None
56    result = ""
57    error_message = ""
58    py_file = ""
59
60    def __init__(self):
61        self.linux_host = ""
62        self.linux_directory = ""
63        self.hilog_file_pipes = []
64
65    def __check_environment__(self, device_options):
66        pass
67
68    def __check_config__(self, config=None):
69        pass
70
71    def __init_nfs_server__(self, request=None):
72        pass
73
74    def __execute__(self, request):
75        try:
76            # set self.config
77            self.config = request.config
78            # get source, json config and kits
79            if request.get_config_file():
80                source = request.get_config_file()
81                LOG.debug("Test config file path: %s" % source)
82            else:
83                source = request.get_source_string()
84                LOG.debug("Test String: %s" % source)
85
86            if not source:
87                err_msg = ErrorMessage.TestCase.Code_0203010.format(request.get_source_file())
88                LOG.error(err_msg)
89                raise ParamError(err_msg)
90
91            json_config = JsonParser(source)
92            kits = get_kit_instances(json_config, request.config.resource_path,
93                                     request.config.testcases_path)
94
95            test_name = request.get_module_name()
96            self.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name)
97
98            # set configs keys
99            configs = self._set_configs(json_config, kits, request)
100
101            # get test list
102            test_list = self._get_test_list(json_config, request, source)
103            if not test_list:
104                raise ParamError(ErrorMessage.TestCase.Code_0203011)
105            self._run_devicetest(configs, test_list)
106        except (ReportException, ModuleNotFoundError, ExecuteTerminate,
107                SyntaxError, ValueError, AttributeError, TypeError,
108                KeyboardInterrupt, ParamError, DeviceError, LiteDeviceError) \
109                as exception:
110            error_no = getattr(exception, "error_no", "00000")
111            LOG.exception(exception, exc_info=False, error_no=error_no)
112            self.error_message = exception
113
114        finally:
115            self._handle_finally(request)
116
117    def _get_test_list(self, json_config, request, source):
118        test_list = get_config_value('py_file', json_config.get_driver(),
119                                     is_list=True)
120        if str(request.root.source.source_file).endswith(PYD_SUFFIX) or \
121                str(request.root.source.source_file).endswith(PY_SUFFIX):
122            test_list = [request.root.source.source_file]
123
124        if not test_list and os.path.exists(source):
125            test_list = _get_dict_test_list(os.path.dirname(source))
126
127        # check test list
128        testcase = request.get("testcase")
129        testcase_list = []
130        if testcase:
131            testcase_list = str(testcase).split(";")
132
133        checked_test_list = []
134        for _, test in enumerate(test_list):
135            if not os.path.exists(test):
136                try:
137                    absolute_file = get_file_absolute_path(test, [
138                        self.config.resource_path, self.config.testcases_path])
139                except ParamError as error:
140                    LOG.error(error, error_no=error.error_no)
141                    continue
142            else:
143                absolute_file = test
144
145            file_name = get_filename_extension(absolute_file)[0]
146            if not testcase_list or file_name in testcase_list:
147                checked_test_list.append(absolute_file)
148            else:
149                LOG.info("Test '%s' is ignored", absolute_file)
150        if checked_test_list:
151            LOG.info("Test list: {}".format(checked_test_list))
152        else:
153            err_msg = ErrorMessage.TestCase.Code_0203012
154            LOG.error(err_msg)
155            raise ParamError(err_msg)
156        return checked_test_list
157
158    def _set_configs(self, json_config, kits, request):
159        configs = dict()
160        configs["testargs"] = self.config.testargs or {}
161        configs["testcases_path"] = self.config.testcases_path or ""
162        configs["request"] = request
163        configs["test_name"] = request.get_module_name()
164        configs["report_path"] = request.config.report_path
165        configs["execute"] = get_config_value(
166            'execute', json_config.get_driver(), False)
167        return configs
168
169    def _handle_finally(self, request):
170        from xdevice import Binder
171
172        # do kit teardown
173        do_module_kit_teardown(request)
174
175        # check result report
176        report_name = request.root.source.test_name if \
177            not request.root.source.test_name.startswith("{") \
178            else "report"
179        module_name = request.get_module_name()
180        if Binder.session().mode != ModeType.decc:
181            self.result = check_result_report(
182                request.config.report_path, self.result, self.error_message,
183                report_name, module_name)
184        else:
185            tmp_list = copy.copy(SuiteReporter.get_report_result())
186            if self.result not in [report_path for report_path, _ in tmp_list]:
187                if not self.error_message:
188                    self.error_message = "Case not execute[01205]"
189                self.result = check_result_report(
190                    request.config.report_path, self.result,
191                    self.error_message, report_name, module_name)
192
193    def _create_tmp_folder(self, request):
194        if request.root.source.source_file.strip():
195            folder_name = "task_%s_%s" % (self.config.tmp_id,
196                                          request.root.source.test_name)
197        else:
198            folder_name = "task_%s_report" % self.config.tmp_id
199
200        tmp_sub_folder = os.path.join(self.config.tmp_folder, folder_name)
201        os.makedirs(tmp_sub_folder, exist_ok=True)
202        return tmp_sub_folder
203
204    def _run_devicetest(self, configs, test_list):
205        from xdevice import Variables
206
207        # insert paths for loading _devicetest module and testcases
208        devicetest_module = os.path.join(Variables.modules_dir, "_devicetest")
209        if os.path.exists(devicetest_module):
210            sys.path.insert(1, devicetest_module)
211        if configs["testcases_path"]:
212            sys.path.insert(1, configs["testcases_path"])
213
214        # apply data to devicetest module about resource path
215        request = configs.get('request', None)
216        if request:
217            sys.ecotest_resource_path = request.config.resource_path
218
219
220        # run devicetest
221        from devicetest.main import DeviceTest
222        device_test = DeviceTest(test_list, configs, None, self.result)
223        device_test.run()
224
225    def __result__(self):
226        if check_mode(ModeType.decc):
227            return self.result
228        return self.result if os.path.exists(self.result) else ""
229
230
231def _get_dict_test_list(module_path):
232    test_list = []
233    for root, _, files in os.walk(module_path):
234        for _file in files:
235            if _file.endswith(".py") or _file.endswith(".pyd"):
236                test_list.append(os.path.join(root, _file))
237    return test_list
238