1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3""" 4Copyright (c) 2024 Huawei Device Co., Ltd. 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16 17Description: Python HeapProfiler Domain Interfaces 18""" 19 20import json 21 22from aw import communicate_with_debugger_server 23from aw.cdp import heap_profiler 24from aw.types import ProtocolType 25from aw.api.protocol_api import ProtocolImpl 26 27 28class HeapProfilerImpl(ProtocolImpl): 29 30 def __init__(self, id_generator, websocket): 31 super().__init__(id_generator, websocket) 32 self.dispatch_table = {"startTrackingHeapObjects": (self.start_tracking_heap_objects, ProtocolType.send), 33 "stopTrackingHeapObjects": (self.stop_tracking_heap_objects, ProtocolType.send), 34 "takeHeapSnapshot": (self.take_heap_snapshot, ProtocolType.send)} 35 36 async def start_tracking_heap_objects(self, message_id, connection, params): 37 response = await communicate_with_debugger_server(connection.instance_id, 38 connection.send_msg_queue, 39 connection.received_msg_queue, 40 heap_profiler.start_tracking_heap_objects(params), message_id) 41 response = json.loads(response) 42 assert response == {"id": message_id, "result": {}} 43 return response 44 45 async def stop_tracking_heap_objects(self, message_id, connection, params): 46 response = await communicate_with_debugger_server(connection.instance_id, 47 connection.send_msg_queue, 48 connection.received_msg_queue, 49 heap_profiler.stop_tracking_heap_objects(), message_id) 50 while response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'): 51 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 52 connection.received_msg_queue) 53 assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response 54 pre_response = response 55 while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"') or \ 56 response.startswith('{"method":"HeapProfiler.lastSeenObjectId"'): 57 if response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'): 58 pre_response = response 59 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 60 connection.received_msg_queue) 61 assert pre_response.endswith(r'\n]\n}\n"}}') 62 response = json.loads(response) 63 assert response == {"id": message_id, "result": {}} 64 return response 65 66 async def take_heap_snapshot(self, message_id, connection, params): 67 response = await communicate_with_debugger_server(connection.instance_id, 68 connection.send_msg_queue, 69 connection.received_msg_queue, 70 heap_profiler.take_heap_snapshot(), message_id) 71 assert r'\"location_fields\":[\"object_index\",\"script_id\",\"line\",\"column\"]' in response 72 pre_response = response 73 while response.startswith('{"method":"HeapProfiler.addHeapSnapshotChunk"'): 74 pre_response = response 75 response = await self.websocket.recv_msg_of_debugger_server(connection.instance_id, 76 connection.received_msg_queue) 77 assert pre_response.endswith(r'\n]\n}\n"}}') 78 response = json.loads(response) 79 assert response == {"id": message_id, "result": {}} 80 return response