1#!/usr/bin/env python3 2# coding=utf-8 3 4# 5# Copyright (c) 2021 Huawei Device Co., Ltd. 6# Licensed under the Apache License, Version 2.0 (the "License"); 7# you may not use this file except in compliance with the License. 8# You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, software 13# distributed under the License is distributed on an "AS IS" BASIS, 14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15# See the License for the specific language governing permissions and 16# limitations under the License. 17# 18import json 19import os 20import shutil 21import platform 22import subprocess 23 24############################################################################## 25############################################################################## 26 27__all__ = ["DeviceShell"] 28 29import zipfile 30 31if platform.system() != 'Windows': 32 QUOTATION_MARKS = "'" 33else: 34 QUOTATION_MARKS = "\"" 35 36HDC_TOOLS = "hdc" 37 38 39############################################################################## 40############################################################################## 41 42 43def get_package_name(hap_filepath): 44 package_name = "" 45 46 if os.path.exists(hap_filepath): 47 filename = os.path.basename(hap_filepath) 48 49 # unzip the hap file 50 hap_bak_path = os.path.abspath(os.path.join( 51 os.path.dirname(hap_filepath), 52 "%s.bak" % filename)) 53 zf_desc = zipfile.ZipFile(hap_filepath) 54 try: 55 zf_desc.extractall(path=hap_bak_path) 56 except RuntimeError as error: 57 print("Unzip error: ", hap_bak_path) 58 zf_desc.close() 59 60 # verify config.json file 61 app_profile_path = os.path.join(hap_bak_path, "config.json") 62 if os.path.isfile(app_profile_path): 63 load_dict = {} 64 with open(app_profile_path, 'r') as json_file: 65 load_dict = json.load(json_file) 66 profile_list = load_dict.values() 67 for profile in profile_list: 68 package_name = profile.get("package") 69 if not package_name: 70 continue 71 break 72 73 # delete hap_bak_path 74 if os.path.exists(hap_bak_path): 75 shutil.rmtree(hap_bak_path) 76 else: 77 print("file %s not exists" % hap_filepath) 78 79 return package_name 80 81############################################################################## 82############################################################################## 83 84 85class DeviceShell: 86 def __init__(self, conn_type, remote_ip="", device_sn="", repote_port="", name=""): 87 self.conn_type = conn_type 88 self.device_sn = device_sn 89 self.name = name 90 self.test_path = "data/test" 91 if conn_type: 92 self.device_params = self.get_device_hdc_para( 93 device_sn 94 ) 95 else: 96 self.device_params = self.get_device_para( 97 remote_ip, repote_port, device_sn) 98 self.init_device() 99 100 @classmethod 101 def get_device_para(cls, remote_ip="", remote_port="", 102 device_sn=""): 103 if "" == remote_ip or "" == remote_port: 104 if "" == device_sn: 105 device_para = "" 106 else: 107 device_para = "-s %s" % device_sn 108 else: 109 if "" == device_sn: 110 device_para = "-H %s -P %s" % (remote_ip, remote_port) 111 else: 112 device_para = "-H %s -P %s -t %s" % ( 113 remote_ip, remote_port, device_sn) 114 return device_para 115 116 @classmethod 117 def get_device_hdc_para(cls, device_sn=""): 118 if " " == device_sn: 119 device_para = "" 120 else: 121 device_para = "-t %s" % device_sn 122 123 return device_para 124 125 @classmethod 126 def execute_command(cls, command, print_flag=True, timeout=900): 127 try: 128 if print_flag: 129 print("command: " + command) 130 if subprocess.call(command, shell=True, timeout=timeout) == 0: 131 print("results: successed") 132 return True 133 except Exception as error: 134 print("Exception: %s" % str(error)) 135 print("results: failed") 136 return False 137 138 @classmethod 139 def execute_command_with_output(cls, command, print_flag=True): 140 if print_flag: 141 print("command: " + command) 142 143 proc = subprocess.Popen(command, 144 stdout=subprocess.PIPE, 145 stderr=subprocess.PIPE, 146 shell=True) 147 148 result = "" 149 try: 150 data, _ = proc.communicate() 151 if isinstance(data, bytes): 152 result = data.decode('utf-8', 'ignore') 153 finally: 154 proc.stdout.close() 155 proc.stderr.close() 156 return result if result else data 157 158 @classmethod 159 def check_path_legal(cls, path): 160 if path and " " in path: 161 return "\"%s\"" % path 162 return path 163 164 def remount(self): 165 if self.conn_type: 166 remount = "target mount" 167 else: 168 remount = "remount" 169 command = "%s %s %s" % (HDC_TOOLS, self.device_params, remount) 170 self.execute_command(command) 171 172 def init_device(self): 173 self.remount() 174 self.shell('rm -rf %s' % self.test_path) 175 self.shell('mkdir -p %s' % self.test_path) 176 self.shell('chmod 777 -R %s' % self.test_path) 177 self.shell("mount -o rw,remount,rw /%s" % "system") 178 179 def unlock_screen(self): 180 self.shell("svc power stayon true") 181 182 def unlock_device(self): 183 self.shell("input keyevent 82") 184 self.shell("wm dismiss-keyguard") 185 186 def push_file(self, srcpath, despath): 187 if self.conn_type: 188 push_args = "file send" 189 else: 190 push_args = "push" 191 command = "%s %s %s %s %s" % ( 192 HDC_TOOLS, 193 self.device_params, 194 push_args, 195 srcpath, 196 despath) 197 return self.execute_command(command) 198 199 def pull_file(self, srcpath, despath): 200 if self.conn_type: 201 pull_args = "file recv" 202 else: 203 pull_args = "pull" 204 command = "%s %s %s %s %s" % ( 205 HDC_TOOLS, 206 self.device_params, 207 pull_args, 208 srcpath, 209 despath) 210 return self.execute_command(command) 211 212 def lock_screen(self): 213 self.shell("svc power stayon false") 214 215 def disable_keyguard(self): 216 self.unlock_screen() 217 self.unlock_device() 218 219 def shell(self, command=""): 220 return self.execute_command("%s %s shell %s%s%s" % ( 221 HDC_TOOLS, 222 self.device_params, 223 QUOTATION_MARKS, 224 command, 225 QUOTATION_MARKS)) 226 227 def shell_with_output(self, command=""): 228 return self.execute_command_with_output("%s %s shell %s%s%s" % ( 229 HDC_TOOLS, 230 self.device_params, 231 QUOTATION_MARKS, 232 command, 233 QUOTATION_MARKS)) 234