1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import json
20import os
21import re
22import shutil
23import subprocess
24import sys
25import time
26import platform
27import zipfile
28import stat
29import random
30import xml.etree.ElementTree as ET
31from dataclasses import dataclass
32from json import JSONDecodeError
33
34from xdevice import DeviceTestType, check_result_report
35from xdevice import DeviceLabelType
36from xdevice import CommonParserType
37from xdevice import ExecuteTerminate
38from xdevice import DeviceError
39from xdevice import ShellHandler
40
41from xdevice import IDriver
42from xdevice import platform_logger
43from xdevice import Plugin
44from xdevice import get_plugin
45from ohos.environment.dmlib import process_command_ret
46from core.utils import get_decode
47from core.utils import get_fuzzer_path
48from core.config.resource_manager import ResourceManager
49from core.config.config_manager import FuzzerConfigManager
50
51__all__ = [
52    "CppTestDriver",
53    "JSUnitTestDriver",
54    "disable_keyguard",
55    "GTestConst"]
56
57LOG = platform_logger("Drivers")
58DEFAULT_TEST_PATH = "/%s/%s/" % ("data", "test")
59OBJ = "obj"
60_ACE_LOG_MARKER = " a0c0d0"
61TIME_OUT = 900 * 1000
62JS_TIMEOUT = 10
63CYCLE_TIMES = 50
64
65FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL
66MODES = stat.S_IWUSR | stat.S_IRUSR
67
68
69class CollectingOutputReceiver:
70    def __init__(self):
71        self.output = ""
72
73    def __read__(self, output):
74        self.output = "%s%s" % (self.output, output)
75
76    def __error__(self, message):
77        pass
78
79    def __done__(self, result_code="", message=""):
80        pass
81
82
83class DisplayOutputReceiver:
84    def __init__(self):
85        self.output = ""
86        self.unfinished_line = ""
87
88    def __read__(self, output):
89        self.output = "%s%s" % (self.output, output)
90        lines = self._process_output(output)
91        for line in lines:
92            line = line.strip()
93            if line:
94                LOG.info(get_decode(line))
95
96    def __error__(self, message):
97        pass
98
99    def __done__(self, result_code="", message=""):
100        pass
101
102    def _process_output(self, output, end_mark="\n"):
103        content = output
104        if self.unfinished_line:
105            content = "".join((self.unfinished_line, content))
106            self.unfinished_line = ""
107        lines = content.split(end_mark)
108        if content.endswith(end_mark):
109            return lines[:-1]
110        else:
111            self.unfinished_line = lines[-1]
112            return lines[:-1]
113
114
115@dataclass
116class GTestConst(object):
117    exec_para_filter = "--gtest_filter"
118    exec_para_level = "--gtest_testsize"
119    exec_acts_para_filter = "--jstest_filter"
120    exec_acts_para_level = "--jstest_testsize"
121
122
123def get_device_log_file(report_path, serial=None, log_name="device_log"):
124    from xdevice import Variables
125    log_path = os.path.join(report_path, Variables.report_vars.log_dir)
126    os.makedirs(log_path, exist_ok=True)
127
128    serial = serial or time.time_ns()
129    device_file_name = "{}_{}.log".format(log_name, serial)
130    device_log_file = os.path.join(log_path, device_file_name)
131    return device_log_file
132
133
134def get_level_para_string(level_string):
135    level_list = list(set(level_string.split(",")))
136    level_para_string = ""
137    for item in level_list:
138        if not item.isdigit():
139            continue
140        item = item.strip(" ")
141        level_para_string = f"{level_para_string}Level{item},"
142    level_para_string = level_para_string.strip(",")
143    return level_para_string
144
145
146def get_result_savepath(testsuit_path, result_rootpath):
147    findkey = os.sep + "tests" + os.sep
148    filedir, _ = os.path.split(testsuit_path)
149    pos = filedir.find(findkey)
150    if -1 != pos:
151        subpath = filedir[pos + len(findkey):]
152        pos1 = subpath.find(os.sep)
153        if -1 != pos1:
154            subpath = subpath[pos1 + len(os.sep):]
155            result_path = os.path.join(result_rootpath, "result", subpath)
156        else:
157            result_path = os.path.join(result_rootpath, "result")
158    else:
159        result_path = os.path.join(result_rootpath, "result")
160
161    if not os.path.exists(result_path):
162        os.makedirs(result_path)
163
164    LOG.info("result_savepath = " + result_path)
165    return result_path
166
167
168def get_test_log_savepath(result_rootpath, result_suit_path):
169    suit_path = result_suit_path.split("result")[-1].strip(os.sep).split(os.sep)[0]
170    test_log_path = os.path.join(result_rootpath, "log", "test_log", suit_path)
171
172    if not os.path.exists(test_log_path):
173        os.makedirs(test_log_path)
174
175    LOG.info("test_log_savepath = {}".format(test_log_path))
176    return test_log_path
177
178
179def update_xml(suite_file, result_xml):
180    suite_path_txt = suite_file.split(".")[0] + "_path.txt"
181    if os.path.exists(suite_path_txt) and os.path.exists(result_xml):
182        with open(suite_path_txt, "r") as path_text:
183            line = path_text.readline().replace("//", "").strip()
184        tree = ET.parse(result_xml)
185        tree.getroot().attrib["path"] = line
186        tree.getroot().attrib["name"] = os.path.basename(suite_file.split(".")[0])
187
188        test_suit = os.path.basename(result_xml).split(".")[0]
189        log_path = os.path.abspath(result_xml).split("result")[0]
190        crash_path = os.path.join(log_path, "log", test_suit)
191        all_items = os.listdir(crash_path)
192
193        # 筛选以crash开头的目录
194        matching_dirs = [os.path.join(crash_path, item) for item in all_items if
195                         os.path.isdir(os.path.join(crash_path, item))
196                         and item.startswith(f"crash_log_{test_suit}")]
197        if len(matching_dirs) >= 1:
198            tree.getroot().attrib["is_crash"] = "1"
199        tree.write(result_xml)
200
201
202# all testsuit common Unavailable test result xml
203def _create_empty_result_file(filepath, filename, error_message):
204    error_message = str(error_message)
205    error_message = error_message.replace("\"", "")
206    error_message = error_message.replace("<", "")
207    error_message = error_message.replace(">", "")
208    error_message = error_message.replace("&", "")
209    if filename.endswith(".hap"):
210        filename = filename.split(".")[0]
211    if not os.path.exists(filepath):
212        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
213            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
214                                       time.localtime())
215            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
216            file_desc.write(
217                '<testsuites tests="0" failures="0" '
218                'disabled="0" errors="0" timestamp="%s" '
219                'time="0" name="%s" unavailable="1">\n' % (time_stamp, filename))
220            file_desc.write(
221                '  <testsuite name="%s" tests="0" failures="0" '
222                'disabled="0" errors="0" time="0.0" '
223                'unavailable="1" message="%s">\n' %
224                (filename, error_message))
225            file_desc.write('  </testsuite>\n')
226            file_desc.write('</testsuites>\n')
227    return
228
229
230def _unlock_screen(device):
231    device.execute_shell_command("svc power stayon true")
232    time.sleep(1)
233
234
235def _unlock_device(device):
236    device.execute_shell_command("input keyevent 82")
237    time.sleep(1)
238    device.execute_shell_command("wm dismiss-keyguard")
239    time.sleep(1)
240
241
242def _lock_screen(device):
243    device.execute_shell_command("svc power stayon false")
244    time.sleep(1)
245
246
247def disable_keyguard(device):
248    _unlock_screen(device)
249    _unlock_device(device)
250
251
252def _sleep_according_to_result(result):
253    if result:
254        time.sleep(1)
255
256
257def _create_fuzz_crash_file(filepath, filename):
258    if not os.path.exists(filepath):
259        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
260            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
261                                       time.localtime())
262            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
263            file_desc.write(
264                '<testsuites disabled="0" name="%s" '
265                'time="300" timestamp="%s" errors="0" '
266                'failures="1" tests="1">\n' % (filename, time_stamp))
267            file_desc.write(
268                '  <testsuite disabled="0" name="%s" time="300" '
269                'errors="0" failures="1" tests="1">\n' % filename)
270            file_desc.write(
271                '    <testcase name="%s" time="300" classname="%s" '
272                'status="run">\n' % (filename, filename))
273            file_desc.write(
274                '      <failure type="" '
275                'message="Fuzzer crash. See ERROR in log file">\n')
276            file_desc.write('      </failure>\n')
277            file_desc.write('    </testcase>\n')
278            file_desc.write('  </testsuite>\n')
279            file_desc.write('</testsuites>\n')
280    return
281
282
283def _create_fuzz_pass_file(filepath, filename):
284    if not os.path.exists(filepath):
285        with os.fdopen(os.open(filepath, FLAGS, MODES), 'w') as file_desc:
286            time_stamp = time.strftime("%Y-%m-%d %H:%M:%S",
287                                       time.localtime())
288            file_desc.write('<?xml version="1.0" encoding="UTF-8"?>\n')
289            file_desc.write(
290                '<testsuites disabled="0" name="%s" '
291                'time="300" timestamp="%s" errors="0" '
292                'failures="0" tests="1">\n' % (filename, time_stamp))
293            file_desc.write(
294                '  <testsuite disabled="0" name="%s" time="300" '
295                'errors="0" failures="0" tests="1">\n' % filename)
296            file_desc.write(
297                '    <testcase name="%s" time="300" classname="%s" '
298                'status="run"/>\n' % (filename, filename))
299            file_desc.write('  </testsuite>\n')
300            file_desc.write('</testsuites>\n')
301    return
302
303
304def _create_fuzz_result_file(filepath, filename, error_message):
305    error_message = str(error_message)
306    error_message = error_message.replace("\"", "")
307    error_message = error_message.replace("<", "")
308    error_message = error_message.replace(">", "")
309    error_message = error_message.replace("&", "")
310    if "AddressSanitizer" in error_message:
311        LOG.error("FUZZ TEST CRASH")
312        _create_fuzz_crash_file(filepath, filename)
313    elif re.search(r'Done (\b\d+\b) runs in (\b\d+\b) second',
314                   error_message, re.M) is not None:
315        LOG.info("FUZZ TEST PASS")
316        _create_fuzz_pass_file(filepath, filename)
317    else:
318        LOG.error("FUZZ TEST UNAVAILABLE")
319        _create_empty_result_file(filepath, filename, error_message)
320    return
321
322
323class ResultManager(object):
324    def __init__(self, testsuit_path, config):
325        self.testsuite_path = testsuit_path
326        self.config = config
327        self.result_rootpath = self.config.report_path
328        self.device = self.config.device
329        if testsuit_path.endswith(".hap"):
330            self.device_testpath = self.config.test_hap_out_path
331        else:
332            self.device_testpath = self.config.target_test_path
333        self.testsuite_name = os.path.basename(self.testsuite_path)
334        self.is_coverage = False
335
336    def set_is_coverage(self, is_coverage):
337        self.is_coverage = is_coverage
338
339    def get_test_results(self, error_message=""):
340        # Get test result files
341        filepath, _ = self.obtain_test_result_file()
342        if "fuzztest" == self.config.testtype[0]:
343            LOG.info("create fuzz test report")
344            _create_fuzz_result_file(filepath, self.testsuite_name,
345                                     error_message)
346            if not self.is_coverage:
347                self._obtain_fuzz_corpus()
348
349        if not os.path.exists(filepath):
350            _create_empty_result_file(filepath, self.testsuite_name,
351                                      error_message)
352        if "benchmark" == self.config.testtype[0]:
353            self._obtain_benchmark_result()
354        # Get coverage data files
355        if self.is_coverage:
356            self.obtain_coverage_data()
357
358        return filepath
359
360    def get_test_results_hidelog(self, error_message=""):
361        # Get test result files
362        result_file_path, test_log_path = self.obtain_test_result_file()
363        log_content = ""
364        if not error_message:
365            if os.path.exists(test_log_path):
366                with open(test_log_path, "r") as log:
367                    log_content = log.readlines()
368            else:
369                LOG.error("{}: Test log not exist.".format(test_log_path))
370        else:
371            log_content = error_message
372
373        if "fuzztest" == self.config.testtype[0]:
374            LOG.info("create fuzz test report")
375            _create_fuzz_result_file(result_file_path, self.testsuite_name,
376                                     log_content)
377            if not self.is_coverage:
378                self._obtain_fuzz_corpus()
379
380        if not os.path.exists(result_file_path):
381            _create_empty_result_file(result_file_path, self.testsuite_name,
382                                      log_content)
383        if "benchmark" == self.config.testtype[0]:
384            self._obtain_benchmark_result()
385        # Get coverage data files
386        if self.is_coverage:
387            self.obtain_coverage_data()
388
389        return result_file_path
390
391    def get_result_sub_save_path(self):
392        find_key = os.sep + "benchmark" + os.sep
393        file_dir, _ = os.path.split(self.testsuite_path)
394        pos = file_dir.find(find_key)
395        subpath = ""
396        if -1 != pos:
397            subpath = file_dir[pos + len(find_key):]
398        LOG.info("subpath = " + subpath)
399        return subpath
400
401    def obtain_test_result_file(self):
402        result_save_path = get_result_savepath(self.testsuite_path,
403                                               self.result_rootpath)
404
405        result_file_path = os.path.join(result_save_path,
406                                        "%s.xml" % self.testsuite_name)
407
408        result_josn_file_path = os.path.join(result_save_path,
409                                             "%s.json" % self.testsuite_name)
410
411        if self.testsuite_path.endswith('.hap'):
412            remote_result_file = os.path.join(self.device_testpath,
413                                              "testcase_result.xml")
414            remote_json_result_file = os.path.join(self.device_testpath,
415                                                   "%s.json" % self.testsuite_name)
416        else:
417            remote_result_file = os.path.join(self.device_testpath,
418                                              "%s.xml" % self.testsuite_name)
419            remote_json_result_file = os.path.join(self.device_testpath,
420                                                   "%s.json" % self.testsuite_name)
421
422        if self.config.testtype[0] != "fuzztest":
423            if self.device.is_file_exist(remote_result_file):
424                self.device.pull_file(remote_result_file, result_file_path)
425            elif self.device.is_file_exist(remote_json_result_file):
426                self.device.pull_file(remote_json_result_file,
427                                      result_josn_file_path)
428                result_file_path = result_josn_file_path
429            else:
430                LOG.info("%s not exist", remote_result_file)
431
432        if self.config.hidelog:
433            remote_log_result_file = os.path.join(self.device_testpath,
434                                                  "%s.log" % self.testsuite_name)
435            test_log_save_path = get_test_log_savepath(self.result_rootpath, result_save_path)
436            test_log_file_path = os.path.join(test_log_save_path,
437                                              "%s.log" % self.testsuite_name)
438            self.device.pull_file(remote_log_result_file, test_log_file_path)
439            return result_file_path, test_log_file_path
440
441        return result_file_path, ""
442
443    def make_empty_result_file(self, error_message=""):
444        result_savepath = get_result_savepath(self.testsuite_path,
445                                              self.result_rootpath)
446        result_filepath = os.path.join(result_savepath, "%s.xml" %
447                                       self.testsuite_name)
448        if not os.path.exists(result_filepath):
449            _create_empty_result_file(result_filepath,
450                                      self.testsuite_name, error_message)
451
452    def is_exist_target_in_device(self, path, target):
453        if platform.system() == "Windows":
454            command = '\"ls -l %s | grep %s\"' % (path, target)
455        else:
456            command = "ls -l %s | grep %s" % (path, target)
457
458        check_result = False
459        stdout_info = self.device.execute_shell_command(command)
460        if stdout_info != "" and stdout_info.find(target) != -1:
461            check_result = True
462        return check_result
463
464    def obtain_coverage_data(self):
465        cov_root_dir = os.path.abspath(os.path.join(
466            self.result_rootpath,
467            "..",
468            "coverage",
469            "data",
470            "exec"))
471
472
473        tests_path = self.config.testcases_path
474        test_type = self.testsuite_path.split(tests_path)[1].strip(os.sep).split(os.sep)[0]
475        cxx_cov_path = os.path.abspath(os.path.join(
476            self.result_rootpath,
477            "..",
478            "coverage",
479            "data",
480            "cxx",
481            self.testsuite_name + '_' + test_type))
482
483        if os.path.basename(self.testsuite_name).startswith("rust_"):
484            target_name = "lib.unstripped"
485        else:
486            target_name = OBJ
487        if self.is_exist_target_in_device(DEFAULT_TEST_PATH, target_name):
488            if not os.path.exists(cxx_cov_path):
489                os.makedirs(cxx_cov_path)
490            else:
491                cxx_cov_path = cxx_cov_path + f"_{str(int(time.time()))}"
492            self.config.device.execute_shell_command(
493                "cd %s; tar -czf %s.tar.gz %s" % (DEFAULT_TEST_PATH, target_name, target_name))
494            src_file_tar = os.path.join(DEFAULT_TEST_PATH, "%s.tar.gz" % target_name)
495            self.device.pull_file(src_file_tar, cxx_cov_path, is_create=True, timeout=TIME_OUT)
496            tar_path = os.path.join(cxx_cov_path, "%s.tar.gz" % target_name)
497            if platform.system() == "Windows":
498                process = subprocess.Popen("tar -zxf %s -C %s" % (tar_path, cxx_cov_path), shell=True)
499                process.communicate()
500                os.remove(tar_path)
501                os.rename(os.path.join(cxx_cov_path, target_name), os.path.join(cxx_cov_path, OBJ))
502            else:
503                subprocess.Popen("tar -zxf %s -C %s > /dev/null 2>&1" %
504                                 (tar_path, cxx_cov_path), shell=True).communicate()
505                subprocess.Popen("rm -rf %s" % tar_path, shell=True).communicate()
506                if target_name != OBJ:
507                    subprocess.Popen("mv %s %s" % (os.path.join(cxx_cov_path, target_name),
508                                                   os.path.join(cxx_cov_path, OBJ)), shell=True).communicate()
509
510    def _obtain_fuzz_corpus(self):
511        command = f"cd {DEFAULT_TEST_PATH}; tar czf {self.testsuite_name}_corpus.tar.gz corpus;"
512        self.config.device.execute_shell_command(command)
513        result_save_path = get_result_savepath(self.testsuite_path, self.result_rootpath)
514        LOG.info(f"fuzz_dir = {result_save_path}")
515        self.device.pull_file(f"{DEFAULT_TEST_PATH}/{self.testsuite_name}_corpus.tar.gz", result_save_path)
516
517    def _obtain_benchmark_result(self):
518        benchmark_root_dir = os.path.abspath(
519            os.path.join(self.result_rootpath, "benchmark"))
520        benchmark_dir = os.path.abspath(
521            os.path.join(benchmark_root_dir,
522                         self.get_result_sub_save_path(),
523                         self.testsuite_name))
524
525        if not os.path.exists(benchmark_dir):
526            os.makedirs(benchmark_dir)
527
528        LOG.info("benchmark_dir = %s" % benchmark_dir)
529        self.device.pull_file(os.path.join(self.device_testpath,
530                                           "%s.json" % self.testsuite_name), benchmark_dir)
531        if not os.path.exists(os.path.join(benchmark_dir,
532                                           "%s.json" % self.testsuite_name)):
533            os.rmdir(benchmark_dir)
534        return benchmark_dir
535
536
537@Plugin(type=Plugin.DRIVER, id=DeviceTestType.cpp_test)
538class CppTestDriver(IDriver):
539    """
540    CppTest is a Test that runs a native test package on given device.
541    """
542    # test driver config
543    config = None
544    result = ""
545
546    def __check_environment__(self, device_options):
547        if len(device_options) == 1 and device_options[0].label is None:
548            return True
549        if len(device_options) != 1 or \
550                device_options[0].label != DeviceLabelType.phone:
551            return False
552        return True
553
554    def __check_config__(self, config):
555        pass
556
557    def __result__(self):
558        return self.result if os.path.exists(self.result) else ""
559
560    def __execute__(self, request):
561        try:
562            self.config = request.config
563            self.config.target_test_path = DEFAULT_TEST_PATH
564            self.config.device = request.config.environment.devices[0]
565
566            suite_file = request.root.source.source_file
567            LOG.debug("Testsuite FilePath: %s" % suite_file)
568
569            if not suite_file:
570                LOG.error("test source '%s' not exists" %
571                          request.root.source.source_string)
572                return
573
574            if not self.config.device:
575                result = ResultManager(suite_file, self.config)
576                result.set_is_coverage(False)
577                result.make_empty_result_file(
578                    "No test device is found. ")
579                return
580            self.config.device.set_device_report_path(request.config.report_path)
581            self.config.device.device_log_collector.start_hilog_task()
582            self._init_gtest()
583            self._run_gtest(suite_file)
584
585        finally:
586            log_path = get_result_savepath(request.root.source.source_file, request.config.report_path)
587            suit_name = os.path.basename(request.root.source.source_file)
588            xml_path = os.path.join(log_path, f"{suit_name}.xml")
589            if not os.path.exists(xml_path):
590                _create_empty_result_file(xml_path, suit_name, "ERROR")
591            serial = "{}_{}".format(str(request.config.device.__get_serial__()), time.time_ns())
592            log_tar_file_name = "{}_{}".format(request.get_module_name(), str(serial).replace(
593                ":", "_"))
594            self.config.device.device_log_collector.stop_hilog_task(
595                log_tar_file_name, module_name=request.get_module_name())
596            update_xml(request.root.source.source_file, xml_path)
597
598    @staticmethod
599    def _alter_init(name):
600        with open(name, "rb") as f:
601            lines = f.read()
602        str_content = lines.decode("utf-8")
603
604        pattern_sharp = '^\s*#.*$(\n|\r\n)'
605        pattern_star = '/\*.*?\*/(\n|\r\n)+'
606        pattern_xml = '<!--[\s\S]*?-->(\n|\r\n)+'
607
608        if re.match(pattern_sharp, str_content, flags=re.M):
609            striped_content = re.sub(pattern_sharp, '', str_content, flags=re.M)
610        elif re.findall(pattern_star, str_content, flags=re.S):
611            striped_content = re.sub(pattern_star, '', str_content, flags=re.S)
612        elif re.findall(pattern_xml, str_content, flags=re.S):
613            striped_content = re.sub(pattern_xml, '', str_content, flags=re.S)
614        else:
615            striped_content = str_content
616
617        striped_bt = striped_content.encode("utf-8")
618        if os.path.exists(name):
619            os.remove(name)
620        with os.fdopen(os.open(name, FLAGS, MODES), 'wb') as f:
621            f.write(striped_bt)
622
623    def _init_gtest(self):
624        self.config.device.connector_command("target mount")
625        self.config.device.execute_shell_command(
626            "rm -rf %s" % self.config.target_test_path)
627        self.config.device.execute_shell_command(
628            "mkdir -p %s" % self.config.target_test_path)
629        self.config.device.execute_shell_command(
630            "mount -o rw,remount,rw /")
631        if "fuzztest" == self.config.testtype[0]:
632            self.config.device.execute_shell_command(
633                "mkdir -p %s" % os.path.join(self.config.target_test_path,
634                                             "corpus"))
635
636    def _gtest_command(self, suite_file):
637        filename = os.path.basename(suite_file)
638        test_para = self._get_test_para(self.config.testcase,
639                                        self.config.testlevel,
640                                        self.config.testtype,
641                                        self.config.target_test_path,
642                                        suite_file,
643                                        filename)
644
645        # execute testcase
646        if not self.config.coverage:
647            if self.config.random == "random":
648                seed = random.randint(1, 100)
649                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s --gtest_shuffle --gtest_random_seed=%d" % (
650                    self.config.target_test_path,
651                    filename,
652                    filename,
653                    test_para,
654                    seed)
655            else:
656                command = "cd %s; rm -rf %s.xml; chmod +x *; ./%s %s" % (
657                    self.config.target_test_path,
658                    filename,
659                    filename,
660                    test_para)
661        else:
662            coverage_outpath = self.config.coverage_outpath
663            if coverage_outpath:
664                strip_num = len(coverage_outpath.strip("/").split("/"))
665            else:
666                ohos_config_path = os.path.join(sys.source_code_root_path, "out", "ohos_config.json")
667                with open(ohos_config_path, 'r') as json_file:
668                    json_info = json.load(json_file)
669                    out_path = json_info.get("out_path")
670                strip_num = len(out_path.strip("/").split("/"))
671            if "fuzztest" == self.config.testtype[0]:
672                self._push_corpus_cov_if_exist(suite_file)
673                command = f"cd {self.config.target_test_path}; tar zxf {filename}_corpus.tar.gz; \
674                                        rm -rf {filename}.xml; chmod +x *; GCOV_PREFIX={DEFAULT_TEST_PATH}; \
675                                        GCOV_PREFIX_STRIP={strip_num} ./{filename} {test_para}"
676            else:
677                command = "cd %s; rm -rf %s.xml; chmod +x *; GCOV_PREFIX=%s " \
678                          "GCOV_PREFIX_STRIP=%s ./%s %s" % \
679                          (self.config.target_test_path,
680                           filename,
681                           DEFAULT_TEST_PATH,
682                           str(strip_num),
683                           filename,
684                           test_para)
685
686        if self.config.hidelog:
687            command += " > {}.log 2>&1".format(filename)
688
689        return command
690
691    def _run_gtest(self, suite_file):
692        from xdevice import Variables
693        is_coverage_test = True if self.config.coverage else False
694
695        # push testsuite file
696        self.config.device.push_file(suite_file, self.config.target_test_path)
697        self.config.device.execute_shell_command(
698            "hilog -d %s" % (os.path.join(self.config.target_test_path,
699                                          os.path.basename(suite_file)))
700        )
701        self._push_corpus_if_exist(suite_file)
702
703        # push resource files
704        resource_manager = ResourceManager()
705        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
706        resource_manager.process_preparer_data(resource_data_dic, resource_dir,
707                                               self.config.device)
708
709        command = self._gtest_command(suite_file)
710
711        result = ResultManager(suite_file, self.config)
712        result.set_is_coverage(is_coverage_test)
713
714        try:
715            # get result
716            if self.config.hidelog:
717                return_message = ""
718                display_receiver = CollectingOutputReceiver()
719                self.config.device.execute_shell_command(
720                    command,
721                    receiver=display_receiver,
722                    timeout=TIME_OUT,
723                    retry=0)
724            else:
725                display_receiver = DisplayOutputReceiver()
726                self.config.device.execute_shell_command(
727                    command,
728                    receiver=display_receiver,
729                    timeout=TIME_OUT,
730                    retry=0)
731                return_message = display_receiver.output
732        except (ExecuteTerminate, DeviceError) as exception:
733            return_message = str(exception.args)
734
735        if self.config.hidelog:
736            self.result = result.get_test_results_hidelog(return_message)
737        else:
738            self.result = result.get_test_results(return_message)
739
740        resource_manager.process_cleaner_data(resource_data_dic,
741                                              resource_dir,
742                                              self.config.device)
743
744    def _push_corpus_cov_if_exist(self, suite_file):
745        corpus_path = suite_file.split("fuzztest")[-1].strip(os.sep)
746        cov_file = os.path.join(
747            sys.framework_root_dir, "reports", "latest_corpus", corpus_path + "_corpus.tar.gz")
748        LOG.info("corpus_cov file :%s" % str(cov_file))
749        self.config.device.push_file(cov_file, os.path.join(self.config.target_test_path))
750
751    def _push_corpus_if_exist(self, suite_file):
752        if "fuzztest" == self.config.testtype[0]:
753            corpus_path = os.path.join(get_fuzzer_path(suite_file), "corpus")
754            if not os.path.isdir(corpus_path):
755                return
756
757            corpus_dirs = []
758            corpus_file_list = []
759
760            for root, _, files in os.walk(corpus_path):
761                if not files:
762                    continue
763
764                corpus_dir = root.split("corpus")[-1]
765                if corpus_dir != "":
766                    corpus_dirs.append(corpus_dir)
767
768                for file in files:
769                    cp_file = os.path.normcase(os.path.join(root, file))
770                    corpus_file_list.append(cp_file)
771                    if file == "init":
772                        self._alter_init(cp_file)
773
774            # mkdir corpus files dir
775            if corpus_dirs:
776                for corpus in corpus_dirs:
777                    mkdir_corpus_command = f"shell; mkdir -p {corpus}"
778                    self.config.device.connector_command(mkdir_corpus_command)
779
780            # push corpus file
781            if corpus_file_list:
782                for corpus_file in corpus_file_list:
783                    self.config.device.push_file(corpus_file,
784                                                 os.path.join(self.config.target_test_path, "corpus"))
785
786    def _get_test_para(self,
787                       testcase,
788                       testlevel,
789                       testtype,
790                       target_test_path,
791                       suite_file,
792                       filename):
793        if "benchmark" == testtype[0]:
794            test_para = (" --benchmark_out_format=json"
795                         " --benchmark_out=%s%s.json") % (
796                            target_test_path, filename)
797            return test_para
798
799        if "" != testcase and "" == testlevel:
800            test_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
801        elif "" == testcase and "" != testlevel:
802            level_para = get_level_para_string(testlevel)
803            test_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
804        else:
805            test_para = ""
806
807        if "fuzztest" == testtype[0]:
808            cfg_list = FuzzerConfigManager(os.path.join(get_fuzzer_path(
809                suite_file), "project.xml")).get_fuzzer_config("fuzztest")
810            LOG.info("config list :%s" % str(cfg_list))
811            if self.config.coverage:
812                test_para += "corpus -runs=0" + \
813                             " -max_len=" + cfg_list[0] + \
814                             " -max_total_time=" + cfg_list[1] + \
815                             " -rss_limit_mb=" + cfg_list[2]
816            else:
817                test_para += "corpus -max_len=" + cfg_list[0] + \
818                             " -max_total_time=" + cfg_list[1] + \
819                             " -rss_limit_mb=" + cfg_list[2]
820
821        return test_para
822
823
824@Plugin(type=Plugin.DRIVER, id=DeviceTestType.jsunit_test)
825class JSUnitTestDriver(IDriver):
826    """
827    JSUnitTestDriver is a Test that runs a native test package on given device.
828    """
829
830    def __init__(self):
831        self.config = None
832        self.result = ""
833        self.start_time = None
834        self.ability_name = ""
835        self.package_name = ""
836        # log
837        self.hilog = None
838        self.hilog_proc = None
839
840    def __check_environment__(self, device_options):
841        pass
842
843    def __check_config__(self, config):
844        pass
845
846    def __result__(self):
847        return self.result if os.path.exists(self.result) else ""
848
849    def __execute__(self, request):
850        try:
851            LOG.info("developer_test driver")
852            self.config = request.config
853            self.config.target_test_path = DEFAULT_TEST_PATH
854            self.config.device = request.config.environment.devices[0]
855
856            suite_file = request.root.source.source_file
857            result_save_path = get_result_savepath(suite_file, self.config.report_path)
858            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
859            if not suite_file:
860                LOG.error("test source '%s' not exists" %
861                          request.root.source.source_string)
862                return
863
864            if not self.config.device:
865                result = ResultManager(suite_file, self.config)
866                result.set_is_coverage(False)
867                result.make_empty_result_file(
868                    "No test device is found")
869                return
870
871            package_name, ability_name = self._get_package_and_ability_name(
872                suite_file)
873            self.package_name = package_name
874            self.ability_name = ability_name
875            self.config.test_hap_out_path = \
876                "/data/data/%s/files/" % self.package_name
877            self.config.device.connector_command("shell hilog -r")
878
879            self.hilog = get_device_log_file(
880                request.config.report_path,
881                request.config.device.__get_serial__() + "_" + request.
882                get_module_name(),
883                "device_hilog")
884
885            hilog_open = os.open(self.hilog, os.O_WRONLY | os.O_CREAT | os.O_APPEND,
886                                 0o755)
887
888            with os.fdopen(hilog_open, "a") as hilog_file_pipe:
889                self.config.device.device_log_collector.add_log_address(None, self.hilog)
890                _, self.hilog_proc = self.config.device.device_log_collector.\
891                    start_catch_device_log(hilog_file_pipe=hilog_file_pipe)
892                self._init_jsunit_test()
893                self._run_jsunit(suite_file, self.hilog)
894                hilog_file_pipe.flush()
895                self.generate_console_output(self.hilog, request)
896                xml_path = os.path.join(
897                    request.config.report_path, "result",
898                    '.'.join((request.get_module_name(), "xml")))
899                shutil.move(xml_path, self.result)
900        finally:
901            self.config.device.device_log_collector.remove_log_address(None, self.hilog)
902            self.config.device.device_log_collector.stop_catch_device_log(self.hilog_proc)
903            update_xml(request.root.source.source_file, self.result)
904
905    @staticmethod
906    def _get_acts_test_para(testcase,
907                            testlevel,
908                            testtype,
909                            target_test_path,
910                            suite_file,
911                            filename):
912        if "actstest" == testtype[0]:
913            test_para = (" --actstest_out_format=json"
914                         " --actstest_out=%s%s.json") % (
915                            target_test_path, filename)
916            return test_para
917
918        if "" != testcase and "" == testlevel:
919            test_para = "%s=%s" % (GTestConst.exec_acts_para_filter, testcase)
920        elif "" == testcase and "" != testlevel:
921            level_para = get_level_para_string(testlevel)
922            test_para = "%s=%s" % (GTestConst.exec_acts_para_level, level_para)
923        else:
924            test_para = ""
925        return test_para
926
927    @staticmethod
928    def _get_hats_test_para(testcase,
929                            testlevel,
930                            testtype,
931                            target_test_path,
932                            suite_file,
933                            filename):
934        if "hatstest" == testtype[0]:
935            test_hats_para = (" --hatstest_out_format=json"
936                         " --hatstest_out=%s%s.json") % (
937                            target_test_path, filename)
938            return test_hats_para
939
940        if "" != testcase and "" == testlevel:
941            test_hats_para = "%s=%s" % (GTestConst.exec_para_filter, testcase)
942        elif "" == testcase and "" != testlevel:
943            level_para = get_level_para_string(testlevel)
944            test_hats_para = "%s=%s" % (GTestConst.exec_para_level, level_para)
945        else:
946            test_hats_para = ""
947        return test_hats_para
948
949    @classmethod
950    def _get_json_shell_timeout(cls, json_filepath):
951        test_timeout = 0
952        try:
953            with open(json_filepath, 'r') as json_file:
954                data_dic = json.load(json_file)
955                if not data_dic:
956                    return test_timeout
957                else:
958                    if "driver" in data_dic.keys():
959                        driver_dict = data_dic.get("driver")
960                        if driver_dict and "test-timeout" in driver_dict.keys():
961                            test_timeout = int(driver_dict["shell-timeout"]) / 1000
962                    return test_timeout
963        except JSONDecodeError:
964            return test_timeout
965        finally:
966            print(" get json shell timeout finally")
967
968    @staticmethod
969    def _get_package_and_ability_name(hap_filepath):
970        package_name = ""
971        ability_name = ""
972        if os.path.exists(hap_filepath):
973            filename = os.path.basename(hap_filepath)
974
975            # unzip the hap file
976            hap_bak_path = os.path.abspath(os.path.join(
977                os.path.dirname(hap_filepath),
978                "%s.bak" % filename))
979            zf_desc = zipfile.ZipFile(hap_filepath)
980            try:
981                zf_desc.extractall(path=hap_bak_path)
982            except RuntimeError as error:
983                print("Unzip error: ", hap_bak_path)
984            zf_desc.close()
985
986            # verify config.json file
987            app_profile_path = os.path.join(hap_bak_path, "config.json")
988            if not os.path.exists(app_profile_path):
989                print("file %s not exist" % app_profile_path)
990                return package_name, ability_name
991
992            if os.path.isdir(app_profile_path):
993                print("%s is a folder, and not a file" % app_profile_path)
994                return package_name, ability_name
995
996            # get package_name and ability_name value
997            load_dict = {}
998            with open(app_profile_path, 'r') as load_f:
999                load_dict = json.load(load_f)
1000            profile_list = load_dict.values()
1001            for profile in profile_list:
1002                package_name = profile.get("package")
1003                if not package_name:
1004                    continue
1005                abilities = profile.get("abilities")
1006                for abilitie in abilities:
1007                    abilities_name = abilitie.get("name")
1008                    if abilities_name.startswith("."):
1009                        ability_name = package_name + abilities_name[
1010                                                      abilities_name.find("."):]
1011                    else:
1012                        ability_name = abilities_name
1013                    break
1014                break
1015
1016            # delete hap_bak_path
1017            if os.path.exists(hap_bak_path):
1018                shutil.rmtree(hap_bak_path)
1019        else:
1020            print("file %s not exist" % hap_filepath)
1021        return package_name, ability_name
1022
1023    def generate_console_output(self, device_log_file, request):
1024        result_message = self.read_device_log(device_log_file)
1025
1026        report_name = request.get_module_name()
1027        parsers = get_plugin(
1028            Plugin.PARSER, CommonParserType.jsunit)
1029        if parsers:
1030            parsers = parsers[:1]
1031        for listener in request.listeners:
1032            listener.device_sn = self.config.device.device_sn
1033        parser_instances = []
1034
1035        for parser in parsers:
1036            parser_instance = parser.__class__()
1037            parser_instance.suites_name = report_name
1038            parser_instance.suite_name = report_name
1039            parser_instance.listeners = request.listeners
1040            parser_instances.append(parser_instance)
1041        handler = ShellHandler(parser_instances)
1042        process_command_ret(result_message, handler)
1043
1044    def read_device_log(self, device_log_file):
1045        result_message = ""
1046        with open(device_log_file, "r", encoding='utf-8',
1047                  errors='ignore') as file_read_pipe:
1048            while True:
1049                data = file_read_pipe.readline()
1050                if not data:
1051                    break
1052                # only filter JSApp log
1053                if data.lower().find(_ACE_LOG_MARKER) != -1:
1054                    result_message += data
1055                    if data.find("[end] run suites end") != -1:
1056                        break
1057        return result_message
1058
1059    def start_hap_execute(self):
1060        try:
1061            command = "aa start -d 123 -a %s.MainAbility -b %s" \
1062                      % (self.package_name, self.package_name)
1063            self.start_time = time.time()
1064            result_value = self.config.device.execute_shell_command(
1065                command, timeout=TIME_OUT)
1066
1067            if "success" in str(result_value).lower():
1068                LOG.info("execute %s's testcase success. result value=%s"
1069                         % (self.package_name, result_value))
1070            else:
1071                LOG.info("execute %s's testcase failed. result value=%s"
1072                         % (self.package_name, result_value))
1073
1074            _sleep_according_to_result(result_value)
1075            return_message = result_value
1076        except (ExecuteTerminate, DeviceError) as exception:
1077            return_message = exception.args
1078
1079        return return_message
1080
1081    def _init_jsunit_test(self):
1082        self.config.device.connector_command("target mount")
1083        self.config.device.execute_shell_command(
1084            "rm -rf %s" % self.config.target_test_path)
1085        self.config.device.execute_shell_command(
1086            "mkdir -p %s" % self.config.target_test_path)
1087        self.config.device.execute_shell_command(
1088            "mount -o rw,remount,rw /")
1089
1090    def _run_jsunit(self, suite_file, device_log_file):
1091        filename = os.path.basename(suite_file)
1092        _, suffix_name = os.path.splitext(filename)
1093
1094        resource_manager = ResourceManager()
1095        resource_data_dic, resource_dir = resource_manager.get_resource_data_dic(suite_file)
1096        if suffix_name == ".hap":
1097            json_file_path = suite_file.replace(".hap", ".json")
1098            if os.path.exists(json_file_path):
1099                timeout = self._get_json_shell_timeout(json_file_path)
1100            else:
1101                timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1102        else:
1103            timeout = ResourceManager.get_nodeattrib_data(resource_data_dic)
1104        resource_manager.process_preparer_data(resource_data_dic, resource_dir, self.config.device)
1105        main_result = self._install_hap(suite_file)
1106        result = ResultManager(suite_file, self.config)
1107        if main_result:
1108            self._execute_hapfile_jsunittest()
1109            try:
1110                status = False
1111                actiontime = JS_TIMEOUT
1112                times = CYCLE_TIMES
1113                if timeout:
1114                    actiontime = timeout
1115                    times = 1
1116                with open(device_log_file, "r", encoding='utf-8',
1117                          errors='ignore') as file_read_pipe:
1118                    for i in range(0, times):
1119                        if status:
1120                            break
1121                        else:
1122                            time.sleep(float(actiontime))
1123                        start_time = int(time.time())
1124                        while True:
1125                            data = file_read_pipe.readline()
1126                            if data.lower().find(_ACE_LOG_MARKER) != -1 and data.find("[end] run suites end") != -1:
1127                                LOG.info("execute testcase successfully.")
1128                                status = True
1129                                break
1130                            if int(time.time()) - start_time > 5:
1131                                break
1132            finally:
1133                _lock_screen(self.config.device)
1134                self._uninstall_hap(self.package_name)
1135        else:
1136            self.result = result.get_test_results("Error: install hap failed")
1137            LOG.error("Error: install hap failed")
1138
1139        resource_manager.process_cleaner_data(resource_data_dic, resource_dir, self.config.device)
1140
1141    def _execute_hapfile_jsunittest(self):
1142        _unlock_screen(self.config.device)
1143        _unlock_device(self.config.device)
1144
1145        try:
1146            return_message = self.start_hap_execute()
1147        except (ExecuteTerminate, DeviceError) as exception:
1148            return_message = str(exception.args)
1149
1150        return return_message
1151
1152    def _install_hap(self, suite_file):
1153        message = self.config.device.connector_command("install %s" % suite_file)
1154        message = str(message).rstrip()
1155        if message == "" or "success" in message:
1156            return_code = True
1157            if message != "":
1158                LOG.info(message)
1159        else:
1160            return_code = False
1161            if message != "":
1162                LOG.warning(message)
1163
1164        _sleep_according_to_result(return_code)
1165        return return_code
1166
1167    def _uninstall_hap(self, package_name):
1168        return_message = self.config.device.execute_shell_command(
1169            "bm uninstall -n %s" % package_name)
1170        _sleep_according_to_result(return_message)
1171        return return_message
1172
1173
1174@Plugin(type=Plugin.DRIVER, id=DeviceTestType.oh_rust_test)
1175class OHRustTestDriver(IDriver):
1176    def __init__(self):
1177        self.result = ""
1178        self.error_message = ""
1179        self.config = None
1180
1181    def __check_environment__(self, device_options):
1182        pass
1183
1184    def __check_config__(self, config):
1185        pass
1186
1187    def __execute__(self, request):
1188        try:
1189            LOG.debug("Start to execute open harmony rust test")
1190            self.config = request.config
1191            self.config.device = request.config.environment.devices[0]
1192            self.config.target_test_path = DEFAULT_TEST_PATH
1193            suite_file = request.root.source.source_file
1194            LOG.debug("Testsuite filepath:{}".format(suite_file))
1195
1196            if not suite_file:
1197                LOG.error("test source '{}' not exists".format(
1198                    request.root.source.source_string))
1199                return
1200
1201            result_save_path = get_result_savepath(suite_file, self.config.report_path)
1202            self.result = os.path.join(result_save_path, "%s.xml" % request.get_module_name())
1203            self.config.device.set_device_report_path(request.config.report_path)
1204            self.config.device.device_log_collector.start_hilog_task()
1205            self._init_oh_rust()
1206            self._run_oh_rust(suite_file, request)
1207        except Exception as exception:
1208            self.error_message = exception
1209            if not getattr(exception, "error_no", ""):
1210                setattr(exception, "error_no", "03409")
1211            LOG.exception(self.error_message, exc_info=False, error_no="03409")
1212        finally:
1213            serial = "{}_{}".format(str(request.config.device.__get_serial__()),
1214                                    time.time_ns())
1215            log_tar_file_name = "{}_{}".format(
1216                request.get_module_name(), str(serial).replace(":", "_"))
1217            self.config.device.device_log_collector.stop_hilog_task(
1218                log_tar_file_name, module_name=request.get_module_name())
1219            xml_path = os.path.join(
1220                request.config.report_path, "result",
1221                '.'.join((request.get_module_name(), "xml")))
1222            shutil.move(xml_path, self.result)
1223            self.result = check_result_report(
1224                request.config.report_path, self.result, self.error_message)
1225            update_xml(request.root.source.source_file, self.result)
1226
1227    def __result__(self):
1228        return self.result if os.path.exists(self.result) else ""
1229
1230    def _init_oh_rust(self):
1231        self.config.device.connector_command("target mount")
1232        self.config.device.execute_shell_command(
1233            "rm -rf %s" % self.config.target_test_path)
1234        self.config.device.execute_shell_command(
1235            "mkdir -p %s" % self.config.target_test_path)
1236        self.config.device.execute_shell_command(
1237            "mount -o rw,remount,rw /")
1238
1239    def _run_oh_rust(self, suite_file, request=None):
1240        self.config.device.push_file(suite_file, self.config.target_test_path)
1241        self.config.device.execute_shell_command(
1242            "hilog -d %s" % (os.path.join(self.config.target_test_path,
1243                                          os.path.basename(suite_file)))
1244        )
1245        resource_manager = ResourceManager()
1246        resource_data_dict, resource_dir = \
1247            resource_manager.get_resource_data_dic(suite_file)
1248        resource_manager.process_preparer_data(resource_data_dict,
1249                                               resource_dir,
1250                                               self.config.device)
1251        for listener in request.listeners:
1252            listener.device_sn = self.config.device.device_sn
1253
1254        parsers = get_plugin(Plugin.PARSER, CommonParserType.oh_rust)
1255        if parsers:
1256            parsers = parsers[:1]
1257        parser_instances = []
1258        for parser in parsers:
1259            parser_instance = parser.__class__()
1260            parser_instance.suite_name = request.get_module_name()
1261            parser_instance.listeners = request.listeners
1262            parser_instances.append(parser_instance)
1263        handler = ShellHandler(parser_instances)
1264        if self.config.coverage:
1265            command = "cd {}; chmod +x *; GCOV_PREFIX=. ./{}".format(
1266                self.config.target_test_path, os.path.basename(suite_file))
1267        else:
1268            command = "cd {}; chmod +x *; ./{}".format(
1269                self.config.target_test_path, os.path.basename(suite_file))
1270        self.config.device.execute_shell_command(
1271            command, timeout=TIME_OUT, receiver=handler, retry=0)
1272        if self.config.coverage:
1273            result = ResultManager(suite_file, self.config)
1274            result.obtain_coverage_data()
1275        resource_manager.process_cleaner_data(resource_data_dict, resource_dir,
1276                                              self.config.device)
1277