1''' 2# Copyright (c) 2023 Huawei Device Co., Ltd. 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6 7# http://www.apache.org/licenses/LICENSE-2.0 8 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14''' 15 16import datetime 17import json 18import os 19import re 20import shutil 21import stat 22import subprocess 23import sys 24import time 25 26import paramiko 27 28import LinuxContains 29 30 31install_fail_dict = {} 32install_success_dict = {} 33total_hap_num = 0 34hap_install_success = 0 35hap_install_fail = 0 36special_num = 0 37special_hap_list = [] 38depend_config_map = dict 39 40CONFIG_PATH='D:\\window_manager_config.xml' 41FINGER_PRINT = '8E93863FC32EE238060BF69A9B37E2608FFFB21F93C862DD511CBAC9F30024B5' 42repeat_time = 0 43snapshot = 1 44 45 46def exec_cmd(cmd): 47 f = os.popen(f"hdc shell \"{cmd}\"") 48 # print(cmd) 49 text = f.read() 50 f.close() 51 52 53 return text 54 55 56def exec_cmd_path(cmd, path): 57 f = os.popen(f"hdc shell \"{cmd}\" >> {path}") 58 # print(cmd) 59 text = f.read() 60 f.close() 61 return text 62 63 64def exec_cmd_simple(cmd): 65 f = os.popen(cmd) 66 # print(cmd) 67 text = f.read() 68 f.close() 69 return text 70 71def exists(file_path): 72 return os.path.exists(file_path) 73 74 75def get_haps(local_dir): 76 files = os.listdir(local_dir) 77 hap_list = [] 78 for file in files: 79 if "_signed" in file: 80 hap_list.append(file) 81 return hap_list 82 83 84def install_hap(hap, hap_path, base_dir): 85 # print("install " + hap_path + "\\" + hap) 86 if not exists(os.path.join(hap_path, hap)): 87 return 88 install_res = exec_cmd_simple(f"hdc install -r {hap_path}/{hap}")) 89 # print(install_res) 90 if not install_res.__contains__("msg:install bundle successfully."): 91 install_res = install_res.replace("\nAppMod finish\n\n", "") 92 install_fail_dict[hap] = install_res 93 else: 94 exec_cmd_simple(f"echo install {hap} success! >> {base_dir}/auto_test.log") 95 install_success_dict[hap] = install_res 96 time.sleep(2) 97 98 99def install_depend_haps(curr_haps, hap_path, base_dir): 100 for hap in curr_haps: 101 if not depend_config_map.get(hap): 102 continue 103 depend_haps = depend_config_map.get(hap) 104 for depend_hap in depend_haps: 105 install_hap(depend_hap, hap_path, base_dir) 106 install_depend_haps(depend_haps, hap_path, base_dir) 107 108 109def get_test_bundle(begin_bundles): 110 end_bundles = get_all_bundles() 111 return list(set(end_bundles) - set(begin_bundles)) 112 113 114def install_haps(local_hap, hap_path, base_dir): 115 for hap in local_hap: 116 install_hap(hap, hap_path, base_dir) 117 118 119def start_log(): 120 cmd_clean = "rm -r /data/log/hilog/*" 121 cmd_start = "hilog -w start -l 1M" 122 exec_cmd(cmd_clean) 123 exec_cmd(cmd_start) 124 125 126def stop_log(): 127 cmd_stop = "hilog -w stop" 128 exec_cmd(cmd_stop) 129 130 131def recv_log(local_log): 132 # 输出日志结果 133 file_path = local_log 134 cmd = f"hdc file recv /data/log/hilog/ {file_path}" 135 exec_cmd_simple(cmd) 136 137 138def test_install_hap_with_error_snapshot(uninstall_bundles, base_dir): 139 with open(LinuxContains.FA_MODAL_AND_LOWER_CASE_LIST,'r',encoding='utf-8') as fp: 140 FA_Python_Obj=json.load(fp) 141 is_stage_model=0 142 for key,value in FA_Python_Obj.items(): 143 for bundle_name in uninstall_bundles: 144 # print("key: "+key) 145 bundle_name=bundle_name.strip() 146 if bundle_name==key: 147 is_stage_model=is_stage_model+1 148 if is_stage_model == 0: 149 cmd = r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest /ets/TestRunner/OpenHarmonyTestRunner -s timeout 30000000 " 150 else: 151 cmd= r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000 " 152 path = f"{base_dir}/auto_test.log" 153 if len(uninstall_bundles) != 1: 154 exec_cmd_simple("echo uninstall_bundles:{0}, >> {1}".format(uninstall_bundles, path)) 155 for bundle in uninstall_bundles: 156 bundle = bundle.strip() 157 exec_cmd_simple("echo ================start {0} ui test================{1} >> {2}".format(bundle, 158 datetime.datetime.now().strftime( 159 '%Y-%m-%d %H:%M:%S'), 160 path)) 161 # print("test " + bundle) 162 tmp_cmd = cmd.format(bundle) 163 # print(tmp_cmd) 164 p = subprocess.Popen(tmp_cmd, shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE, 165 stderr=subprocess.PIPE, encoding="utf-8") 166 current = "-1" 167 auto_log = open(f"{base_dir}/auto_test.log", mode='a') 168 while True: 169 line = p.stdout.readline() 170 auto_log.writelines(line) 171 if line and "current" in line: 172 nums = re.findall(r"\d+", line) 173 if (len(nums) == 0): 174 current = "0" 175 else: 176 current = nums[0] 177 if line and "stack" in line: 178 exec_cmd_simple( 179 f"hdc shell snapshot_display -f /data/snapshot/{bundle}_{current}.jpeg") 180 if line == '' and p.poll() != None: 181 break 182 exec_cmd_simple(f"hdc file recv /data/snapshot/. {base_dir}/snapshot") 183 exec_cmd_simple("hdc shell rm -rf /data/snapshot/*") 184 auto_log.flush() 185 auto_log.close() 186 time.sleep(5) 187 188 189def test_install_hap(test_bundles, base_dir): 190 cmd = r"aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000" 191 path = f"{base_dir}/auto_test.log" 192 for bundle in test_bundles: 193 bundle = bundle.strip() 194 if bundle == 'ohos.samples.launcher': 195 cmd_launcher = 'hdc shell aa start -b ohos.samples.launcher -a MainAbility' 196 os.system(cmd_launcher) 197 exec_cmd_simple(f"echo ================start {bundle} ui test================{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} >> {path}") 198 # print("test " + bundle) 199 tmp_cmd = cmd.format(bundle) 200 exec_cmd_path(tmp_cmd, path) 201 time.sleep(5) 202 203 204def uninstall_bundles(install_bundles): 205 for bundle in install_bundles: 206 # print("uninstall " + bundle) 207 uninstall_res = exec_cmd_simple(f"hdc uninstall {bundle}") 208 209 210def clear_dir(dir_path): 211 if os.path.exists(dir_path): 212 shutil.rmtree(dir_path) 213 os.makedirs(dir_path) 214 215 216def clear_file(file_path): 217 if os.path.exists(file_path): 218 os.remove(file_path) 219 os.system(r"type nul>{}".format(file_path)) 220 221 222def get_all_bundles(): 223 bundles = exec_cmd("bm dump -a") 224 bundles = bundles.splitlines() 225 del bundles[0] 226 return bundles 227 228 229def batch_install(haps, base_dir): 230 start_log() 231 exec_cmd_simple("hdc shell power-shell setmode 602") 232 # exec_cmd_simple("hdc shell setenforce 0") 233 exec_cmd_simple("hdc shell param set persist.ace.testmode.enabled 1") 234 exec_cmd_simple("hdc shell mkdir /data/snapshot") 235 limit = 2 236 count = 0 237 time = 0 238 cur_batch_hap = [] 239 special_haps = LinuxContains.SPECIAL_HAP.split(";") 240 target_paths = LinuxContains.TARGET_PATH.split(";") 241 begin_bundles = get_all_bundles() 242 while time <= repeat_time: 243 for hap in haps: 244 245 isTargetPath = False 246 for target_path in target_paths: 247 if hap.startswith(target_path): 248 isTargetPath = True 249 if not isTargetPath: 250 continue 251 252 isSpecialSkip = False 253 for special_hap in special_haps: 254 if special_hap in hap: 255 global special_num 256 isSpecialSkip = True 257 special_hap_list.append(hap) 258 special_num = special_num + 1 259 break 260 if isSpecialSkip: 261 continue 262 263 cur_batch_hap.append(hap) 264 count = count + 1 265 if count == limit: 266 install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 267 test_bundles = get_test_bundle(begin_bundles) 268 install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 269 all_install_bundles = get_test_bundle(begin_bundles) 270 if snapshot == 0: 271 test_install_hap(test_bundles, base_dir) 272 else: 273 test_install_hap_with_error_snapshot(test_bundles, base_dir) 274 count = 0 275 uninstall_bundles(all_install_bundles) 276 cur_batch_hap.clear() 277 if len(cur_batch_hap) != 0: 278 install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 279 test_bundles = get_test_bundle(begin_bundles) 280 install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir) 281 all_install_bundles = get_test_bundle(begin_bundles) 282 if snapshot == 0: 283 test_install_hap(test_bundles, base_dir) 284 else: 285 test_install_hap_with_error_snapshot(test_bundles, base_dir) 286 uninstall_bundles(all_install_bundles) 287 cur_batch_hap.clear() 288 time += 1 289 stop_log() 290 recv_log(base_dir + "/") 291 292 293def handle_test_log(base_dir): 294 file = open(f"{base_dir}/auto_test.log", encoding='utf-8', errors='ignore') 295 p_num = 0 296 fail_num = 0 297 success_num = 0 298 test_exp_num = 0 299 test_pass_num = 0 300 test_fail_num = 0 301 test_error_num = 0 302 died_num = 0 303 fail_dict = [] 304 died_dict = [] 305 curr_name = "" 306 has_result = 1 307 for x in file: 308 if x.startswith(r"================start"): 309 if (has_result == 0): 310 fail_num = fail_num + 1 311 fail_dict.append(curr_name) 312 else: 313 has_result = 0 314 p_num = p_num + 1 315 curr_name = x.split(" ")[1] 316 if x.startswith(r"OHOS_REPORT_RESULT"): 317 has_result = 1 318 nums = re.findall(r"\d+", x) 319 if len(nums) == 4 or len(nums) == 5: 320 if nums[0] == nums[3] and int(nums[0]) != 0: 321 success_num = success_num + 1 322 else: 323 fail_num = fail_num + 1 324 fail_dict.append(curr_name) 325 test_exp_num = test_exp_num + int(nums[0]) 326 test_fail_num = test_fail_num + int(nums[1]) 327 test_error_num = test_error_num + int(nums[2]) 328 test_pass_num = test_pass_num + int(nums[3]) 329 else: 330 fail_num = fail_num + 1 331 fail_dict.append(curr_name) 332 elif x.__contains__("App died"): 333 has_result = 1 334 died_num = died_num + 1 335 died_dict.append(curr_name) 336 337 file.close() 338 339 error_log = open(f"{base_dir}/auto_test.log", mode='a') 340 error_log.writelines( 341 f"共完成测试项目 {int(p_num + special_num / 2)}个,成功{success_num}个,失败{fail_num}个,异常中止(App died){died_num}个,特殊应用跳过{int(special_num / 2)}个\n") 342 343 error_log.writelines( 344 f"共完成测试用例 {test_exp_num}个,成功{test_pass_num}个,失败{test_fail_num}个,错误{test_error_num}个\n") 345 print(f"successNum:{success_num} failNum:{fail_num} diedNum:{died_num} specialNum:{int(special_num / 2)} testNum:{test_exp_num} testSuccessNum:{test_pass_num} testFailNum:{test_fail_num} testErrorNum:{test_error_num}") 346 if len(fail_dict) > 0: 347 error_log.writelines("失败工程BundleName如下:\n") 348 for x in fail_dict: 349 error_log.writelines(" " + x + "\n") 350 if len(died_dict) > 0: 351 error_log.writelines("异常中止(App died)工程BundleName如下:\n") 352 for x in died_dict: 353 error_log.writelines(" " + x + "\n") 354 error_log.writelines(f"安装失败项目数量:{len(install_fail_dict)}\n") 355 for i in install_fail_dict: 356 error_log.writelines(f'{i} : {install_fail_dict[i]}') 357 if len(special_hap_list) > 0: 358 error_log.writelines(f"特殊安装跳过Hap数量:{special_num}\n") 359 for i in special_hap_list: 360 error_log.writelines(i + "\n") 361 error_log.flush() 362 error_log.close() 363 if fail_num != 0 or died_num != 0 or len(install_fail_dict) != 0: 364 print ('test failed !!') 365 else: 366 return ('test success !!') 367 368def init_out(): 369 base_dir = sys.path[0] 370 out_path = f"{base_dir}/ui_test_out/{datetime.datetime.now().strftime('%Y%m%d')}") 371 print(out_path) 372 clear_dir(out_path) 373 for log_dir in ['errorLog','successLog','SampleSignHap','auto_test.log','hilog','snapshot']: 374 clear_dir(f"{out_path}/{log_dir}") 375 return out_path 376 377 378def add_permission(bundle_name, finger_print): 379 KEY_NAME = 'install_list' 380 f = open(LinuxContains.INSTALL_LIST_CAPABILITY, 'r') 381 if bundle_name in f.read(): 382 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f: 383 old_data = json.load(f) 384 # print(bundle_name + " 已存在,需要修改权限") 385 for i in old_data[KEY_NAME]: 386 if i['bundleName'] == bundle_name: 387 i['app_signature'][0] = finger_print 388 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f: 389 json.dump(old_data, f) 390 391 else: 392 # 追加权限 393 permission_fields = { 394 "bundleName": bundle_name, 395 "app_signature": [finger_print], 396 "allowAppUsePrivilegeExtension": True 397 } 398 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f: 399 old_data = json.load(f) 400 # 这里就是那个列表List 401 old_data[KEY_NAME].append(permission_fields) 402 # print(bundle_name + " 不存在,需要追加权限") 403 with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f: 404 json.dump(old_data, f) 405 406 407def pull_list(): 408 cmd_pull = "hdc shell mount -o rw,remount / & hdc file recv /system/etc/app/install_list_capability.json" 409 os.system(cmd_pull) 410 411 412def push_list(): 413 cmd_push = "hdc shell mount -o rw,remount / & hdc file send install_list_capability.json /system/etc/app/install_list_capability.json & hdc shell chmod 777 /system/etc/app/install_list_capability.json & hdc shell reboot" 414 # print(cmd_push) 415 os.system(cmd_push) 416 time.sleep(40) # 等待重启 417 cmd_unlock = 'hdc shell power-shell wakeup & hdc shell uinput -T -m 100 200 100 900 600 & hdc shell power-shell setmode 602' # 亮屏并解锁屏幕 418 # print("设置屏幕常亮") 419 # print(cmd_unlock) 420 os.system(cmd_unlock) 421 422 423def load_config_to_dict(config_file): 424 with open(config_file, encoding='utf-8') as a: 425 # 读取文件 426 global depend_config_map 427 depend_config_map = json.load(a) 428 # print(type (depend_config_map)) 429 430 431def replace_enable(file,old_str,new_str): 432 with open(file, "r", encoding="utf-8") as f1,open("%s.bak" % file, "w", encoding="utf-8") as f2: 433 for line in f1: 434 if old_str in line: 435 line = line.replace(old_str, new_str) 436 f2.write(line) 437 os.remove(file) 438 os.rename("%s.bak" % file, file) 439 440 441def pull_config(): 442 cmd=f'hdc shell mount -o rw,remount / & hdc file recv system/etc/window/resources/window_manager_config.xml {LinuxContains.SAVE_XML_PATH}' 443 os.system(cmd) 444 445 446def push_config(): 447 cmd=f'hdc file send {LinuxContains.SAVE_XML_PATH}/window_manager_config.xml system/etc/window/resources/window_manager_config.xml' 448 449 450if __name__ == '__main__': 451 load_config_to_dict(LinuxContains.COMBIN_CONFIG) 452 # print(depend_config_map) 453 454 # 创建ui_test_out文件夹及子文件夹 455 out_base = init_out() 456 # pull_config() 457 # replace_enable(CONFIG_PATH,'decor enable="false"','decor enable="true"') 458 # push_config() 459 460 # 拉install_list_capability.json 461 pull_list() 462 463 # 特殊安装应用,添加权限 464 with open(LinuxContains.SPECIAL_LIST,'r',encoding='utf-8') as fp: 465 python_obj=json.load(fp) 466 for key,value in python_obj.items(): 467 add_permission(value[0],FINGER_PRINT) 468 469 # 推install_list_capability.json,重启解锁 470 push_list() 471 # sftp_from_remote("{0}\\SampleSignHap".format(out_base), "{0}\\errorLog".format(out_base), 472 # "{0}\\successLog".format(out_base)) 473 haps = get_haps(LinuxContains.SIGN_HAP_PATH) 474 total_hap_num = len(haps) / 2 475 batch_install(haps, out_base) 476 handle_test_log(out_base)