1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Copyright (c) 2024 Huawei Device Co., Ltd.
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Description: Python HeapProfiler Domain Interfaces
18"""
19
20import json
21
22from aw import communicate_with_debugger_server
23from aw.cdp import heap_profiler
24from aw.types import ProtocolType
25from aw.api.protocol_api import ProtocolImpl
26
27
28class HeapProfilerImpl(ProtocolImpl):
29
30    def __init__(self, id_generator, websocket):
31        super().__init__(id_generator, websocket)
32        self.dispatch_table = {"startTrackingHeapObjects": (self.start_tracking_heap_objects, ProtocolType.send),
33                               "stopTrackingHeapObjects": (self.stop_tracking_heap_objects, ProtocolType.send),
34                               "takeHeapSnapshot": (self.take_heap_snapshot, ProtocolType.send)}
35
36    async def start_tracking_heap_objects(self, message_id, connection, params):
37        response = await communicate_with_debugger_server(connection.instance_id,
38                                                          connection.send_msg_queue,
39                                                          connection.received_msg_queue,
40                                                          heap_profiler.start_tracking_heap_objects(params), message_id)
41        response = json.loads(response)
42        assert response == {"id": message_id, "result": {}}
43        return response
44
45    async def stop_tracking_heap_objects(self, message_id, connection, params):
46        response = await communicate_with_debugger_server(connection.instance_id,
47                                                          connection.send_msg_queue,
48                                                          connection.received_msg_queue,
49                                                          heap_profiler.stop_tracking_heap_objects(), message_id)
50        while response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'):
51            response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
52                                                                        connection.received_msg_queue)
53        assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response
54        pre_response = response
55        while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"') or \
56            response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'):
57            if response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'):
58                pre_response = response
59            response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
60                                                                        connection.received_msg_queue)
61        assert pre_response.endswith(r'\n]\n}\n"}}')
62        response = json.loads(response)
63        assert response == {"id": message_id, "result": {}}
64        return response
65
66    async def take_heap_snapshot(self, message_id, connection, params):
67        response = await communicate_with_debugger_server(connection.instance_id,
68                                                          connection.send_msg_queue,
69                                                          connection.received_msg_queue,
70                                                          heap_profiler.take_heap_snapshot(), message_id)
71        assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response
72        pre_response = response
73        while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'):
74            pre_response = response
75            response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id,
76                                                                        connection.received_msg_queue)
77        assert pre_response.endswith(r'\n]\n}\n"}}')
78        response = json.loads(response)
79        assert response == {"id": message_id, "result": {}}
80        return response