#!/usr/bin/env python3 # coding: utf-8 """ Copyright (c) 2023 Huawei Device Co., Ltd. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Description: utils for test suite """ import gzip import logging import os import re import shutil import subprocess import sys import time import zipfile from PIL import Image import options def get_log_level(arg_log_level): log_level_dict = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warn': logging.WARN, 'error': logging.ERROR } if arg_log_level not in log_level_dict.keys(): return logging.ERROR # use error as default log level else: return log_level_dict[arg_log_level] def init_logger(log_level, log_file): logging.basicConfig(filename=log_file, level=get_log_level(log_level), encoding=get_encoding(), format='[%(asctime)s %(filename)s:%(lineno)d]: [%(levelname)s] %(message)s') logging.info("Test command:") logging.info(" ".join(sys.argv)) def get_encoding(): if is_windows(): return 'utf-8' else: return sys.getfilesystemencoding() def check_zip_file(file_path): try: if zipfile.is_zipfile(file_path): with zipfile.ZipFile(file_path, 'r'): return True else: return False except Exception as e: print(e) return False def check_gzip_file(file_path): try: with gzip.open(file_path, 'rb') as gzfile: gzfile.read(1) except Exception as e: print(e) return False return True def is_windows(): return sys.platform == 'win32' or sys.platform == 'cygwin' def is_mac(): return sys.platform == 'darwin' def is_linux(): return sys.platform == 'linux' def get_time_string(): return time.strftime('%Y%m%d-%H%M%S') def is_esmodule(hap_type): # if hap_type is stage, it's esmodule. # if hap_type is js, fa, compatible 8, it's js_bundle return 'stage' in hap_type def is_file_timestamps_same(file_a, file_b): file_a_mtime = os.stat(file_a).st_mtime file_b_mtime = os.stat(file_b).st_mtime return file_a_mtime == file_b_mtime def is_file_name_same(file_a, file_b): file_a_name = os.path.basename(file_a) file_b_name = os.path.basename(file_b) return file_a_name == file_b_name def add_executable_permission(file_path): current_mode = os.stat(file_path).st_mode new_mode = current_mode | 0o111 os.chmod(file_path, new_mode) def replace_file_content(file_path, old_content, new_content): with open(file_path, 'r+', encoding='utf-8') as file: content = file.read() content = content.replace(old_content, new_content) file.seek(0) file.write(content) file.truncate() def run_cmd(cmd): logging.debug(f'cmd: {cmd}') result = subprocess.run(cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) logging.debug(f'cmd stdout: {result.stdout}') logging.error(f'cmd stderr: {result.stderr}') return result def move_picture(task, image_name): pic_save_dic = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'pictures') if not os.path.exists(pic_save_dic): os.mkdir(pic_save_dic) pic_save_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'pictures/{task.name}') if not os.path.exists(pic_save_path): os.mkdir(pic_save_path) pic_file_path = os.path.join(pic_save_path, f'{image_name}.jpeg') if os.path.exists(pic_file_path): os.remove(pic_file_path) shutil.move(f'{image_name}.jpeg', pic_save_path) def get_running_screenshot(task, image_name, is_debug, module=''): logging.debug(f'Getting runtime screenshot of {task.name}') run_cmd(['hdc', 'shell', 'power-shell', 'wakeup;power-shell', 'setmode 602']) run_cmd(['hdc', 'shell', 'uinput', '-T', '-m', '420', '1000', '420', '400;uinput', '-T', '-m', '420', '400', '420', '1000']) module_path = get_module_path(task, module) output_path_signed = get_output_path(task, module, options.OutputType.signed) build_path = os.path.join(task.path, *module_path, *task.build_path) out_path = os.path.join(build_path, *output_path_signed) result = run_cmd(['hdc', 'install', f'{out_path}']) # After importing Hsp, Hap needs to install the Hsp package first before installing the Hap package. not_hsp_error_message = 'Failed to install the HAP or HSP because the dependent module does not exist' if not_hsp_error_message in result.stdout: hsp_output_path = task.backup_info.hsp_signed_output_debug if is_debug \ else task.backup_info.hsp_signed_output_release run_cmd(['hdc', 'install', f'{hsp_output_path}']) time.sleep(3) not_out_hsp_error_message = 'outHsp does not exist' if not_out_hsp_error_message in result.stdout: external_hsp_output_path = task.backup_info.external_hsp_signed_output_debug if is_debug \ else task.backup_info.external_hsp_signed_output_release run_cmd(['hdc', 'install', f'{external_hsp_output_path}']) time.sleep(3) if not_hsp_error_message in result.stdout or not_out_hsp_error_message in result.stdout: run_cmd(['hdc', 'install', f'{out_path}']) run_cmd(['hdc', 'shell', 'aa', 'start', '-a', f'{task.ability_name}', '-b', f'{task.bundle_name}']) time.sleep(3) screen_path = f'/data/local/tmp/{image_name}.jpeg' run_cmd(['hdc', 'shell', 'snapshot_display', '-f', f'{screen_path}']) time.sleep(3) run_cmd(['hdc', 'file', 'recv', f'{screen_path}', f'{image_name}.jpeg']) run_cmd(['hdc', 'shell', 'aa', 'force-stop', f'{task.bundle_name}']) run_cmd(['hdc', 'shell', 'bm', 'uninstall', '-n', f'{task.bundle_name}']) move_picture(task, image_name) def compare_screenshot(runtime_picture_path, picture_reference_path, threshold=0.95): try: runtime_picture = Image.open(runtime_picture_path).convert('RGB') picture_reference_path = Image.open(picture_reference_path).convert('RGB') except Exception: logging.error(f'open image {runtime_picture_path} failed') return False runtime_picture.thumbnail((256, 256)) picture_reference_path.thumbnail((256, 256)) runtime_pixel = runtime_picture.load() reference_pixel = picture_reference_path.load() width, height = runtime_picture.size similar_pixels = 0 total_pixels = width * height for x in range(width): for y in range(height): if runtime_pixel[x, y] == reference_pixel[x, y]: similar_pixels += 1 similarity = similar_pixels / total_pixels if similarity >= threshold: return True else: return False def verify_runtime(task, picture_name): pic_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'pictures/{task.name}/{picture_name}.jpeg') pic_path_reference = os.path.join(os.path.dirname(os.path.abspath(__file__)), f'pictures_reference/{task.name}/{picture_name}.jpeg') passed = compare_screenshot(pic_path, pic_path_reference, threshold=0.95) if not passed: logging.error(f'{task.name} get error when running') return False return True def add_content_to_file(file_path, head_content, tail_content): if not head_content and not tail_content: logging.error('Both head_content and tail_content are missing,please check!') return with open(file_path, 'r+', encoding='utf-8') as file: old_content = file.read() file.seek(0) if head_content: file.write(head_content) file.write(old_content) if tail_content: file.write(tail_content) file.truncate() def remove_content_from_file(file_path, head_content, tail_content): if not head_content and not tail_content: logging.error('Both head_content and tail_content are missing,please check!') return with open(file_path, 'r+', encoding='utf-8') as file: old_content = file.read() if head_content and old_content.startswith(head_content): old_content = old_content[len(head_content):] elif head_content: logging.debug(f'Cannot find the head content to remove in {file_path}') if tail_content and old_content.endswith(tail_content): old_content = old_content[:-len(tail_content)] elif tail_content: logging.debug(f'Cannot find the tail content to remove in {file_path}') file.seek(0) file.write(old_content) file.truncate() def extract_library_names(import_statement): pattern = r"import\s+{[^}]+}\s+from\s+'([^']+)';" matches = re.findall(pattern, import_statement) return matches[0] def get_module_name(task, module=''): module_mapping = { 'Hap': task.hap_module, 'Har': task.har_module, 'BytecodeHar': task.har_module, 'Hsp': task.hsp_module, 'Cpp': task.cpp_module } return module_mapping.get(module, module_mapping['Hap']) def get_module_path(task, module=''): module_mapping = { 'Hap': task.hap_module_path, 'Har': task.har_module_path, 'BytecodeHar': task.har_module_path, 'Hsp': task.hsp_module_path, 'Cpp': task.cpp_module_path } return module_mapping.get(module, module_mapping['Hap']) def get_output_path_unsigned(task, module=''): output_path_mapping = { "Hap": task.hap_output_path, "Hsp": task.hsp_output_path, "Cpp": task.cpp_output_path } return output_path_mapping.get(module, output_path_mapping['Hap']) def get_output_path_signed(task, module=''): output_path_mapping = { "Hap": task.hap_output_path_signed, "Hsp": task.hsp_output_path_signed, "Cpp": task.cpp_output_path_signed } return output_path_mapping.get(module, output_path_mapping['Hap']) def get_output_path_har(task, module=''): output_path_mapping = { "Har": task.har_output_path_har, "BytecodeHar": task.har_output_path_har, "Hsp": task.hsp_output_path_har } return output_path_mapping.get(module, output_path_mapping['Har']) def get_output_path(task, module, output_type): if output_type == options.OutputType.unsigned: return get_output_path_unsigned(task, module) elif output_type == options.OutputType.signed: return get_output_path_signed(task, module) else: return get_output_path_har(task, module) def get_cache_extension(task_type): cache_extension = '' if 'stage' in task_type: cache_extension = '.protoBin' elif 'fa' in task_type or 'compatible8' in task_type: cache_extension = '.temp.abc' elif 'js' in task_type: cache_extension = '.abc' return cache_extension def file_contains_specified_fields(file_path, fields): if not os.path.exists(file_path): logging.error(f"File {file_path} doesn't exist") return False with open(file_path, 'r', encoding='utf-8') as file: line = file.readline() while line: if fields in line: return True line = file.readline() return False