1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2022 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18 19import re 20import telnetlib 21import time 22import os 23import threading 24 25from xdevice import DeviceOsType 26from xdevice import DeviceProperties 27from xdevice import ConfigConst 28from xdevice import DeviceLabelType 29from xdevice import ModeType 30from xdevice import IDevice 31from xdevice import platform_logger 32from xdevice import DeviceAllocationState 33from xdevice import Plugin 34from xdevice import exec_cmd 35from xdevice import convert_serial 36from xdevice import convert_mac 37from xdevice import convert_ip 38from xdevice import check_mode 39from xdevice import Platform 40 41from ohos.constants import ComType 42from ohos.environment.dmlib_lite import LiteHelper 43from ohos.error import ErrorMessage 44from ohos.exception import LiteDeviceConnectError 45from ohos.exception import LiteDeviceTimeout 46from ohos.exception import LiteParamError 47 48LOG = platform_logger("DeviceLite") 49TIMEOUT = 90 50RETRY_ATTEMPTS = 0 51HDC = "litehdc.exe" 52DEFAULT_BAUD_RATE = 115200 53 54 55def get_hdc_path(): 56 from xdevice import Variables 57 user_path = os.path.join(Variables.exec_dir, "resource/tools") 58 top_user_path = os.path.join(Variables.top_dir, "config") 59 config_path = os.path.join(Variables.res_dir, "config") 60 paths = [user_path, top_user_path, config_path] 61 62 file_path = "" 63 for path in paths: 64 if os.path.exists(os.path.abspath(os.path.join( 65 path, HDC))): 66 file_path = os.path.abspath(os.path.join( 67 path, HDC)) 68 break 69 70 if os.path.exists(file_path): 71 return file_path 72 else: 73 raise LiteParamError(ErrorMessage.Common.Code_0301006) 74 75 76def parse_available_com(com_str): 77 com_str = com_str.replace("\r", " ") 78 com_list = com_str.split("\n") 79 for index, item in enumerate(com_list): 80 com_list[index] = item.strip().strip(b'\x00'.decode()) 81 return com_list 82 83 84def perform_device_action(func): 85 def device_action(self, *args, **kwargs): 86 if not self.get_recover_state(): 87 LOG.debug("Device %s %s is false" % (self.device_sn, 88 ConfigConst.recover_state)) 89 return "", "", "" 90 91 tmp = int(kwargs.get("retry", RETRY_ATTEMPTS)) 92 retry = tmp + 1 if tmp > 0 else 1 93 exception = None 94 for num in range(retry): 95 try: 96 result = func(self, *args, **kwargs) 97 return result 98 except LiteDeviceTimeout as error: 99 LOG.error(error) 100 exception = error 101 if num: 102 self.recover_device() 103 except Exception as error: 104 LOG.error(error) 105 exception = error 106 raise exception 107 108 return device_action 109 110 111@Plugin(type=Plugin.DEVICE, id=DeviceOsType.lite) 112class DeviceLite(IDevice): 113 """ 114 Class representing a device lite device. 115 116 Each object of this class represents one device lite device in xDevice. 117 118 Attributes: 119 device_connect_type: A string that's the type of lite device 120 """ 121 device_os_type = DeviceOsType.lite 122 device_allocation_state = DeviceAllocationState.available 123 124 def __init__(self): 125 self.device_sn = "" 126 self.label = "" 127 self.device_connect_type = "" 128 self.device_kernel = "" 129 self.device = None 130 self.ifconfig = None 131 self.device_id = None 132 self.extend_value = {} 133 self.device_lock = threading.RLock() 134 self.test_platform = Platform.ohos 135 self.device_props = {} 136 self.device_description = {} 137 138 def init_description(self): 139 if self.device_description: 140 return 141 desc = { 142 DeviceProperties.sn: convert_serial(self.device_sn), 143 DeviceProperties.model: self.label, 144 DeviceProperties.type_: self.label, 145 DeviceProperties.platform: self.test_platform, 146 DeviceProperties.version: "", 147 DeviceProperties.others: self.device_props 148 } 149 self.device_description.update(desc) 150 151 def __set_serial__(self, device=None): 152 for item in device: 153 if "ip" in item.keys() and "port" in item.keys(): 154 self.device_sn = "remote_%s_%s" % \ 155 (item.get("ip"), item.get("port")) 156 break 157 elif "type" in item.keys() and "com" in item.keys(): 158 self.device_sn = "local_%s" % item.get("com") 159 break 160 161 def __get_serial__(self): 162 return self.device_sn 163 164 def get(self, key=None, default=None): 165 if not key: 166 return default 167 value = getattr(self, key, None) 168 if value: 169 return value 170 else: 171 return self.extend_value.get(key, default) 172 173 def update_device_props(self, props): 174 if self.device_props or not isinstance(props, dict): 175 return 176 self.device_props.update(props) 177 178 def __set_device_kernel__(self, kernel_type=""): 179 self.device_kernel = kernel_type 180 181 def __get_device_kernel__(self): 182 return self.device_kernel 183 184 @staticmethod 185 def _check_watchgt(device): 186 for item in device: 187 if "label" not in item.keys(): 188 raise LiteParamError(ErrorMessage.Config.Code_0302027) 189 if "com" not in item.keys() or ("com" in item.keys() and 190 not item.get("com")): 191 raise LiteParamError(ErrorMessage.Config.Code_0302028) 192 else: 193 hdc = get_hdc_path() 194 result = exec_cmd([hdc]) 195 com_list = parse_available_com(result) 196 if item.get("com").upper() in com_list: 197 return True 198 else: 199 raise LiteParamError(ErrorMessage.Config.Code_0302029) 200 201 @staticmethod 202 def _check_wifiiot_config(device): 203 com_type_set = set() 204 for item in device: 205 if "label" not in item.keys(): 206 if "com" not in item.keys() or ("com" in item.keys() and 207 not item.get("com")): 208 raise LiteParamError(ErrorMessage.Config.Code_0302030) 209 210 if "type" not in item.keys() or ("type" not in item.keys() and 211 not item.get("type")): 212 raise LiteParamError(ErrorMessage.Config.Code_0302031) 213 else: 214 com_type_set.add(item.get("type")) 215 if len(com_type_set) < 2: 216 raise LiteParamError(ErrorMessage.Config.Code_0302032) 217 218 @staticmethod 219 def _check_ipcamera_local(device): 220 for item in device: 221 if "label" not in item.keys(): 222 if "com" not in item.keys() or ("com" in item.keys() and 223 not item.get("com")): 224 raise LiteParamError(ErrorMessage.Config.Code_0302033) 225 226 @staticmethod 227 def _check_ipcamera_remote(device=None): 228 for item in device: 229 if "label" not in item.keys(): 230 if "port" in item.keys() and item.get("port") and not item.get( 231 "port").isnumeric(): 232 raise LiteParamError(ErrorMessage.Config.Code_0302034) 233 elif "port" not in item.keys(): 234 raise LiteParamError(ErrorMessage.Config.Code_0302035) 235 236 def __check_config__(self, device=None): 237 self.set_connect_type(device) 238 if self.label == DeviceLabelType.wifiiot: 239 self._check_wifiiot_config(device) 240 elif self.label == DeviceLabelType.ipcamera and \ 241 self.device_connect_type == "local": 242 self._check_ipcamera_local(device) 243 elif self.label == DeviceLabelType.ipcamera and \ 244 self.device_connect_type == "remote": 245 self._check_ipcamera_remote(device) 246 elif self.label == DeviceLabelType.watch_gt: 247 self._check_watchgt(device) 248 249 def set_connect_type(self, device): 250 pattern = r'^((25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(25[0-5]|2[0-4]\d|[' \ 251 r'01]?\d\d?)$' 252 for item in device: 253 if "label" in item.keys(): 254 self.label = item.get("label") 255 if "com" in item.keys(): 256 self.device_connect_type = "local" 257 if "ip" in item.keys(): 258 if re.match(pattern, item.get("ip")): 259 self.device_connect_type = "remote" 260 else: 261 raise LiteParamError(ErrorMessage.Config.Code_0302025) 262 if not self.label: 263 raise LiteParamError(ErrorMessage.Config.Code_0302026) 264 else: 265 if self.label != DeviceLabelType.wifiiot and \ 266 self.label != DeviceLabelType.ipcamera and \ 267 self.label != DeviceLabelType.watch_gt: 268 raise LiteParamError(ErrorMessage.Config.Code_0302020) 269 if not self.device_connect_type: 270 raise LiteParamError(ErrorMessage.Config.Code_0302021) 271 272 def __init_device__(self, device): 273 self.__check_config__(device) 274 self.__set_serial__(device) 275 if self.device_connect_type == "remote": 276 self.device = CONTROLLER_DICT.get("remote")(device) 277 else: 278 self.device = CONTROLLER_DICT.get("local")(device) 279 280 self.ifconfig = device[1].get("ifconfig") 281 if self.ifconfig: 282 self.connect() 283 self.execute_command_with_timeout(command='\r', timeout=5) 284 self.execute_command_with_timeout(command=self.ifconfig, timeout=5) 285 286 def connect(self): 287 """ 288 Connect the device 289 290 """ 291 try: 292 self.device.connect() 293 except LiteDeviceConnectError as _: 294 if check_mode(ModeType.decc): 295 LOG.debug("Set device %s recover state to false" % 296 self.device_sn) 297 self.device_allocation_state = DeviceAllocationState.unusable 298 self.set_recover_state(False) 299 raise 300 301 @perform_device_action 302 def execute_command_with_timeout(self, command="", case_type="", 303 timeout=TIMEOUT, **kwargs): 304 """Executes command on the device. 305 306 Args: 307 command: the command to execute 308 case_type: CTest or CppTest 309 timeout: timeout for read result 310 **kwargs: receiver - parser handler input 311 312 Returns: 313 (filter_result, status, error_message) 314 315 filter_result: command execution result 316 status: true or false 317 error_message: command execution error message 318 """ 319 receiver = kwargs.get("receiver", None) 320 if self.device_connect_type == "remote": 321 LOG.info("%s execute command shell %s with timeout %ss" % 322 (convert_serial(self.__get_serial__()), command, 323 str(timeout))) 324 filter_result, status, error_message = \ 325 self.device.execute_command_with_timeout( 326 command=command, 327 timeout=timeout, 328 receiver=receiver) 329 elif self.device_connect_type == "agent": 330 filter_result, status, error_message = \ 331 self.device.execute_command_with_timeout( 332 command=command, 333 case_type=case_type, 334 timeout=timeout, 335 receiver=receiver, type="cmd") 336 else: 337 filter_result, status, error_message = \ 338 self.device.execute_command_with_timeout( 339 command=command, 340 case_type=case_type, 341 timeout=timeout, 342 receiver=receiver) 343 if not receiver: 344 LOG.debug("{} execute result:{}".format(convert_serial(self.__get_serial__()), 345 convert_mac(filter_result.replace("BS", "")))) 346 if not status: 347 LOG.debug("{} error_message:{}".format(convert_serial(self.__get_serial__()), 348 convert_mac(error_message.replace("BS", "")))) 349 350 return filter_result.replace("BS", ""), status, error_message 351 352 def recover_device(self): 353 self.reboot() 354 355 def reboot(self): 356 self.connect() 357 filter_result, status, error_message = self. \ 358 execute_command_with_timeout(command="reset", timeout=30) 359 if not filter_result: 360 if check_mode(ModeType.decc): 361 LOG.debug("Set device %s recover state to false" % 362 self.device_sn) 363 self.device_allocation_state = DeviceAllocationState.unusable 364 self.set_recover_state(False) 365 if self.ifconfig: 366 enter_result, _, _ = self.execute_command_with_timeout( 367 command='\r', 368 timeout=15) 369 if " #" in enter_result or "OHOS #" in enter_result: 370 LOG.info("Reset device %s success" % self.device_sn) 371 self.execute_command_with_timeout(command=self.ifconfig, 372 timeout=5) 373 elif "hisilicon #" in enter_result: 374 LOG.info("Reset device %s fail" % self.device_sn) 375 376 ifconfig_result, _, _ = self.execute_command_with_timeout( 377 command="ifconfig", 378 timeout=5) 379 380 def close(self): 381 """ 382 Close the telnet connection with device server or close the local 383 serial 384 """ 385 self.device.close() 386 387 def set_recover_state(self, state): 388 with self.device_lock: 389 setattr(self, ConfigConst.recover_state, state) 390 391 def get_recover_state(self, default_state=True): 392 with self.device_lock: 393 state = getattr(self, ConfigConst.recover_state, default_state) 394 return state 395 396 397class RemoteController: 398 """ 399 Class representing a device lite remote device. 400 Each object of this class represents one device lite remote device 401 in xDevice. 402 """ 403 404 def __init__(self, device): 405 self.host = device[1].get("ip") 406 self.port = int(device[1].get("port")) 407 self.telnet = None 408 409 def connect(self): 410 """ 411 Connect the device server 412 """ 413 try: 414 if self.telnet: 415 return self.telnet 416 self.telnet = telnetlib.Telnet(self.host, self.port, 417 timeout=TIMEOUT) 418 except Exception as error: 419 raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303008.format( 420 convert_ip(self.host), self.port, str(error))) from error 421 time.sleep(2) 422 self.telnet.set_debuglevel(0) 423 return self.telnet 424 425 def execute_command_with_timeout(self, command="", timeout=TIMEOUT, 426 receiver=None): 427 """ 428 Executes command on the device. 429 430 Parameters: 431 command: the command to execute 432 timeout: timeout for read result 433 receiver: parser handler 434 """ 435 return LiteHelper.execute_remote_cmd_with_timeout( 436 self.telnet, command, timeout, receiver) 437 438 def close(self): 439 """ 440 Close the telnet connection with device server 441 """ 442 try: 443 if not self.telnet: 444 return 445 self.telnet.close() 446 self.telnet = None 447 except ConnectionError as _: 448 error_message = "Remote device is disconnected abnormally" 449 LOG.error(error_message, error_no="00401") 450 451 452class LocalController: 453 def __init__(self, device): 454 """ 455 Init Local device. 456 Parameters: 457 device: local device 458 """ 459 self.com_dict = {} 460 for item in device: 461 if "com" in item.keys(): 462 if "type" in item.keys() and ComType.cmd_com == item.get( 463 "type"): 464 self.com_dict[ComType.cmd_com] = ComController(item) 465 elif "type" in item.keys() and ComType.deploy_com == item.get( 466 "type"): 467 self.com_dict[ComType.deploy_com] = ComController(item) 468 469 def connect(self, key=ComType.cmd_com): 470 """ 471 Open serial. 472 """ 473 self.com_dict.get(key).connect() 474 475 def close(self, key=ComType.cmd_com): 476 """ 477 Close serial. 478 """ 479 if self.com_dict and self.com_dict.get(key): 480 self.com_dict.get(key).close() 481 482 def execute_command_with_timeout(self, **kwargs): 483 """ 484 Execute command on the serial and read all the output from the serial. 485 """ 486 args = kwargs 487 key = args.get("key", ComType.cmd_com) 488 command = args.get("command", None) 489 case_type = args.get("case_type", "") 490 receiver = args.get("receiver", None) 491 timeout = args.get("timeout", TIMEOUT) 492 return self.com_dict.get(key).execute_command_with_timeout( 493 command=command, case_type=case_type, 494 timeout=timeout, receiver=receiver) 495 496 497class ComController: 498 def __init__(self, device): 499 """ 500 Init serial. 501 Parameters: 502 device: local com 503 """ 504 self.is_open = False 505 self.com = None 506 self.serial_port = device.get("com", None) 507 self.baud_rate = int(device.get("baud_rate", DEFAULT_BAUD_RATE)) 508 self.timeout = int(device.get("timeout", TIMEOUT)) 509 self.usb_port = device.get("usb_port", None) 510 511 def connect(self): 512 """ 513 Open serial. 514 """ 515 try: 516 if not self.is_open: 517 import serial 518 self.com = serial.Serial(self.serial_port, 519 baudrate=self.baud_rate, 520 timeout=self.timeout) 521 self.is_open = True 522 return self.com 523 except Exception as error: 524 raise LiteDeviceConnectError(ErrorMessage.Device.Code_0303009.format( 525 self.serial_port, str(error))) from error 526 return None 527 528 def close(self): 529 """ 530 Close serial. 531 """ 532 try: 533 if not self.com: 534 return 535 if self.is_open: 536 self.com.close() 537 self.is_open = False 538 except ConnectionError as _: 539 error_message = "Local device is disconnected abnormally" 540 LOG.error(error_message, error_no="00401") 541 542 def execute_command_with_timeout(self, **kwargs): 543 """ 544 Execute command on the serial and read all the output from the serial. 545 """ 546 return LiteHelper.execute_local_cmd_with_timeout(self.com, **kwargs) 547 548 def execute_command(self, command): 549 """ 550 Execute command on the serial and read all the output from the serial. 551 """ 552 LiteHelper.execute_local_command(self.com, command) 553 554 555CONTROLLER_DICT = { 556 "local": LocalController, 557 "remote": RemoteController, 558} 559