176e6818aSopenharmony_ci#!/usr/bin/env python3
276e6818aSopenharmony_ci# coding=utf-8
376e6818aSopenharmony_ci
476e6818aSopenharmony_ci#
576e6818aSopenharmony_ci# Copyright (c) 2022 Huawei Device Co., Ltd.
676e6818aSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
776e6818aSopenharmony_ci# you may not use this file except in compliance with the License.
876e6818aSopenharmony_ci# You may obtain a copy of the License at
976e6818aSopenharmony_ci#
1076e6818aSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
1176e6818aSopenharmony_ci#
1276e6818aSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1376e6818aSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1476e6818aSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1576e6818aSopenharmony_ci# See the License for the specific language governing permissions and
1676e6818aSopenharmony_ci# limitations under the License.
1776e6818aSopenharmony_ci#
1876e6818aSopenharmony_ci
1976e6818aSopenharmony_ciimport copy
2076e6818aSopenharmony_ciimport os
2176e6818aSopenharmony_ciimport sys
2276e6818aSopenharmony_ci
2376e6818aSopenharmony_cifrom xdevice import HostDrivenTestType
2476e6818aSopenharmony_cifrom xdevice import ModeType
2576e6818aSopenharmony_cifrom xdevice import DeviceError
2676e6818aSopenharmony_cifrom xdevice import LiteDeviceError
2776e6818aSopenharmony_cifrom xdevice import ParamError
2876e6818aSopenharmony_cifrom xdevice import ReportException
2976e6818aSopenharmony_cifrom xdevice import ExecuteTerminate
3076e6818aSopenharmony_cifrom xdevice import IDriver
3176e6818aSopenharmony_cifrom xdevice import platform_logger
3276e6818aSopenharmony_cifrom xdevice import Plugin
3376e6818aSopenharmony_cifrom xdevice import JsonParser
3476e6818aSopenharmony_cifrom xdevice import get_config_value
3576e6818aSopenharmony_cifrom xdevice import do_module_kit_teardown
3676e6818aSopenharmony_cifrom xdevice import get_filename_extension
3776e6818aSopenharmony_cifrom xdevice import get_file_absolute_path
3876e6818aSopenharmony_cifrom xdevice import get_kit_instances
3976e6818aSopenharmony_cifrom xdevice import check_result_report
4076e6818aSopenharmony_cifrom xdevice import check_mode
4176e6818aSopenharmony_cifrom xdevice import SuiteReporter
4276e6818aSopenharmony_cifrom devicetest.error import ErrorMessage
4376e6818aSopenharmony_ci
4476e6818aSopenharmony_ciLOG = platform_logger("WindowsTest")
4576e6818aSopenharmony_ciPY_SUFFIX = ".py"
4676e6818aSopenharmony_ciPYD_SUFFIX = ".pyd"
4776e6818aSopenharmony_ci
4876e6818aSopenharmony_ci
4976e6818aSopenharmony_ci@Plugin(type=Plugin.DRIVER, id=HostDrivenTestType.windows_test)
5076e6818aSopenharmony_ciclass WindowsTestDriver(IDriver):
5176e6818aSopenharmony_ci    """
5276e6818aSopenharmony_ci    DeviceTest is a Test that runs a host-driven test on given devices.
5376e6818aSopenharmony_ci    """
5476e6818aSopenharmony_ci    # test driver config
5576e6818aSopenharmony_ci    config = None
5676e6818aSopenharmony_ci    result = ""
5776e6818aSopenharmony_ci    error_message = ""
5876e6818aSopenharmony_ci    py_file = ""
5976e6818aSopenharmony_ci
6076e6818aSopenharmony_ci    def __init__(self):
6176e6818aSopenharmony_ci        self.linux_host = ""
6276e6818aSopenharmony_ci        self.linux_directory = ""
6376e6818aSopenharmony_ci        self.hilog_file_pipes = []
6476e6818aSopenharmony_ci
6576e6818aSopenharmony_ci    def __check_environment__(self, device_options):
6676e6818aSopenharmony_ci        pass
6776e6818aSopenharmony_ci
6876e6818aSopenharmony_ci    def __check_config__(self, config=None):
6976e6818aSopenharmony_ci        pass
7076e6818aSopenharmony_ci
7176e6818aSopenharmony_ci    def __init_nfs_server__(self, request=None):
7276e6818aSopenharmony_ci        pass
7376e6818aSopenharmony_ci
7476e6818aSopenharmony_ci    def __execute__(self, request):
7576e6818aSopenharmony_ci        try:
7676e6818aSopenharmony_ci            # set self.config
7776e6818aSopenharmony_ci            self.config = request.config
7876e6818aSopenharmony_ci            # get source, json config and kits
7976e6818aSopenharmony_ci            if request.get_config_file():
8076e6818aSopenharmony_ci                source = request.get_config_file()
8176e6818aSopenharmony_ci                LOG.debug("Test config file path: %s" % source)
8276e6818aSopenharmony_ci            else:
8376e6818aSopenharmony_ci                source = request.get_source_string()
8476e6818aSopenharmony_ci                LOG.debug("Test String: %s" % source)
8576e6818aSopenharmony_ci
8676e6818aSopenharmony_ci            if not source:
8776e6818aSopenharmony_ci                err_msg = ErrorMessage.TestCase.Code_0203010.format(request.get_source_file())
8876e6818aSopenharmony_ci                LOG.error(err_msg)
8976e6818aSopenharmony_ci                raise ParamError(err_msg)
9076e6818aSopenharmony_ci
9176e6818aSopenharmony_ci            json_config = JsonParser(source)
9276e6818aSopenharmony_ci            kits = get_kit_instances(json_config, request.config.resource_path,
9376e6818aSopenharmony_ci                                     request.config.testcases_path)
9476e6818aSopenharmony_ci
9576e6818aSopenharmony_ci            test_name = request.get_module_name()
9676e6818aSopenharmony_ci            self.result = os.path.join(request.config.report_path, "result", "%s.xml" % test_name)
9776e6818aSopenharmony_ci
9876e6818aSopenharmony_ci            # set configs keys
9976e6818aSopenharmony_ci            configs = self._set_configs(json_config, kits, request)
10076e6818aSopenharmony_ci
10176e6818aSopenharmony_ci            # get test list
10276e6818aSopenharmony_ci            test_list = self._get_test_list(json_config, request, source)
10376e6818aSopenharmony_ci            if not test_list:
10476e6818aSopenharmony_ci                raise ParamError(ErrorMessage.TestCase.Code_0203011)
10576e6818aSopenharmony_ci            self._run_devicetest(configs, test_list)
10676e6818aSopenharmony_ci        except (ReportException, ModuleNotFoundError, ExecuteTerminate,
10776e6818aSopenharmony_ci                SyntaxError, ValueError, AttributeError, TypeError,
10876e6818aSopenharmony_ci                KeyboardInterrupt, ParamError, DeviceError, LiteDeviceError) \
10976e6818aSopenharmony_ci                as exception:
11076e6818aSopenharmony_ci            error_no = getattr(exception, "error_no", "00000")
11176e6818aSopenharmony_ci            LOG.exception(exception, exc_info=False, error_no=error_no)
11276e6818aSopenharmony_ci            self.error_message = exception
11376e6818aSopenharmony_ci
11476e6818aSopenharmony_ci        finally:
11576e6818aSopenharmony_ci            self._handle_finally(request)
11676e6818aSopenharmony_ci
11776e6818aSopenharmony_ci    def _get_test_list(self, json_config, request, source):
11876e6818aSopenharmony_ci        test_list = get_config_value('py_file', json_config.get_driver(),
11976e6818aSopenharmony_ci                                     is_list=True)
12076e6818aSopenharmony_ci        if str(request.root.source.source_file).endswith(PYD_SUFFIX) or \
12176e6818aSopenharmony_ci                str(request.root.source.source_file).endswith(PY_SUFFIX):
12276e6818aSopenharmony_ci            test_list = [request.root.source.source_file]
12376e6818aSopenharmony_ci
12476e6818aSopenharmony_ci        if not test_list and os.path.exists(source):
12576e6818aSopenharmony_ci            test_list = _get_dict_test_list(os.path.dirname(source))
12676e6818aSopenharmony_ci
12776e6818aSopenharmony_ci        # check test list
12876e6818aSopenharmony_ci        testcase = request.get("testcase")
12976e6818aSopenharmony_ci        testcase_list = []
13076e6818aSopenharmony_ci        if testcase:
13176e6818aSopenharmony_ci            testcase_list = str(testcase).split(";")
13276e6818aSopenharmony_ci
13376e6818aSopenharmony_ci        checked_test_list = []
13476e6818aSopenharmony_ci        for _, test in enumerate(test_list):
13576e6818aSopenharmony_ci            if not os.path.exists(test):
13676e6818aSopenharmony_ci                try:
13776e6818aSopenharmony_ci                    absolute_file = get_file_absolute_path(test, [
13876e6818aSopenharmony_ci                        self.config.resource_path, self.config.testcases_path])
13976e6818aSopenharmony_ci                except ParamError as error:
14076e6818aSopenharmony_ci                    LOG.error(error, error_no=error.error_no)
14176e6818aSopenharmony_ci                    continue
14276e6818aSopenharmony_ci            else:
14376e6818aSopenharmony_ci                absolute_file = test
14476e6818aSopenharmony_ci
14576e6818aSopenharmony_ci            file_name = get_filename_extension(absolute_file)[0]
14676e6818aSopenharmony_ci            if not testcase_list or file_name in testcase_list:
14776e6818aSopenharmony_ci                checked_test_list.append(absolute_file)
14876e6818aSopenharmony_ci            else:
14976e6818aSopenharmony_ci                LOG.info("Test '%s' is ignored", absolute_file)
15076e6818aSopenharmony_ci        if checked_test_list:
15176e6818aSopenharmony_ci            LOG.info("Test list: {}".format(checked_test_list))
15276e6818aSopenharmony_ci        else:
15376e6818aSopenharmony_ci            err_msg = ErrorMessage.TestCase.Code_0203012
15476e6818aSopenharmony_ci            LOG.error(err_msg)
15576e6818aSopenharmony_ci            raise ParamError(err_msg)
15676e6818aSopenharmony_ci        return checked_test_list
15776e6818aSopenharmony_ci
15876e6818aSopenharmony_ci    def _set_configs(self, json_config, kits, request):
15976e6818aSopenharmony_ci        configs = dict()
16076e6818aSopenharmony_ci        configs["testargs"] = self.config.testargs or {}
16176e6818aSopenharmony_ci        configs["testcases_path"] = self.config.testcases_path or ""
16276e6818aSopenharmony_ci        configs["request"] = request
16376e6818aSopenharmony_ci        configs["test_name"] = request.get_module_name()
16476e6818aSopenharmony_ci        configs["report_path"] = request.config.report_path
16576e6818aSopenharmony_ci        configs["execute"] = get_config_value(
16676e6818aSopenharmony_ci            'execute', json_config.get_driver(), False)
16776e6818aSopenharmony_ci        return configs
16876e6818aSopenharmony_ci
16976e6818aSopenharmony_ci    def _handle_finally(self, request):
17076e6818aSopenharmony_ci        from xdevice import Binder
17176e6818aSopenharmony_ci
17276e6818aSopenharmony_ci        # do kit teardown
17376e6818aSopenharmony_ci        do_module_kit_teardown(request)
17476e6818aSopenharmony_ci
17576e6818aSopenharmony_ci        # check result report
17676e6818aSopenharmony_ci        report_name = request.root.source.test_name if \
17776e6818aSopenharmony_ci            not request.root.source.test_name.startswith("{") \
17876e6818aSopenharmony_ci            else "report"
17976e6818aSopenharmony_ci        module_name = request.get_module_name()
18076e6818aSopenharmony_ci        if Binder.session().mode != ModeType.decc:
18176e6818aSopenharmony_ci            self.result = check_result_report(
18276e6818aSopenharmony_ci                request.config.report_path, self.result, self.error_message,
18376e6818aSopenharmony_ci                report_name, module_name)
18476e6818aSopenharmony_ci        else:
18576e6818aSopenharmony_ci            tmp_list = copy.copy(SuiteReporter.get_report_result())
18676e6818aSopenharmony_ci            if self.result not in [report_path for report_path, _ in tmp_list]:
18776e6818aSopenharmony_ci                if not self.error_message:
18876e6818aSopenharmony_ci                    self.error_message = "Case not execute[01205]"
18976e6818aSopenharmony_ci                self.result = check_result_report(
19076e6818aSopenharmony_ci                    request.config.report_path, self.result,
19176e6818aSopenharmony_ci                    self.error_message, report_name, module_name)
19276e6818aSopenharmony_ci
19376e6818aSopenharmony_ci    def _create_tmp_folder(self, request):
19476e6818aSopenharmony_ci        if request.root.source.source_file.strip():
19576e6818aSopenharmony_ci            folder_name = "task_%s_%s" % (self.config.tmp_id,
19676e6818aSopenharmony_ci                                          request.root.source.test_name)
19776e6818aSopenharmony_ci        else:
19876e6818aSopenharmony_ci            folder_name = "task_%s_report" % self.config.tmp_id
19976e6818aSopenharmony_ci
20076e6818aSopenharmony_ci        tmp_sub_folder = os.path.join(self.config.tmp_folder, folder_name)
20176e6818aSopenharmony_ci        os.makedirs(tmp_sub_folder, exist_ok=True)
20276e6818aSopenharmony_ci        return tmp_sub_folder
20376e6818aSopenharmony_ci
20476e6818aSopenharmony_ci    def _run_devicetest(self, configs, test_list):
20576e6818aSopenharmony_ci        from xdevice import Variables
20676e6818aSopenharmony_ci
20776e6818aSopenharmony_ci        # insert paths for loading _devicetest module and testcases
20876e6818aSopenharmony_ci        devicetest_module = os.path.join(Variables.modules_dir, "_devicetest")
20976e6818aSopenharmony_ci        if os.path.exists(devicetest_module):
21076e6818aSopenharmony_ci            sys.path.insert(1, devicetest_module)
21176e6818aSopenharmony_ci        if configs["testcases_path"]:
21276e6818aSopenharmony_ci            sys.path.insert(1, configs["testcases_path"])
21376e6818aSopenharmony_ci
21476e6818aSopenharmony_ci        # apply data to devicetest module about resource path
21576e6818aSopenharmony_ci        request = configs.get('request', None)
21676e6818aSopenharmony_ci        if request:
21776e6818aSopenharmony_ci            sys.ecotest_resource_path = request.config.resource_path
21876e6818aSopenharmony_ci
21976e6818aSopenharmony_ci
22076e6818aSopenharmony_ci        # run devicetest
22176e6818aSopenharmony_ci        from devicetest.main import DeviceTest
22276e6818aSopenharmony_ci        device_test = DeviceTest(test_list, configs, None, self.result)
22376e6818aSopenharmony_ci        device_test.run()
22476e6818aSopenharmony_ci
22576e6818aSopenharmony_ci    def __result__(self):
22676e6818aSopenharmony_ci        if check_mode(ModeType.decc):
22776e6818aSopenharmony_ci            return self.result
22876e6818aSopenharmony_ci        return self.result if os.path.exists(self.result) else ""
22976e6818aSopenharmony_ci
23076e6818aSopenharmony_ci
23176e6818aSopenharmony_cidef _get_dict_test_list(module_path):
23276e6818aSopenharmony_ci    test_list = []
23376e6818aSopenharmony_ci    for root, _, files in os.walk(module_path):
23476e6818aSopenharmony_ci        for _file in files:
23576e6818aSopenharmony_ci            if _file.endswith(".py") or _file.endswith(".pyd"):
23676e6818aSopenharmony_ci                test_list.append(os.path.join(root, _file))
23776e6818aSopenharmony_ci    return test_list
238