1'''
2#  Copyright (c) 2023 Huawei Device Co., Ltd.
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6
7#      http://www.apache.org/licenses/LICENSE-2.0
8
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14'''
15
16import datetime
17import json
18import os
19import re
20import shutil
21import stat
22import subprocess
23import sys
24import time
25
26import paramiko
27
28import LinuxContains
29
30
31install_fail_dict = {}
32install_success_dict = {}
33total_hap_num = 0
34hap_install_success = 0
35hap_install_fail = 0
36special_num = 0
37special_hap_list = []
38depend_config_map = dict
39
40CONFIG_PATH='D:\\window_manager_config.xml'
41FINGER_PRINT = '8E93863FC32EE238060BF69A9B37E2608FFFB21F93C862DD511CBAC9F30024B5'
42repeat_time = 0
43snapshot = 1
44
45
46def exec_cmd(cmd):
47    f = os.popen(f"hdc shell \"{cmd}\"")
48    # print(cmd)
49    text = f.read()
50    f.close()
51
52
53    return text
54
55
56def exec_cmd_path(cmd, path):
57    f = os.popen(f"hdc shell \"{cmd}\" >> {path}")
58    # print(cmd)
59    text = f.read()
60    f.close()
61    return text
62
63
64def exec_cmd_simple(cmd):
65    f = os.popen(cmd)
66    # print(cmd)
67    text = f.read()
68    f.close()
69    return text
70
71def exists(file_path):
72    return os.path.exists(file_path)
73
74
75def get_haps(local_dir):
76    files = os.listdir(local_dir)
77    hap_list = []
78    for file in files:
79        if "_signed" in file:
80            hap_list.append(file)
81    return hap_list
82
83
84def install_hap(hap, hap_path, base_dir):
85    # print("install " + hap_path + "\\" + hap)
86    if not exists(os.path.join(hap_path, hap)):
87        return
88    install_res = exec_cmd_simple(f"hdc install -r {hap_path}/{hap}"))
89    # print(install_res)
90    if not install_res.__contains__("msg:install bundle successfully."):
91        install_res = install_res.replace("\nAppMod finish\n\n", "")
92        install_fail_dict[hap] = install_res
93    else:
94        exec_cmd_simple(f"echo install {hap} success! >> {base_dir}/auto_test.log")
95        install_success_dict[hap] = install_res
96    time.sleep(2)
97
98
99def install_depend_haps(curr_haps, hap_path, base_dir):
100    for hap in curr_haps:
101        if not depend_config_map.get(hap):
102            continue
103        depend_haps = depend_config_map.get(hap)
104        for depend_hap in depend_haps:
105            install_hap(depend_hap, hap_path, base_dir)
106        install_depend_haps(depend_haps, hap_path, base_dir)
107
108
109def get_test_bundle(begin_bundles):
110    end_bundles = get_all_bundles()
111    return list(set(end_bundles) - set(begin_bundles))
112
113
114def install_haps(local_hap, hap_path, base_dir):
115    for hap in local_hap:
116        install_hap(hap, hap_path, base_dir)
117
118
119def start_log():
120    cmd_clean = "rm -r /data/log/hilog/*"
121    cmd_start = "hilog -w start -l 1M"
122    exec_cmd(cmd_clean)
123    exec_cmd(cmd_start)
124
125
126def stop_log():
127    cmd_stop = "hilog -w stop"
128    exec_cmd(cmd_stop)
129
130
131def recv_log(local_log):
132    # 输出日志结果
133    file_path = local_log
134    cmd = f"hdc file recv /data/log/hilog/ {file_path}"
135    exec_cmd_simple(cmd)
136
137
138def test_install_hap_with_error_snapshot(uninstall_bundles, base_dir):
139    with open(LinuxContains.FA_MODAL_AND_LOWER_CASE_LIST,'r',encoding='utf-8') as fp:
140        FA_Python_Obj=json.load(fp)
141        is_stage_model=0
142        for key,value in FA_Python_Obj.items():
143            for bundle_name in uninstall_bundles:
144                # print("key:  "+key)
145                bundle_name=bundle_name.strip()
146                if bundle_name==key:
147                    is_stage_model=is_stage_model+1
148            if is_stage_model == 0:
149                cmd = r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest /ets/TestRunner/OpenHarmonyTestRunner -s timeout 30000000 "
150            else:
151                cmd=  r"hdc shell aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000 "
152    path = f"{base_dir}/auto_test.log"
153    if len(uninstall_bundles) != 1:
154        exec_cmd_simple("echo uninstall_bundles:{0}, >> {1}".format(uninstall_bundles, path))
155    for bundle in uninstall_bundles:
156        bundle = bundle.strip()
157        exec_cmd_simple("echo ================start {0} ui test================{1} >> {2}".format(bundle,
158                                                                                                  datetime.datetime.now().strftime(
159                                                                                                      '%Y-%m-%d %H:%M:%S'),
160                                                                                                  path))
161        # print("test " + bundle)
162        tmp_cmd = cmd.format(bundle)
163        # print(tmp_cmd)
164        p = subprocess.Popen(tmp_cmd, shell=True, close_fds=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
165                             stderr=subprocess.PIPE, encoding="utf-8")
166        current = "-1"
167        auto_log = open(f"{base_dir}/auto_test.log", mode='a')
168        while True:
169            line = p.stdout.readline()
170            auto_log.writelines(line)
171            if line and "current" in line:
172                nums = re.findall(r"\d+", line)
173                if (len(nums) == 0):
174                    current = "0"
175                else:
176                    current = nums[0]
177            if line and "stack" in line:
178                exec_cmd_simple(
179                    f"hdc shell snapshot_display -f /data/snapshot/{bundle}_{current}.jpeg")
180            if line == '' and p.poll() != None:
181                break
182        exec_cmd_simple(f"hdc file recv /data/snapshot/. {base_dir}/snapshot")
183        exec_cmd_simple("hdc shell rm -rf /data/snapshot/*")
184        auto_log.flush()
185        auto_log.close()
186        time.sleep(5)
187
188
189def test_install_hap(test_bundles, base_dir):
190    cmd = r"aa test -b {} -p com.example.entry_test -m entry_test -s unittest OpenHarmonyTestRunner -s timeout 30000000"
191    path = f"{base_dir}/auto_test.log"
192    for bundle in test_bundles:
193        bundle = bundle.strip()
194        if bundle == 'ohos.samples.launcher':
195            cmd_launcher = 'hdc shell aa start -b ohos.samples.launcher -a MainAbility'
196            os.system(cmd_launcher)
197        exec_cmd_simple(f"echo ================start {bundle} ui test================{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} >> {path}")
198        # print("test " + bundle)
199        tmp_cmd = cmd.format(bundle)
200        exec_cmd_path(tmp_cmd, path)
201        time.sleep(5)
202
203
204def uninstall_bundles(install_bundles):
205    for bundle in install_bundles:
206        # print("uninstall " + bundle)
207        uninstall_res = exec_cmd_simple(f"hdc uninstall {bundle}")
208
209
210def clear_dir(dir_path):
211    if os.path.exists(dir_path):
212        shutil.rmtree(dir_path)
213    os.makedirs(dir_path)
214
215
216def clear_file(file_path):
217    if os.path.exists(file_path):
218        os.remove(file_path)
219    os.system(r"type nul>{}".format(file_path))
220
221
222def get_all_bundles():
223    bundles = exec_cmd("bm dump -a")
224    bundles = bundles.splitlines()
225    del bundles[0]
226    return bundles
227
228
229def batch_install(haps, base_dir):
230    start_log()
231    exec_cmd_simple("hdc shell power-shell setmode 602")
232    # exec_cmd_simple("hdc shell setenforce 0")
233    exec_cmd_simple("hdc shell param set persist.ace.testmode.enabled 1")
234    exec_cmd_simple("hdc shell mkdir /data/snapshot")
235    limit = 2
236    count = 0
237    time = 0
238    cur_batch_hap = []
239    special_haps = LinuxContains.SPECIAL_HAP.split(";")
240    target_paths = LinuxContains.TARGET_PATH.split(";")
241    begin_bundles = get_all_bundles()
242    while time <= repeat_time:
243        for hap in haps:
244
245            isTargetPath = False
246            for target_path in target_paths:
247                if hap.startswith(target_path):
248                    isTargetPath = True
249            if not isTargetPath:
250                continue
251
252            isSpecialSkip = False
253            for special_hap in special_haps:
254                if special_hap in hap:
255                    global special_num
256                    isSpecialSkip = True
257                    special_hap_list.append(hap)
258                    special_num = special_num + 1
259                    break
260            if isSpecialSkip:
261                continue
262
263            cur_batch_hap.append(hap)
264            count = count + 1
265            if count == limit:
266                install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
267                test_bundles = get_test_bundle(begin_bundles)
268                install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
269                all_install_bundles = get_test_bundle(begin_bundles)
270                if snapshot == 0:
271                    test_install_hap(test_bundles, base_dir)
272                else:
273                    test_install_hap_with_error_snapshot(test_bundles, base_dir)
274                count = 0
275                uninstall_bundles(all_install_bundles)
276                cur_batch_hap.clear()
277        if len(cur_batch_hap) != 0:
278            install_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
279            test_bundles = get_test_bundle(begin_bundles)
280            install_depend_haps(cur_batch_hap, LinuxContains.SIGN_HAP_PATH, base_dir)
281            all_install_bundles = get_test_bundle(begin_bundles)
282            if snapshot == 0:
283                test_install_hap(test_bundles, base_dir)
284            else:
285                test_install_hap_with_error_snapshot(test_bundles, base_dir)
286            uninstall_bundles(all_install_bundles)
287            cur_batch_hap.clear()
288        time += 1
289    stop_log()
290    recv_log(base_dir + "/")
291
292
293def handle_test_log(base_dir):
294    file = open(f"{base_dir}/auto_test.log", encoding='utf-8', errors='ignore')
295    p_num = 0
296    fail_num = 0
297    success_num = 0
298    test_exp_num = 0
299    test_pass_num = 0
300    test_fail_num = 0
301    test_error_num = 0
302    died_num = 0
303    fail_dict = []
304    died_dict = []
305    curr_name = ""
306    has_result = 1
307    for x in file:
308        if x.startswith(r"================start"):
309            if (has_result == 0):
310                fail_num = fail_num + 1
311                fail_dict.append(curr_name)
312            else:
313                has_result = 0
314            p_num = p_num + 1
315            curr_name = x.split(" ")[1]
316        if x.startswith(r"OHOS_REPORT_RESULT"):
317            has_result = 1
318            nums = re.findall(r"\d+", x)
319            if len(nums) == 4 or len(nums) == 5:
320                if nums[0] == nums[3] and int(nums[0]) != 0:
321                    success_num = success_num + 1
322                else:
323                    fail_num = fail_num + 1
324                    fail_dict.append(curr_name)
325                test_exp_num = test_exp_num + int(nums[0])
326                test_fail_num = test_fail_num + int(nums[1])
327                test_error_num = test_error_num + int(nums[2])
328                test_pass_num = test_pass_num + int(nums[3])
329            else:
330                fail_num = fail_num + 1
331                fail_dict.append(curr_name)
332        elif x.__contains__("App died"):
333            has_result = 1
334            died_num = died_num + 1
335            died_dict.append(curr_name)
336
337    file.close()
338
339    error_log = open(f"{base_dir}/auto_test.log", mode='a')
340    error_log.writelines(
341        f"共完成测试项目 {int(p_num + special_num / 2)}个,成功{success_num}个,失败{fail_num}个,异常中止(App died){died_num}个,特殊应用跳过{int(special_num / 2)}个\n")
342
343    error_log.writelines(
344        f"共完成测试用例 {test_exp_num}个,成功{test_pass_num}个,失败{test_fail_num}个,错误{test_error_num}个\n")
345    print(f"successNum:{success_num} failNum:{fail_num} diedNum:{died_num} specialNum:{int(special_num / 2)} testNum:{test_exp_num} testSuccessNum:{test_pass_num} testFailNum:{test_fail_num} testErrorNum:{test_error_num}")
346    if len(fail_dict) > 0:
347        error_log.writelines("失败工程BundleName如下:\n")
348        for x in fail_dict:
349            error_log.writelines("     " + x + "\n")
350    if len(died_dict) > 0:
351        error_log.writelines("异常中止(App died)工程BundleName如下:\n")
352        for x in died_dict:
353            error_log.writelines("     " + x + "\n")
354    error_log.writelines(f"安装失败项目数量:{len(install_fail_dict)}\n")
355    for i in install_fail_dict:
356        error_log.writelines(f'{i} : {install_fail_dict[i]}')
357    if len(special_hap_list) > 0:
358        error_log.writelines(f"特殊安装跳过Hap数量:{special_num}\n")
359        for i in special_hap_list:
360            error_log.writelines(i + "\n")
361    error_log.flush()
362    error_log.close()
363    if fail_num != 0 or died_num != 0 or len(install_fail_dict) != 0:
364        print ('test failed !!')
365    else:
366        return ('test success !!')
367
368def init_out():
369    base_dir = sys.path[0]
370    out_path = f"{base_dir}/ui_test_out/{datetime.datetime.now().strftime('%Y%m%d')}")
371    print(out_path)
372    clear_dir(out_path)
373    for log_dir in ['errorLog','successLog','SampleSignHap','auto_test.log','hilog','snapshot']:
374        clear_dir(f"{out_path}/{log_dir}")
375    return out_path
376
377
378def add_permission(bundle_name, finger_print):
379    KEY_NAME = 'install_list'
380    f = open(LinuxContains.INSTALL_LIST_CAPABILITY, 'r')
381    if bundle_name in f.read():
382        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f:
383            old_data = json.load(f)
384            # print(bundle_name + " 已存在,需要修改权限")
385            for i in old_data[KEY_NAME]:
386                if i['bundleName'] == bundle_name:
387                    i['app_signature'][0] = finger_print
388        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f:
389            json.dump(old_data, f)
390
391    else:
392        # 追加权限
393        permission_fields = {
394            "bundleName": bundle_name,
395            "app_signature": [finger_print],
396            "allowAppUsePrivilegeExtension": True
397        }
398        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "r", encoding="utf-8") as f:
399            old_data = json.load(f)
400            # 这里就是那个列表List
401            old_data[KEY_NAME].append(permission_fields)
402            # print(bundle_name + " 不存在,需要追加权限")
403        with open(LinuxContains.INSTALL_LIST_CAPABILITY, "w", encoding="utf-8") as f:
404            json.dump(old_data, f)
405
406
407def pull_list():
408    cmd_pull = "hdc shell mount -o rw,remount / & hdc file recv /system/etc/app/install_list_capability.json"
409    os.system(cmd_pull)
410
411
412def push_list():
413    cmd_push = "hdc shell mount -o rw,remount /  & hdc file send install_list_capability.json /system/etc/app/install_list_capability.json & hdc shell chmod 777 /system/etc/app/install_list_capability.json & hdc shell reboot"
414    # print(cmd_push)
415    os.system(cmd_push)
416    time.sleep(40)  # 等待重启
417    cmd_unlock = 'hdc shell power-shell wakeup & hdc shell uinput -T -m 100 200 100 900 600 & hdc shell power-shell setmode 602'  # 亮屏并解锁屏幕
418    # print("设置屏幕常亮")
419    # print(cmd_unlock)
420    os.system(cmd_unlock)
421
422
423def load_config_to_dict(config_file):
424    with open(config_file, encoding='utf-8') as a:
425        # 读取文件
426        global depend_config_map
427        depend_config_map = json.load(a)
428        # print(type (depend_config_map))
429
430
431def replace_enable(file,old_str,new_str):
432    with open(file, "r", encoding="utf-8") as f1,open("%s.bak" % file, "w", encoding="utf-8") as f2:
433        for line in f1:
434            if old_str in line:
435                line = line.replace(old_str, new_str)
436            f2.write(line)
437    os.remove(file)
438    os.rename("%s.bak" % file, file)
439
440
441def pull_config():
442    cmd=f'hdc shell mount -o rw,remount / & hdc file recv system/etc/window/resources/window_manager_config.xml {LinuxContains.SAVE_XML_PATH}'
443    os.system(cmd)
444
445
446def push_config():
447    cmd=f'hdc file send {LinuxContains.SAVE_XML_PATH}/window_manager_config.xml system/etc/window/resources/window_manager_config.xml'
448
449
450if __name__ == '__main__':
451    load_config_to_dict(LinuxContains.COMBIN_CONFIG)
452    # print(depend_config_map)
453
454    # 创建ui_test_out文件夹及子文件夹
455    out_base = init_out()
456    # pull_config()
457    # replace_enable(CONFIG_PATH,'decor enable="false"','decor enable="true"')
458    # push_config()
459
460    # 拉install_list_capability.json
461    pull_list()
462
463    # 特殊安装应用,添加权限
464    with open(LinuxContains.SPECIAL_LIST,'r',encoding='utf-8') as fp:
465        python_obj=json.load(fp)
466        for key,value in python_obj.items():
467            add_permission(value[0],FINGER_PRINT)
468
469    # 推install_list_capability.json,重启解锁
470    push_list()
471    # sftp_from_remote("{0}\\SampleSignHap".format(out_base), "{0}\\errorLog".format(out_base),
472    #                  "{0}\\successLog".format(out_base))
473    haps = get_haps(LinuxContains.SIGN_HAP_PATH)
474    total_hap_num = len(haps) / 2
475    batch_install(haps, out_base)
476    handle_test_log(out_base)