1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2020-2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import os 20import shutil 21import xml.etree.ElementTree as ElementTree 22from xdevice import platform_logger 23from xdevice import DeviceTestType 24from core.constants import ConfigFileConst 25 26LOG = platform_logger("ResourceManager") 27 28 29############################################################################## 30############################################################################## 31 32class ResourceManager(object): 33 def __init__(self): 34 pass 35 36 @staticmethod 37 def get_resource_xml_file_path(test_suit_file_path): 38 current_dir = os.path.dirname(test_suit_file_path) 39 while True: 40 if current_dir.endswith(os.sep + "tests"): 41 current_dir = "" 42 break 43 if current_dir == "/" or current_dir.endswith(":\\"): 44 current_dir = "" 45 break 46 if os.path.exists(os.path.join(current_dir, "resource")): 47 break 48 current_dir = os.path.dirname(current_dir) 49 50 if current_dir != "": 51 xml_filepath = os.path.join( 52 current_dir, 53 "resource", 54 ConfigFileConst.RESOURCECONFIG_FILEPATH) 55 if not os.path.exists(xml_filepath): 56 xml_filepath = os.path.join( 57 current_dir, 58 "resource", 59 ConfigFileConst.CASE_RESOURCE_FILEPATH) 60 else: 61 xml_filepath = "" 62 LOG.info("xml_filepath = %s" % xml_filepath) 63 return xml_filepath 64 65 @staticmethod 66 def find_node_by_target(file_path, targe_tname): 67 node = None 68 try: 69 if os.path.exists(file_path): 70 tree = ElementTree.parse(file_path) 71 root = tree.getroot() 72 targets = root.iter("target") 73 for target in targets: 74 curr_dic = target.attrib 75 if curr_dic.get("name") == targe_tname or \ 76 targe_tname.startswith(curr_dic.get("name")): 77 node = target 78 break 79 except ElementTree.ParseError as xml_exception: 80 LOG.error("resource_test.xml parsing failed." + 81 xml_exception.args) 82 return node 83 84 @classmethod 85 def _get_file_name_extension(cls, filepath): 86 _, fullname = os.path.split(filepath) 87 filename, ext = os.path.splitext(fullname) 88 LOG.debug("file path:{}".format(filepath)) 89 return filename, ext 90 91 @classmethod 92 def get_dir_name(cls, dir_path): 93 dir_name = "" 94 if os.path.isdir(dir_path) and dir_path[-1] != ".": 95 dir_name_list = dir_path.rstrip(os.sep).split(os.sep) 96 if len(dir_name_list) > 1: 97 dir_name = dir_name_list[-1] 98 return dir_name 99 100 @classmethod 101 def get_env_data(cls, environment_list): 102 env_data_dic = {} 103 device_name = "" 104 option_dic = {} 105 106 for item in environment_list: 107 if "type" in item.keys(): 108 if device_name != "": 109 temp_dic = option_dic.copy() 110 env_data_dic[device_name] = temp_dic 111 device_name = "" 112 option_dic.clear() 113 device_name = item["type"] 114 115 if "name" in item.keys(): 116 name = item["name"] 117 value = item["value"] 118 option_dic[name] = value 119 120 if device_name != "": 121 temp_dic = option_dic.copy() 122 env_data_dic[device_name] = temp_dic 123 device_name = "" 124 option_dic.clear() 125 LOG.debug("get environment data finish") 126 return env_data_dic 127 128 @classmethod 129 def get_nodeattrib_data(cls, data_dic): 130 curr_timeout = "" 131 if "nodeattrib" in data_dic.keys(): 132 LOG.info("++++++++++++++nodeattrib+++++++++++++++") 133 nodeattrib_list = data_dic["nodeattrib"] 134 if len(nodeattrib_list) != 0: 135 node_item_dic = nodeattrib_list[0] 136 if "timeout" in node_item_dic: 137 curr_timeout = node_item_dic["timeout"] 138 return curr_timeout 139 140 def get_resource_data_dic(self, testsuit_filepath): 141 resource_dir = "" 142 data_dic = {} 143 144 target_name, _ = self._get_file_name_extension(testsuit_filepath) 145 xml_filepath = self.get_resource_xml_file_path(testsuit_filepath) 146 if not os.path.exists(xml_filepath): 147 return data_dic, resource_dir 148 149 data_dic = self.get_resource_data(xml_filepath, target_name) 150 resource_dir = os.path.abspath(os.path.dirname(xml_filepath)) 151 return data_dic, resource_dir 152 153 def get_resource_data(self, xml_filepath, target_name): 154 data_dic = {} 155 if os.path.exists(xml_filepath): 156 data_dic = self._parse_resource_test_xml_file( 157 xml_filepath, target_name) 158 return data_dic 159 160 ########################################################################## 161 ########################################################################## 162 163 def process_resource_file(self, resource_dir, preparer_list, device): 164 for item in preparer_list: 165 if "name" not in item.keys(): 166 continue 167 168 if item["name"] == "push": 169 push_value = item["value"] 170 find_key = "->" 171 pos = push_value.find(find_key) 172 src = os.path.join(resource_dir, push_value[0:pos].strip()) 173 dst = push_value[pos + len(find_key):len(push_value)].strip() 174 src = src.replace("/", os.sep) 175 dir_name = self.get_dir_name(src) 176 if dir_name != "": 177 dst = dst.rstrip("/") + "/" + dir_name 178 device.execute_shell_command("mkdir -p %s" % dst) 179 device.push_file(src, dst) 180 elif item["name"] == "pull": 181 push_value = item["value"] 182 find_key = "->" 183 pos = push_value.find(find_key) 184 src = os.path.join(resource_dir, push_value[0:pos].strip()) 185 dst = push_value[pos + len(find_key):len(push_value)].strip() 186 device.pull_file(src, dst) 187 elif item["name"] == "shell": 188 command = item["value"].strip() 189 device.execute_shell_command(command) 190 else: 191 command = item["name"] + " " + item["value"] 192 command = command.strip() 193 device.connector_command(command) 194 195 def lite_process_resource_file(self, resource_dir, preparer_list): 196 for item in preparer_list: 197 if "name" not in item.keys(): 198 continue 199 200 if item["name"] == "push": 201 copy_value = item["value"] 202 find_key = "->" 203 pos = copy_value.find(find_key) 204 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 205 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 206 shutil.copy(src, dst) 207 208 elif item["name"] == "pull": 209 copy_value = item["value"] 210 find_key = "->" 211 pos = copy_value.find(find_key) 212 src = os.path.join(resource_dir, copy_value[0:pos].strip()) 213 dst = copy_value[pos + len(find_key):len(copy_value)].strip() 214 shutil.copyfile(dst, src) 215 else: 216 command = item["name"] + " " + item["value"] 217 command = command.strip() 218 self.lite_device.execute_command_with_timeout(command, case_type=DeviceTestType.lite_cpp_test) 219 220 def get_environment_data(self, data_dic): 221 env_data_dic = {} 222 if "environment" in data_dic.keys(): 223 LOG.info("++++++++++++++environment+++++++++++++++") 224 environment_list = data_dic["environment"] 225 env_data_dic = self.get_env_data(environment_list) 226 return env_data_dic 227 228 def process_preparer_data(self, data_dic, resource_dir, device): 229 if "preparer" in data_dic.keys(): 230 LOG.info("++++++++++++++preparer+++++++++++++++") 231 preparer_list = data_dic["preparer"] 232 self.process_resource_file(resource_dir, preparer_list, device) 233 return 234 235 def lite_process_preparer_data(self, data_dic, resource_dir): 236 if "preparer" in data_dic.keys(): 237 LOG.info("++++++++++++++preparer+++++++++++++++") 238 preparer_list = data_dic["preparer"] 239 self.lite_process_resource_file(resource_dir, preparer_list) 240 return 241 242 def process_cleaner_data(self, data_dic, resource_dir, device): 243 if "cleaner" in data_dic.keys(): 244 LOG.info("++++++++++++++cleaner+++++++++++++++") 245 cleaner_list = data_dic["cleaner"] 246 self.process_resource_file(resource_dir, cleaner_list, device) 247 return 248 249 def _parse_resource_test_xml_file(self, filepath, targetname): 250 data_dic = {} 251 252 node = self.find_node_by_target(filepath, targetname) 253 if node: 254 target_attrib_list = [] 255 target_attrib_list.append(node.attrib) 256 environment_data_list = [] 257 env_node = node.find("environment") 258 if env_node: 259 environment_data_list.append(env_node.attrib) 260 for element in env_node.findall("device"): 261 environment_data_list.append(element.attrib) 262 for option_element in element.findall("option"): 263 environment_data_list.append(option_element.attrib) 264 265 preparer_data_list = [] 266 pre_node = node.find("preparer") 267 if pre_node: 268 preparer_data_list.append(pre_node.attrib) 269 for element in pre_node.findall("option"): 270 preparer_data_list.append(element.attrib) 271 272 cleaner_data_list = [] 273 clr_node = node.find("cleaner") 274 if clr_node: 275 cleaner_data_list.append(clr_node.attrib) 276 for element in clr_node.findall("option"): 277 cleaner_data_list.append(element.attrib) 278 279 data_dic["nodeattrib"] = target_attrib_list 280 data_dic["environment"] = environment_data_list 281 data_dic["preparer"] = preparer_data_list 282 data_dic["cleaner"] = cleaner_data_list 283 284 return data_dic 285 286############################################################################## 287############################################################################## 288