1#!/usr/bin/env python3 2# coding: utf-8 3 4""" 5Copyright (c) 2023 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18Description: utils for test suite 19""" 20 21import gzip 22import logging 23import os 24import re 25import shutil 26import subprocess 27import sys 28import time 29import zipfile 30 31from PIL import Image 32 33import options 34 35 36def get_log_level(arg_log_level): 37 log_level_dict = { 38 'debug': logging.DEBUG, 39 'info': logging.INFO, 40 'warn': logging.WARN, 41 'error': logging.ERROR 42 } 43 if arg_log_level not in log_level_dict.keys(): 44 return logging.ERROR # use error as default log level 45 else: 46 return log_level_dict[arg_log_level] 47 48 49def init_logger(log_level, log_file): 50 logging.basicConfig(filename=log_file, 51 level=get_log_level(log_level), 52 encoding=get_encoding(), 53 format='[%(asctime)s %(filename)s:%(lineno)d]: [%(levelname)s] %(message)s') 54 logging.info("Test command:") 55 logging.info(" ".join(sys.argv)) 56 57 58def get_encoding(): 59 if is_windows(): 60 return 'utf-8' 61 else: 62 return sys.getfilesystemencoding() 63 64 65def check_zip_file(file_path): 66 try: 67 if zipfile.is_zipfile(file_path): 68 with zipfile.ZipFile(file_path, 'r'): 69 return True 70 else: 71 return False 72 except Exception as e: 73 print(e) 74 return False 75 76 77def check_gzip_file(file_path): 78 try: 79 with gzip.open(file_path, 'rb') as gzfile: 80 gzfile.read(1) 81 except Exception as e: 82 print(e) 83 return False 84 return True 85 86 87def is_windows(): 88 return sys.platform == 'win32' or sys.platform == 'cygwin' 89 90 91def is_mac(): 92 return sys.platform == 'darwin' 93 94 95def is_linux(): 96 return sys.platform == 'linux' 97 98 99def get_time_string(): 100 return time.strftime('%Y%m%d-%H%M%S') 101 102 103def is_esmodule(hap_type): 104 # if hap_type is stage, it's esmodule. 105 # if hap_type is js, fa, compatible 8, it's js_bundle 106 return 'stage' in hap_type 107 108 109def is_file_timestamps_same(file_a, file_b): 110 file_a_mtime = os.stat(file_a).st_mtime 111 file_b_mtime = os.stat(file_b).st_mtime 112 return file_a_mtime == file_b_mtime 113 114 115def is_file_name_same(file_a, file_b): 116 file_a_name = os.path.basename(file_a) 117 file_b_name = os.path.basename(file_b) 118 return file_a_name == file_b_name 119 120 121def add_executable_permission(file_path): 122 current_mode = os.stat(file_path).st_mode 123 new_mode = current_mode | 0o111 124 os.chmod(file_path, new_mode) 125 126 127def replace_file_content(file_path, old_content, new_content): 128 with open(file_path, 'r+', encoding='utf-8') as file: 129 content = file.read() 130 content = content.replace(old_content, new_content) 131 file.seek(0) 132 file.write(content) 133 file.truncate() 134 135 136def run_cmd(cmd): 137 logging.debug(f'cmd: {cmd}') 138 result = subprocess.run(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) 139 logging.debug(f'cmd stdout: {result.stdout}') 140 logging.error(f'cmd stderr: {result.stderr}') 141 return result 142 143 144def move_picture(task, image_name): 145 pic_save_dic = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'pictures') 146 if not os.path.exists(pic_save_dic): 147 os.mkdir(pic_save_dic) 148 149 pic_save_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'pictures/{task.name}') 150 if not os.path.exists(pic_save_path): 151 os.mkdir(pic_save_path) 152 153 pic_file_path = os.path.join(pic_save_path, f'{image_name}.jpeg') 154 if os.path.exists(pic_file_path): 155 os.remove(pic_file_path) 156 157 shutil.move(f'{image_name}.jpeg', pic_save_path) 158 159 160def get_running_screenshot(task, image_name, is_debug, module=''): 161 logging.debug(f'Getting runtime screenshot of {task.name}') 162 run_cmd(['hdc', 'shell', 'power-shell', 'wakeup;power-shell', 'setmode 602']) 163 run_cmd(['hdc', 'shell', 'uinput', '-T', '-m', '420', '1000', '420', 164 '400;uinput', '-T', '-m', '420', '400', '420', '1000']) 165 166 module_path = get_module_path(task, module) 167 output_path_signed = get_output_path(task, module, options.OutputType.signed) 168 build_path = os.path.join(task.path, *module_path, *task.build_path) 169 out_path = os.path.join(build_path, *output_path_signed) 170 171 result = run_cmd(['hdc', 'install', f'{out_path}']) 172 # After importing Hsp, Hap needs to install the Hsp package first before installing the Hap package. 173 not_hsp_error_message = 'Failed to install the HAP or HSP because the dependent module does not exist' 174 if not_hsp_error_message in result.stdout: 175 hsp_output_path = task.backup_info.hsp_signed_output_debug if is_debug \ 176 else task.backup_info.hsp_signed_output_release 177 run_cmd(['hdc', 'install', f'{hsp_output_path}']) 178 time.sleep(3) 179 not_out_hsp_error_message = 'outHsp does not exist' 180 if not_out_hsp_error_message in result.stdout: 181 external_hsp_output_path = task.backup_info.external_hsp_signed_output_debug if is_debug \ 182 else task.backup_info.external_hsp_signed_output_release 183 run_cmd(['hdc', 'install', f'{external_hsp_output_path}']) 184 time.sleep(3) 185 186 if not_hsp_error_message in result.stdout or not_out_hsp_error_message in result.stdout: 187 run_cmd(['hdc', 'install', f'{out_path}']) 188 189 run_cmd(['hdc', 'shell', 'aa', 'start', '-a', f'{task.ability_name}', '-b', f'{task.bundle_name}']) 190 time.sleep(3) 191 192 screen_path = f'/data/local/tmp/{image_name}.jpeg' 193 run_cmd(['hdc', 'shell', 'snapshot_display', '-f', f'{screen_path}']) 194 time.sleep(3) 195 196 run_cmd(['hdc', 'file', 'recv', f'{screen_path}', f'{image_name}.jpeg']) 197 run_cmd(['hdc', 'shell', 'aa', 'force-stop', f'{task.bundle_name}']) 198 run_cmd(['hdc', 'shell', 'bm', 'uninstall', '-n', f'{task.bundle_name}']) 199 200 move_picture(task, image_name) 201 202 203def compare_screenshot(runtime_picture_path, picture_reference_path, threshold=0.95): 204 try: 205 runtime_picture = Image.open(runtime_picture_path).convert('RGB') 206 picture_reference_path = Image.open(picture_reference_path).convert('RGB') 207 except Exception: 208 logging.error(f'open image {runtime_picture_path} failed') 209 return False 210 runtime_picture.thumbnail((256, 256)) 211 picture_reference_path.thumbnail((256, 256)) 212 213 runtime_pixel = runtime_picture.load() 214 reference_pixel = picture_reference_path.load() 215 width, height = runtime_picture.size 216 217 similar_pixels = 0 218 total_pixels = width * height 219 220 for x in range(width): 221 for y in range(height): 222 if runtime_pixel[x, y] == reference_pixel[x, y]: 223 similar_pixels += 1 224 225 similarity = similar_pixels / total_pixels 226 227 if similarity >= threshold: 228 return True 229 else: 230 return False 231 232 233def verify_runtime(task, picture_name): 234 pic_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 235 f'pictures/{task.name}/{picture_name}.jpeg') 236 pic_path_reference = os.path.join(os.path.dirname(os.path.abspath(__file__)), 237 f'pictures_reference/{task.name}/{picture_name}.jpeg') 238 passed = compare_screenshot(pic_path, pic_path_reference, threshold=0.95) 239 if not passed: 240 logging.error(f'{task.name} get error when running') 241 return False 242 return True 243 244 245def add_content_to_file(file_path, head_content, tail_content): 246 if not head_content and not tail_content: 247 logging.error('Both head_content and tail_content are missing,please check!') 248 return 249 250 with open(file_path, 'r+', encoding='utf-8') as file: 251 old_content = file.read() 252 file.seek(0) 253 if head_content: 254 file.write(head_content) 255 file.write(old_content) 256 if tail_content: 257 file.write(tail_content) 258 file.truncate() 259 260 261def remove_content_from_file(file_path, head_content, tail_content): 262 if not head_content and not tail_content: 263 logging.error('Both head_content and tail_content are missing,please check!') 264 return 265 266 with open(file_path, 'r+', encoding='utf-8') as file: 267 old_content = file.read() 268 if head_content and old_content.startswith(head_content): 269 old_content = old_content[len(head_content):] 270 elif head_content: 271 logging.debug(f'Cannot find the head content to remove in {file_path}') 272 273 if tail_content and old_content.endswith(tail_content): 274 old_content = old_content[:-len(tail_content)] 275 elif tail_content: 276 logging.debug(f'Cannot find the tail content to remove in {file_path}') 277 278 file.seek(0) 279 file.write(old_content) 280 file.truncate() 281 282 283def extract_library_names(import_statement): 284 pattern = r"import\s+{[^}]+}\s+from\s+'([^']+)';" 285 matches = re.findall(pattern, import_statement) 286 287 return matches[0] 288 289 290def get_module_name(task, module=''): 291 module_mapping = { 292 'Hap': task.hap_module, 293 'Har': task.har_module, 294 'BytecodeHar': task.har_module, 295 'Hsp': task.hsp_module, 296 'Cpp': task.cpp_module 297 } 298 299 return module_mapping.get(module, module_mapping['Hap']) 300 301 302def get_module_path(task, module=''): 303 module_mapping = { 304 'Hap': task.hap_module_path, 305 'Har': task.har_module_path, 306 'BytecodeHar': task.har_module_path, 307 'Hsp': task.hsp_module_path, 308 'Cpp': task.cpp_module_path 309 } 310 311 return module_mapping.get(module, module_mapping['Hap']) 312 313 314def get_output_path_unsigned(task, module=''): 315 output_path_mapping = { 316 "Hap": task.hap_output_path, 317 "Hsp": task.hsp_output_path, 318 "Cpp": task.cpp_output_path 319 } 320 return output_path_mapping.get(module, output_path_mapping['Hap']) 321 322 323def get_output_path_signed(task, module=''): 324 output_path_mapping = { 325 "Hap": task.hap_output_path_signed, 326 "Hsp": task.hsp_output_path_signed, 327 "Cpp": task.cpp_output_path_signed 328 } 329 return output_path_mapping.get(module, output_path_mapping['Hap']) 330 331 332def get_output_path_har(task, module=''): 333 output_path_mapping = { 334 "Har": task.har_output_path_har, 335 "BytecodeHar": task.har_output_path_har, 336 "Hsp": task.hsp_output_path_har 337 } 338 return output_path_mapping.get(module, output_path_mapping['Har']) 339 340 341def get_output_path(task, module, output_type): 342 if output_type == options.OutputType.unsigned: 343 return get_output_path_unsigned(task, module) 344 elif output_type == options.OutputType.signed: 345 return get_output_path_signed(task, module) 346 else: 347 return get_output_path_har(task, module) 348 349 350def get_cache_extension(task_type): 351 cache_extension = '' 352 if 'stage' in task_type: 353 cache_extension = '.protoBin' 354 elif 'fa' in task_type or 'compatible8' in task_type: 355 cache_extension = '.temp.abc' 356 elif 'js' in task_type: 357 cache_extension = '.abc' 358 359 return cache_extension 360 361 362def file_contains_specified_fields(file_path, fields): 363 if not os.path.exists(file_path): 364 logging.error(f"File {file_path} doesn't exist") 365 return False 366 with open(file_path, 'r', encoding='utf-8') as file: 367 line = file.readline() 368 while line: 369 if fields in line: 370 return True 371 line = file.readline() 372 return False 373