1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3""" 4Copyright (c) 2024 Huawei Device Co., Ltd. 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16 17Description: Python Debugger Domain Interfaces 18""" 19 20import json 21import logging 22 23from aw import communicate_with_debugger_server, Utils 24from aw.cdp import debugger 25from aw.types import ThreadConnectionInfo, ProtocolType 26from aw.api.protocol_api import ProtocolImpl 27 28 29class DebuggerImpl(ProtocolImpl): 30 31 def __init__(self, id_generator, websocket): 32 super().__init__(id_generator, websocket) 33 self.dispatch_table = {"enable": (self.enable, ProtocolType.send), 34 "disable": (self.disable, ProtocolType.send), 35 "resume": (self.resume, ProtocolType.send), 36 "pause": (self.pause, ProtocolType.send), 37 "removeBreakpointsByUrl": (self.remove_breakpoints_by_url, ProtocolType.send), 38 "getPossibleAndSetBreakpointsByUrl": (self.get_possible_and_set_breakpoints_by_url, 39 ProtocolType.send), 40 "stepOver": (self.step_over, ProtocolType.send), 41 "stepInto": (self.step_into, ProtocolType.send), 42 "stepOut": (self.step_out, ProtocolType.send), 43 "setPauseOnExceptions": (self.set_pause_on_exceptions, ProtocolType.send), 44 "evaluateOnCallFrame": (self.evaluate_on_call_frame, ProtocolType.send), 45 "smartStepInto": (self.smart_step_into, ProtocolType.send), 46 "setMixedDebugEnabled": (self.set_mixed_debug_enabled, ProtocolType.send), 47 "replyNativeCalling": (self.reply_native_calling, ProtocolType.send), 48 "dropFrame": (self.drop_frame, ProtocolType.send), 49 "scriptParsed": (self.script_parsed, ProtocolType.recv), 50 "paused": (self.paused, ProtocolType.recv), 51 "nativeCalling": (self.native_calling, ProtocolType.recv)} 52 53 async def connect_to_debugger_server(self, pid, is_main=True): 54 if is_main: 55 send_msg = {"type": "connected"} 56 await self.websocket.send_msg_to_connect_server(send_msg) 57 response = await self.websocket.recv_msg_of_connect_server() 58 response = json.loads(response) 59 assert response['type'] == 'addInstance' 60 assert response['instanceId'] == 0, logging.error('InstanceId of the main thread is not 0') 61 assert response['tid'] == pid 62 else: 63 response = await self.websocket.recv_msg_of_connect_server() 64 response = json.loads(response) 65 assert response['type'] == 'addInstance' 66 assert response['instanceId'] != 0 67 assert response['tid'] != pid 68 assert 'workerThread_' in response['name'] 69 instance_id = await self.websocket.get_instance() 70 send_queue = self.websocket.to_send_msg_queues[instance_id] 71 recv_queue = self.websocket.received_msg_queues[instance_id] 72 connection = ThreadConnectionInfo(instance_id, send_queue, recv_queue) 73 return connection 74 75 async def destroy_instance(self): 76 response = await self.websocket.recv_msg_of_connect_server() 77 response = json.loads(response) 78 assert response['type'] == 'destroyInstance' 79 return response 80 81 async def enable(self, message_id, connection, params): 82 response = await communicate_with_debugger_server(connection.instance_id, 83 connection.send_msg_queue, 84 connection.received_msg_queue, 85 debugger.enable(), message_id) 86 while response.startswith('{"method":"Debugger.scriptParsed"'): 87 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 88 connection.received_msg_queue) 89 response = json.loads(response) 90 assert response == {"id": message_id, "result": {"debuggerId": "0", "protocols": Utils.get_custom_protocols()}} 91 92 async def disable(self, message_id, connection, params): 93 response = await communicate_with_debugger_server(connection.instance_id, 94 connection.send_msg_queue, 95 connection.received_msg_queue, 96 debugger.disable(), message_id) 97 while response.startswith('{"method":"Debugger.resumed"') or \ 98 response.startswith('{"method":"HeapProfiler.lastSeenObjectId"') or \ 99 response.startswith('{"method":"HeapProfiler.heapStatsUpdate"'): 100 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 101 connection.received_msg_queue) 102 response = json.loads(response) 103 assert response == {"id": message_id, "result": {}} 104 return response 105 106 async def script_parsed(self, connection, params): 107 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 108 connection.received_msg_queue) 109 response = json.loads(response) 110 return response 111 112 async def paused(self, connection, params): 113 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 114 connection.received_msg_queue) 115 response = json.loads(response) 116 assert response['method'] == 'Debugger.paused' 117 return response 118 119 async def resume(self, message_id, connection, params): 120 response = await communicate_with_debugger_server(connection.instance_id, 121 connection.send_msg_queue, 122 connection.received_msg_queue, 123 debugger.resume(), message_id) 124 assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} 125 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 126 connection.received_msg_queue) 127 response = json.loads(response) 128 assert response == {"id": message_id, "result": {}} 129 return response 130 131 async def remove_breakpoints_by_url(self, message_id, connection, params): 132 response = await communicate_with_debugger_server(connection.instance_id, 133 connection.send_msg_queue, 134 connection.received_msg_queue, 135 debugger.remove_breakpoints_by_url(params), message_id) 136 response = json.loads(response) 137 assert response == {"id": message_id, "result": {}} 138 return response 139 140 async def get_possible_and_set_breakpoints_by_url(self, message_id, connection, params): 141 response = await communicate_with_debugger_server(connection.instance_id, 142 connection.send_msg_queue, 143 connection.received_msg_queue, 144 debugger.get_possible_and_set_breakpoint_by_url(params), 145 message_id) 146 response = json.loads(response) 147 assert response['id'] == message_id 148 return response 149 150 async def step_over(self, message_id, connection, params): 151 response = await communicate_with_debugger_server(connection.instance_id, 152 connection.send_msg_queue, 153 connection.received_msg_queue, 154 debugger.step_over(), message_id) 155 assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} 156 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 157 connection.received_msg_queue) 158 response = json.loads(response) 159 assert response == {"id": message_id, "result": {}} 160 return response 161 162 async def step_out(self, message_id, connection, params): 163 response = await communicate_with_debugger_server(connection.instance_id, 164 connection.send_msg_queue, 165 connection.received_msg_queue, 166 debugger.step_out(), message_id) 167 assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} 168 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 169 connection.received_msg_queue) 170 response = json.loads(response) 171 assert response == {"id": message_id, "result": {}} 172 return response 173 174 async def step_into(self, message_id, connection, params): 175 response = await communicate_with_debugger_server(connection.instance_id, 176 connection.send_msg_queue, 177 connection.received_msg_queue, 178 debugger.step_into(), message_id) 179 assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} 180 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 181 connection.received_msg_queue) 182 response = json.loads(response) 183 assert response == {"id": message_id, "result": {}} 184 return response 185 186 async def set_pause_on_exceptions(self, message_id, connection, params): 187 response = await communicate_with_debugger_server(connection.instance_id, 188 connection.send_msg_queue, 189 connection.received_msg_queue, 190 debugger.set_pause_on_exceptions(params), message_id) 191 response = json.loads(response) 192 assert response == {"id": message_id, "result": {}} 193 return response 194 195 async def evaluate_on_call_frame(self, message_id, connection, params): 196 response = await communicate_with_debugger_server(connection.instance_id, 197 connection.send_msg_queue, 198 connection.received_msg_queue, 199 debugger.evaluate_on_call_frame(params), message_id) 200 response = json.loads(response) 201 assert response['id'] == message_id 202 return response 203 204 async def pause(self, message_id, connection, params): 205 response = await communicate_with_debugger_server(connection.instance_id, 206 connection.send_msg_queue, 207 connection.received_msg_queue, 208 debugger.pause(), message_id) 209 response = json.loads(response) 210 assert response == {"id": message_id, "result": {}} 211 return response 212 213 async def smart_step_into(self, message_id, connection, params): 214 response = await communicate_with_debugger_server(connection.instance_id, 215 connection.send_msg_queue, 216 connection.received_msg_queue, 217 debugger.smart_step_into(params), message_id) 218 response = json.loads(response) 219 assert response == {"id": message_id, "result": {}} 220 return response 221 222 async def set_mixed_debug_enabled(self, message_id, connection, params): 223 response = await communicate_with_debugger_server(connection.instance_id, 224 connection.send_msg_queue, 225 connection.received_msg_queue, 226 debugger.set_mixed_debug_enabled(params), message_id) 227 response = json.loads(response) 228 assert response == {"id": message_id, "result": {}} 229 return response 230 231 async def native_calling(self, connection, params): 232 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 233 connection.received_msg_queue) 234 response = json.loads(response) 235 assert response['method'] == 'Debugger.nativeCalling' 236 return response 237 238 async def reply_native_calling(self, message_id, connection, params): 239 response = await communicate_with_debugger_server(connection.instance_id, 240 connection.send_msg_queue, 241 connection.received_msg_queue, 242 debugger.reply_native_calling(params), message_id) 243 assert json.loads(response) == {"method": "Debugger.resumed", "params": {}} 244 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 245 connection.received_msg_queue) 246 response = json.loads(response) 247 assert response == {"id": message_id, "result": {}} 248 return response 249 250 async def drop_frame(self, message_id, connection, params): 251 response = await communicate_with_debugger_server(connection.instance_id, 252 connection.send_msg_queue, 253 connection.received_msg_queue, 254 debugger.drop_frame(params), message_id) 255 response = json.loads(response) 256 assert response == {"id": message_id, "result": {}} 257 return response