1#!/usr/bin/env python3
2# coding: utf-8
3
4"""
5Copyright (c) 2023 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18Description: utils for test suite
19"""
20
21import gzip
22import logging
23import os
24import re
25import shutil
26import subprocess
27import sys
28import time
29import zipfile
30
31from PIL import Image
32
33import options
34
35
36def get_log_level(arg_log_level):
37    log_level_dict = {
38        'debug': logging.DEBUG,
39        'info': logging.INFO,
40        'warn': logging.WARN,
41        'error': logging.ERROR
42    }
43    if arg_log_level not in log_level_dict.keys():
44        return logging.ERROR  # use error as default log level
45    else:
46        return log_level_dict[arg_log_level]
47
48
49def init_logger(log_level, log_file):
50    logging.basicConfig(filename=log_file,
51                        level=get_log_level(log_level),
52                        encoding=get_encoding(),
53                        format='[%(asctime)s %(filename)s:%(lineno)d]: [%(levelname)s] %(message)s')
54    logging.info("Test command:")
55    logging.info(" ".join(sys.argv))
56
57
58def get_encoding():
59    if is_windows():
60        return 'utf-8'
61    else:
62        return sys.getfilesystemencoding()
63
64
65def check_zip_file(file_path):
66    try:
67        if zipfile.is_zipfile(file_path):
68            with zipfile.ZipFile(file_path, 'r'):
69                return True
70        else:
71            return False
72    except Exception as e:
73        print(e)
74        return False
75
76
77def check_gzip_file(file_path):
78    try:
79        with gzip.open(file_path, 'rb') as gzfile:
80            gzfile.read(1)
81    except Exception as e:
82        print(e)
83        return False
84    return True
85
86
87def is_windows():
88    return sys.platform == 'win32' or sys.platform == 'cygwin'
89
90
91def is_mac():
92    return sys.platform == 'darwin'
93
94
95def is_linux():
96    return sys.platform == 'linux'
97
98
99def get_time_string():
100    return time.strftime('%Y%m%d-%H%M%S')
101
102
103def is_esmodule(hap_type):
104    # if hap_type is stage, it's esmodule.
105    # if hap_type is js, fa, compatible 8, it's js_bundle
106    return 'stage' in hap_type
107
108
109def is_file_timestamps_same(file_a, file_b):
110    file_a_mtime = os.stat(file_a).st_mtime
111    file_b_mtime = os.stat(file_b).st_mtime
112    return file_a_mtime == file_b_mtime
113
114
115def is_file_name_same(file_a, file_b):
116    file_a_name = os.path.basename(file_a)
117    file_b_name = os.path.basename(file_b)
118    return file_a_name == file_b_name
119
120
121def add_executable_permission(file_path):
122    current_mode = os.stat(file_path).st_mode
123    new_mode = current_mode | 0o111
124    os.chmod(file_path, new_mode)
125
126
127def replace_file_content(file_path, old_content, new_content):
128    with open(file_path, 'r+', encoding='utf-8') as file:
129        content = file.read()
130        content = content.replace(old_content, new_content)
131        file.seek(0)
132        file.write(content)
133        file.truncate()
134
135
136def run_cmd(cmd):
137    logging.debug(f'cmd: {cmd}')
138    result = subprocess.run(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
139    logging.debug(f'cmd stdout: {result.stdout}')
140    logging.error(f'cmd stderr: {result.stderr}')
141    return result
142
143
144def move_picture(task, image_name):
145    pic_save_dic = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'pictures')
146    if not os.path.exists(pic_save_dic):
147        os.mkdir(pic_save_dic)
148
149    pic_save_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'pictures/{task.name}')
150    if not os.path.exists(pic_save_path):
151        os.mkdir(pic_save_path)
152
153    pic_file_path = os.path.join(pic_save_path, f'{image_name}.jpeg')
154    if os.path.exists(pic_file_path):
155        os.remove(pic_file_path)
156
157    shutil.move(f'{image_name}.jpeg', pic_save_path)
158
159
160def get_running_screenshot(task, image_name, is_debug, module=''):
161    logging.debug(f'Getting runtime screenshot of {task.name}')
162    run_cmd(['hdc', 'shell', 'power-shell', 'wakeup;power-shell', 'setmode 602'])
163    run_cmd(['hdc', 'shell', 'uinput', '-T', '-m', '420', '1000', '420',
164             '400;uinput', '-T', '-m', '420', '400', '420', '1000'])
165
166    module_path = get_module_path(task, module)
167    output_path_signed = get_output_path(task, module, options.OutputType.signed)
168    build_path = os.path.join(task.path, *module_path, *task.build_path)
169    out_path = os.path.join(build_path, *output_path_signed)
170
171    result = run_cmd(['hdc', 'install', f'{out_path}'])
172    # After importing Hsp, Hap needs to install the Hsp package first before installing the Hap package.
173    not_hsp_error_message = 'Failed to install the HAP or HSP because the dependent module does not exist'
174    if not_hsp_error_message in result.stdout:
175        hsp_output_path = task.backup_info.hsp_signed_output_debug if is_debug \
176            else task.backup_info.hsp_signed_output_release
177        run_cmd(['hdc', 'install', f'{hsp_output_path}'])
178        time.sleep(3)
179    not_out_hsp_error_message = 'outHsp does not exist'
180    if not_out_hsp_error_message in result.stdout:
181        external_hsp_output_path = task.backup_info.external_hsp_signed_output_debug if is_debug \
182            else task.backup_info.external_hsp_signed_output_release
183        run_cmd(['hdc', 'install', f'{external_hsp_output_path}'])
184        time.sleep(3)
185
186    if not_hsp_error_message in result.stdout or not_out_hsp_error_message in result.stdout:
187        run_cmd(['hdc', 'install', f'{out_path}'])
188
189    run_cmd(['hdc', 'shell', 'aa', 'start', '-a', f'{task.ability_name}', '-b', f'{task.bundle_name}'])
190    time.sleep(3)
191
192    screen_path = f'/data/local/tmp/{image_name}.jpeg'
193    run_cmd(['hdc', 'shell', 'snapshot_display', '-f', f'{screen_path}'])
194    time.sleep(3)
195
196    run_cmd(['hdc', 'file', 'recv', f'{screen_path}', f'{image_name}.jpeg'])
197    run_cmd(['hdc', 'shell', 'aa', 'force-stop', f'{task.bundle_name}'])
198    run_cmd(['hdc', 'shell', 'bm', 'uninstall', '-n', f'{task.bundle_name}'])
199
200    move_picture(task, image_name)
201
202
203def compare_screenshot(runtime_picture_path, picture_reference_path, threshold=0.95):
204    try:
205        runtime_picture = Image.open(runtime_picture_path).convert('RGB')
206        picture_reference_path = Image.open(picture_reference_path).convert('RGB')
207    except Exception:
208        logging.error(f'open image {runtime_picture_path} failed')
209        return False
210    runtime_picture.thumbnail((256, 256))
211    picture_reference_path.thumbnail((256, 256))
212
213    runtime_pixel = runtime_picture.load()
214    reference_pixel = picture_reference_path.load()
215    width, height = runtime_picture.size
216
217    similar_pixels = 0
218    total_pixels = width * height
219
220    for x in range(width):
221        for y in range(height):
222            if runtime_pixel[x, y] == reference_pixel[x, y]:
223                similar_pixels += 1
224
225    similarity = similar_pixels / total_pixels
226
227    if similarity >= threshold:
228        return True
229    else:
230        return False
231
232
233def verify_runtime(task, picture_name):
234    pic_path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
235                            f'pictures/{task.name}/{picture_name}.jpeg')
236    pic_path_reference = os.path.join(os.path.dirname(os.path.abspath(__file__)),
237                                      f'pictures_reference/{task.name}/{picture_name}.jpeg')
238    passed = compare_screenshot(pic_path, pic_path_reference, threshold=0.95)
239    if not passed:
240        logging.error(f'{task.name} get error when running')
241        return False
242    return True
243
244
245def add_content_to_file(file_path, head_content, tail_content):
246    if not head_content and not tail_content:
247        logging.error('Both head_content and tail_content are missing,please check!')
248        return
249
250    with open(file_path, 'r+', encoding='utf-8') as file:
251        old_content = file.read()
252        file.seek(0)
253        if head_content:
254            file.write(head_content)
255        file.write(old_content)
256        if tail_content:
257            file.write(tail_content)
258        file.truncate()
259
260
261def remove_content_from_file(file_path, head_content, tail_content):
262    if not head_content and not tail_content:
263        logging.error('Both head_content and tail_content are missing,please check!')
264        return
265
266    with open(file_path, 'r+', encoding='utf-8') as file:
267        old_content = file.read()
268        if head_content and old_content.startswith(head_content):
269            old_content = old_content[len(head_content):]
270        elif head_content:
271            logging.debug(f'Cannot find the head content to remove in {file_path}')
272
273        if tail_content and old_content.endswith(tail_content):
274            old_content = old_content[:-len(tail_content)]
275        elif tail_content:
276            logging.debug(f'Cannot find the tail content to remove in {file_path}')
277
278        file.seek(0)
279        file.write(old_content)
280        file.truncate()
281
282
283def extract_library_names(import_statement):
284    pattern = r"import\s+{[^}]+}\s+from\s+'([^']+)';"
285    matches = re.findall(pattern, import_statement)
286
287    return matches[0]
288
289
290def get_module_name(task, module=''):
291    module_mapping = {
292        'Hap': task.hap_module,
293        'Har': task.har_module,
294        'BytecodeHar': task.har_module,
295        'Hsp': task.hsp_module,
296        'Cpp': task.cpp_module
297    }
298
299    return module_mapping.get(module, module_mapping['Hap'])
300
301
302def get_module_path(task, module=''):
303    module_mapping = {
304        'Hap': task.hap_module_path,
305        'Har': task.har_module_path,
306        'BytecodeHar': task.har_module_path,
307        'Hsp': task.hsp_module_path,
308        'Cpp': task.cpp_module_path
309    }
310
311    return module_mapping.get(module, module_mapping['Hap'])
312
313
314def get_output_path_unsigned(task, module=''):
315    output_path_mapping = {
316        "Hap": task.hap_output_path,
317        "Hsp": task.hsp_output_path,
318        "Cpp": task.cpp_output_path
319    }
320    return output_path_mapping.get(module, output_path_mapping['Hap'])
321
322
323def get_output_path_signed(task, module=''):
324    output_path_mapping = {
325        "Hap": task.hap_output_path_signed,
326        "Hsp": task.hsp_output_path_signed,
327        "Cpp": task.cpp_output_path_signed
328    }
329    return output_path_mapping.get(module, output_path_mapping['Hap'])
330
331
332def get_output_path_har(task, module=''):
333    output_path_mapping = {
334        "Har": task.har_output_path_har,
335        "BytecodeHar": task.har_output_path_har,
336        "Hsp": task.hsp_output_path_har
337    }
338    return output_path_mapping.get(module, output_path_mapping['Har'])
339
340
341def get_output_path(task, module, output_type):
342    if output_type == options.OutputType.unsigned:
343        return get_output_path_unsigned(task, module)
344    elif output_type == options.OutputType.signed:
345        return get_output_path_signed(task, module)
346    else:
347        return get_output_path_har(task, module)
348
349
350def get_cache_extension(task_type):
351    cache_extension = ''
352    if 'stage' in task_type:
353        cache_extension = '.protoBin'
354    elif 'fa' in task_type or 'compatible8' in task_type:
355        cache_extension = '.temp.abc'
356    elif 'js' in task_type:
357        cache_extension = '.abc'
358
359    return cache_extension
360
361
362def file_contains_specified_fields(file_path, fields):
363    if not os.path.exists(file_path):
364        logging.error(f"File {file_path} doesn't exist")
365        return False
366    with open(file_path, 'r', encoding='utf-8') as file:
367        line = file.readline()
368        while line:
369            if fields in line:
370                return True
371            line = file.readline()
372    return False
373