1#-*- coding:utf-8 -*- 2import uuid 3import sys 4import subprocess 5import os 6import serial 7 8from core.base import BaseApp, dec_stepmsg 9from util.file_locker import FileLock 10from util.log_info import logger 11from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout 12from aw.Telnet.TelnetClient import TelConnect 13from aw.Common.Constant import CONSTANT 14from aw.Download.Download import * 15from aw.Common.Common import getHostIp, copyFile, copyDirectory 16from aw.ExtractFile.ExtractFile import * 17from aw.poweronoff.serial_power_on_off import usbPowerOnOff 18from threading import Thread 19 20lock_suffix = CONSTANT.File.LOCK_SUFFIX #通过文件锁实现并发下载 21suc_file = CONSTANT.File.SUC_FILE #通过本文件是区分版本是否成功下载 22failed_file = CONSTANT.File.FAILED_FILE #通过本文件是标记文件下载失败 23READ_MAXTIMEOUT = 20 24READ_TIMEOUT = 30 25READ_MINITIMEOUT = 5 26uboot_finish = 'hisilicon #' 27cmd_finish = ' #' 28error_str_list = ['Unknown', '发送起始帧失败', '发送头帧失败'] 29 30class liteOsUpgrade_L1_shequ(BaseApp): 31 ''' 32 @author: w00278233 33 ''' 34 35 def __init__(self, param_file): 36 super().__init__(param_file) 37 self.param_List = ["deploy_com", 38 "usb_port", 39 "upgrade_upgradeLocation"] 40 41 @dec_stepmsg("hongmeng L1 flash") 42 def excute(self): 43 ''' 44 #=================================================================================== 45 # @Method: excute(self) 46 # @Precondition: none 47 # @Func: 升级执行入口 48 # @PostStatus: none 49 # @eg: excute() 50 # @return: True or Flase 51 #=================================================================================== 52 ''' 53 step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L1_shequ_app") 54 55 # 执行下载 56 try: 57 if not self.download(): 58 CONSTANT.ENVERRMESSAGE = "image download fail" 59 logger.printLog(CONSTANT.ENVERRMESSAGE) 60 return False 61 except Exception as e: 62 raise e 63 64 65 # 执行升级 66 try: 67 if not self.upgrade(): 68 CONSTANT.ENVERRMESSAGE = "board upgrade fail" 69 logger.printLog(CONSTANT.ENVERRMESSAGE) 70 return False 71 return True 72 except Exception as e: 73 raise e 74 75 @dec_stepmsg("download") 76 @timeout(1800) 77 def download(self): 78 ''' 79 #=================================================================================== 80 # @Method: download(self) 81 # @Precondition: none 82 # @Func: 构建下载到本地的路径,执行相应包的下载 83 # @PostStatus: none 84 # @eg: download() 85 # @return: True or Flase 86 #=================================================================================== 87 ''' 88 global version_savepath, version_name 89 dir_path = CONSTANT.Path.getDirPath() 90 if self.params_dict.get("pbiid"): 91 version_path = self.params_dict.get("pbiid") 92 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT")) 93 version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name) 94 else: 95 version_path = self.params_dict.get("upgrade_upgradeLocation") 96 version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation")))) 97 version_savepath = os.path.join(dir_path, version_name) 98 99 if self.params_dict.get("isDownload") == "True": 100 logger.printLog("不需要做下载,直接返回") 101 return True 102 103 #执行img下载 104 import hashlib 105 save_file_str = version_path.replace("/", "").replace("\\", "") 106 save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest() 107 logger.info("download hash value:%s" % (save_file_name)) 108 save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt")) 109 if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False): 110 logger.error("download img fail") 111 return False 112 113 #保存本地版本路径给devicetest去版本路径下取用例 114 saveVersion(save_path_file, os.path.join(version_savepath, "img")) 115 116 return True 117 118 def excutedown(self, source_path, download_dir, suc_mark, is_file): 119 ''' 120 #=================================================================================== 121 # @Method: excutedown(source_path, download_dir, is_file) 122 # @Precondition: none 123 # @Func: 执行下载动作 124 # @PostStatus: none 125 # @Param: source_path:资源文件路径 126 # download_dir:文件下载到本地的文件夹路径 127 # is_file:是否是文件 128 # 129 # @eg: excutedown("xxxx", "D:\\local\\image", Flase, os_method) 130 # @return: True or Flase 131 #=================================================================================== 132 ''' 133 failed_mark = os.path.join(download_dir, failed_file) 134 lock_path = os.path.join(download_dir, lock_suffix) 135 file_lock = FileLock() 136 137 if isDownLoadSuccess(download_dir, suc_mark, failed_mark): 138 return True 139 try: 140 nowtime = get_now_time_str_info() 141 logger.printLog("%s Downloading, please wait" % nowtime) 142 file_lock.lockFile(lock_path) 143 ret = "" 144 logger.info("Get lock. Start to ") 145 if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True": 146 ret = downloadByBitComet(source_path, download_dir, os_method) 147 elif source_path.startswith('\\\\'): 148 ret = downloadByCopy(source_path, download_dir, is_file) 149 elif self.params_dict.get("pbiid"): 150 ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid")) 151 elif source_path.startswith("http"): 152 ret = downloadFileFromDevCloud(source_path, "", "", download_dir) 153 154 if source_path.endswith(".zip"): 155 zip_name = os.path.basename(source_path) 156 ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir) 157 if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)): 158 if source_path.startswith("http") and ("file_id=" in source_path): 159 if source_path.endswith(".tar.gz"): 160 zip_name = source_path.split('=')[-1] 161 else: 162 zip_name = "out.tar.gz" 163 else: 164 zip_name = os.path.basename(source_path) 165 ret = unTarFile(os.path.join(download_dir, zip_name), download_dir) 166 nowtime = get_now_time_str_info() 167 logger.printLog("%s download to %s end" % (nowtime, download_dir)) 168 169 if not ret: 170 with open(failed_mark, "a+") as fp: 171 fp.write("") 172 return ret 173 except Exception as e: 174 logger.printLog(e) 175 raise Exception(e) 176 finally: 177 file_lock.releaseFile() 178 179 180 @dec_stepmsg("upgrade") 181 #@timeout(900) 182 def upgrade(self): 183 ''' 184 #=================================================================================== 185 # @Method: upgrade(self) 186 # @Precondition: none 187 # @Func: 升级相关业务逻辑 188 # @PostStatus: none 189 # @eg: upgrade() 190 # @return: True or Flase 191 #=================================================================================== 192 ''' 193 logger.printLog('开始升级') 194 deploy_com = self.params_dict.get("deploy_com") 195 usb_port = self.params_dict.get("usb_port") 196 baudrate = self.params_dict.get("baudrate") 197 #芯片类型,根据芯片类型获取对应的刷机命令 198 flash_type = self.params_dict.get("flash_type") 199 burn_usbport = self.params_dict.get("hiburn_usbport") 200 device_ip = self.params_dict.get("Device_IP") 201 device_netmask = self.params_dict.get("Device_Netmask") 202 device_gatewayip = self.params_dict.get("Device_GatewayIP") 203 chip_version = self.params_dict.get('chip_version') 204 chip_version = chip_version.lower() if chip_version else chip_version 205 206 if not deploy_com: 207 logger.error("deploy_com is NULL !!") 208 return False 209 if not burn_usbport: 210 logger.error("hiburn_usbport is NULL !!") 211 return False 212 if not baudrate: 213 baudrate = 115200 214 scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__))))) 215 #升级需要的工具归档 216 toolworkspace = CONSTANT.OSType.getworkspace() 217 hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport) 218 logger.info("hiburn tool path is: %s" % hiburntoolpath) 219 if not os.path.exists(hiburntoolpath): 220 if not burn_usbport: 221 logger.error("hiburn_usbport is NULL !!") 222 return False 223 os.makedirs(hiburntoolpath) 224 toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip") 225 logger.info("copy %s to %s" % (toolpath, hiburntoolpath)) 226 copyFile(toolpath, hiburntoolpath) 227 zip_name = os.path.basename(toolpath) 228 ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath) 229 if ret: 230 logger.info("unzip to %s succ" % (hiburntoolpath)) 231 #修改burn.config中的usb口 232 configpath = os.path.join(hiburntoolpath, "config", "burn.config") 233 all_data = "" 234 with open(configpath, "r", encoding="utf-8") as cf: 235 for line in cf: 236 if "usbDeviceNumber=" in line: 237 old_str = line 238 line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport) 239 logger.info("replace line: %s " % line) 240 all_data += line 241 with open(configpath, "w", encoding="utf-8") as wf: 242 wf.write(all_data) 243 else: 244 logger.error("%s is not exit" % hiburntoolpath) 245 return False 246 #将升级需要的文件拷贝到镜像里面 247 local_image_path = os.path.join(version_savepath, "img") 248 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn.xml") 249 xml_path = os.path.join(local_image_path, "usb-burn.xml") 250 if chip_version == 'hi3518': 251 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-jffs2.xml") 252 elif chip_version == 'hi3516': 253 old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-vfat.xml") 254 copyFile(old_xml_path, xml_path) 255 if flash_type.lower() == "ev300": 256 chip_type = "Hi3518EV300" 257 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3518ev300.bin") 258 elif flash_type.lower() == "dv300": 259 chip_type = "Hi3516DV300" 260 ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3516dv300.bin") 261 else: 262 logger.error("flash_type is : %s " % flash_type) 263 return False 264 copyFile(ubootpath, local_image_path) 265 scriptfile = os.path.join(scriptpath, "resource", "L1", f'{flash_type.lower()}', "update.txt") 266 logger.info(f'scriptfile:{scriptfile}') 267 268 current_path = os.getcwd() 269 logger.info("before excute hiburn,current path is: %s" % current_path) 270 os.chdir(hiburntoolpath) 271 logger.info("excute hiburn path is: %s" % os.getcwd()) 272 flash_uboot_xml = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "flash_fastboot.xml") 273 cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml) 274 self.eraseDevice(cmd, usb_port) 275 276 retry = 0 277 while retry < 3: 278 #usb刷机 279 cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path) 280 logger.info("cmd is: %s" % cmd) 281 ret, outpri = subprocess.getstatusoutput(cmd) 282 logger.info("usb upgrade result: %s " % ret) 283 logger.info("print console: %s " % outpri) 284 if ret != 0: 285 if ret == 4 and retry < 2: 286 time.sleep(10) 287 retry = retry + 1 288 logger.info('flash fail,so flash once again') 289 continue 290 logger.info(ret) 291 logger.error("hiburn usb upgrade failed!!") 292 return False 293 retry = retry + 3 294 os.chdir(current_path) 295 logger.info("hiburn upgrade end, check board status") 296 time.sleep(5) 297 try: 298 reset_count = 0 299 logger.info("打开serial") 300 ser = serial.Serial(deploy_com, int(baudrate), timeout=0) 301 while reset_count < 2: 302 if ser.is_open == False: 303 ser.open() 304 logger.info("get device status") 305 board_type = getBoardType(ser) 306 if board_type == "uboot": 307 with open(scriptfile, "r") as fp: 308 lines = fp.readlines() 309 for line in lines: 310 if not line: 311 logger.info("cmd is: %s " % line) 312 continue 313 if "reset" in line: 314 ret = sendCmd(ser, line, READ_MAXTIMEOUT) 315 continue 316 ret = sendCmd(ser, line, READ_MINITIMEOUT) 317 board_type = getBoardType(ser) 318 if board_type != "OHOS": 319 if reset_count < 2: 320 logger.info('after reset;the device status is error,reset device again,reset_count:%d' % reset_count) 321 reset_count += 1 322 time.sleep(20) 323 continue 324 logger.error("upgrade fail") 325 return False 326 if flash_type.lower() == "dv300": 327 logger.info("upgrade success") 328 init_cmd = "toybox ifconfig eth0 %s netmask %s gateway %s \r" % (device_ip, device_netmask, device_gatewayip) 329 sendCmd(ser, init_cmd, READ_MINITIMEOUT) 330 sendCmd(ser, 'toybox ifconfig\r', READ_MINITIMEOUT) 331 return True 332 elif flash_type.lower() == "ev300": 333 logger.info("setup wifi") 334 cmd = 'toybox ls\r' 335 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 336 if not "sdcard" in ret.lower(): 337 cmd = 'toybox mkdir /sdcard\r' 338 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 339 if "error:" in ret.lower(): 340 logger.error("toybox mkdir /sdcard fail") 341 return False 342 cmd = 'toybox mount /dev/mmcblk0p0 /sdcard vfat\r' 343 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 344 cmd = 'toybox ls /sdcard\r' 345 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 346 cmd = 'cd /sdcard/wpa\r' 347 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 348 cmd = './wpa_supplicant -i wlan0 -c wpa_supplicant.conf & \r' 349 ret = sendCmd(ser, cmd, READ_MAXTIMEOUT) 350 if "error:" in ret.lower(): 351 logger.error("setup wifi fail") 352 return False 353 cmd = 'toybox ifconfig\r' 354 ret = sendCmd(ser, cmd, READ_MINITIMEOUT) 355 if "error:" in ret.lower(): 356 logger.error("toybox ifconfig fail") 357 return False 358 logger.info("upgrade success") 359 return True 360 else: 361 logger.info('before reset;the device status is error,reset device again,reset_count:%d'%reset_count) 362 reset_count += 1 363 time.sleep(20) 364 continue 365 366 except Exception as e: 367 logger.info(e) 368 return False 369 finally: 370 ser.close() 371 logger.info("close serial") 372 373 def eraseDevice(self, cmd, usb_port): 374 ''' 375 ret 暂时没有作用,先使用out 376 ''' 377 erase_retry = 0 378 while erase_retry < 3: 379 # 通电 380 PowerOnByThread(usb_port) 381 logger.info("cmd is: %s" % cmd) 382 ret, outpri = subprocess.getstatusoutput(cmd) 383 logger.info("flash fastboot result: %s " % ret) 384 logger.info("print console: %s " % outpri) 385 is_earse_again = any([True if item in outpri else False for item in error_str_list]) 386 if ret == 0 and erase_retry < 2 and not is_earse_again : 387 logger.info('檫除成功'.center(20,'*')) 388 break 389 elif is_earse_again: 390 logger.info('檫除存在问题 重新上下电 重新檫除') 391 erase_retry += 1 392 time.sleep(5) 393 continue 394 else: 395 logger.info('other error') 396 return False 397 else: 398 return False 399 400def PowerOnByThread(usb_port,wait_time=10): 401 thread = Thread(target=boardPowerOn, args=[usb_port, wait_time]) 402 thread.start() 403 logger.info("thread board power on start") 404 405 406def boardPowerOn(usb_port, waittime): 407 logger.info("board power on start") 408 time.sleep(waittime) 409 410 #对端口下电 411 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"): 412 logger.error("board power off failed") 413 return False 414 415 #对端口上电 416 if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"): 417 logger.error("board power on failed") 418 return False 419 logger.info("board power on end") 420 421 422def getBoardType(ser): 423 ret = sendCmd(ser, '\r', READ_TIMEOUT) 424 if 'HMOS' in ret or 'OHOS' in ret or '$' in ret: 425 ostype = 'OHOS' 426 elif 'hisilicon' in ret: 427 ostype = 'uboot' 428 elif ' #' in ret: 429 ostype = 'linux' 430 else: 431 ostype = 'bootrom' 432 logger.info("board type is: %s" % ostype) 433 return ostype 434 435def sendCmd(ser, cmd, timeout): 436 logger.info("cmd is: %s " % cmd) 437 ser.write((cmd + '\n').encode()) 438 time.sleep(0.5) 439 ret = '' 440 i = 0 441 while True: 442 out = ser.read(ser.inWaiting()) 443 if not out: 444 break 445 if i > 2: 446 break 447 ret = ret + out.decode(encoding="utf-8", errors="ignore") 448 time.sleep(timeout) 449 i = i + 1 450 logger.info("result is: %s " % ret) 451 return ret 452 453 454 455if __name__ == "__main__": 456 param_file = sys.argv[1] 457 if not param_file: 458 logger.printLog("Missing params file") 459 sys.exit(-1) 460 try: 461 uphandle = liteOsUpgrade_L1(param_file) 462 uphandle._excuteApp() 463 except Exception as e: 464 logger.printLog(e) 465 sys.exit(-1) 466