#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2023 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import shutil import subprocess import sys import time import traceback import zipfile import json5 import performance_config class PerformanceBuild(): def __init__(self, config_input, mail_obj): self.config = None self.first_line_in_avg_excel = "" self.preview_all_time_dic = {} self.preview_avg_time_dic = {} self.time_avg_dic = {} self.all_time_dic = {} self.size_avg_dic = {} self.all_size_dic = {} self.mail_helper = None self.mail_msg = '' self.mail_helper = mail_obj self.config = config_input self.prj_name = '' self.timeout = 1800 self.error_log_str = '' @staticmethod def append_into_dic(key, value, dic): if key not in dic: dic[key] = [] dic[key].append(value) @staticmethod def add_code(code_path, start_pos, end_pos, code_str, lines): with open(code_path, 'r+', encoding='UTF-8') as modified_file: content = modified_file.read() add_str_end_pos = content.find(end_pos) if add_str_end_pos == -1: print(f'Can not find code : {end_pos} in {code_path}, please check config') return add_str_start_pos = content.find(start_pos) if add_str_start_pos == -1: if lines == 0: return add_str_start_pos = add_str_end_pos content_add = "" for i in range(lines, 0, -1): if "%d" in code_str: content_add = content_add + code_str % i else: content_add = content_add + code_str content = content[:add_str_start_pos] + content_add + content[add_str_end_pos:] modified_file.seek(0) modified_file.write(content) modified_file.truncate() @staticmethod def add_row(context): return rf'{context}' @staticmethod def add_td(context): return rf'{context}' @staticmethod def add_th(context): return rf'{context}' @staticmethod def test_type_title(context): return rf'{context}' @staticmethod def app_title(context): return rf'{context}' def start(self): self.init() self.start_test() self.write_mail_msg() os.chdir(self.config.project_path) def init(self): if self.config.ide == performance_config.IdeType.DevEco: os.environ['path'] = self.config.node_js_path + ";" + os.environ['path'] os.chdir(self.config.project_path) os.environ['path'] = os.path.join(self.config.jbr_path, "bin") + ";" + os.environ['path'] os.environ['JAVA_HOME'] = self.config.jbr_path self.config.cmd_prefix = os.path.join(self.config.project_path, self.config.cmd_prefix) self.config.debug_package_path = os.path.join(self.config.project_path, self.config.debug_package_path) self.config.release_package_path = os.path.join(self.config.project_path, self.config.release_package_path) self.config.incremental_code_path = os.path.join(self.config.project_path, self.config.incremental_code_path) self.config.json5_path = os.path.join(self.config.project_path, self.config.json5_path) if self.config.developing_test_data_path: self.config.build_times = 3 else: cmd = [self.config.node_js_path, self.config.cmd_prefix, "--stop-daemon"] print(f'cmd: {cmd}') subprocess.Popen(cmd, stderr=sys.stderr, stdout=sys.stdout).communicate(timeout=self.timeout) def add_incremental_code(self, lines): PerformanceBuild.add_code(self.config.incremental_code_path, self.config.incremental_code_start_pos, self.config.incremental_code_end_pos, self.config.incremental_code_str, lines) def revert_incremental_code(self): self.add_incremental_code(0) def reset(self): self.first_line_in_avg_excel = "" self.preview_all_time_dic = {} self.preview_avg_time_dic = {} self.time_avg_dic = {} self.all_time_dic = {} self.size_avg_dic = {} self.all_size_dic = {} self.error_log_str = '' self.revert_incremental_code() def clean_project(self): if not self.config.developing_test_data_path: cmd = [self.config.node_js_path, self.config.cmd_prefix, "clean"] print(f'cmd: {cmd}') subprocess.Popen(cmd, stderr=sys.stderr, stdout=sys.stdout).communicate(timeout=self.timeout) def get_bytecode_size(self, is_debug): if self.config.developing_test_data_path: # test data for size PerformanceBuild.append_into_dic("ets/mudules.abc rawSize", 44444, self.all_size_dic) PerformanceBuild.append_into_dic("ets/mudules.abc Compress_size", 33333, self.all_size_dic) PerformanceBuild.append_into_dic("ets/mudules2.abc rawSize", 44444, self.all_size_dic) PerformanceBuild.append_into_dic("ets/mudules2.abc Compress_size", 33333, self.all_size_dic) return package_path = self.config.debug_package_path if is_debug else self.config.release_package_path package = zipfile.ZipFile(package_path) extension_name = ".abc" if self.config.ide == performance_config.IdeType.DevEco else ".dex" for info in package.infolist(): if info.filename.endswith(extension_name): name_str1 = info.filename + " rawSize" name_str2 = info.filename + " compress_size" PerformanceBuild.append_into_dic(name_str1, info.file_size, self.all_size_dic) PerformanceBuild.append_into_dic(name_str2, info.compress_size, self.all_size_dic) def get_build_time(self, report_path, time_dic): event_obj = None with open(report_path, 'r+', encoding='UTF-8') as report: event_obj = json5.load(report)['events'] if not event_obj: raise Exception('Open report json failed') found_error = False for node in event_obj: if node['head']['type'] == "log" and node['additional']['logType'] == 'error': self.error_log_str = self.error_log_str + node['head']['name'] found_error = True if found_error: continue build_time = 0 task_name = node['head']['name'] if node['head']['type'] == "mark": if node['additional']['markType'] == 'history': build_time = (node['body']['endTime'] - node['body']['startTime']) / 1000000000 task_name = "total build cost" else: continue elif node['head']['type'] == "continual": build_time = node['additional']['totalTime'] / 1000000000 else: continue PerformanceBuild.append_into_dic(task_name, build_time, time_dic) if found_error: raise Exception('Build Failed') def collect_preview_build_data(self, report_path): self.get_build_time(report_path, self.preview_all_time_dic) def collect_build_data(self, is_debug, report_path): self.get_build_time(report_path, self.all_time_dic) self.get_bytecode_size(is_debug) def get_report_path(self, cmd_suffix): reports_before = [] report_dir = '.hvigor/report' if os.path.exists(report_dir): reports_before = os.listdir(report_dir) cmd = [self.config.node_js_path, self.config.cmd_prefix, *cmd_suffix] print(f'cmd: {cmd}') subprocess.Popen(cmd, stderr=sys.stderr, stdout=sys.stdout).communicate(timeout=self.timeout) report_path = (set(os.listdir(report_dir)) - set(reports_before)).pop() return os.path.join(report_dir, report_path) def preview_build(self, is_debug): cache_path = os.path.join(self.config.project_path, *self.config.preview_cache_path) if os.path.exists(cache_path): shutil.rmtree(cache_path) cmd_suffix = self.config.preview_debug_suffix if is_debug else self.config.preview_release_suffix report_path = self.get_report_path(cmd_suffix) self.collect_preview_build_data(report_path) def build(self, is_debug): cmd_suffix = self.config.cmd_debug_suffix if is_debug else self.config.cmd_release_suffix report_path = self.get_report_path(cmd_suffix) self.collect_build_data(is_debug, report_path) def start_build(self, is_debug): if self.config.developing_test_data_path: # test data self.collect_build_data(is_debug, os.path.join(os.path.dirname(__file__), self.config.developing_test_data_path)) return True self.build(is_debug) return True def get_millisecond(self, time_string): if self.config.ide != performance_config.IdeType.DevEco and not self.config.developing_test_data_path: return int(time_string) else: cost_time = 0 res = time_string.split(" min ") target_str = "" if len(res) > 1: cost_time = int(res[0]) * 60000 target_str = res[1] else: target_str = res[0] res = target_str.split(" s ") if len(res) > 1: cost_time = cost_time + int(res[0]) * 1000 target_str = res[1] else: target_str = res[0] res = target_str.split(" ms") if len(res) > 1: cost_time = cost_time + int(res[0]) return cost_time def cal_preview_build_avg_time(self): self.first_line_in_avg_excel = self.first_line_in_avg_excel + "\n" for key in self.preview_all_time_dic: task_count = len(self.preview_all_time_dic[key]) has_task = True if task_count != 2 * self.config.build_times: if task_count == self.config.build_times: has_task = False else: continue # average of first build sum_build_time = 0 for i in range(0, self.config.build_times): index = i * 2 if not has_task: self.preview_all_time_dic[key].insert(index + 1, 0) sum_build_time = sum_build_time + self.preview_all_time_dic[key][index] cost = round(sum_build_time / self.config.build_times, 2) PerformanceBuild.append_into_dic(key, cost, self.preview_avg_time_dic) # average of incremental build sum_build_time = 0 for i in range(1, len(self.preview_all_time_dic[key]), 2): sum_build_time = sum_build_time + self.preview_all_time_dic[key][i] cost = round(sum_build_time / self.config.build_times, 2) PerformanceBuild.append_into_dic(key, cost, self.preview_avg_time_dic) def cal_incremental_avg_time(self, all_time_dic, avg_time_dic): self.first_line_in_avg_excel = self.first_line_in_avg_excel + "\n" for key in all_time_dic: task_count = len(all_time_dic[key]) has_task = True if task_count != 2 * self.config.build_times: if task_count == self.config.build_times: has_task = False else: continue # average of first build sum_build_time = 0 for i in range(0, self.config.build_times): index = i * 2 if not has_task: all_time_dic[key].insert(index + 1, 0) sum_build_time = sum_build_time + all_time_dic[key][index] cost = round(sum_build_time / self.config.build_times, 2) PerformanceBuild.append_into_dic(key, cost, avg_time_dic) # average of incremental build sum_build_time = 0 for i in range(1, len(all_time_dic[key]), 2): sum_build_time = sum_build_time + all_time_dic[key][i] cost = round(sum_build_time / self.config.build_times, 2) PerformanceBuild.append_into_dic(key, cost, avg_time_dic) def cal_incremental_avg_size(self): total_raw_size = [] total_compressed_size = [] for i in range(0, self.config.build_times * 2): total_raw_size.append(0) total_compressed_size.append(0) for key in self.all_size_dic: if "raw" in key: total_raw_size[i] += self.all_size_dic[key][i] else: total_compressed_size[i] += self.all_size_dic[key][i] self.all_size_dic["total_raw_size"] = total_raw_size self.all_size_dic["total_compressed_size"] = total_compressed_size for key in self.all_size_dic: # sizes should be the same, just check full_first_size = self.all_size_dic[key][0] for i in range(0, len(self.all_size_dic[key]), 2): if full_first_size != self.all_size_dic[key][i]: full_first_size = -1 break PerformanceBuild.append_into_dic(key, full_first_size, self.size_avg_dic) incremental_first_size = self.all_size_dic[key][1] for i in range(1, len(self.all_size_dic[key]), 2): if incremental_first_size != self.all_size_dic[key][i]: incremental_first_size = -1 break PerformanceBuild.append_into_dic(key, incremental_first_size, self.size_avg_dic) def cal_incremental_avg(self): if self.config.preview_build: self.cal_incremental_avg_time(self.preview_all_time_dic, self.preview_avg_time_dic) self.cal_incremental_avg_time(self.all_time_dic, self.time_avg_dic) self.cal_incremental_avg_size() def add_preview_build_time_pic_data(self, dic, is_debug): for key in dic: if "total" in key: preview_build_time = dic[key][0] break self.mail_helper.add_pic_data(self.config.name, is_debug, [preview_build_time], 2) def add_time_pic_data(self, dic, is_debug): for key in dic: if "total" in key: full_time = dic[key][0] incremental_time = dic[key][1] break self.mail_helper.add_pic_data(self.config.name, is_debug, [full_time, incremental_time], 0) def add_size_pic_data(self, dic, is_debug): for key in dic: full_size = dic[key][0] self.mail_helper.add_pic_data(self.config.name, is_debug, [full_size], 3) def write_mail_files(self, dic): if not hasattr(self.config, 'show_time_detail_filter'): return '' msg = '' rows = '' first_row = "" first_line_res = self.first_line_in_avg_excel.replace("\n", "").split(",") for i in first_line_res: first_row += PerformanceBuild.add_th(i) rows += PerformanceBuild.add_row(first_row) show_dic = [] for k in self.config.show_time_detail_filter: if k in dic: show_dic.append(k) for key in show_dic: content_row = PerformanceBuild.add_th(key) for v in dic[key]: content_row += PerformanceBuild.add_td(f'{v} s') rows += PerformanceBuild.add_row(content_row) msg += rows return msg def write_from_dic(self, file_path, first_line, dic): content_list = [] if first_line: content_list.append(first_line) for key in dic: content_list.append(key) for v in dic[key]: content_list.append(",") content_list.append(str(v)) content_list.append("\n") content = "".join(content_list) self.mail_helper.add_logs_file(file_path, content.encode()) def write_logs_from_dic(self, path_prefix, log_filename, source_dic, need_first_line): file_path = self.config.output_split.join((path_prefix, log_filename)) file_path = os.path.join(self.prj_name, file_path) first_line = self.first_line_in_avg_excel if need_first_line else None self.write_from_dic(file_path, first_line, source_dic) return def generate_full_and_incremental_results(self, is_debug): path_prefix = self.config.output_split.join( (self.config.ide_filename[self.config.ide - 1], self.config.debug_or_release[0 if is_debug else 1], self.config.build_type_of_log[0]) ) temp_mail_msg = "" # write all build time log self.write_logs_from_dic(path_prefix, self.config.log_filename[2], self.all_time_dic, False) # write avg build time, html msg and picture data self.write_logs_from_dic(path_prefix, self.config.log_filename[3], self.time_avg_dic, True) temp_mail_msg += self.write_mail_files(self.time_avg_dic) # write preview avg build time if self.config.preview_build: self.add_preview_build_time_pic_data(self.preview_avg_time_dic, is_debug) self.add_time_pic_data(self.time_avg_dic, is_debug) # write all size of abc log self.write_logs_from_dic(path_prefix, self.config.log_filename[0], self.all_size_dic, False) # write avg abc size, html msg and picture data self.write_logs_from_dic(path_prefix, self.config.log_filename[1], self.size_avg_dic, True) self.add_size_pic_data(self.size_avg_dic, is_debug) # write html message if self.config.send_mail and hasattr(self.config, 'show_time_detail_filter'): temp_mail_msg = '' + \ PerformanceBuild.app_title(self.config.name + (' Debug' if is_debug else ' Release')) + \ temp_mail_msg + '
' self.mail_msg += temp_mail_msg def error_handle(self, is_debug, log_type): build_mode = 'Debug' if is_debug else 'Release' log_type_str = 'full_build' if log_type == performance_config.LogType.FULL else 'incremental_build' self.mail_helper.add_failed_project(self.prj_name, build_mode, log_type_str) save_name = build_mode + '_' + os.path.basename(self.config.error_filename) print(self.error_log_str) self.mail_helper.add_logs_file(os.path.join(self.prj_name, save_name), self.error_log_str) def full_and_incremental_build(self, is_debug): log_type = performance_config.LogType.FULL try: self.reset() self.prj_name = os.path.basename(self.config.project_path) self.first_line_in_avg_excel = self.first_line_in_avg_excel + ",first build,incremental build" for i in range(self.config.build_times): self.clean_project() print(f"fullbuild: {'Debug' if is_debug else 'Release'}, {i + 1}/{self.config.build_times}") log_type = performance_config.LogType.FULL if self.config.preview_build: self.preview_build(is_debug) self.start_build(is_debug) self.add_incremental_code(1) print(f"incremental: {'Debug' if is_debug else 'Release'}, {i + 1}/{self.config.build_times}") log_type = performance_config.LogType.INCREMENTAL self.start_build(is_debug) self.revert_incremental_code() self.cal_incremental_avg() self.generate_full_and_incremental_results(is_debug) except Exception as e: err_msg = traceback.format_exc() self.error_log_str = f'error:\n{self.error_log_str}\n{err_msg}' self.error_handle(is_debug, log_type) def start_test(self): self.full_and_incremental_build(True) self.full_and_incremental_build(False) self.reset() def write_mail_msg(self): if self.config.send_mail: self.mail_helper.add_msg(self.mail_msg) def run(config_input, mail_obj): start_time = time.time() PerformanceBuild(config_input, mail_obj).start() print("Test [%s] finished at: %s\n" \ "total cost: %ds" % (os.path.basename(config_input.project_path), time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() - start_time))