1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import copy
21import json
22import sys
23from json import JSONDecodeError
24from core.utils import get_build_output_path
25from core.common import is_open_source_product
26
27from core.utils import get_file_list_by_postfix
28from core.config.config_manager import FilterConfigManager
29from xdevice import platform_logger
30from xdevice import DeviceTestType
31from xdevice import Binder
32
33LOG = platform_logger("TestcaseManager")
34
35TESTFILE_TYPE_DATA_DIC = {
36    "DEX": [],
37    "HAP": [],
38    "PYT": [],
39    "CXX": [],
40    "BIN": [],
41    "OHJST": [],
42    "JST": [],
43    "LTPPosix": [],
44    "OHRust": []
45}
46FILTER_SUFFIX_NAME_LIST = [".TOC", ".info", ".pyc"]
47
48
49class TestCaseManager(object):
50    @classmethod
51    def get_valid_suite_file(cls, test_case_out_path, suite_file, options):
52        partlist = options.partname_list
53        testmodule = options.testmodule
54        testsuit = options.testsuit
55
56        if not suite_file.startswith(test_case_out_path):
57            return False
58
59        if testsuit != "":
60            short_name, _ = os.path.splitext(os.path.basename(suite_file))
61            testsuit_list = testsuit.split(',')
62            for test in testsuit_list:
63                if short_name.startswith(test) or \
64                        testsuit.startswith(short_name):
65                    return True
66            return False
67
68        is_valid_status = False
69        suitfile_subpath = suite_file.replace(test_case_out_path, "")
70        suitfile_subpath = suitfile_subpath.strip(os.sep)
71        if len(partlist) == 0:
72            if testmodule != "":
73                temp_list = suitfile_subpath.split(os.sep)
74                if len(temp_list) > 2 and testmodule == temp_list[1]:
75                    is_valid_status = True
76            else:
77                is_valid_status = True
78        else:
79            for partname in partlist:
80                if testmodule != "":
81                    if suitfile_subpath.startswith(
82                            partname + os.sep + testmodule + os.sep):
83                        is_valid_status = True
84                        break
85                else:
86                    if suitfile_subpath.startswith(partname + os.sep):
87                        is_valid_status = True
88                        break
89        return is_valid_status
90
91    @classmethod
92    def check_python_test_file(cls, suite_file):
93        if suite_file.endswith(".py"):
94            filename = os.path.basename(suite_file)
95            if filename.startswith("test_"):
96                return True
97        return False
98
99    @classmethod
100    def check_hap_test_file(cls, hap_file_path):
101        try:
102            if hap_file_path.endswith(".hap"):
103                json_file_path = hap_file_path.replace(".hap", ".json")
104                if os.path.exists(json_file_path):
105                    with open(json_file_path, 'r') as json_file:
106                        data_dic = json.load(json_file)
107                        if not data_dic:
108                            return False
109                        else:
110                            if "kits" in data_dic.keys():
111                                kits_list = data_dic.get("kits")
112                                if len(kits_list) > 0:
113                                    for kits_dict in kits_list:
114                                        if "test-file-name" not in kits_dict.keys():
115                                            continue
116                                        else:
117                                            return True
118                                else:
119                                    return False
120            return False
121        except JSONDecodeError:
122            return False
123        finally:
124            print(" check hap test file finally")
125
126    @classmethod
127    def get_hap_test_driver(cls, hap_file_path):
128        data_dic = cls.get_hap_json(hap_file_path)
129        if not data_dic:
130            return ""
131        else:
132            if "driver" in data_dic.keys():
133                driver_dict = data_dic.get("driver")
134                if bool(driver_dict):
135                    driver_type = driver_dict.get("type")
136                    return driver_type
137                else:
138                    LOG.error("%s has not set driver." % hap_file_path)
139                    return ""
140            else:
141                return ""
142
143    @classmethod
144    def get_hap_json(cls, hap_file_path):
145        if hap_file_path.endswith(".hap"):
146            json_file_path = hap_file_path.replace(".hap", ".json")
147            if os.path.exists(json_file_path):
148                with open(json_file_path, 'r') as json_file:
149                    data_dic = json.load(json_file)
150                    return data_dic
151            else:
152                return {}
153        else:
154            return {}
155
156    @classmethod
157    def get_hap_part_json(cls, hap_file_path):
158        if hap_file_path.endswith(".hap"):
159            json_file_path = hap_file_path.replace(".hap", ".moduleInfo")
160            if os.path.exists(json_file_path):
161                with open(json_file_path, 'r') as json_file:
162                    data_dic = json.load(json_file)
163                    return data_dic
164            else:
165                return {}
166        else:
167            return {}
168
169    @classmethod
170    def get_part_name_test_file(cls, hap_file_path):
171        data_dic = cls.get_hap_part_json(hap_file_path)
172        if not data_dic:
173            return ""
174        else:
175            if "part" in data_dic.keys():
176                part_name = data_dic["part"]
177                return part_name
178            else:
179                return ""
180
181    def get_test_files(self, test_case_path, options):
182        LOG.info("test case path: " + test_case_path)
183        LOG.info("test type list: " + str(options.testtype))
184        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
185        if os.path.exists(test_case_path):
186            if len(options.testtype) != 0:
187                test_type_list = options.testtype
188                suit_file_dic = self.get_test_file_data(
189                    test_case_path,
190                    test_type_list,
191                    options)
192        else:
193            LOG.error("%s is not exist." % test_case_path)
194        return suit_file_dic
195
196    def get_test_file_data(self, test_case_path, test_type_list, options):
197        suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
198        for test_type in test_type_list:
199            temp_dic = self.get_test_file_data_by_test_type(
200                test_case_path,
201                test_type,
202                options)
203            for key, value in suit_file_dic.items():
204                suit_file_dic[key] = value + temp_dic.get(key)
205        return suit_file_dic
206
207    def get_test_file_data_by_test_type(self, test_case_path,
208                                        test_type, options):
209        suit_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
210        test_case_out_path = os.path.join(test_case_path, test_type)
211        if os.path.exists(test_case_out_path):
212            LOG.info("The test case directory: %s" % test_case_out_path)
213            return self.get_all_test_file(test_case_out_path, options)
214        else:
215            LOG.error("Test case dir does not exist. %s" % test_case_out_path)
216        return suit_file_dictionary
217
218    def get_all_test_file(self, test_case_out_path, options):
219        suite_file_dictionary = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
220        filter_part_list = FilterConfigManager().get_filtering_list(
221            "subsystem_name", options.productform)
222        filter_list_test_file = FilterConfigManager().get_filtering_list(
223            "testfile_name", options.productform)
224        # 遍历测试用例输出目录下面的所有文件夹,每个文件夹对应一个子系统
225        command_list = options.current_raw_cmd.split(" ")
226        for part_name in os.listdir(test_case_out_path):
227            if "-ss" in command_list or "-tp" in command_list:
228                if part_name not in options.partname_list:
229                    continue
230            part_case_dir = os.path.join(test_case_out_path, part_name)
231            if not os.path.isdir(part_case_dir):
232                continue
233            # 如果子系统在fiter_config.xml配置文件的<subsystem_name>下面配置过,则过滤
234            if part_name in filter_part_list:
235                continue
236
237            # 获取子系统目录下面的所有文件路径列表
238            suite_file_list = get_file_list_by_postfix(part_case_dir)
239            for suite_file in suite_file_list:
240                # 如果文件在resource目录下面,需要过滤
241                if -1 != suite_file.replace(test_case_out_path, "").find(
242                        os.sep + "resource" + os.sep):
243                    continue
244
245                file_name = os.path.basename(suite_file)
246                # 如果文件在fiter_config.xml配置文件的<testfile_name>下面配置过,则过滤
247                if file_name in filter_list_test_file:
248                    continue
249
250                prefix_name, suffix_name = os.path.splitext(file_name)
251                if suffix_name in FILTER_SUFFIX_NAME_LIST:
252                    continue
253
254                if not self.get_valid_suite_file(test_case_out_path,
255                                                suite_file,
256                                                options):
257                    continue
258
259                if suffix_name == ".dex":
260                    suite_file_dictionary.get("DEX").append(suite_file)
261                elif suffix_name == ".hap":
262                    if self.get_hap_test_driver(suite_file) == "OHJSUnitTest":
263                        # 如果stage测试指定了-tp,只有部件名与moduleInfo中part一致的HAP包才会加入最终执行的队列
264                        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(
265                                suite_file):
266                            continue
267                        # 如果stage测试指定了-ts,只有完全匹配的HAP包才会加入最终执行的队列
268                        if options.testsuit != "":
269                            testsuit_list = options.testsuit.split(";")
270                            is_match = False
271                            for suite_item in testsuit_list:
272                                if suite_item == prefix_name:
273                                    is_match = True
274                                    break
275                            if not is_match:
276                                continue
277                        if not self.check_hap_test_file(suite_file):
278                            continue
279                        suite_file_dictionary.get("OHJST").append(suite_file)
280                    if self.get_hap_test_driver(suite_file) == "JSUnitTest":
281                        suite_file_dictionary.get("JST").append(suite_file)
282                elif suffix_name == ".py":
283                    if not self.check_python_test_file(suite_file):
284                        continue
285                    suite_file_dictionary.get("PYT").append(suite_file)
286                elif suffix_name == "":
287                    if file_name.startswith("rust_"):
288                        Binder.get_tdd_config().update_test_type_in_source(
289                            "OHRust", DeviceTestType.oh_rust_test)
290                        suite_file_dictionary.get("OHRust").append(suite_file)
291                    else:
292                        suite_file_dictionary.get("CXX").append(suite_file)
293                elif suffix_name == ".bin":
294                    suite_file_dictionary.get("BIN").append(suite_file)
295
296        return suite_file_dictionary
297
298    def get_part_deps_files(self, external_deps_path, testpart):
299        LOG.info("external_deps_path:" + external_deps_path)
300        if os.path.exists(external_deps_path):
301            with open(external_deps_path, 'r') as json_file:
302                data_dic = json.load(json_file)
303                if not data_dic:
304                    LOG.error("data_dic is empty.")
305                    return []
306                test_part = testpart[0]
307                if test_part in data_dic.keys():
308                    external_deps_part_list = data_dic.get(test_part)
309                    LOG.info("external_deps_part_list = %s" % external_deps_part_list)
310                    return external_deps_part_list
311                else:
312                    LOG.error("%s is not in part deps info json." % test_part)
313        else:
314            LOG.error("Part deps info %s is not exist." % external_deps_path)
315        return []
316
317    def check_xts_config_match(self, options, prefix_name, xts_suite_file):
318        # 如果xts测试指定了-tp,只有部件名与moduleInfo中part一致的文件才会加入最终执行的队列
319        if options.testpart != [] and options.testpart[0] != self.get_part_name_test_file(xts_suite_file):
320            return False
321        # 如果xts测试指定了-ts,只有完全匹配的文件才会加入最终执行的队列
322        if options.testsuit != "":
323            testsuit_list = options.testsuit.split(";")
324            for suite_item in testsuit_list:
325                if suite_item == prefix_name:
326                    return True
327            return False
328        return True
329
330    def get_xts_test_files(self, xts_test_case_path, options):
331        LOG.info("xts test case path: " + xts_test_case_path)
332        xts_suit_file_dic = copy.deepcopy(TESTFILE_TYPE_DATA_DIC)
333        if not os.path.exists(xts_test_case_path):
334            LOG.error("xts %s is not exist." % xts_test_case_path)
335            return xts_suit_file_dic
336        # 获取XTS测试用例输出目录下面的所有文件路径列表
337        xts_suite_file_list = get_file_list_by_postfix(xts_test_case_path)
338        for xts_suite_file in xts_suite_file_list:
339            file_name = os.path.basename(xts_suite_file)
340            prefix_name, suffix_name = os.path.splitext(file_name)
341            if not self.check_xts_config_match(options, prefix_name, xts_suite_file):
342                continue
343            if suffix_name == "":
344                if file_name == "HatsOpenPosixTest":
345                    xts_suit_file_dic.get("LTPPosix").append(xts_suite_file)
346                else:
347                    xts_suit_file_dic.get("CXX").append(xts_suite_file)
348            elif suffix_name == ".hap":
349                if self.get_hap_test_driver(xts_suite_file) == "OHJSUnitTest":
350                    xts_suit_file_dic.get("OHJST").append(xts_suite_file)
351                if self.get_hap_test_driver(xts_suite_file) == "JSUnitTest":
352                    xts_suit_file_dic.get("JST").append(xts_suite_file)
353        return xts_suit_file_dic
354