1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import time
22from dataclasses import dataclass
23
24from xdevice import DeviceTestType
25from xdevice import IDriver
26from xdevice import Plugin
27from xdevice import platform_logger
28from xdevice import DeviceLabelType
29from xdevice import ShellHandler
30from xdevice import ExecuteTerminate
31from xdevice import get_plugin
32from xdevice import JsonParser
33from xdevice import get_config_value
34from xdevice import get_kit_instances
35from xdevice import check_result_report
36from xdevice import get_device_log_file
37from xdevice import get_test_component_version
38from xdevice import ParamError
39from ohos.constants import ParserType
40from ohos.constants import ComType
41from ohos.constants import CKit
42from ohos.exception import LiteDeviceExecuteCommandError
43from core.utils import get_filename_extension
44from core.testkit.kit_lite import DeployKit
45from core.config.resource_manager import ResourceManager
46from core.config.config_manager import UserConfigManager
47
48__all__ = ["LiteUnitTest", "CTestDriver", "JSUnitTestLiteDriver"]
49LOG = platform_logger("LiteUnitTest")
50
51
52def get_level_para_string(level_string):
53    level_list = list(set(level_string.split(",")))
54    level_para_string = ""
55    for item in level_list:
56        if not item.isdigit():
57            continue
58        item = item.strip(" ")
59        level_para_string = f"{level_para_string}Level{item},"
60    level_para_string = level_para_string.strip(",")
61    return level_para_string
62
63
64@dataclass
65class GTestConst(object):
66    exec_para_filter = "--gtest_filter"
67    exec_para_level = "--gtest_testsize"
68
69
70@Plugin(type=Plugin.DRIVER, id=DeviceTestType.lite_cpp_test)
71class LiteUnitTest(IDriver):
72    """
73    lite gtest test driver for L1
74    """
75    config = None
76    log = platform_logger("LiteUnitTest")
77    nfs_dir = ""
78    mnt_cmd = ""
79    lite_device = None
80    result = None
81
82    def __check_failed__(self, msg):
83        self.log.error("check failed {}".format(msg))
84        return
85
86    def __check_environment__(self, device_options):
87        pass
88
89    def __check_config__(self, config):
90        """
91        1. check serial protocol
92        2. login device
93        3. NFS is available
94        :param config: serial device
95        :return:
96        """
97        self.log.error("Lite driver check config:{}".format(config))
98
99    def __execute__(self, request):
100        """
101
102        1. select test case by subsystem, module, suite
103        2. open test dir
104        3、execute single test case, eg. ./test_demo
105        :param request: contains test condition, sub_system
106            module_name,test_suit,
107        test_case,test_level,test_case_dir
108        :return:
109        """
110        self.log.debug("Test suite FilePath: %s" %
111                      request.root.source.source_file)
112        self.lite_device = request.config.environment.devices[0]
113        self.lite_device.connect()
114        if not self._before_execute_test():
115            self.log.error("open test dir failed")
116            return
117        self.log.debug("open test dir success")
118        if self._execute_test(request) == "":
119            self.log.error("execute test command failed")
120            return
121        self.log.info("execute test command success")
122        if not self._after_execute_test(request):
123            self.log.error("after execute test failed")
124            return
125        self.log.info("lite device execute request success")
126
127    def __result__(self):
128        pass
129
130    def show_help_info(self):
131        """
132        show help info.
133        """
134        self.log.info("this is test driver for cpp test")
135        return
136
137    def show_driver_info(self):
138        """
139        show driver info.
140        """
141        self.log.info("this is test driver for cpp test")
142        return
143
144    def _mount_nfs_server(self):
145        #before execute each suits bin, mount nfs
146        self.mnt_cmd = "mount {}".format(UserConfigManager().get_user_config(
147            "NFS").get("mnt_cmd"))
148        if self.mnt_cmd == "mount ":
149            self.log.error("no configure for mount command")
150            return
151
152        filter_result, status, _ = \
153            self.lite_device.execute_command_with_timeout(
154            self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test, timeout=3)
155        if "already mounted" in filter_result:
156            self.log.info("nfs has been mounted")
157            return
158
159        for i in range(0, 2):
160            if status:
161                self.log.info("execute mount command success")
162                return
163
164            self.log.info("try mount %d" % (i + 2))
165            _, status, _ = self.lite_device.execute_command_with_timeout(
166                self.mnt_cmd, case_type=DeviceTestType.lite_cpp_test,
167                timeout=3)
168
169        self.log.error("execute mount command failed")
170
171    def _before_execute_test(self):
172        """
173        need copy test case to nfs dir
174        :param request: nfs dir, test case path
175        :return:
176        """
177        self.nfs_dir = \
178            UserConfigManager().get_user_config("NFS").get("host_dir")
179        if self.nfs_dir == "":
180            self.log.error("no configure for nfs directory")
181            return False
182
183        self._mount_nfs_server()
184        _, status, _ = \
185            self.lite_device.execute_command_with_timeout("cd /{}".format(
186                UserConfigManager().get_user_config("NFS").get("board_dir")),
187            case_type=DeviceTestType.lite_cpp_test)
188        if not status:
189            self.log.error("pre execute command failed")
190            return False
191
192        self.log.info("pre execute command success")
193        return True
194
195    def _execute_test(self, request):
196        test_case = request.root.source.source_file
197        self.config = request.config
198        test_para = self._get_test_para(self.config.testcase,
199                                       self.config.testlevel)
200        case_name = os.path.basename(test_case)
201        if os.path.exists(os.path.join(self.nfs_dir, case_name)):
202            os.remove(os.path.join(self.nfs_dir, case_name))
203        result_name = case_name + ".xml"
204        result_file = os.path.join(self.nfs_dir, result_name)
205        if os.path.exists(result_file):
206            os.remove(result_file)
207        shutil.copyfile(test_case, os.path.join(self.nfs_dir, case_name))
208        # push resource files
209        resource_manager = ResourceManager()
210        resource_data_dic, resource_dir = \
211            resource_manager.get_resource_data_dic(test_case)
212        resource_manager.lite_process_preparer_data(resource_data_dic, resource_dir)
213        self.lite_device.execute_command_with_timeout(
214            "chmod 777 {}".format(case_name),
215            case_type=DeviceTestType.lite_cpp_test)
216        test_command = "./%s %s" % (case_name, test_para)
217        case_result, status, _ = \
218            self.lite_device.execute_command_with_timeout(
219            test_command, case_type=DeviceTestType.lite_cpp_test)
220        if status:
221            self.log.info("test case result:\n %s" % case_result)
222            return
223
224        self.log.error("failed case: %s" % test_case)
225
226    def _get_test_para(self, testcase, testlevel):
227        if "" != testcase and "" == testlevel:
228            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
229        elif "" == testcase and "" != testlevel:
230            level_para = get_level_para_string(testlevel)
231            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
232        else:
233            test_para = ""
234        return test_para
235
236    def _after_execute_test(self, request):
237        """
238        copy test result to result dir
239        :param request:
240        :return:
241        """
242        if request.config is None:
243            self.log.error("test config is null")
244            return False
245
246        report_path = request.config.report_path
247        test_result = os.path.join(report_path, "result")
248        test_case = request.root.source.source_file
249        case_name = os.path.basename(test_case)
250        if not os.path.exists(test_result):
251            os.mkdir(test_result)
252        sub_system_module = test_case.split(
253            "unittest" + os.sep)[1].split(os.sep + "bin")[0]
254        if os.sep in sub_system_module:
255            sub_system = sub_system_module.split(os.sep)[0]
256            module_name = sub_system_module.split(os.sep)[1]
257            subsystem_dir = os.path.join(test_result, sub_system)
258            if not os.path.exists(subsystem_dir):
259                os.mkdir(subsystem_dir)
260            module_dir = os.path.join(subsystem_dir, module_name)
261            if not os.path.exists(module_dir):
262                os.mkdir(module_dir)
263            test_result = module_dir
264        else:
265            if sub_system_module != "":
266                test_result = os.path.join(test_result, sub_system_module)
267                if not os.path.exists(test_result):
268                    os.mkdir(test_result)
269        result_name = case_name + ".xml"
270        result_file = os.path.join(self.nfs_dir, result_name)
271        if not self._check_xml_exist(result_name):
272            self.log.error("result xml file %s not exist." % result_name)
273        if not os.path.exists(result_file):
274            self.log.error("file %s not exist." % result_file)
275            self._clear_nfs_space()
276            return False
277
278        file_name = os.path.basename(result_file)
279        final_result = os.path.join(test_result, file_name)
280        shutil.copyfile(result_file,
281                        final_result)
282        self.log.info("after execute test")
283        self._clear_nfs_space()
284        self.lite_device.close()
285        return True
286
287    def _clear_nfs_space(self):
288        _, status, _ = \
289            self.lite_device.execute_command_with_timeout(
290                "cd ..",
291                case_type=DeviceTestType.lite_cpp_test)
292        _, status, _ = \
293            self.lite_device.execute_command_with_timeout(
294                "umount %s" % UserConfigManager().get_user_config("NFS").get("board_dir"),
295                case_type=DeviceTestType.lite_cpp_test)
296        shutil.rmtree(self.nfs_dir)
297        os.mkdir(self.nfs_dir)
298
299    def _check_xml_exist(self, xml_file, timeout=10):
300        ls_command = \
301            "ls /%s" % \
302            UserConfigManager().get_user_config("NFS").get("board_dir")
303        start_time = time.time()
304        while time.time() - start_time < timeout:
305            result, _, _ = self.lite_device.execute_command_with_timeout(
306                command=ls_command, case_type=DeviceTestType.cpp_test_lite,
307                timeout=5, receiver=None)
308            if xml_file in result:
309                return True
310
311            time.sleep(5)
312        return False
313
314
315@Plugin(type=Plugin.DRIVER, id=DeviceTestType.ctest_lite)
316class CTestDriver(IDriver):
317    """
318    CTest is a test that runs a native test package on given lite device.
319    """
320    config = None
321    result = ""
322    error_message = ""
323    version_cmd = "AT+CSV"
324
325    def __init__(self):
326        self.file_name = ""
327
328    def __check_environment__(self, device_options):
329        if len(device_options) != 1 or \
330                device_options[0].label != DeviceLabelType.wifiiot:
331            self.error_message = "check environment failed"
332            return False
333        return True
334
335    def __check_config__(self, config=None):
336        del config
337        self.config = None
338
339    def __execute__(self, request):
340        from xdevice import Variables
341        try:
342            self.config = request.config
343            self.config.device = request.config.environment.devices[0]
344
345            if request.config.resource_path:
346                current_dir = request.config.resource_path
347            else:
348                current_dir = Variables.exec_dir
349
350            config_file = request.root.source.config_file.strip()
351            if config_file:
352                source = os.path.join(current_dir, config_file)
353                self.file_name = os.path.basename(config_file).split(".")[0]
354            else:
355                source = request.root.source.source_string.strip()
356
357            self._run_ctest(source=source, request=request)
358
359        except (LiteDeviceExecuteCommandError, Exception) as exception:
360            LOG.error(exception, error_no=getattr(exception, "error_no",
361                                                  "00000"))
362            self.error_message = exception
363        finally:
364            if request.root.source.test_name.startswith("{"):
365                report_name = "report"
366            else:
367                report_name = get_filename_extension(
368                    request.root.source.test_name)[0]
369
370            self.result = check_result_report(request.config.report_path,
371                                              self.result,
372                                              self.error_message,
373                                              report_name)
374
375    def __result__(self):
376        return self.result if os.path.exists(self.result) else ""
377
378    def _run_ctest(self, source=None, request=None):
379        if not source:
380            LOG.error("Error: %s don't exist." % source, error_no="00101")
381            return
382
383        try:
384            parsers = get_plugin(Plugin.PARSER, ParserType.ctest_lite)
385            version = get_test_component_version(self.config)
386            parser_instances = []
387            for parser in parsers:
388                parser_instance = parser.__class__()
389                parser_instance.suites_name = self.file_name
390                parser_instance.product_info.setdefault("Version", version)
391                parser_instance.listeners = request.listeners
392                parser_instances.append(parser_instance)
393            handler = ShellHandler(parser_instances)
394
395            reset_cmd = self._reset_device(request, source)
396            self.result = "%s.xml" % os.path.join(request.config.report_path,
397                                                  "result", self.file_name)
398
399            self.config.device.device.com_dict.get(
400                ComType.deploy_com).connect()
401
402            result, _, error = self.config.device.device. \
403                execute_command_with_timeout(
404                    command=reset_cmd,
405                    case_type=DeviceTestType.ctest_lite,
406                    key=ComType.deploy_com,
407                    timeout=90,
408                    receiver=handler)
409
410            device_log_file = get_device_log_file(request.config.report_path,
411                                                  request.config.device.
412                                                  __get_serial__())
413
414            device_log_file_open = os.open(device_log_file, os.O_WRONLY |
415                os.O_CREAT | os.O_APPEND, 0o755)
416            with os.fdopen(device_log_file_open, "a") as file_name:
417                file_name.write("{}{}".format(
418                    "\n".join(result.split("\n")[0:-1]), "\n"))
419                file_name.flush()
420        finally:
421            self.config.device.device.com_dict.get(ComType.deploy_com).close()
422
423    def _reset_device(self, request, source):
424        json_config = JsonParser(source)
425        reset_cmd = []
426        kit_instances = get_kit_instances(json_config,
427                                          request.config.resource_path,
428                                          request.config.testcases_path)
429        from xdevice import Binder
430
431        for (kit_instance, kit_info) in zip(kit_instances,
432                                            json_config.get_kits()):
433            if not isinstance(kit_instance, DeployKit):
434                continue
435            if not self.file_name:
436                self.file_name = get_config_value(
437                    'burn_file', kit_info)[0].split("\\")[-1].split(".")[0]
438            reset_cmd = kit_instance.burn_command
439            if not Binder.is_executing():
440                raise ExecuteTerminate("ExecuteTerminate", error_no="00300")
441
442            kit_instance.__setup__(self.config.device,
443                source_file=request.root.source.source_file.strip())
444
445        reset_cmd = [int(item, 16) for item in reset_cmd]
446        return reset_cmd
447
448
449@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test_lite)
450class JSUnitTestLiteDriver(IDriver):
451    """
452    JSUnitTestDriver is a Test that runs a native test package on given device.
453    """
454
455    def __init__(self):
456        self.result = ""
457        self.error_message = ""
458        self.kits = []
459        self.config = None
460
461    def __check_environment__(self, device_options):
462        pass
463
464    def __check_config__(self, config):
465        pass
466
467    def __execute__(self, request):
468        try:
469            LOG.debug("Start execute xdevice extension JSUnit Test")
470
471            self.config = request.config
472            self.config.device = request.config.environment.devices[0]
473
474            config_file = request.root.source.config_file
475            suite_file = request.root.source.source_file
476
477            if not suite_file:
478                raise ParamError(
479                    "test source '%s' not exists" %
480                    request.root.source.source_string, error_no="00101")
481
482            if not os.path.exists(config_file):
483                LOG.error("Error: Test cases don't exist %s." % config_file,
484                          error_no="00101")
485                raise ParamError(
486                    "Error: Test cases don't exist %s." % config_file,
487                    error_no="00101")
488
489            self.file_name = os.path.basename(
490                request.root.source.source_file.strip()).split(".")[0]
491
492            self.result = "%s.xml" % os.path.join(
493                request.config.report_path, "result", self.file_name)
494
495            json_config = JsonParser(config_file)
496            self.kits = get_kit_instances(json_config,
497                                          self.config.resource_path,
498                                          self.config.testcases_path)
499
500            self._get_driver_config(json_config)
501
502            from xdevice import Binder
503            for kit in self.kits:
504                if not Binder.is_executing():
505                    raise ExecuteTerminate("ExecuteTerminate",
506                                           error_no="00300")
507                if kit.__class__.__name__ == CKit.liteinstall:
508                    kit.bundle_name = self.config.bundle_name
509                kit.__setup__(self.config.device, request=request)
510
511            self._run_jsunit(request)
512
513        except Exception as exception:
514            self.error_message = exception
515        finally:
516            report_name = "report" if request.root.source. \
517                test_name.startswith("{") else get_filename_extension(
518                request.root.source.test_name)[0]
519
520            self.result = check_result_report(
521                request.config.report_path, self.result, self.error_message,
522                report_name)
523
524            for kit in self.kits:
525                kit.__teardown__(self.config.device)
526
527            self.config.device.close()
528
529    def __result__(self):
530        return self.result if os.path.exists(self.result) else ""
531
532    def _get_driver_config(self, json_config):
533        bundle_name = get_config_value('bundle-name',
534                                       json_config.get_driver(), False)
535        if not bundle_name:
536            raise ParamError("Can't find bundle-name in config file.",
537                             error_no="00108")
538        else:
539            self.config.bundle_name = bundle_name
540
541        ability = get_config_value('ability',
542                                   json_config.get_driver(), False)
543        if not ability:
544            self.config.ability = "default"
545        else:
546            self.config.ability = ability
547
548    def _run_jsunit(self, request):
549        parser_instances = []
550        parsers = get_plugin(Plugin.PARSER, ParserType.jsuit_test_lite)
551        for parser in parsers:
552            parser_instance = parser.__class__()
553            parser_instance.suites_name = self.file_name
554            parser_instance.listeners = request.listeners
555            parser_instances.append(parser_instance)
556        handler = ShellHandler(parser_instances)
557
558        command = "./bin/aa start -p %s -n %s" % \
559                  (self.config.bundle_name, self.config.ability)
560        result, _, error = self.config.device.execute_command_with_timeout(
561            command=command,
562            timeout=300,
563            receiver=handler)
564
565        device_log_file = get_device_log_file(request.config.report_path,
566            request.config.device.
567            __get_serial__())
568
569        device_log_file_open = os.open(device_log_file, os.O_WRONLY |
570            os.O_CREAT | os.O_APPEND, 0o755)
571        with os.fdopen(device_log_file_open, "a") as file_name:
572            file_name.write("{}{}".format(
573                "\n".join(result.split("\n")[0:-1]), "\n"))
574            file_name.flush()
575
576