1#!/usr/bin/env python3
2# coding=utf-8
3
4#
5# Copyright (c) 2021 Huawei Device Co., Ltd.
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17#
18import json
19import os
20import shutil
21import platform
22import subprocess
23
24##############################################################################
25##############################################################################
26
27__all__ = ["DeviceShell"]
28
29import zipfile
30
31if platform.system() != 'Windows':
32    QUOTATION_MARKS = "'"
33else:
34    QUOTATION_MARKS = "\""
35
36HDC_TOOLS = "hdc"
37
38
39##############################################################################
40##############################################################################
41
42
43def get_package_name(hap_filepath):
44    package_name = ""
45
46    if os.path.exists(hap_filepath):
47        filename = os.path.basename(hap_filepath)
48
49        # unzip the hap file
50        hap_bak_path = os.path.abspath(os.path.join(
51            os.path.dirname(hap_filepath),
52            "%s.bak" % filename))
53        zf_desc = zipfile.ZipFile(hap_filepath)
54        try:
55            zf_desc.extractall(path=hap_bak_path)
56        except RuntimeError as error:
57            print("Unzip error: ", hap_bak_path)
58        zf_desc.close()
59
60        # verify config.json file
61        app_profile_path = os.path.join(hap_bak_path, "config.json")
62        if os.path.isfile(app_profile_path):
63            load_dict = {}
64            with open(app_profile_path, 'r') as json_file:
65                load_dict = json.load(json_file)
66            profile_list = load_dict.values()
67            for profile in profile_list:
68                package_name = profile.get("package")
69                if not package_name:
70                    continue
71                break
72
73        # delete hap_bak_path
74        if os.path.exists(hap_bak_path):
75            shutil.rmtree(hap_bak_path)
76    else:
77        print("file %s not exists" % hap_filepath)
78
79    return package_name
80
81##############################################################################
82##############################################################################
83
84
85class DeviceShell:
86    def __init__(self, conn_type, remote_ip="", device_sn="", repote_port="", name=""):
87        self.conn_type = conn_type
88        self.device_sn = device_sn
89        self.name = name
90        self.test_path = "data/test"
91        if conn_type:
92            self.device_params = self.get_device_hdc_para(
93                device_sn
94            )
95        else:
96            self.device_params = self.get_device_para(
97                remote_ip, repote_port, device_sn)
98        self.init_device()
99
100    @classmethod
101    def get_device_para(cls, remote_ip="", remote_port="",
102                        device_sn=""):
103        if "" == remote_ip or "" == remote_port:
104            if "" == device_sn:
105                device_para = ""
106            else:
107                device_para = "-s %s" % device_sn
108        else:
109            if "" == device_sn:
110                device_para = "-H %s -P %s" % (remote_ip, remote_port)
111            else:
112                device_para = "-H %s -P %s -t %s" % (
113                    remote_ip, remote_port, device_sn)
114        return device_para
115
116    @classmethod
117    def get_device_hdc_para(cls, device_sn=""):
118        if " " == device_sn:
119            device_para = ""
120        else:
121            device_para = "-t %s" % device_sn
122
123        return device_para
124
125    @classmethod
126    def execute_command(cls, command, print_flag=True, timeout=900):
127        try:
128            if print_flag:
129                print("command: " + command)
130            if subprocess.call(command, shell=True, timeout=timeout) == 0:
131                print("results: successed")
132                return True
133        except Exception as error:
134            print("Exception: %s" % str(error))
135        print("results: failed")
136        return False
137
138    @classmethod
139    def execute_command_with_output(cls, command, print_flag=True):
140        if print_flag:
141            print("command: " + command)
142
143        proc = subprocess.Popen(command,
144                                stdout=subprocess.PIPE,
145                                stderr=subprocess.PIPE,
146                                shell=True)
147
148        result = ""
149        try:
150            data, _ = proc.communicate()
151            if isinstance(data, bytes):
152                result = data.decode('utf-8', 'ignore')
153        finally:
154            proc.stdout.close()
155            proc.stderr.close()
156        return result if result else data
157
158    @classmethod
159    def check_path_legal(cls, path):
160        if path and " " in path:
161            return "\"%s\"" % path
162        return path
163
164    def remount(self):
165        if self.conn_type:
166            remount = "target mount"
167        else:
168            remount = "remount"
169        command = "%s %s %s" % (HDC_TOOLS, self.device_params, remount)
170        self.execute_command(command)
171
172    def init_device(self):
173        self.remount()
174        self.shell('rm -rf %s' % self.test_path)
175        self.shell('mkdir -p %s' % self.test_path)
176        self.shell('chmod 777 -R %s' % self.test_path)
177        self.shell("mount -o rw,remount,rw /%s" % "system")
178
179    def unlock_screen(self):
180        self.shell("svc power stayon true")
181
182    def unlock_device(self):
183        self.shell("input keyevent 82")
184        self.shell("wm dismiss-keyguard")
185
186    def push_file(self, srcpath, despath):
187        if self.conn_type:
188            push_args = "file send"
189        else:
190            push_args = "push"
191        command = "%s %s %s %s %s" % (
192            HDC_TOOLS,
193            self.device_params,
194            push_args,
195            srcpath,
196            despath)
197        return self.execute_command(command)
198
199    def pull_file(self, srcpath, despath):
200        if self.conn_type:
201            pull_args = "file recv"
202        else:
203            pull_args = "pull"
204        command = "%s %s %s %s %s" % (
205            HDC_TOOLS,
206            self.device_params,
207            pull_args,
208            srcpath,
209            despath)
210        return self.execute_command(command)
211
212    def lock_screen(self):
213        self.shell("svc power stayon false")
214
215    def disable_keyguard(self):
216        self.unlock_screen()
217        self.unlock_device()
218
219    def shell(self, command=""):
220        return self.execute_command("%s %s shell %s%s%s" % (
221            HDC_TOOLS,
222            self.device_params,
223            QUOTATION_MARKS,
224            command,
225            QUOTATION_MARKS))
226
227    def shell_with_output(self, command=""):
228        return self.execute_command_with_output("%s %s shell %s%s%s" % (
229            HDC_TOOLS,
230            self.device_params,
231            QUOTATION_MARKS,
232            command,
233            QUOTATION_MARKS))
234