1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3"""
4Copyright (c) 2024 Huawei Device Co., Ltd.
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16
17Description: MISC action words.
18"""
19
20import asyncio
21import logging
22import os
23import stat
24import subprocess
25import threading
26
27from aw.websocket import WebSocket
28
29
30class Utils(object):
31    @classmethod
32    def message_id_generator(cls):
33        message_id = 1
34        while True:
35            yield message_id
36            message_id += 1
37
38    @classmethod
39    def get_custom_protocols(cls):
40        protocols = ["removeBreakpointsByUrl",
41                     "setMixedDebugEnabled",
42                     "replyNativeCalling",
43                     "getPossibleAndSetBreakpointByUrl",
44                     "dropFrame",
45                     "setNativeRange",
46                     "resetSingleStepper",
47                     "callFunctionOn",
48                     "smartStepInto"]
49        return protocols
50
51    @classmethod
52    async def communicate_with_debugger_server(cls, instance_id, to_send_queue, received_queue, command, message_id):
53        """
54        Assembles and send the commands, then return the response.
55        Send message to the debugger server corresponding to the to_send_queue.
56        Return the response from the received_queue.
57        """
58        command['id'] = message_id
59        await WebSocket.send_msg_to_debugger_server(instance_id, to_send_queue, command)
60        response = await WebSocket.recv_msg_of_debugger_server(instance_id, received_queue)
61        return response
62
63    @classmethod
64    async def async_wait_timeout(cls, task, timeout=3):
65        try:
66            result = await asyncio.wait_for(task, timeout)
67            return result
68        except asyncio.TimeoutError:
69            logging.error('await timeout!')
70
71    @classmethod
72    def hdc_target_mount(cls):
73        mount_cmd = ['hdc', 'target', 'mount']
74        logging.info('Mount finish: ' + ' '.join(mount_cmd))
75        mount_result = subprocess.run(mount_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
76        logging.info(mount_result.stdout.strip())
77        assert mount_result.stdout.decode('utf-8').strip() == 'Mount finish'
78
79    @classmethod
80    def clear_fault_log(cls):
81        cmd = ['hdc', 'shell', 'rm', '/data/log/faultlog/faultlogger/*']
82        logging.info(' '.join(cmd))
83        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
84        assert result.returncode == 0
85
86    @classmethod
87    def save_fault_log(cls, log_path):
88        if not os.path.exists(log_path):
89            os.makedirs(log_path)
90
91        cmd = ['hdc', 'file', 'recv', '/data/log/faultlog/faultlogger/', log_path]
92        logging.info(' '.join(cmd))
93        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
94        logging.info(result.stdout.strip())
95        assert result.returncode == 0
96
97    @classmethod
98    def save_hilog(cls, log_path, file_name, debug_on=False):
99        if not os.path.exists(log_path):
100            os.makedirs(log_path)
101
102        # config the hilog
103        cmd = ['hdc', 'shell', 'hilog', '-r']
104        logging.info(' '.join(cmd))
105        result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
106        assert result.returncode == 0
107
108        if debug_on:
109            cmd = ['hdc', 'shell', 'hilog', '-G', '16M']
110            logging.info(' '.join(cmd))
111            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
112            assert result.returncode == 0
113
114            cmd = ['hdc', 'shell', 'hilog', '-Q', 'pidoff']
115            logging.info(' '.join(cmd))
116            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
117            assert result.returncode == 0
118
119            cmd = ['hdc', 'shell', 'hilog', '-Q', 'domainoff']
120            logging.info(' '.join(cmd))
121            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
122            assert result.returncode == 0
123
124            cmd = ['hdc', 'shell', 'hilog', '-b', 'd']
125            logging.info(' '.join(cmd))
126            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
127            assert result.returncode == 0
128
129            cmd = ['hdc', 'shell', 'setprop', 'persist.sys.hilog.debug.on', 'true']
130            logging.info(' '.join(cmd))
131            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
132            assert result.returncode == 0
133
134            cmd = ['hdc', 'shell', 'hilog', '-b', 'DEBUG']
135            logging.info(' '.join(cmd))
136            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
137            assert result.returncode == 0
138
139        # create a sub-process to receive the hilog
140        hilog_process = subprocess.Popen(['hdc', 'shell', 'hilog'],
141                                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
142        file = os.fdopen(os.open(rf'{log_path}\{file_name}',
143                                 os.O_WRONLY | os.O_CREAT,
144                                 stat.S_IRUSR | stat.S_IWUSR), 'wb')
145
146        def write():
147            try:
148                for line in iter(hilog_process.stdout.readline, b''):
149                    file.write(line)
150                    file.flush()
151            except ValueError:
152                logging.info('hilog stream is closed.')
153            file.close()
154
155        write_thread = threading.Thread(target=write)
156        write_thread.start()
157
158        return hilog_process, write_thread
159