1#-*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import serial 7 8from core.base import BaseApp, dec_stepmsg 9from util.file_locker import FileLock 10from util.log_info import logger 11from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 12from aw.Telnet.TelnetClient import TelConnect 13from aw.Common.Constant import CONSTANT 14from aw.Download.Download import * 15from aw.Common.Common import getHostIp, copyFile, copyDirectory 16from aw.ExtractFile.ExtractFile import * 17from aw.poweronoff.serial_power_on_off import usbPowerOnOff 18from threading import Thread 19 20lock_suffix = CONSTANT.File.LOCK_SUFFIX #通过文件锁实现并发下载 21suc_file = CONSTANT.File.SUC_FILE #通过本文件是区分版本是否成功下载 22failed_file = CONSTANT.File.FAILED_FILE #通过本文件是标记文件下载失败 23READ_MAXTIMEOUT = 5 24READ_TIMEOUT = 5 25READ_MINITIMEOUT = 2 26uboot_finish = 'hisilicon #' 27cmd_finish = ' #' 28error_str_list = ['Unknown', '发送起始帧失败', '发送头帧失败'] 29ip_cmd = 'ping 192.168.18.1' 30 31class liteOsUpgrade_L1_shequ(BaseApp): 32 ''' 33 @author: w00278233 34 ''' 35 36 def __init__(self, param_file): 37 super().__init__(param_file) 38 self.param_List = ["deploy_com", 39 "usb_port", 40 "upgrade_upgradeLocation"] 41 42 @dec_stepmsg("hongmeng L1 flash") 43 def excute(self): 44 ''' 45 #=================================================================================== 46 # @Method: excute(self) 47 # @Precondition: none 48 # @Func: 升级执行入口 49 # @PostStatus: none 50 # @eg: excute() 51 # @return: True or Flase 52 #=================================================================================== 53 ''' 54 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L1_shequ_app") 55 56 # 执行下载 57 try: 58 if not self.download(): 59 CONSTANT.ENVERRMESSAGE = "image download fail" 60 logger.printLog(CONSTANT.ENVERRMESSAGE) 61 return False 62 except Exception as e: 63 raise e 64 65 66 # 执行升级 67 try: 68 if not self.upgrade(): 69 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 70 logger.printLog(CONSTANT.ENVERRMESSAGE) 71 return False 72 return True 73 except Exception as e: 74 raise e 75 76 @dec_stepmsg("download") 77 @timeout(1800) 78 def download(self): 79 ''' 80 #=================================================================================== 81 # @Method: download(self) 82 # @Precondition: none 83 # @Func: 构建下载到本地的路径,执行相应包的下载 84 # @PostStatus: none 85 # @eg: download() 86 # @return: True or Flase 87 #=================================================================================== 88 ''' 89 global version_savepath, version_name 90 dir_path = CONSTANT.Path.getDirPath() 91 if self.params_dict.get("pbiid"): 92 version_path = self.params_dict.get("pbiid") 93 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 94 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 95 else: 96 version_path = self.params_dict.get("upgrade_upgradeLocation") 97 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 98 version_savepath = os.path.join(dir_path, version_name) 99 100 if self.params_dict.get("isDownload") == "True": 101 logger.printLog("不需要做下载,直接返回") 102 return True 103 104 #执行img下载 105 import hashlib 106 save_file_str = version_path.replace("/", "").replace("\\", "") 107 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 108 logger.info("download hash value:%s" % (save_file_name)) 109 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 110 if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False): 111 logger.error("download img fail") 112 return False 113 114 #保存本地版本路径给devicetest去版本路径下取用例 115 saveVersion(save_path_file, os.path.join(version_savepath, "img")) 116 117 return True 118 119 def excutedown(self, source_path, download_dir, suc_mark, is_file): 120 ''' 121 #=================================================================================== 122 # @Method: excutedown(source_path, download_dir, is_file) 123 # @Precondition: none 124 # @Func: 执行下载动作 125 # @PostStatus: none 126 # @Param: source_path:资源文件路径 127 # download_dir:文件下载到本地的文件夹路径 128 # is_file:是否是文件 129 # 130 # @eg: excutedown("xxxx", "D:\\local\\image", Flase, os_method) 131 # @return: True or Flase 132 #=================================================================================== 133 ''' 134 failed_mark = os.path.join(download_dir, failed_file) 135 lock_path = os.path.join(download_dir, lock_suffix) 136 file_lock = FileLock() 137 138 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 139 return True 140 try: 141 nowtime = get_now_time_str_info() 142 logger.printLog("%s Downloading, please wait" % nowtime) 143 file_lock.lockFile(lock_path) 144 ret = "" 145 logger.info("Get lock. Start to ") 146 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 147 ret = downloadByBitComet(source_path, download_dir, os_method) 148 elif source_path.startswith('\\\\'): 149 ret = downloadByCopy(source_path, download_dir, is_file) 150 elif self.params_dict.get("pbiid"): 151 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid")) 152 elif source_path.startswith("http"): 153 ret = downloadFileFromDevCloud(source_path, "", "", download_dir) 154 155 if source_path.endswith(".zip"): 156 zip_name = os.path.basename(source_path) 157 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 158 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 159 if source_path.startswith("http") and ("file_id=" in source_path): 160 if source_path.endswith(".tar.gz"): 161 zip_name = source_path.split('=')[-1] 162 else: 163 zip_name = "out.tar.gz" 164 else: 165 zip_name = os.path.basename(source_path) 166 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 167 nowtime = get_now_time_str_info() 168 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 169 170 if not ret: 171 with open(failed_mark, "a+") as fp: 172 fp.write("") 173 return ret 174 except Exception as e: 175 logger.printLog(e) 176 raise Exception(e) 177 finally: 178 file_lock.releaseFile() 179 180 181 @dec_stepmsg("upgrade") 182 #@timeout(900) 183 def upgrade(self): 184 ''' 185 #=================================================================================== 186 # @Method: upgrade(self) 187 # @Precondition: none 188 # @Func: 升级相关业务逻辑 189 # @PostStatus: none 190 # @eg: upgrade() 191 # @return: True or Flase 192 #=================================================================================== 193 ''' 194 logger.printLog('开始升级') 195 deploy_com = self.params_dict.get("deploy_com") 196 usb_port = self.params_dict.get("usb_port") 197 baudrate = self.params_dict.get("baudrate") 198 #芯片类型,根据芯片类型获取对应的刷机命令 199 flash_type = self.params_dict.get("flash_type") 200 burn_usbport = self.params_dict.get("hiburn_usbport") 201 device_ip = self.params_dict.get("Device_IP") 202 device_netmask = self.params_dict.get("Device_Netmask") 203 device_gatewayip = self.params_dict.get("Device_GatewayIP") 204 chip_version = self.params_dict.get('chip_version') 205 chip_version = chip_version.lower() if chip_version else chip_version 206 207 if not deploy_com: 208 logger.error("deploy_com is NULL !!") 209 return False 210 if not burn_usbport: 211 logger.error("hiburn_usbport is NULL !!") 212 return False 213 if not baudrate: 214 baudrate = 115200 215 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 216 #升级需要的工具归档 217 toolworkspace = CONSTANT.OSType.getworkspace() 218 hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport) 219 logger.info("hiburn tool path is: %s" % hiburntoolpath) 220 if not os.path.exists(hiburntoolpath): 221 if not burn_usbport: 222 logger.error("hiburn_usbport is NULL !!") 223 return False 224 os.makedirs(hiburntoolpath) 225 toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip") 226 logger.info("copy %s to %s" % (toolpath, hiburntoolpath)) 227 copyFile(toolpath, hiburntoolpath) 228 zip_name = os.path.basename(toolpath) 229 ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath) 230 if ret: 231 logger.info("unzip to %s succ" % (hiburntoolpath)) 232 #修改burn.config中的usb口 233 configpath = os.path.join(hiburntoolpath, "config", "burn.config") 234 all_data = "" 235 with open(configpath, "r", encoding="utf-8") as cf: 236 for line in cf: 237 if "usbDeviceNumber=" in line: 238 old_str = line 239 line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport) 240 logger.info("replace line: %s " % line) 241 all_data += line 242 with open(configpath, "w", encoding="utf-8") as wf: 243 wf.write(all_data) 244 else: 245 logger.error("%s is not exit" % hiburntoolpath) 246 return False 247 #将升级需要的文件拷贝到镜像里面 248 local_image_path = os.path.join(version_savepath, "img") 249 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn.xml") 250 xml_path = os.path.join(local_image_path, "usb-burn.xml") 251 if chip_version == 'hi3518': 252 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-jffs2.xml") 253 elif chip_version == 'hi3516': 254 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-vfat.xml") 255 copyFile(old_xml_path, xml_path) 256 if flash_type.lower() == "ev300": 257 chip_type = "Hi3518EV300" 258 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3518ev300.bin") 259 elif flash_type.lower() == "dv300": 260 chip_type = "Hi3516DV300" 261 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3516dv300.bin") 262 else: 263 logger.error("flash_type is : %s " % flash_type) 264 return False 265 copyFile(ubootpath, local_image_path) 266 scriptfile = os.path.join(scriptpath, "resource", "L1", f'{flash_type.lower()}', "update.txt") 267 logger.info(f'scriptfile:{scriptfile}') 268 269 current_path = os.getcwd() 270 logger.info("before excute hiburn,current path is: %s" % current_path) 271 os.chdir(hiburntoolpath) 272 logger.info("excute hiburn path is: %s" % os.getcwd()) 273 flash_uboot_xml = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "flash_fastboot.xml") 274 cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml) 275 self.eraseDevice(cmd, usb_port) 276 277 retry = 0 278 while retry < 3: 279 #usb刷机 280 cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path) 281 logger.info("cmd is: %s" % cmd) 282 ret, outpri = subprocess.getstatusoutput(cmd) 283 logger.info("usb upgrade result: %s " % ret) 284 logger.info("print console: %s " % outpri) 285 if ret != 0: 286 if ret == 4 and retry < 2: 287 time.sleep(10) 288 retry = retry + 1 289 logger.info('flash fail,so flash once again') 290 continue 291 logger.info(ret) 292 logger.error("hiburn usb upgrade failed!!") 293 return False 294 retry = retry + 3 295 os.chdir(current_path) 296 logger.info("hiburn upgrade end, check board status") 297 time.sleep(5) 298 try: 299 logger.info("打开serial") 300 ser = serial.Serial(deploy_com, int(baudrate), timeout=0) 301 if self.bootUpUboot(ser,scriptfile) and self.configureIp(ser, flash_type, device_ip, device_netmask, device_gatewayip) and self.checkStatus(ser): 302 return True 303 return False 304 except Exception as e: 305 logger.info(e) 306 return False 307 finally: 308 ser.close() 309 logger.info("close serial") 310 311 def bootUpUboot(self,ser,scriptfile): 312 ''' 313 启动uboot 314 ''' 315 reset_count = 0 316 while reset_count < 2: 317 if ser.is_open == False: 318 ser.open() 319 logger.info("get device status") 320 board_type = getBoardType(ser) 321 if board_type == "uboot": 322 with open(scriptfile, "r") as fp: 323 lines = fp.readlines() 324 for line in lines: 325 if not line: 326 logger.info("cmd is: %s " % line) 327 continue 328 if "reset" in line: 329 ret = sendCmd(ser, line, READ_MAXTIMEOUT) 330 continue 331 ret = sendCmd(ser, line, READ_MINITIMEOUT) 332 board_type = getBoardType(ser) 333 if board_type != "OHOS": 334 if reset_count < 2: 335 logger.info('after reset;the device status is error,reset device again,reset_count:%d' % reset_count) 336 reset_count += 1 337 time.sleep(20) 338 continue 339 logger.error("upgrade fail") 340 return False 341 return True 342 else: 343 logger.info('before reset;the device status is error,reset device again,reset_count:%d'%reset_count) 344 reset_count += 1 345 time.sleep(20) 346 continue 347 else: 348 return False 349 350 def configureIp(self,ser,flash_type, device_ip, device_netmask, device_gatewayip): 351 ''' 352 配置ip,确认配置情况 353 ''' 354 rerty_count = 0 355 # test_count = 0 356 while rerty_count <= 2: 357 if flash_type.lower() == "dv300": 358 if not self.afterRebootDvConfigure(ser, device_ip, device_netmask, device_gatewayip): 359 return False 360 elif flash_type.lower() == "ev300": 361 if not self.afterRebootEvConfigure(ser): 362 return False 363 time.sleep(5) 364 365 ret = sendCmd(ser, ip_cmd, READ_MINITIMEOUT) 366 logger.info(ret) 367 # if test_count ==0 : 368 # logger.printLog('ip 配置失败') 369 # test_count = 1 370 # continue 371 if 'Reply from 192.168.18.1' in ret: 372 logger.info('ip 配置成功') 373 return True 374 elif 'Ping: sending ICMP echo request failed' in ret: 375 logger.printLog('ip 配置失败') 376 logger.info('重新配置ip') 377 rerty_count += 1 378 else: 379 return False 380 381 def afterRebootDvConfigure(self,ser, device_ip, device_netmask, device_gatewayip): 382 logger.info("dv300 configure ip") 383 init_cmd = "ifconfig eth0 %s netmask %s gateway %s \r" % (device_ip, device_netmask, device_gatewayip) 384 sendCmd(ser, init_cmd, READ_MINITIMEOUT) 385 sendCmd(ser, 'ifconfig\r', READ_MINITIMEOUT) 386 return True 387 388 def afterRebootEvConfigure(self,ser): 389 logger.info("ev300 configuring ,setup wifi") 390 cmd = 'ls\r' 391 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 392 if not "sdcard" in ret.lower(): 393 cmd = 'mkdir /sdcard\r' 394 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 395 if "error:" in ret.lower(): 396 logger.error("mkdir /sdcard fail") 397 return False 398 cmd = 'mount /dev/mmcblk0p0 /sdcard vfat\r' 399 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 400 cmd = 'ls /sdcard\r' 401 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 402 cmd = 'cd /sdcard/wpa\r' 403 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 404 cmd = 'exec wpa_supplicant -i wlan0 -c wpa_supplicant.conf \r' 405 ret = sendCmd(ser, cmd, READ_MAXTIMEOUT) 406 if "error:" in ret.lower(): 407 logger.error("setup wifi fail") 408 return False 409 cmd = 'ifconfig\r' 410 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 411 if "error:" in ret.lower(): 412 logger.error("ifconfig fail") 413 return False 414 return True 415 416 def eraseDevice(self, cmd, usb_port): 417 ''' 418 ret 暂时没有作用,先使用out 419 ''' 420 erase_retry = 0 421 while erase_retry < 3: 422 # 通电 423 PowerOnByThread(usb_port) 424 logger.info("cmd is: %s" % cmd) 425 ret, outpri = subprocess.getstatusoutput(cmd) 426 logger.info("flash fastboot result: %s " % ret) 427 logger.info("print console: %s " % outpri) 428 is_earse_again = any([True if item in outpri else False for item in error_str_list]) 429 if ret == 0 and erase_retry < 2 and not is_earse_again : 430 logger.info('檫除成功'.center(20,'*')) 431 break 432 elif is_earse_again: 433 logger.info('檫除存在问题 重新上下电 重新檫除') 434 erase_retry += 1 435 time.sleep(5) 436 continue 437 else: 438 logger.info('other error') 439 return False 440 else: 441 return False 442 443 def checkStatus(self, ser): 444 ret1 = sendCmd(ser, 'task', READ_TIMEOUT) 445 time.sleep(3) 446 ret2 = sendCmd(ser, 'task', READ_TIMEOUT) 447 if 'com.huawei.launcher' in ret1 and 'com.huawei.launcher' in ret2: 448 return True 449 logger.printLog('process com.huawei.launcher is not exist') 450 return False 451 452def PowerOnByThread(usb_port,wait_time=10): 453 thread = Thread(target=boardPowerOn, args=[usb_port, wait_time]) 454 thread.start() 455 logger.info("thread board power on start") 456 457 458def boardPowerOn(usb_port, waittime): 459 logger.info("board power on start") 460 time.sleep(waittime) 461 462 #对端口下电 463 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"): 464 logger.error("board power off failed") 465 return False 466 467 #对端口上电 468 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"): 469 logger.error("board power on failed") 470 return False 471 logger.info("board power on end") 472 473 474def getBoardType(ser): 475 ret = sendCmd(ser, '\r', READ_TIMEOUT) 476 if 'HMOS' in ret or 'OHOS' in ret: 477 ostype = 'OHOS' 478 elif 'hisilicon' in ret: 479 ostype = 'uboot' 480 elif ' #' in ret: 481 ostype = 'linux' 482 else: 483 ostype = 'bootrom' 484 logger.info("board type is: %s" % ostype) 485 return ostype 486 487def sendCmd(ser, cmd, timeout): 488 logger.info("cmd is: %s " % cmd) 489 ser.write((cmd + '\n').encode()) 490 time.sleep(0.5) 491 ret = '' 492 i = 0 493 while True: 494 out = ser.read(ser.inWaiting()) 495 if not out: 496 break 497 if i > 12: 498 break 499 ret = ret + out.decode(encoding="utf-8", errors="ignore") 500 time.sleep(timeout) 501 i = i + 1 502 logger.info("result is: %s " % ret) 503 return ret 504 505 506 507if __name__ == "__main__": 508 param_file = sys.argv[1] 509 if not param_file: 510 logger.printLog("Missing params file") 511 sys.exit(-1) 512 try: 513 uphandle = liteOsUpgrade_L1(param_file) 514 uphandle._excuteApp() 515 except Exception as e: 516 logger.printLog(e) 517 sys.exit(-1) 518