#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import subprocess
import sys
import time
import traceback
import zipfile
import json5
import performance_config
class PerformanceBuild():
def __init__(self, config_input, mail_obj):
self.config = None
self.first_line_in_avg_excel = ""
self.preview_all_time_dic = {}
self.preview_avg_time_dic = {}
self.time_avg_dic = {}
self.all_time_dic = {}
self.size_avg_dic = {}
self.all_size_dic = {}
self.mail_helper = None
self.mail_msg = ''
self.mail_helper = mail_obj
self.config = config_input
self.prj_name = ''
self.timeout = 1800
self.error_log_str = ''
@staticmethod
def append_into_dic(key, value, dic):
if key not in dic:
dic[key] = []
dic[key].append(value)
@staticmethod
def add_code(code_path, start_pos, end_pos, code_str, lines):
with open(code_path, 'r+', encoding='UTF-8') as modified_file:
content = modified_file.read()
add_str_end_pos = content.find(end_pos)
if add_str_end_pos == -1:
print(f'Can not find code : {end_pos} in {code_path}, please check config')
return
add_str_start_pos = content.find(start_pos)
if add_str_start_pos == -1:
if lines == 0:
return
add_str_start_pos = add_str_end_pos
content_add = ""
for i in range(lines, 0, -1):
if "%d" in code_str:
content_add = content_add + code_str % i
else:
content_add = content_add + code_str
content = content[:add_str_start_pos] + content_add + content[add_str_end_pos:]
modified_file.seek(0)
modified_file.write(content)
modified_file.truncate()
@staticmethod
def add_row(context):
return rf'
{context}
'
@staticmethod
def add_td(context):
return rf'{context} | '
@staticmethod
def add_th(context):
return rf'{context} | '
@staticmethod
def test_type_title(context):
return rf'{context} |
'
@staticmethod
def app_title(context):
return rf'{context} | '
def start(self):
self.init()
self.start_test()
self.write_mail_msg()
os.chdir(self.config.project_path)
def init(self):
if self.config.ide == performance_config.IdeType.DevEco:
os.environ['path'] = self.config.node_js_path + ";" + os.environ['path']
os.chdir(self.config.project_path)
os.environ['path'] = os.path.join(self.config.jbr_path, "bin") + ";" + os.environ['path']
os.environ['JAVA_HOME'] = self.config.jbr_path
self.config.cmd_prefix = os.path.join(self.config.project_path, self.config.cmd_prefix)
self.config.debug_package_path = os.path.join(self.config.project_path, self.config.debug_package_path)
self.config.release_package_path = os.path.join(self.config.project_path, self.config.release_package_path)
self.config.incremental_code_path = os.path.join(self.config.project_path, self.config.incremental_code_path)
self.config.json5_path = os.path.join(self.config.project_path, self.config.json5_path)
if self.config.developing_test_data_path:
self.config.build_times = 3
else:
cmd = [self.config.node_js_path, self.config.cmd_prefix, "--stop-daemon"]
print(f'cmd: {cmd}')
subprocess.Popen(cmd, stderr=sys.stderr,
stdout=sys.stdout).communicate(timeout=self.timeout)
def add_incremental_code(self, lines):
PerformanceBuild.add_code(self.config.incremental_code_path,
self.config.incremental_code_start_pos,
self.config.incremental_code_end_pos,
self.config.incremental_code_str,
lines)
def revert_incremental_code(self):
self.add_incremental_code(0)
def reset(self):
self.first_line_in_avg_excel = ""
self.preview_all_time_dic = {}
self.preview_avg_time_dic = {}
self.time_avg_dic = {}
self.all_time_dic = {}
self.size_avg_dic = {}
self.all_size_dic = {}
self.error_log_str = ''
self.revert_incremental_code()
def clean_project(self):
if not self.config.developing_test_data_path:
cmd = [self.config.node_js_path, self.config.cmd_prefix, "clean"]
print(f'cmd: {cmd}')
subprocess.Popen(cmd, stderr=sys.stderr,
stdout=sys.stdout).communicate(timeout=self.timeout)
def get_bytecode_size(self, is_debug):
if self.config.developing_test_data_path:
# test data for size
PerformanceBuild.append_into_dic("ets/mudules.abc rawSize", 44444, self.all_size_dic)
PerformanceBuild.append_into_dic("ets/mudules.abc Compress_size", 33333, self.all_size_dic)
PerformanceBuild.append_into_dic("ets/mudules2.abc rawSize", 44444, self.all_size_dic)
PerformanceBuild.append_into_dic("ets/mudules2.abc Compress_size", 33333, self.all_size_dic)
return
package_path = self.config.debug_package_path if is_debug else self.config.release_package_path
package = zipfile.ZipFile(package_path)
extension_name = ".abc" if self.config.ide == performance_config.IdeType.DevEco else ".dex"
for info in package.infolist():
if info.filename.endswith(extension_name):
name_str1 = info.filename + " rawSize"
name_str2 = info.filename + " compress_size"
PerformanceBuild.append_into_dic(name_str1, info.file_size, self.all_size_dic)
PerformanceBuild.append_into_dic(name_str2, info.compress_size, self.all_size_dic)
def get_build_time(self, report_path, time_dic):
event_obj = None
with open(report_path, 'r+', encoding='UTF-8') as report:
event_obj = json5.load(report)['events']
if not event_obj:
raise Exception('Open report json failed')
found_error = False
for node in event_obj:
if node['head']['type'] == "log" and node['additional']['logType'] == 'error':
self.error_log_str = self.error_log_str + node['head']['name']
found_error = True
if found_error:
continue
build_time = 0
task_name = node['head']['name']
if node['head']['type'] == "mark":
if node['additional']['markType'] == 'history':
build_time = (node['body']['endTime'] - node['body']['startTime']) / 1000000000
task_name = "total build cost"
else:
continue
elif node['head']['type'] == "continual":
build_time = node['additional']['totalTime'] / 1000000000
else:
continue
PerformanceBuild.append_into_dic(task_name, build_time, time_dic)
if found_error:
raise Exception('Build Failed')
def collect_preview_build_data(self, report_path):
self.get_build_time(report_path, self.preview_all_time_dic)
def collect_build_data(self, is_debug, report_path):
self.get_build_time(report_path, self.all_time_dic)
self.get_bytecode_size(is_debug)
def get_report_path(self, cmd_suffix):
reports_before = []
report_dir = '.hvigor/report'
if os.path.exists(report_dir):
reports_before = os.listdir(report_dir)
cmd = [self.config.node_js_path, self.config.cmd_prefix, *cmd_suffix]
print(f'cmd: {cmd}')
subprocess.Popen(cmd, stderr=sys.stderr,
stdout=sys.stdout).communicate(timeout=self.timeout)
report_path = (set(os.listdir(report_dir)) - set(reports_before)).pop()
return os.path.join(report_dir, report_path)
def preview_build(self, is_debug):
cache_path = os.path.join(self.config.project_path, *self.config.preview_cache_path)
if os.path.exists(cache_path):
shutil.rmtree(cache_path)
cmd_suffix = self.config.preview_debug_suffix if is_debug else self.config.preview_release_suffix
report_path = self.get_report_path(cmd_suffix)
self.collect_preview_build_data(report_path)
def build(self, is_debug):
cmd_suffix = self.config.cmd_debug_suffix if is_debug else self.config.cmd_release_suffix
report_path = self.get_report_path(cmd_suffix)
self.collect_build_data(is_debug, report_path)
def start_build(self, is_debug):
if self.config.developing_test_data_path:
# test data
self.collect_build_data(is_debug, os.path.join(os.path.dirname(__file__),
self.config.developing_test_data_path))
return True
self.build(is_debug)
return True
def get_millisecond(self, time_string):
if self.config.ide != performance_config.IdeType.DevEco and not self.config.developing_test_data_path:
return int(time_string)
else:
cost_time = 0
res = time_string.split(" min ")
target_str = ""
if len(res) > 1:
cost_time = int(res[0]) * 60000
target_str = res[1]
else:
target_str = res[0]
res = target_str.split(" s ")
if len(res) > 1:
cost_time = cost_time + int(res[0]) * 1000
target_str = res[1]
else:
target_str = res[0]
res = target_str.split(" ms")
if len(res) > 1:
cost_time = cost_time + int(res[0])
return cost_time
def cal_preview_build_avg_time(self):
self.first_line_in_avg_excel = self.first_line_in_avg_excel + "\n"
for key in self.preview_all_time_dic:
task_count = len(self.preview_all_time_dic[key])
has_task = True
if task_count != 2 * self.config.build_times:
if task_count == self.config.build_times:
has_task = False
else:
continue
# average of first build
sum_build_time = 0
for i in range(0, self.config.build_times):
index = i * 2
if not has_task:
self.preview_all_time_dic[key].insert(index + 1, 0)
sum_build_time = sum_build_time + self.preview_all_time_dic[key][index]
cost = round(sum_build_time / self.config.build_times, 2)
PerformanceBuild.append_into_dic(key, cost, self.preview_avg_time_dic)
# average of incremental build
sum_build_time = 0
for i in range(1, len(self.preview_all_time_dic[key]), 2):
sum_build_time = sum_build_time + self.preview_all_time_dic[key][i]
cost = round(sum_build_time / self.config.build_times, 2)
PerformanceBuild.append_into_dic(key, cost, self.preview_avg_time_dic)
def cal_incremental_avg_time(self, all_time_dic, avg_time_dic):
self.first_line_in_avg_excel = self.first_line_in_avg_excel + "\n"
for key in all_time_dic:
task_count = len(all_time_dic[key])
has_task = True
if task_count != 2 * self.config.build_times:
if task_count == self.config.build_times:
has_task = False
else:
continue
# average of first build
sum_build_time = 0
for i in range(0, self.config.build_times):
index = i * 2
if not has_task:
all_time_dic[key].insert(index + 1, 0)
sum_build_time = sum_build_time + all_time_dic[key][index]
cost = round(sum_build_time / self.config.build_times, 2)
PerformanceBuild.append_into_dic(key, cost, avg_time_dic)
# average of incremental build
sum_build_time = 0
for i in range(1, len(all_time_dic[key]), 2):
sum_build_time = sum_build_time + all_time_dic[key][i]
cost = round(sum_build_time / self.config.build_times, 2)
PerformanceBuild.append_into_dic(key, cost, avg_time_dic)
def cal_incremental_avg_size(self):
total_raw_size = []
total_compressed_size = []
for i in range(0, self.config.build_times * 2):
total_raw_size.append(0)
total_compressed_size.append(0)
for key in self.all_size_dic:
if "raw" in key:
total_raw_size[i] += self.all_size_dic[key][i]
else:
total_compressed_size[i] += self.all_size_dic[key][i]
self.all_size_dic["total_raw_size"] = total_raw_size
self.all_size_dic["total_compressed_size"] = total_compressed_size
for key in self.all_size_dic:
# sizes should be the same, just check
full_first_size = self.all_size_dic[key][0]
for i in range(0, len(self.all_size_dic[key]), 2):
if full_first_size != self.all_size_dic[key][i]:
full_first_size = -1
break
PerformanceBuild.append_into_dic(key, full_first_size, self.size_avg_dic)
incremental_first_size = self.all_size_dic[key][1]
for i in range(1, len(self.all_size_dic[key]), 2):
if incremental_first_size != self.all_size_dic[key][i]:
incremental_first_size = -1
break
PerformanceBuild.append_into_dic(key, incremental_first_size, self.size_avg_dic)
def cal_incremental_avg(self):
if self.config.preview_build:
self.cal_incremental_avg_time(self.preview_all_time_dic, self.preview_avg_time_dic)
self.cal_incremental_avg_time(self.all_time_dic, self.time_avg_dic)
self.cal_incremental_avg_size()
def add_preview_build_time_pic_data(self, dic, is_debug):
for key in dic:
if "total" in key:
preview_build_time = dic[key][0]
break
self.mail_helper.add_pic_data(self.config.name, is_debug, [preview_build_time], 2)
def add_time_pic_data(self, dic, is_debug):
for key in dic:
if "total" in key:
full_time = dic[key][0]
incremental_time = dic[key][1]
break
self.mail_helper.add_pic_data(self.config.name, is_debug, [full_time, incremental_time], 0)
def add_size_pic_data(self, dic, is_debug):
for key in dic:
full_size = dic[key][0]
self.mail_helper.add_pic_data(self.config.name, is_debug, [full_size], 3)
def write_mail_files(self, dic):
if not hasattr(self.config, 'show_time_detail_filter'):
return ''
msg = ''
rows = ''
first_row = ""
first_line_res = self.first_line_in_avg_excel.replace("\n", "").split(",")
for i in first_line_res:
first_row += PerformanceBuild.add_th(i)
rows += PerformanceBuild.add_row(first_row)
show_dic = []
for k in self.config.show_time_detail_filter:
if k in dic:
show_dic.append(k)
for key in show_dic:
content_row = PerformanceBuild.add_th(key)
for v in dic[key]:
content_row += PerformanceBuild.add_td(f'{v} s')
rows += PerformanceBuild.add_row(content_row)
msg += rows
return msg
def write_from_dic(self, file_path, first_line, dic):
content_list = []
if first_line:
content_list.append(first_line)
for key in dic:
content_list.append(key)
for v in dic[key]:
content_list.append(",")
content_list.append(str(v))
content_list.append("\n")
content = "".join(content_list)
self.mail_helper.add_logs_file(file_path, content.encode())
def write_logs_from_dic(self, path_prefix, log_filename, source_dic, need_first_line):
file_path = self.config.output_split.join((path_prefix, log_filename))
file_path = os.path.join(self.prj_name, file_path)
first_line = self.first_line_in_avg_excel if need_first_line else None
self.write_from_dic(file_path, first_line, source_dic)
return
def generate_full_and_incremental_results(self, is_debug):
path_prefix = self.config.output_split.join(
(self.config.ide_filename[self.config.ide - 1],
self.config.debug_or_release[0 if is_debug else 1],
self.config.build_type_of_log[0])
)
temp_mail_msg = ""
# write all build time log
self.write_logs_from_dic(path_prefix, self.config.log_filename[2], self.all_time_dic, False)
# write avg build time, html msg and picture data
self.write_logs_from_dic(path_prefix, self.config.log_filename[3], self.time_avg_dic, True)
temp_mail_msg += self.write_mail_files(self.time_avg_dic)
# write preview avg build time
if self.config.preview_build:
self.add_preview_build_time_pic_data(self.preview_avg_time_dic, is_debug)
self.add_time_pic_data(self.time_avg_dic, is_debug)
# write all size of abc log
self.write_logs_from_dic(path_prefix, self.config.log_filename[0], self.all_size_dic, False)
# write avg abc size, html msg and picture data
self.write_logs_from_dic(path_prefix, self.config.log_filename[1], self.size_avg_dic, True)
self.add_size_pic_data(self.size_avg_dic, is_debug)
# write html message
if self.config.send_mail and hasattr(self.config, 'show_time_detail_filter'):
temp_mail_msg = '' + \
PerformanceBuild.app_title(self.config.name + (' Debug' if is_debug else ' Release')) + \
temp_mail_msg + '
'
self.mail_msg += temp_mail_msg
def error_handle(self, is_debug, log_type):
build_mode = 'Debug' if is_debug else 'Release'
log_type_str = 'full_build' if log_type == performance_config.LogType.FULL else 'incremental_build'
self.mail_helper.add_failed_project(self.prj_name, build_mode, log_type_str)
save_name = build_mode + '_' + os.path.basename(self.config.error_filename)
print(self.error_log_str)
self.mail_helper.add_logs_file(os.path.join(self.prj_name, save_name),
self.error_log_str)
def full_and_incremental_build(self, is_debug):
log_type = performance_config.LogType.FULL
try:
self.reset()
self.prj_name = os.path.basename(self.config.project_path)
self.first_line_in_avg_excel = self.first_line_in_avg_excel + ",first build,incremental build"
for i in range(self.config.build_times):
self.clean_project()
print(f"fullbuild: {'Debug' if is_debug else 'Release'}, {i + 1}/{self.config.build_times}")
log_type = performance_config.LogType.FULL
if self.config.preview_build:
self.preview_build(is_debug)
self.start_build(is_debug)
self.add_incremental_code(1)
print(f"incremental: {'Debug' if is_debug else 'Release'}, {i + 1}/{self.config.build_times}")
log_type = performance_config.LogType.INCREMENTAL
self.start_build(is_debug)
self.revert_incremental_code()
self.cal_incremental_avg()
self.generate_full_and_incremental_results(is_debug)
except Exception as e:
err_msg = traceback.format_exc()
self.error_log_str = f'error:\n{self.error_log_str}\n{err_msg}'
self.error_handle(is_debug, log_type)
def start_test(self):
self.full_and_incremental_build(True)
self.full_and_incremental_build(False)
self.reset()
def write_mail_msg(self):
if self.config.send_mail:
self.mail_helper.add_msg(self.mail_msg)
def run(config_input, mail_obj):
start_time = time.time()
PerformanceBuild(config_input, mail_obj).start()
print("Test [%s] finished at: %s\n" \
"total cost: %ds"
% (os.path.basename(config_input.project_path),
time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
time.time() - start_time))