1ba991379Sopenharmony_ci# encoding=utf-8 2ba991379Sopenharmony_ci 3ba991379Sopenharmony_ci''' 4ba991379Sopenharmony_ci====================================================================================== 5ba991379Sopenharmony_ci版权 (C) 2015-2020, Huawei Technologies Co., HUTAF xDevice 6ba991379Sopenharmony_ci======================================================================================== 7ba991379Sopenharmony_ci@FileName: telnet_client.py 8ba991379Sopenharmony_ci@Function: telnet operation to os 9ba991379Sopenharmony_ci@Author: 10ba991379Sopenharmony_ci@Date: 11ba991379Sopenharmony_ci====================================================================================== 12ba991379Sopenharmony_ci''' 13ba991379Sopenharmony_ci 14ba991379Sopenharmony_ciimport os 15ba991379Sopenharmony_ciimport platform 16ba991379Sopenharmony_ciimport telnetlib 17ba991379Sopenharmony_ciimport time 18ba991379Sopenharmony_ci 19ba991379Sopenharmony_cifrom util.log_info import logger 20ba991379Sopenharmony_ci 21ba991379Sopenharmony_ciclass TelConnect(): 22ba991379Sopenharmony_ci 23ba991379Sopenharmony_ci def __init__(self, telip, telport): 24ba991379Sopenharmony_ci logger.info("init telnet %s:%s" % (telip, telport)) 25ba991379Sopenharmony_ci self._host = telip 26ba991379Sopenharmony_ci self._port = int(telport) 27ba991379Sopenharmony_ci self.__timeout = 10 28ba991379Sopenharmony_ci self.device = telnetlib.Telnet(self._host, port=self._port, timeout=self.__timeout) 29ba991379Sopenharmony_ci 30ba991379Sopenharmony_ci def tellogin(self, username, password, endwaittag, timeout): 31ba991379Sopenharmony_ci logger.info("telnet login") 32ba991379Sopenharmony_ci rets = self.device.read_until(endwaittag, timeout) 33ba991379Sopenharmony_ci logger.info("cmd result: %s" % rets) 34ba991379Sopenharmony_ci if (endwaittag in rets) or not rets: 35ba991379Sopenharmony_ci logger.info("Noneed login") 36ba991379Sopenharmony_ci return True 37ba991379Sopenharmony_ci else: 38ba991379Sopenharmony_ci rets = self.sendCmdAndCheckResult(username.encode('utf-8'), 'Password:', timeout) 39ba991379Sopenharmony_ci if rets == False: 40ba991379Sopenharmony_ci logger.error("telnet login Failed!!") 41ba991379Sopenharmony_ci return False 42ba991379Sopenharmony_ci rets = self.sendCmdAndCheckResult(password.encode('utf-8'), endwaittag, timeout) 43ba991379Sopenharmony_ci if rets == False: 44ba991379Sopenharmony_ci logger.error("telnet login Failed!!") 45ba991379Sopenharmony_ci return False 46ba991379Sopenharmony_ci 47ba991379Sopenharmony_ci def sendCmd(self, send_cmd, endtag, timeout): 48ba991379Sopenharmony_ci try: 49ba991379Sopenharmony_ci logger.info(send_cmd) 50ba991379Sopenharmony_ci self.device.write(send_cmd) 51ba991379Sopenharmony_ci rets = self.device.read_until(endtag.encode('utf-8'), timeout) 52ba991379Sopenharmony_ci return rets.decode('utf-8', 'ignore') 53ba991379Sopenharmony_ci except Exception as e: 54ba991379Sopenharmony_ci logger.error(e) 55ba991379Sopenharmony_ci return False 56ba991379Sopenharmony_ci 57ba991379Sopenharmony_ci def sendEnterCmd(self, endtag, timeout): 58ba991379Sopenharmony_ci try: 59ba991379Sopenharmony_ci logger.info("send enter to board") 60ba991379Sopenharmony_ci i = 0 61ba991379Sopenharmony_ci while i < 4: 62ba991379Sopenharmony_ci cmd = b'\r\n' 63ba991379Sopenharmony_ci rets = self.sendCmd(cmd, endtag, 1) 64ba991379Sopenharmony_ci if (endtag in rets): 65ba991379Sopenharmony_ci return True 66ba991379Sopenharmony_ci logger.info("retry send Enter") 67ba991379Sopenharmony_ci i = i + 1 68ba991379Sopenharmony_ci return False 69ba991379Sopenharmony_ci except Exception as e: 70ba991379Sopenharmony_ci logger.error(e) 71ba991379Sopenharmony_ci return False 72ba991379Sopenharmony_ci 73ba991379Sopenharmony_ci def sendCmdAndCheckResult(self, cmd, endtag, timeout): 74ba991379Sopenharmony_ci try: 75ba991379Sopenharmony_ci rets = self.sendCmd(cmd + '\n'.encode('utf-8'), endtag, timeout) 76ba991379Sopenharmony_ci logger.info("result: %s" % rets) 77ba991379Sopenharmony_ci if (endtag in rets): 78ba991379Sopenharmony_ci if (not "error:" in rets.lower()): 79ba991379Sopenharmony_ci return True 80ba991379Sopenharmony_ci logger.error("cmd not end or cmd failed, please check board !!!") 81ba991379Sopenharmony_ci return False 82ba991379Sopenharmony_ci except Exception as e: 83ba991379Sopenharmony_ci logger.error(e) 84ba991379Sopenharmony_ci return False 85ba991379Sopenharmony_ci 86ba991379Sopenharmony_ci def sendCmdAndCheckSucTag(self, cmd, endtag, suctag , timeout): 87ba991379Sopenharmony_ci try: 88ba991379Sopenharmony_ci rets = self.sendCmd(cmd + '\n'.encode('utf-8'), endtag, timeout) 89ba991379Sopenharmony_ci logger.info("result: %s" % rets) 90ba991379Sopenharmony_ci if (endtag in rets): 91ba991379Sopenharmony_ci if (suctag in rets) or (not "error:" in rets.lower()): 92ba991379Sopenharmony_ci return True 93ba991379Sopenharmony_ci logger.error("cmd not end or cmd failed, please check board !!!") 94ba991379Sopenharmony_ci return False 95ba991379Sopenharmony_ci except Exception as e: 96ba991379Sopenharmony_ci logger.error(e) 97ba991379Sopenharmony_ci return False 98ba991379Sopenharmony_ci 99ba991379Sopenharmony_ci def sendUpgradeCmd(self, sendcmd, endwaittag, suctag, timeout): 100ba991379Sopenharmony_ci if suctag: 101ba991379Sopenharmony_ci logger.info("cmd is: %s" % sendcmd) 102ba991379Sopenharmony_ci return self.sendCmdAndCheckSucTag(sendcmd.encode('utf-8'), endwaittag, suctag, timeout) 103ba991379Sopenharmony_ci else: 104ba991379Sopenharmony_ci logger.info("cmd is: %s" % sendcmd) 105ba991379Sopenharmony_ci return self.sendCmdAndCheckResult(sendcmd.encode('utf-8'), endwaittag, timeout) 106ba991379Sopenharmony_ci 107ba991379Sopenharmony_ci def sendResetCmd(self, endtag, timeout): 108ba991379Sopenharmony_ci logger.info("send cmd reset") 109ba991379Sopenharmony_ci sendcmd = b'reset\n' 110ba991379Sopenharmony_ci try: 111ba991379Sopenharmony_ci #rets = self.sendCmd(sendcmd, endtag, timeout) 112ba991379Sopenharmony_ci self.device.write(sendcmd) 113ba991379Sopenharmony_ci if not self.sendEnterCmd(endtag, timeout): 114ba991379Sopenharmony_ci logger.error("send Enter fail") 115ba991379Sopenharmony_ci return False 116ba991379Sopenharmony_ci return True 117ba991379Sopenharmony_ci except Exception as e: 118ba991379Sopenharmony_ci logger.error(e) 119ba991379Sopenharmony_ci return False 120ba991379Sopenharmony_ci 121ba991379Sopenharmony_ci def getBoardType(self, endtag, timeout): 122ba991379Sopenharmony_ci rcvStr = self.sendCmd(b'\r\n', endtag, timeout) 123ba991379Sopenharmony_ci if 'HMOS' in rcvStr or 'OHOS' in rcvStr : 124ba991379Sopenharmony_ci ostype = 'OHOS' 125ba991379Sopenharmony_ci elif 'hisilicon' in rcvStr: 126ba991379Sopenharmony_ci ostype = 'uboot' 127ba991379Sopenharmony_ci elif ' #' in rcvStr: 128ba991379Sopenharmony_ci ostype = 'linux' 129ba991379Sopenharmony_ci else: 130ba991379Sopenharmony_ci ostype = 'bootrom' 131ba991379Sopenharmony_ci logger.info("board type is: %s" % ostype) 132ba991379Sopenharmony_ci return ostype 133ba991379Sopenharmony_ci 134ba991379Sopenharmony_ci def close(self): 135ba991379Sopenharmony_ci self.device.close() 136ba991379Sopenharmony_ci 137ba991379Sopenharmony_ci 138