1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import platform
19import random
20import shutil
21import subprocess
22from pydoc import classname
23import time
24import os
25import sys
26import datetime
27import xml.etree.ElementTree as ElementTree
28
29from core.constants import SchedulerType
30from xdevice import Plugin
31from xdevice import get_plugin
32from xdevice import platform_logger
33from xdevice import DeviceTestType
34from xdevice import Binder
35from core.utils import get_build_output_path
36from core.utils import scan_support_product
37from core.utils import is_lite_product
38from core.common import is_open_source_product
39from core.command.parameter import Parameter
40from core.command.distribute_execute import DbinderTest
41from core.testcase.testcase_manager import TestCaseManager
42from core.config.config_manager import UserConfigManager
43from core.config.parse_parts_config import ParsePartsConfig
44from core.config.resource_manager import ResourceManager
45
46LOG = platform_logger("Run")
47
48
49class Run(object):
50
51    history_cmd_list = []
52
53    @classmethod
54    def get_history(self):
55        return self.history_cmd_list
56
57    @classmethod
58    def get_target_out_path(cls, product_form):
59        target_out_path = UserConfigManager().get_test_cases_dir()
60        if target_out_path == "":
61            target_out_path = os.path.join(
62                get_build_output_path(product_form),
63                "packages",
64                product_form)
65        target_out_path = os.path.abspath(target_out_path)
66        return target_out_path
67
68    @classmethod
69    def _build_test_cases(cls, options):
70        if options.coverage:
71            LOG.info("Coverage testing, no need to compile testcases")
72            return True
73
74        is_build_testcase = UserConfigManager().get_user_config_flag(
75            "build", "testcase")
76        project_root_path = sys.source_code_root_path
77        if is_build_testcase and project_root_path != "":
78            from core.build.build_manager import BuildManager
79            build_manager = BuildManager()
80            return build_manager.build_testcases(project_root_path, options)
81        else:
82            return True
83
84    @classmethod
85    def _check_test_dictionary(cls, test_dictionary):
86        is_valid_status = False
87        key_list = sorted(test_dictionary.keys())
88        for key in key_list:
89            file_list = test_dictionary[key]
90            if len(file_list) > 0:
91                is_valid_status = True
92                break
93        return is_valid_status
94
95    @classmethod
96    def get_tests_out_path(cls, product_form):
97        testcase_path = UserConfigManager().get_test_cases_dir()
98        if testcase_path == "":
99            all_product_list = scan_support_product()
100            if product_form in all_product_list:
101                if is_open_source_product(product_form):
102                    testcase_path = os.path.abspath(os.path.join(
103                        get_build_output_path(product_form),
104                        "tests"))
105                else:
106                    testcase_path = os.path.abspath(os.path.join(
107                        get_build_output_path(product_form),
108                        "tests"))
109            else:
110                testcase_path = os.path.join(
111                    get_build_output_path(product_form), "tests")
112        LOG.info("testcase_path=%s" % testcase_path)
113        return testcase_path
114
115    @classmethod
116    def get_xts_tests_out_path(cls, product_form, testtype):
117        xts_testcase_path = UserConfigManager().get_test_cases_dir()
118        if xts_testcase_path == "":
119            xts_testcase_path = os.path.abspath(os.path.join(
120                get_build_output_path(product_form),
121                "suites",
122                testtype[0],
123                "testcases"))
124        LOG.info("xts_testcase_path=%s" % xts_testcase_path)
125        return xts_testcase_path
126
127    @classmethod
128    def get_external_deps_out_path(cls, product_form):
129        external_deps_path = os.path.abspath(os.path.join(
130            get_build_output_path(product_form),
131            "part_deps_info",
132            "part_deps_info.json"))
133        LOG.info("external_deps_path=%s" % external_deps_path)
134        return external_deps_path
135
136    @classmethod
137    def get_coverage_outpath(cls, options):
138        coverage_out_path = ""
139        if options.coverage:
140            coverage_out_path = get_build_output_path(options.productform)
141            if coverage_out_path == "":
142                coverage_out_path = UserConfigManager().get_user_config(
143                    "coverage").get("outpath", "")
144            if coverage_out_path == "":
145                LOG.error("Coverage test: coverage_outpath is empty.")
146        return coverage_out_path
147
148    @classmethod
149    def get_part_deps_list(cls, productform, testpart):
150        #获取预处理部件间依赖的编译结果路径
151        external_deps_path = cls.get_external_deps_out_path(productform)
152        external_deps_path_list = TestCaseManager().get_part_deps_files(external_deps_path, testpart)
153        return external_deps_path_list
154
155    def process_command_run(self, command, options):
156        current_raw_cmd = ",".join(list(map(str, options.current_raw_cmd.split(" "))))
157        if options.coverage and platform.system() != "Windows":
158            if not options.pullgcda:
159                push_cov_path = os.path.join(sys.framework_root_dir, "local_coverage/push_coverage_so/push_coverage.py")
160                if os.path.exists(push_cov_path):
161                    if str(options.testpart) == "[]" and str(options.subsystem) == "[]":
162                        LOG.info("No subsystem or part input. Not push coverage so.")
163                    elif str(options.testpart) != "[]" and str(options.subsystem) != "[]":
164                        LOG.info("Subsystem or part, there can be only one parameter exist. Not push coverage so.")
165                    else:
166                        if str(options.testpart) != "[]":
167                            param = str(options.testpart)
168                            subprocess.run("python3 {} {} {}".format(
169                                push_cov_path, "testpart", param), shell=True)
170                        else:
171                            param = str(options.subsystem)
172                            subprocess.run("python3 {} {} {}".format(
173                                push_cov_path, "subsystem", param), shell=True)
174                else:
175                    print(f"{push_cov_path} not exists.")
176
177            init_gcov_path = os.path.join(sys.framework_root_dir, "local_coverage/resident_service/init_gcov.py")
178            if os.path.exists(init_gcov_path):
179                subprocess.run("python3 %s command_str=%s" % (
180                    init_gcov_path, current_raw_cmd), shell=True)
181            else:
182                print(f"{init_gcov_path} not exists.")
183
184        para = Parameter()
185        test_type_list = para.get_testtype_list(options.testtype)
186        if len(test_type_list) == 0:
187            LOG.error("The testtype parameter is incorrect.")
188            return
189        options.testtype = test_type_list
190
191        parser = ParsePartsConfig(options.productform)
192        partname_list = parser.get_part_list(
193            options.subsystem,
194            options.testpart)
195        options.partname_list = partname_list
196        options.coverage_outpath = self.get_coverage_outpath(options)
197
198        LOG.info("")
199        LOG.info("------------------------------------")
200        LOG.info("Input parameter:")
201        LOG.info("productform   = %s" % options.productform)
202        LOG.info("testtype      = %s" % str(options.testtype))
203        LOG.info("subsystem     = %s" % str(options.subsystem))
204        LOG.info("testpart      = %s" % str(options.testpart))
205        LOG.info("testmodule    = %s" % options.testmodule)
206        LOG.info("testsuit      = %s" % options.testsuit)
207        LOG.info("testcase      = %s" % options.testcase)
208        LOG.info("testlevel     = %s" % options.testlevel)
209        LOG.info("testargs     = %s" % options.testargs)
210        LOG.info("repeat     = %s" % options.repeat)
211        LOG.info("retry         = %s" % options.retry)
212        LOG.info("historylist   = %s" % options.historylist)
213        LOG.info("runhistory   = %s" % options.runhistory)
214        LOG.info("partname_list = %s" % str(options.partname_list))
215        LOG.info("partdeps = %s" % options.partdeps)
216        LOG.info("------------------------------------")
217        LOG.info("")
218
219        if not para.check_run_parameter(options):
220            LOG.error("Input parameter is incorrect.")
221            return
222
223        current_time = datetime.datetime.now()
224        #记录命令运行历史
225        need_record_history = False
226        cmd_record = {
227            "time" : str(current_time),
228            "raw_cmd" : options.current_raw_cmd,
229            "result" : "unknown",
230            "command": command,
231            "options": options
232        }
233        if not ("-hl" in options.current_raw_cmd or "-rh" in options.current_raw_cmd \
234            or "--retry" in options.current_raw_cmd):
235            need_record_history = True
236
237        #打印历史记录
238        if options.historylist:
239            print("The latest command history is: %d" % len(self.history_cmd_list))
240            for index, cmd_record in enumerate(self.history_cmd_list):
241                print("%d. [%s] - [%s]::[%s]" % (index + 1, cmd_record["time"],
242                      cmd_record["raw_cmd"], cmd_record["result"]))
243            return
244        #重新运行历史里的一条命令
245        if options.runhistory > 0:
246            #如果记录大于10则认为非法
247            if options.runhistory > 10 or options.runhistory > len(self.history_cmd_list):
248                print("input history command[%d] out of range:", options.runhistory)
249                return
250            cmd_record = self.history_cmd_list[options.runhistory - 1]
251            print("run history command:", cmd_record["raw_cmd"])
252            need_record_history = False
253            command = cmd_record["command"]
254            options = cmd_record["options"]
255
256        if options.retry:
257            if len(self.history_cmd_list) <= 0:
258                LOG.info("No history command exsit")
259                return
260            history_cmd = self.history_cmd_list[-1]
261            command = history_cmd["command"]
262            options = history_cmd["options"]
263            from xdevice import Variables
264            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
265            tree = ElementTree.parse(latest_report_path)
266            root = tree.getroot()
267            has_failed_case = 0
268            test_targets = {}
269            fail_list = []
270            for child in root:
271                print(child.tag, ":", child.attrib)
272                for grand in child:
273                    print(grand.tag, ":", grand.attrib)
274                    for sub_child in grand:
275                        if sub_child.tag == 'failure':
276                            fail_case = grand.attrib["classname"] + "#" + grand.attrib["name"]
277                            fail_list.append(fail_case)
278                            has_failed_case += 1
279                            break
280            test_targets["class"] = fail_list
281            setattr(options, "testargs", test_targets)
282            setattr(options, "scheduler", "Scheduler")
283            print("retry option:", options)
284            if has_failed_case > 0:
285                if not self._build_test_cases(options):
286                    LOG.error("Build test cases failed.")
287                    return
288                scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
289                                    plugin_id=SchedulerType.SCHEDULER)[0]
290                scheduler.exec_command(command, options)
291            else:
292                LOG.info("No testcase to retry")
293            return
294
295        if not self._build_test_cases(options):
296            LOG.error("Build test cases failed.")
297            return
298
299        if "partdeps" == options.partdeps:
300            self.get_part_deps_list(options.productform, options.testpart)
301            options.testcases_path = self.get_external_deps_out_path(options.productform)
302            LOG.info("partdeps = %s" % options.partdeps)
303
304        if "acts" in options.testtype or "hats" in options.testtype or "hits" in options.testtype:
305            test_dict = self.get_xts_test_dict(options)
306            options.testcases_path = self.get_xts_tests_out_path(options.productform, options.testtype)
307            options.resource_path = self.get_xts_tests_out_path(options.productform, options.testtype)
308        else:
309            test_dict = self.get_test_dict(options)
310
311        if not self._check_test_dictionary(test_dict):
312            LOG.error("The test file list is empty.")
313            return
314        if options.coverage and platform.system() != "Windows":
315            coverage_path = os.path.join(sys.framework_root_dir, "reports/coverage")
316            if os.path.exists(coverage_path):
317                coverage_process = subprocess.Popen("rm -rf %s" % coverage_path, shell=True)
318                coverage_process.communicate()
319
320        if ("distributedtest" in options.testtype and
321                len(options.testtype) == 1):
322            from core.command.distribute_utils import get_test_case
323            from core.command.distribute_utils \
324                import check_ditributetest_environment
325            from core.command.distribute_utils import make_device_info_file
326            from core.command.distribute_utils import make_reports
327
328            local_time = time.localtime()
329            create_time = time.strftime('%Y-%m-%d-%H-%M-%S', local_time)
330            start_time = time.strftime('%Y-%m-%d %H:%M:%S', local_time)
331
332            if not check_ditributetest_environment():
333                return
334
335            output_test = get_test_case(test_dict.get("CXX", None))
336            if not output_test:
337                return
338
339            result_rootpath = os.path.join(sys.framework_root_dir,
340                "reports",
341                create_time)
342
343            log_path = os.path.join(result_rootpath, "log")
344            tmp_path = os.path.join(result_rootpath, "temp")
345            os.makedirs(log_path, exist_ok=True)
346            os.makedirs(tmp_path, exist_ok=True)
347
348            Binder.get_runtime_log().start_task_log(log_path)
349            make_device_info_file(tmp_path)
350
351            for case in output_test:
352                agent_target_name = case["agent_target_name"]
353                major_target_name = case["major_target_name"]
354                manager = DbinderTest(result_rootpath, case["suits_dir"])
355                manager.setUp()
356                manager.test_distribute(major_target_name, agent_target_name, options)
357                manager.tearDown()
358
359            make_reports(result_rootpath, start_time)
360            Binder.get_runtime_log().stop_task_logcat()
361        else:
362            options.testdict = test_dict
363            options.target_outpath = self.get_target_out_path(
364                options.productform)
365            setattr(options, "scheduler", "Scheduler")
366            scheduler = get_plugin(plugin_type=Plugin.SCHEDULER,
367                                   plugin_id=SchedulerType.SCHEDULER)[0]
368            if scheduler is None:
369                LOG.error("Can not find the scheduler plugin.")
370            else:
371                options.testcases_path = self.get_tests_out_path(options.productform)
372                options.resource_path = os.path.abspath(os.path.join(
373                    sys.framework_root_dir, "..", "resource"))
374                if is_lite_product(options.productform,
375                                   sys.source_code_root_path):
376                    if options.productform.find("wifiiot") != -1:
377                        Binder.get_tdd_config().update_test_type_in_source(
378                            ".bin", DeviceTestType.ctest_lite)
379                        Binder.get_tdd_config().update_ext_type_in_source(
380                            "BIN", DeviceTestType.ctest_lite)
381                    else:
382                        print("productform is not wifiiot")
383                scheduler.exec_command(command, options)
384        if need_record_history:
385            #读文件获取运行结果
386            from xdevice import Variables
387            latest_report_path = os.path.join(Variables.temp_dir, "latest/summary_report.xml")
388            with open(latest_report_path) as report_file:
389                for report_line in report_file:
390                    if "testsuites name=\"summary_report\"" in report_line:
391                        result = report_line.replace("\n", "")
392                        result = result.replace("<testsuites name=\"summary_report\" ", "")
393                        result = result.replace(">", "")
394                        cmd_record["result"] = result
395                        break
396            if len(self.history_cmd_list) >= 10:
397                del self.history_cmd_list[0]
398            self.history_cmd_list.append(cmd_record)
399
400        if "fuzztest" == options.testtype[0] and options.coverage is False:
401            report = get_plugin(plugin_type=Plugin.REPORTER, plugin_id="ALL")[0]
402            latest_corpus_path = os.path.join(sys.framework_root_dir, "reports", "latest_corpus")
403            if os.path.exists(latest_corpus_path):
404                shutil.rmtree(latest_corpus_path)
405            shutil.copytree(os.path.join(report.report_path, "result"), latest_corpus_path)
406
407        if options.coverage and platform.system() != "Windows":
408            pull_service_gcov_path = os.path.join(
409                sys.framework_root_dir, "local_coverage/resident_service/pull_service_gcda.py")
410            if os.path.exists(pull_service_gcov_path):
411                subprocess.run("python3 %s command_str=%s" % (pull_service_gcov_path, current_raw_cmd), shell=True)
412            else:
413                print(f"{pull_service_gcov_path} not exists.")
414
415            if not options.pullgcda:
416                cov_main_file_path = os.path.join(sys.framework_root_dir, "local_coverage/coverage_tools.py")
417                testpart = ",".join(list(map(str, options.partname_list)))
418                if os.path.exists(cov_main_file_path):
419                    subprocess.run("python3 %s testpart=%s" % (
420                        cov_main_file_path, testpart), shell=True)
421                else:
422                    print(f"{cov_main_file_path} not exists.")
423        return
424
425    def get_xts_test_dict(self, options):
426        # 获取XTS测试用例编译结果路径
427        xts_test_case_path = self.get_xts_tests_out_path(options.productform, options.testtype)
428        if not os.path.exists(xts_test_case_path):
429            LOG.error("%s is not exist." % xts_test_case_path)
430            return {}
431        xts_test_dict = TestCaseManager().get_xts_test_files(xts_test_case_path, options)
432        return xts_test_dict
433
434    def get_test_dict(self, options):
435        # 获取测试用例编译结果路径
436        test_case_path = self.get_tests_out_path(options.productform)
437        if not os.path.exists(test_case_path):
438            LOG.error("%s is not exist." % test_case_path)
439            return {}
440
441        test_dict = TestCaseManager().get_test_files(test_case_path, options)
442        return test_dict
443