1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2020-2022 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18
19import os
20import shutil
21import xml.etree.ElementTree as ElementTree
22from xdevice import platform_logger
23from xdevice import DeviceTestType
24from core.constants import ConfigFileConst
25
26LOG = platform_logger("ResourceManager")
27
28
29##############################################################################
30##############################################################################
31
32class ResourceManager(object):
33    def __init__(self):
34        pass
35
36    @staticmethod
37    def get_resource_xml_file_path(test_suit_file_path):
38        current_dir = os.path.dirname(test_suit_file_path)
39        while True:
40            if current_dir.endswith(os.sep + "tests"):
41                current_dir = ""
42                break
43            if current_dir == "/" or current_dir.endswith(":\\"):
44                current_dir = ""
45                break
46            if os.path.exists(os.path.join(current_dir, "resource")):
47                break
48            current_dir = os.path.dirname(current_dir)
49
50        if current_dir != "":
51            xml_filepath = os.path.join(
52                current_dir,
53                "resource",
54                ConfigFileConst.RESOURCECONFIG_FILEPATH)
55            if not os.path.exists(xml_filepath):
56                xml_filepath = os.path.join(
57                    current_dir,
58                    "resource",
59                    ConfigFileConst.CASE_RESOURCE_FILEPATH)
60        else:
61            xml_filepath = ""
62        LOG.info("xml_filepath = %s" % xml_filepath)
63        return xml_filepath
64
65    @staticmethod
66    def find_node_by_target(file_path, targe_tname):
67        node = None
68        try:
69            if os.path.exists(file_path):
70                tree = ElementTree.parse(file_path)
71                root = tree.getroot()
72                targets = root.iter("target")
73                for target in targets:
74                    curr_dic = target.attrib
75                    if curr_dic.get("name") == targe_tname or \
76                            targe_tname.startswith(curr_dic.get("name")):
77                        node = target
78                        break
79        except ElementTree.ParseError as xml_exception:
80            LOG.error("resource_test.xml parsing failed." +
81                      xml_exception.args)
82        return node
83
84    @classmethod
85    def _get_file_name_extension(cls, filepath):
86        _, fullname = os.path.split(filepath)
87        filename, ext = os.path.splitext(fullname)
88        LOG.debug("file path:{}".format(filepath))
89        return filename, ext
90
91    @classmethod
92    def get_dir_name(cls, dir_path):
93        dir_name = ""
94        if os.path.isdir(dir_path) and dir_path[-1] != ".":
95            dir_name_list = dir_path.rstrip(os.sep).split(os.sep)
96            if len(dir_name_list) > 1:
97                dir_name = dir_name_list[-1]
98        return dir_name
99
100    @classmethod
101    def get_env_data(cls, environment_list):
102        env_data_dic = {}
103        device_name = ""
104        option_dic = {}
105
106        for item in environment_list:
107            if "type" in item.keys():
108                if device_name != "":
109                    temp_dic = option_dic.copy()
110                    env_data_dic[device_name] = temp_dic
111                    device_name = ""
112                    option_dic.clear()
113                device_name = item["type"]
114
115            if "name" in item.keys():
116                name = item["name"]
117                value = item["value"]
118                option_dic[name] = value
119
120        if device_name != "":
121            temp_dic = option_dic.copy()
122            env_data_dic[device_name] = temp_dic
123            device_name = ""
124            option_dic.clear()
125        LOG.debug("get environment data finish")
126        return env_data_dic
127
128    @classmethod
129    def get_nodeattrib_data(cls, data_dic):
130        curr_timeout = ""
131        if "nodeattrib" in data_dic.keys():
132            LOG.info("++++++++++++++nodeattrib+++++++++++++++")
133            nodeattrib_list = data_dic["nodeattrib"]
134            if len(nodeattrib_list) != 0:
135                node_item_dic = nodeattrib_list[0]
136                if "timeout" in node_item_dic:
137                    curr_timeout = node_item_dic["timeout"]
138        return curr_timeout
139
140    def get_resource_data_dic(self, testsuit_filepath):
141        resource_dir = ""
142        data_dic = {}
143
144        target_name, _ = self._get_file_name_extension(testsuit_filepath)
145        xml_filepath = self.get_resource_xml_file_path(testsuit_filepath)
146        if not os.path.exists(xml_filepath):
147            return data_dic, resource_dir
148
149        data_dic = self.get_resource_data(xml_filepath, target_name)
150        resource_dir = os.path.abspath(os.path.dirname(xml_filepath))
151        return data_dic, resource_dir
152
153    def get_resource_data(self, xml_filepath, target_name):
154        data_dic = {}
155        if os.path.exists(xml_filepath):
156            data_dic = self._parse_resource_test_xml_file(
157                xml_filepath, target_name)
158        return data_dic
159
160    ##########################################################################
161    ##########################################################################
162
163    def process_resource_file(self, resource_dir, preparer_list, device):
164        for item in preparer_list:
165            if "name" not in item.keys():
166                continue
167
168            if item["name"] == "push":
169                push_value = item["value"]
170                find_key = "->"
171                pos = push_value.find(find_key)
172                src = os.path.join(resource_dir, push_value[0:pos].strip())
173                dst = push_value[pos + len(find_key):len(push_value)].strip()
174                src = src.replace("/", os.sep)
175                dir_name = self.get_dir_name(src)
176                if dir_name != "":
177                    dst = dst.rstrip("/") + "/" + dir_name
178                device.execute_shell_command("mkdir -p %s" % dst)
179                device.push_file(src, dst)
180            elif item["name"] == "pull":
181                push_value = item["value"]
182                find_key = "->"
183                pos = push_value.find(find_key)
184                src = os.path.join(resource_dir, push_value[0:pos].strip())
185                dst = push_value[pos + len(find_key):len(push_value)].strip()
186                device.pull_file(src, dst)
187            elif item["name"] == "shell":
188                command = item["value"].strip()
189                device.execute_shell_command(command)
190            else:
191                command = item["name"] + " " + item["value"]
192                command = command.strip()
193                device.connector_command(command)
194
195    def lite_process_resource_file(self, resource_dir, preparer_list):
196        for item in preparer_list:
197            if "name" not in item.keys():
198                continue
199
200            if item["name"] == "push":
201                copy_value = item["value"]
202                find_key = "->"
203                pos = copy_value.find(find_key)
204                src = os.path.join(resource_dir, copy_value[0:pos].strip())
205                dst = copy_value[pos + len(find_key):len(copy_value)].strip()
206                shutil.copy(src, dst)
207
208            elif item["name"] == "pull":
209                copy_value = item["value"]
210                find_key = "->"
211                pos = copy_value.find(find_key)
212                src = os.path.join(resource_dir, copy_value[0:pos].strip())
213                dst = copy_value[pos + len(find_key):len(copy_value)].strip()
214                shutil.copyfile(dst, src)
215            else:
216                command = item["name"] + " " + item["value"]
217                command = command.strip()
218                self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test)
219
220    def get_environment_data(self, data_dic):
221        env_data_dic = {}
222        if "environment" in data_dic.keys():
223            LOG.info("++++++++++++++environment+++++++++++++++")
224            environment_list = data_dic["environment"]
225            env_data_dic = self.get_env_data(environment_list)
226        return env_data_dic
227
228    def process_preparer_data(self, data_dic, resource_dir, device):
229        if "preparer" in data_dic.keys():
230            LOG.info("++++++++++++++preparer+++++++++++++++")
231            preparer_list = data_dic["preparer"]
232            self.process_resource_file(resource_dir, preparer_list, device)
233        return
234
235    def lite_process_preparer_data(self, data_dic, resource_dir):
236        if "preparer" in data_dic.keys():
237            LOG.info("++++++++++++++preparer+++++++++++++++")
238            preparer_list = data_dic["preparer"]
239            self.lite_process_resource_file(resource_dir, preparer_list)
240        return
241
242    def process_cleaner_data(self, data_dic, resource_dir, device):
243        if "cleaner" in data_dic.keys():
244            LOG.info("++++++++++++++cleaner+++++++++++++++")
245            cleaner_list = data_dic["cleaner"]
246            self.process_resource_file(resource_dir, cleaner_list, device)
247        return
248
249    def _parse_resource_test_xml_file(self, filepath, targetname):
250        data_dic = {}
251
252        node = self.find_node_by_target(filepath, targetname)
253        if node:
254            target_attrib_list = []
255            target_attrib_list.append(node.attrib)
256            environment_data_list = []
257            env_node = node.find("environment")
258            if env_node:
259                environment_data_list.append(env_node.attrib)
260                for element in env_node.findall("device"):
261                    environment_data_list.append(element.attrib)
262                    for option_element in element.findall("option"):
263                        environment_data_list.append(option_element.attrib)
264
265            preparer_data_list = []
266            pre_node = node.find("preparer")
267            if pre_node:
268                preparer_data_list.append(pre_node.attrib)
269                for element in pre_node.findall("option"):
270                    preparer_data_list.append(element.attrib)
271
272            cleaner_data_list = []
273            clr_node = node.find("cleaner")
274            if clr_node:
275                cleaner_data_list.append(clr_node.attrib)
276                for element in clr_node.findall("option"):
277                    cleaner_data_list.append(element.attrib)
278
279            data_dic["nodeattrib"] = target_attrib_list
280            data_dic["environment"] = environment_data_list
281            data_dic["preparer"] = preparer_data_list
282            data_dic["cleaner"] = cleaner_data_list
283
284        return data_dic
285
286##############################################################################
287##############################################################################
288