1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Copyright (c) 2024 Huawei Device Co., Ltd.
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Description: Python Debugger Domain Interfaces
18"""
19
20import json
21import logging
22
23from aw import communicate_with_debugger_server, Utils
24from aw.cdp import debugger
25from aw.types import ThreadConnectionInfo, ProtocolType
26from aw.api.protocol_api import ProtocolImpl
27
28
29class DebuggerImpl(ProtocolImpl):
30
31    def __init__(self, id_generator, websocket):
32        super().__init__(id_generator, websocket)
33        self.dispatch_table = {"enable": (self.enable, ProtocolType.send),
34                               "disable": (self.disable, ProtocolType.send),
35                               "resume": (self.resume, ProtocolType.send),
36                               "pause": (self.pause, ProtocolType.send),
37                               "removeBreakpointsByUrl": (self.remove_breakpoints_by_url, ProtocolType.send),
38                               "getPossibleAndSetBreakpointsByUrl": (self.get_possible_and_set_breakpoints_by_url,
39                                                                     ProtocolType.send),
40                               "stepOver": (self.step_over, ProtocolType.send),
41                               "stepInto": (self.step_into, ProtocolType.send),
42                               "stepOut": (self.step_out, ProtocolType.send),
43                               "setPauseOnExceptions": (self.set_pause_on_exceptions, ProtocolType.send),
44                               "evaluateOnCallFrame": (self.evaluate_on_call_frame, ProtocolType.send),
45                               "smartStepInto": (self.smart_step_into, ProtocolType.send),
46                               "setMixedDebugEnabled": (self.set_mixed_debug_enabled, ProtocolType.send),
47                               "replyNativeCalling": (self.reply_native_calling, ProtocolType.send),
48                               "dropFrame": (self.drop_frame, ProtocolType.send),
49                               "scriptParsed": (self.script_parsed, ProtocolType.recv),
50                               "paused": (self.paused, ProtocolType.recv),
51                               "nativeCalling": (self.native_calling, ProtocolType.recv)}
52
53    async def connect_to_debugger_server(self, pid, is_main=True):
54        if is_main:
55            send_msg = {"type": "connected"}
56            await self.websocket.send_msg_to_connect_server(send_msg)
57            response = await self.websocket.recv_msg_of_connect_server()
58            response = json.loads(response)
59            assert response['type'] == 'addInstance'
60            assert response['instanceId'] == 0, logging.error('InstanceId of the main thread is not 0')
61            assert response['tid'] == pid
62        else:
63            response = await self.websocket.recv_msg_of_connect_server()
64            response = json.loads(response)
65            assert response['type'] == 'addInstance'
66            assert response['instanceId'] != 0
67            assert response['tid'] != pid
68            assert 'workerThread_' in response['name']
69        instance_id = await self.websocket.get_instance()
70        send_queue = self.websocket.to_send_msg_queues[instance_id]
71        recv_queue = self.websocket.received_msg_queues[instance_id]
72        connection = ThreadConnectionInfo(instance_id, send_queue, recv_queue)
73        return connection
74
75    async def destroy_instance(self):
76        response = await self.websocket.recv_msg_of_connect_server()
77        response = json.loads(response)
78        assert response['type'] == 'destroyInstance'
79        return response
80
81    async def enable(self, message_id, connection, params):
82        response = await communicate_with_debugger_server(connection.instance_id,
83                                                          connection.send_msg_queue,
84                                                          connection.received_msg_queue,
85                                                          debugger.enable(), message_id)
86        while response.startswith('{"method":"Debugger.scriptParsed"'):
87            response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
88                                                                        connection.received_msg_queue)
89        response = json.loads(response)
90        assert response == {"id": message_id, "result": {"debuggerId": "0", "protocols": Utils.get_custom_protocols()}}
91
92    async def disable(self, message_id, connection, params):
93        response = await communicate_with_debugger_server(connection.instance_id,
94                                                          connection.send_msg_queue,
95                                                          connection.received_msg_queue,
96                                                          debugger.disable(), message_id)
97        while response.startswith('{"method":"Debugger.resumed"') or \
98            response.startswith('{"method":"HeapProfiler.lastSeenObjectId"') or \
99            response.startswith('{"method":"HeapProfiler.heapStatsUpdate"'):
100            response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
101                                                                        connection.received_msg_queue)
102        response = json.loads(response)
103        assert response == {"id": message_id, "result": {}}
104        return response
105
106    async def script_parsed(self, connection, params):
107        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
108                                                                    connection.received_msg_queue)
109        response = json.loads(response)
110        return response
111
112    async def paused(self, connection, params):
113        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
114                                                                    connection.received_msg_queue)
115        response = json.loads(response)
116        assert response['method'] == 'Debugger.paused'
117        return response
118
119    async def resume(self, message_id, connection, params):
120        response = await communicate_with_debugger_server(connection.instance_id,
121                                                          connection.send_msg_queue,
122                                                          connection.received_msg_queue,
123                                                          debugger.resume(), message_id)
124        assert json.loads(response) == {"method": "Debugger.resumed", "params": {}}
125        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
126                                                                    connection.received_msg_queue)
127        response = json.loads(response)
128        assert response == {"id": message_id, "result": {}}
129        return response
130
131    async def remove_breakpoints_by_url(self, message_id, connection, params):
132        response = await communicate_with_debugger_server(connection.instance_id,
133                                                          connection.send_msg_queue,
134                                                          connection.received_msg_queue,
135                                                          debugger.remove_breakpoints_by_url(params), message_id)
136        response = json.loads(response)
137        assert response == {"id": message_id, "result": {}}
138        return response
139
140    async def get_possible_and_set_breakpoints_by_url(self, message_id, connection, params):
141        response = await communicate_with_debugger_server(connection.instance_id,
142                                                          connection.send_msg_queue,
143                                                          connection.received_msg_queue,
144                                                          debugger.get_possible_and_set_breakpoint_by_url(params),
145                                                          message_id)
146        response = json.loads(response)
147        assert response['id'] == message_id
148        return response
149
150    async def step_over(self, message_id, connection, params):
151        response = await communicate_with_debugger_server(connection.instance_id,
152                                                          connection.send_msg_queue,
153                                                          connection.received_msg_queue,
154                                                          debugger.step_over(), message_id)
155        assert json.loads(response) == {"method": "Debugger.resumed", "params": {}}
156        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
157                                                                    connection.received_msg_queue)
158        response = json.loads(response)
159        assert response == {"id": message_id, "result": {}}
160        return response
161
162    async def step_out(self, message_id, connection, params):
163        response = await communicate_with_debugger_server(connection.instance_id,
164                                                          connection.send_msg_queue,
165                                                          connection.received_msg_queue,
166                                                          debugger.step_out(), message_id)
167        assert json.loads(response) == {"method": "Debugger.resumed", "params": {}}
168        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
169                                                                    connection.received_msg_queue)
170        response = json.loads(response)
171        assert response == {"id": message_id, "result": {}}
172        return response
173
174    async def step_into(self, message_id, connection, params):
175        response = await communicate_with_debugger_server(connection.instance_id,
176                                                          connection.send_msg_queue,
177                                                          connection.received_msg_queue,
178                                                          debugger.step_into(), message_id)
179        assert json.loads(response) == {"method": "Debugger.resumed", "params": {}}
180        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
181                                                                    connection.received_msg_queue)
182        response = json.loads(response)
183        assert response == {"id": message_id, "result": {}}
184        return response
185
186    async def set_pause_on_exceptions(self, message_id, connection, params):
187        response = await communicate_with_debugger_server(connection.instance_id,
188                                                          connection.send_msg_queue,
189                                                          connection.received_msg_queue,
190                                                          debugger.set_pause_on_exceptions(params), message_id)
191        response = json.loads(response)
192        assert response == {"id": message_id, "result": {}}
193        return response
194
195    async def evaluate_on_call_frame(self, message_id, connection, params):
196        response = await communicate_with_debugger_server(connection.instance_id,
197                                                          connection.send_msg_queue,
198                                                          connection.received_msg_queue,
199                                                          debugger.evaluate_on_call_frame(params), message_id)
200        response = json.loads(response)
201        assert response['id'] == message_id
202        return response
203
204    async def pause(self, message_id, connection, params):
205        response = await communicate_with_debugger_server(connection.instance_id,
206                                                          connection.send_msg_queue,
207                                                          connection.received_msg_queue,
208                                                          debugger.pause(), message_id)
209        response = json.loads(response)
210        assert response == {"id": message_id, "result": {}}
211        return response
212
213    async def smart_step_into(self, message_id, connection, params):
214        response = await communicate_with_debugger_server(connection.instance_id,
215                                                          connection.send_msg_queue,
216                                                          connection.received_msg_queue,
217                                                          debugger.smart_step_into(params), message_id)
218        response = json.loads(response)
219        assert response == {"id": message_id, "result": {}}
220        return response
221
222    async def set_mixed_debug_enabled(self, message_id, connection, params):
223        response = await communicate_with_debugger_server(connection.instance_id,
224                                                          connection.send_msg_queue,
225                                                          connection.received_msg_queue,
226                                                          debugger.set_mixed_debug_enabled(params), message_id)
227        response = json.loads(response)
228        assert response == {"id": message_id, "result": {}}
229        return response
230
231    async def native_calling(self, connection, params):
232        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
233                                                                    connection.received_msg_queue)
234        response = json.loads(response)
235        assert response['method'] == 'Debugger.nativeCalling'
236        return response
237
238    async def reply_native_calling(self, message_id, connection, params):
239        response = await communicate_with_debugger_server(connection.instance_id,
240                                                          connection.send_msg_queue,
241                                                          connection.received_msg_queue,
242                                                          debugger.reply_native_calling(params), message_id)
243        assert json.loads(response) == {"method": "Debugger.resumed", "params": {}}
244        response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
245                                                                    connection.received_msg_queue)
246        response = json.loads(response)
247        assert response == {"id": message_id, "result": {}}
248        return response
249
250    async def drop_frame(self, message_id, connection, params):
251        response = await communicate_with_debugger_server(connection.instance_id,
252                                                          connection.send_msg_queue,
253                                                          connection.received_msg_queue,
254                                                          debugger.drop_frame(params), message_id)
255        response = json.loads(response)
256        assert response == {"id": message_id, "result": {}}
257        return response