1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3""" 4Copyright (c) 2024 Huawei Device Co., Ltd. 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16 17Description: MISC action words. 18""" 19 20import asyncio 21import logging 22import os 23import stat 24import subprocess 25import threading 26 27from aw.websocket import WebSocket 28 29 30class Utils(object): 31 @classmethod 32 def message_id_generator(cls): 33 message_id = 1 34 while True: 35 yield message_id 36 message_id += 1 37 38 @classmethod 39 def get_custom_protocols(cls): 40 protocols = ["removeBreakpointsByUrl", 41 "setMixedDebugEnabled", 42 "replyNativeCalling", 43 "getPossibleAndSetBreakpointByUrl", 44 "dropFrame", 45 "setNativeRange", 46 "resetSingleStepper", 47 "callFunctionOn", 48 "smartStepInto"] 49 return protocols 50 51 @classmethod 52 async def communicate_with_debugger_server(cls, instance_id, to_send_queue, received_queue, command, message_id): 53 """ 54 Assembles and send the commands, then return the response. 55 Send message to the debugger server corresponding to the to_send_queue. 56 Return the response from the received_queue. 57 """ 58 command['id'] = message_id 59 await WebSocket.send_msg_to_debugger_server(instance_id, to_send_queue, command) 60 response = await WebSocket.recv_msg_of_debugger_server(instance_id, received_queue) 61 return response 62 63 @classmethod 64 async def async_wait_timeout(cls, task, timeout=3): 65 try: 66 result = await asyncio.wait_for(task, timeout) 67 return result 68 except asyncio.TimeoutError: 69 logging.error('await timeout!') 70 71 @classmethod 72 def hdc_target_mount(cls): 73 mount_cmd = ['hdc', 'target', 'mount'] 74 logging.info('Mount finish: ' + ' '.join(mount_cmd)) 75 mount_result = subprocess.run(mount_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 76 logging.info(mount_result.stdout.strip()) 77 assert mount_result.stdout.decode('utf-8').strip() == 'Mount finish' 78 79 @classmethod 80 def clear_fault_log(cls): 81 cmd = ['hdc', 'shell', 'rm', '/data/log/faultlog/faultlogger/*'] 82 logging.info(' '.join(cmd)) 83 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 84 assert result.returncode == 0 85 86 @classmethod 87 def save_fault_log(cls, log_path): 88 if not os.path.exists(log_path): 89 os.makedirs(log_path) 90 91 cmd = ['hdc', 'file', 'recv', '/data/log/faultlog/faultlogger/', log_path] 92 logging.info(' '.join(cmd)) 93 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 94 logging.info(result.stdout.strip()) 95 assert result.returncode == 0 96 97 @classmethod 98 def save_hilog(cls, log_path, file_name, debug_on=False): 99 if not os.path.exists(log_path): 100 os.makedirs(log_path) 101 102 # config the hilog 103 cmd = ['hdc', 'shell', 'hilog', '-r'] 104 logging.info(' '.join(cmd)) 105 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 106 assert result.returncode == 0 107 108 if debug_on: 109 cmd = ['hdc', 'shell', 'hilog', '-G', '16M'] 110 logging.info(' '.join(cmd)) 111 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 112 assert result.returncode == 0 113 114 cmd = ['hdc', 'shell', 'hilog', '-Q', 'pidoff'] 115 logging.info(' '.join(cmd)) 116 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 117 assert result.returncode == 0 118 119 cmd = ['hdc', 'shell', 'hilog', '-Q', 'domainoff'] 120 logging.info(' '.join(cmd)) 121 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 122 assert result.returncode == 0 123 124 cmd = ['hdc', 'shell', 'hilog', '-b', 'd'] 125 logging.info(' '.join(cmd)) 126 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 127 assert result.returncode == 0 128 129 cmd = ['hdc', 'shell', 'setprop', 'persist.sys.hilog.debug.on', 'true'] 130 logging.info(' '.join(cmd)) 131 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 132 assert result.returncode == 0 133 134 cmd = ['hdc', 'shell', 'hilog', '-b', 'DEBUG'] 135 logging.info(' '.join(cmd)) 136 result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 137 assert result.returncode == 0 138 139 # create a sub-process to receive the hilog 140 hilog_process = subprocess.Popen(['hdc', 'shell', 'hilog'], 141 stdout=subprocess.PIPE, stderr=subprocess.STDOUT) 142 file = os.fdopen(os.open(rf'{log_path}\{file_name}', 143 os.O_WRONLY | os.O_CREAT, 144 stat.S_IRUSR | stat.S_IWUSR), 'wb') 145 146 def write(): 147 try: 148 for line in iter(hilog_process.stdout.readline, b''): 149 file.write(line) 150 file.flush() 151 except ValueError: 152 logging.info('hilog stream is closed.') 153 file.close() 154 155 write_thread = threading.Thread(target=write) 156 write_thread.start() 157 158 return hilog_process, write_thread 159