1#-*- coding:utf-8 -*-
2import uuid
3import sys
4import subprocess
5import os
6import serial
7
8from core.base import BaseApp, dec_stepmsg
9from util.file_locker import FileLock
10from util.log_info import logger
11from util.time_info import get_now_time_str_info, get_now_time_info, Timeout, timeout
12from aw.Telnet.TelnetClient import TelConnect
13from aw.Common.Constant import CONSTANT
14from aw.Download.Download import *
15from aw.Common.Common import getHostIp, copyFile, copyDirectory
16from aw.ExtractFile.ExtractFile import *
17from aw.poweronoff.serial_power_on_off import usbPowerOnOff
18from threading import Thread
19
20lock_suffix = CONSTANT.File.LOCK_SUFFIX  #通过文件锁实现并发下载
21suc_file = CONSTANT.File.SUC_FILE        #通过本文件是区分版本是否成功下载
22failed_file = CONSTANT.File.FAILED_FILE  #通过本文件是标记文件下载失败
23READ_MAXTIMEOUT = 20
24READ_TIMEOUT = 30
25READ_MINITIMEOUT = 5
26uboot_finish = 'hisilicon #'
27cmd_finish = ' #'
28error_str_list = ['Unknown', '发送起始帧失败', '发送头帧失败']
29
30class liteOsUpgrade_L1_shequ(BaseApp):
31    '''
32    @author: w00278233
33    '''
34
35    def __init__(self, param_file):
36        super().__init__(param_file)
37        self.param_List = ["deploy_com",
38                        "usb_port",
39                           "upgrade_upgradeLocation"]
40
41    @dec_stepmsg("hongmeng L1 flash")
42    def excute(self):
43        '''
44        #===================================================================================
45        #   @Method:        excute(self)
46        #   @Precondition:  none
47        #   @Func:          升级执行入口
48        #   @PostStatus:    none
49        #   @eg:            excute()
50        #   @return:        True or Flase
51        #===================================================================================
52        '''
53        step_index = self.params_dict.get("step_list").index("liteOsUpgrade_L1_shequ_app")
54
55        # 执行下载
56        try:
57            if not self.download():
58                CONSTANT.ENVERRMESSAGE = "image download fail"
59                logger.printLog(CONSTANT.ENVERRMESSAGE)
60                return False
61        except Exception as e:
62            raise e
63
64
65        # 执行升级
66        try:
67            if not self.upgrade():
68                CONSTANT.ENVERRMESSAGE = "board upgrade fail"
69                logger.printLog(CONSTANT.ENVERRMESSAGE)
70                return False
71            return True
72        except Exception as e:
73            raise e
74
75    @dec_stepmsg("download")
76    @timeout(1800)
77    def download(self):
78        '''
79        #===================================================================================
80        #   @Method:        download(self)
81        #   @Precondition:  none
82        #   @Func:          构建下载到本地的路径,执行相应包的下载
83        #   @PostStatus:    none
84        #   @eg:            download()
85        #   @return:        True or Flase
86        #===================================================================================
87        '''
88        global version_savepath, version_name
89        dir_path = CONSTANT.Path.getDirPath()
90        if self.params_dict.get("pbiid"):
91            version_path = self.params_dict.get("pbiid")
92            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, str(self.params_dict.get("pbiid")) + "FASTBOOT"))
93            version_savepath = os.path.join(dir_path, self.params_dict.get("flash_type"), version_name)
94        else:
95            version_path = self.params_dict.get("upgrade_upgradeLocation")
96            version_name = str(uuid.uuid5(uuid.NAMESPACE_URL, (self.params_dict.get("upgrade_upgradeLocation"))))
97            version_savepath = os.path.join(dir_path, version_name)
98
99        if self.params_dict.get("isDownload") == "True":
100            logger.printLog("不需要做下载,直接返回")
101            return True
102
103        #执行img下载
104        import hashlib
105        save_file_str = version_path.replace("/", "").replace("\\", "")
106        save_file_name = hashlib.sha1(save_file_str.encode("utf-8")).hexdigest()
107        logger.info("download hash value:%s" % (save_file_name))
108        save_path_file = os.path.join(dir_path, "record", "%s%s" % (save_file_name, ".txt"))
109        if not self.excutedown(version_path, os.path.join(version_savepath, "img"), save_path_file, False):
110            logger.error("download img fail")
111            return False
112
113        #保存本地版本路径给devicetest去版本路径下取用例
114        saveVersion(save_path_file, os.path.join(version_savepath, "img"))
115
116        return True
117
118    def excutedown(self, source_path, download_dir, suc_mark, is_file):
119        '''
120        #===================================================================================
121        #   @Method:        excutedown(source_path, download_dir, is_file)
122        #   @Precondition:  none
123        #   @Func:          执行下载动作
124        #   @PostStatus:    none
125        #   @Param:         source_path:资源文件路径
126        #                   download_dir:文件下载到本地的文件夹路径
127        #                   is_file:是否是文件
128        #
129        #   @eg:            excutedown("xxxx", "D:\\local\\image", Flase, os_method)
130        #   @return:        True or Flase
131        #===================================================================================
132        '''
133        failed_mark = os.path.join(download_dir, failed_file)
134        lock_path = os.path.join(download_dir, lock_suffix)
135        file_lock = FileLock()
136
137        if isDownLoadSuccess(download_dir, suc_mark, failed_mark):
138            return True
139        try:
140            nowtime = get_now_time_str_info()
141            logger.printLog("%s Downloading, please wait" % nowtime)
142            file_lock.lockFile(lock_path)
143            ret = ""
144            logger.info("Get lock. Start to ")
145            if self.params_dict.get("bt_enable") and self.params_dict.get("bt_enable") == "True":
146                ret = downloadByBitComet(source_path, download_dir, os_method)
147            elif source_path.startswith('\\\\'):
148                ret = downloadByCopy(source_path, download_dir, is_file)
149            elif self.params_dict.get("pbiid"):
150                ret = downlaodByDownloadTool(version_savepath, self.params_dict.get("version_type"), "FASTBOOT", self.params_dict.get("pbiid"))
151            elif source_path.startswith("http"):
152                ret = downloadFileFromDevCloud(source_path, "", "", download_dir)
153
154            if source_path.endswith(".zip"):
155                zip_name = os.path.basename(source_path)
156                ret = extractZipFile(os.path.join(download_dir, zip_name), download_dir)
157            if source_path.endswith(".tar.gz") or (source_path.startswith("http") and ("file_id=" in source_path)):
158                if source_path.startswith("http") and ("file_id=" in source_path):
159                    if source_path.endswith(".tar.gz"):
160                        zip_name = source_path.split('=')[-1]
161                    else:
162                        zip_name = "out.tar.gz"
163                else:
164                    zip_name = os.path.basename(source_path)
165                ret = unTarFile(os.path.join(download_dir, zip_name), download_dir)
166            nowtime = get_now_time_str_info()
167            logger.printLog("%s download to %s end" % (nowtime, download_dir))
168
169            if not ret:
170                with open(failed_mark, "a+") as fp:
171                    fp.write("")
172            return ret
173        except Exception as e:
174            logger.printLog(e)
175            raise Exception(e)
176        finally:
177            file_lock.releaseFile()
178
179
180    @dec_stepmsg("upgrade")
181    #@timeout(900)
182    def upgrade(self):
183        '''
184        #===================================================================================
185        #   @Method:        upgrade(self)
186        #   @Precondition:  none
187        #   @Func:          升级相关业务逻辑
188        #   @PostStatus:    none
189        #   @eg:            upgrade()
190        #   @return:        True or Flase
191        #===================================================================================
192        '''
193        logger.printLog('开始升级')
194        deploy_com = self.params_dict.get("deploy_com")
195        usb_port = self.params_dict.get("usb_port")
196        baudrate = self.params_dict.get("baudrate")
197        #芯片类型,根据芯片类型获取对应的刷机命令
198        flash_type = self.params_dict.get("flash_type")
199        burn_usbport = self.params_dict.get("hiburn_usbport")
200        device_ip = self.params_dict.get("Device_IP")
201        device_netmask = self.params_dict.get("Device_Netmask")
202        device_gatewayip = self.params_dict.get("Device_GatewayIP")
203        chip_version = self.params_dict.get('chip_version')
204        chip_version = chip_version.lower() if chip_version else chip_version
205
206        if not deploy_com:
207            logger.error("deploy_com is NULL !!")
208            return False
209        if not burn_usbport:
210            logger.error("hiburn_usbport is NULL !!")
211            return False
212        if not baudrate:
213            baudrate = 115200
214        scriptpath = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(os.path.dirname(__file__)))))
215        #升级需要的工具归档
216        toolworkspace = CONSTANT.OSType.getworkspace()
217        hiburntoolpath = os.path.join(toolworkspace, "HiBurnCmdLine", "usb%s_tool" % burn_usbport)
218        logger.info("hiburn tool path is: %s" % hiburntoolpath)
219        if not os.path.exists(hiburntoolpath):
220            if not burn_usbport:
221                logger.error("hiburn_usbport is NULL !!")
222                return False
223            os.makedirs(hiburntoolpath)
224            toolpath = os.path.join(scriptpath, "resource", "HiBurnCmdLine.zip")
225            logger.info("copy %s to %s" % (toolpath, hiburntoolpath))
226            copyFile(toolpath, hiburntoolpath)
227            zip_name = os.path.basename(toolpath)
228            ret = extractZipFile(os.path.join(hiburntoolpath, zip_name), hiburntoolpath)
229            if ret:
230                logger.info("unzip to %s succ" % (hiburntoolpath))
231                #修改burn.config中的usb口
232                configpath = os.path.join(hiburntoolpath, "config", "burn.config")
233                all_data = ""
234                with open(configpath, "r", encoding="utf-8") as cf:
235                    for line in cf:
236                        if "usbDeviceNumber=" in line:
237                            old_str = line
238                            line = line.replace(old_str, "usbDeviceNumber=%s\r\n" % burn_usbport)
239                            logger.info("replace line: %s " % line)
240                        all_data += line
241                with open(configpath, "w", encoding="utf-8") as wf:
242                    wf.write(all_data)
243            else:
244                logger.error("%s is not exit" % hiburntoolpath)
245                return False
246        #将升级需要的文件拷贝到镜像里面
247        local_image_path = os.path.join(version_savepath, "img")
248        old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn.xml")
249        xml_path = os.path.join(local_image_path, "usb-burn.xml")
250        if chip_version == 'hi3518':
251            old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-jffs2.xml")
252        elif chip_version == 'hi3516':
253            old_xml_path = os.path.join(scriptpath, "resource", "L1", flash_type, "usb-burn-vfat.xml")
254        copyFile(old_xml_path, xml_path)
255        if flash_type.lower() == "ev300":
256            chip_type = "Hi3518EV300"
257            ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3518ev300.bin")
258        elif flash_type.lower() == "dv300":
259            chip_type = "Hi3516DV300"
260            ubootpath = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "u-boot-hi3516dv300.bin")
261        else:
262            logger.error("flash_type is : %s " % flash_type)
263            return False
264        copyFile(ubootpath, local_image_path)
265        scriptfile = os.path.join(scriptpath, "resource", "L1", f'{flash_type.lower()}', "update.txt")
266        logger.info(f'scriptfile:{scriptfile}')
267
268        current_path = os.getcwd()
269        logger.info("before excute hiburn,current path is: %s" % current_path)
270        os.chdir(hiburntoolpath)
271        logger.info("excute hiburn path is: %s" % os.getcwd())
272        flash_uboot_xml = os.path.join(scriptpath, "resource", "L1", flash_type.lower(), "flash_fastboot.xml")
273        cmd = ".\jre\\bin\java -jar hiburn.jar --erase -n %s -m serial %s -x %s" % (chip_type, deploy_com.upper(), flash_uboot_xml)
274        self.eraseDevice(cmd, usb_port)
275
276        retry = 0
277        while retry < 3:
278            #usb刷机
279            cmd = ".\jre\\bin\java -jar hiburn.jar --burn -n %s -m USBBootrom -x %s" % (chip_type, xml_path)
280            logger.info("cmd is: %s" % cmd)
281            ret, outpri = subprocess.getstatusoutput(cmd)
282            logger.info("usb upgrade result: %s " % ret)
283            logger.info("print console: %s " % outpri)
284            if ret != 0:
285                if ret == 4 and retry < 2:
286                    time.sleep(10)
287                    retry = retry + 1
288                    logger.info('flash fail,so flash once again')
289                    continue
290                logger.info(ret)
291                logger.error("hiburn usb upgrade failed!!")
292                return False
293            retry = retry + 3
294        os.chdir(current_path)
295        logger.info("hiburn upgrade end, check board status")
296        time.sleep(5)
297        try:
298            reset_count = 0
299            logger.info("打开serial")
300            ser = serial.Serial(deploy_com, int(baudrate), timeout=0)
301            while reset_count < 2:
302                if ser.is_open == False:
303                    ser.open()
304                logger.info("get device status")
305                board_type = getBoardType(ser)
306                if board_type == "uboot":
307                    with open(scriptfile, "r") as fp:
308                        lines = fp.readlines()
309                    for line in lines:
310                        if not line:
311                            logger.info("cmd is: %s " % line)
312                            continue
313                        if "reset" in line:
314                            ret = sendCmd(ser, line, READ_MAXTIMEOUT)
315                            continue
316                        ret = sendCmd(ser, line, READ_MINITIMEOUT)
317                    board_type = getBoardType(ser)
318                    if board_type != "OHOS":
319                        if  reset_count < 2:
320                            logger.info('after reset;the device status is error,reset device again,reset_count:%d' % reset_count)
321                            reset_count += 1
322                            time.sleep(20)
323                            continue
324                        logger.error("upgrade fail")
325                        return False
326                    if flash_type.lower() == "dv300":
327                        logger.info("upgrade success")
328                        init_cmd = "toybox ifconfig eth0 %s netmask %s gateway %s \r" % (device_ip, device_netmask, device_gatewayip)
329                        sendCmd(ser, init_cmd, READ_MINITIMEOUT)
330                        sendCmd(ser, 'toybox ifconfig\r', READ_MINITIMEOUT)
331                        return True
332                    elif flash_type.lower() == "ev300":
333                        logger.info("setup wifi")
334                        cmd = 'toybox ls\r'
335                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
336                        if not "sdcard" in ret.lower():
337                            cmd = 'toybox mkdir /sdcard\r'
338                            ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
339                            if "error:" in ret.lower():
340                                logger.error("toybox mkdir /sdcard fail")
341                                return False
342                        cmd = 'toybox mount /dev/mmcblk0p0 /sdcard vfat\r'
343                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
344                        cmd = 'toybox ls /sdcard\r'
345                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
346                        cmd = 'cd /sdcard/wpa\r'
347                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
348                        cmd = './wpa_supplicant -i wlan0 -c wpa_supplicant.conf & \r'
349                        ret = sendCmd(ser, cmd, READ_MAXTIMEOUT)
350                        if "error:" in ret.lower():
351                            logger.error("setup wifi fail")
352                            return False
353                        cmd = 'toybox ifconfig\r'
354                        ret = sendCmd(ser, cmd, READ_MINITIMEOUT)
355                        if "error:" in ret.lower():
356                            logger.error("toybox ifconfig fail")
357                            return False
358                        logger.info("upgrade success")
359                        return True
360                else:
361                    logger.info('before reset;the device status is error,reset device again,reset_count:%d'%reset_count)
362                    reset_count += 1
363                    time.sleep(20)
364                    continue
365
366        except Exception as e:
367            logger.info(e)
368            return False
369        finally:
370            ser.close()
371            logger.info("close serial")
372
373    def eraseDevice(self, cmd, usb_port):
374        '''
375        ret 暂时没有作用,先使用out
376        '''
377        erase_retry = 0
378        while erase_retry < 3:
379            # 通电
380            PowerOnByThread(usb_port)
381            logger.info("cmd is: %s" % cmd)
382            ret, outpri = subprocess.getstatusoutput(cmd)
383            logger.info("flash fastboot result: %s " % ret)
384            logger.info("print console: %s " % outpri)
385            is_earse_again = any([True if item in outpri else False for item in error_str_list])
386            if ret == 0 and erase_retry < 2 and not is_earse_again :
387                logger.info('檫除成功'.center(20,'*'))
388                break
389            elif is_earse_again:
390                logger.info('檫除存在问题 重新上下电 重新檫除')
391                erase_retry += 1
392                time.sleep(5)
393                continue
394            else:
395                logger.info('other error')
396                return False
397        else:
398            return False
399
400def PowerOnByThread(usb_port,wait_time=10):
401    thread = Thread(target=boardPowerOn, args=[usb_port, wait_time])
402    thread.start()
403    logger.info("thread board power on start")
404
405
406def boardPowerOn(usb_port, waittime):
407    logger.info("board power on start")
408    time.sleep(waittime)
409
410    #对端口下电
411    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "off"):
412        logger.error("board power off failed")
413        return False
414
415    #对端口上电
416    if not usbPowerOnOff('127.0.0.1', '7788', usb_port, "on"):
417        logger.error("board power on failed")
418        return False
419    logger.info("board power on end")
420
421
422def getBoardType(ser):
423    ret = sendCmd(ser, '\r', READ_TIMEOUT)
424    if 'HMOS' in ret or 'OHOS' in ret or '$' in ret:
425        ostype = 'OHOS'
426    elif 'hisilicon' in ret:
427        ostype = 'uboot'
428    elif ' #' in ret:
429        ostype = 'linux'
430    else:
431        ostype = 'bootrom'
432    logger.info("board type is: %s" % ostype)
433    return ostype
434
435def sendCmd(ser, cmd, timeout):
436    logger.info("cmd is: %s " % cmd)
437    ser.write((cmd + '\n').encode())
438    time.sleep(0.5)
439    ret = ''
440    i = 0
441    while True:
442        out = ser.read(ser.inWaiting())
443        if not out:
444            break
445        if i > 2:
446            break
447        ret = ret + out.decode(encoding="utf-8", errors="ignore")
448        time.sleep(timeout)
449        i = i + 1
450    logger.info("result is: %s " % ret)
451    return ret
452
453
454
455if __name__ == "__main__":
456    param_file = sys.argv[1]
457    if not param_file:
458        logger.printLog("Missing params file")
459        sys.exit(-1)
460    try:
461        uphandle = liteOsUpgrade_L1(param_file)
462        uphandle._excuteApp()
463    except Exception as e:
464        logger.printLog(e)
465        sys.exit(-1)
466