#!/usr/bin/env python3 # -*- coding: utf-8 -*- # # Copyright (c) 2023 Huawei Device Co., Ltd. # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import stat import time import zipfile import numpy as np import pandas as pd import performance_build import performance_config from pylab import mpl class MailHelper(): def __init__(self): mpl.rcParams['font.sans-serif'] = ['Microsoft YaHei'] self.pic_table = {} self.head_line = [] self.mail_msg = '' self.logs_file = {} self.failed_prjs = '' self.data_count_in_line_graph = 10 MailHelper.remove_files() @staticmethod def remove_files(): if os.path.exists(performance_config.MailPicConfig.html_file_path): os.remove(performance_config.MailPicConfig.html_file_path) if os.path.exists(performance_config.MailPicConfig.attach_path): os.remove(performance_config.MailPicConfig.attach_path) if not os.path.exists(performance_config.MailPicConfig.mail_data_path): os.mkdir(performance_config.MailPicConfig.mail_data_path) for build_mode in range(2): for log_type in range(3): pic_path = MailHelper.find_in_double_map(build_mode, log_type, performance_config.MailPicConfig.mail_pic_name, "mail_pic_name") if not pic_path: return if os.path.exists(pic_path): os.remove(pic_path) @staticmethod def find_in_double_map(key_1, key_2, double_dic, error_table_name): it_1 = double_dic.get(key_1) if it_1: it_2 = it_1.get(key_2) if not it_2: print(f"Can not find key_2: {key_2} in {error_table_name}, please check") return it_2 else: print(f"Can not find key_1: {key_1} in {error_table_name}, please check") return it_1 def add_msg(self, msg): self.mail_msg += msg def add_failed_project(self, prj_name, build_mode, log_type): info = '-'.join((prj_name, build_mode, log_type)) + '
' if self.failed_prjs == '': self.failed_prjs = 'failed projects:
' self.failed_prjs += info def add_logs_file(self, filename, buffer): self.logs_file[filename] = buffer def add_pic_data(self, prj_name, is_debug, data_list): build_mode = performance_config.BuildMode.DEBUG if is_debug else performance_config.BuildMode.RELEASE it_1 = self.pic_table.get(prj_name) prj_table = None if it_1: prj_table = it_1.get(build_mode) if not prj_table: it_1[build_mode] = [np.nan, np.nan, np.nan] else: self.pic_table[prj_name] = {build_mode:[np.nan, np.nan, np.nan]} self.head_line.append(prj_name) self.time_index = [time.strftime("%Y/%m/%d", time.localtime())] prj_table = self.pic_table[prj_name][build_mode] if len(data_list) == 1: # size prj_table[2] = data_list[0] else: # time prj_table[0] = data_list[0] prj_table[1] = data_list[1] def create_msg_file(self): with os.fdopen( os.open(performance_config.MailPicConfig.html_file_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IRWXU | stat.S_IRWXO | stat.S_IRWXG), 'w', encoding='utf-8' ) as msg_file: if self.failed_prjs == '': self.failed_prjs = 'All Build Succeed
' self.mail_msg = performance_config.get_html_prefix().format(self.failed_prjs) + \ self.mail_msg + \ performance_config.get_html_suffix() msg_file.write(self.mail_msg) def create_logs_file(self): mail_zip = zipfile.ZipFile(performance_config.MailPicConfig.attach_path, 'w') for file in self.logs_file: mail_zip.writestr(file, self.logs_file.get(file)) mail_zip.close() @staticmethod def draw_pic(df, pic_name, title_name, y_label): ax = df.plot( linestyle='-', linewidth=2, marker='o', markersize=6, markeredgecolor='black', title=title_name, ylabel=y_label, grid=True, rot=30 ) i = 0 y_min, y_max = ax.get_ylim() y_range = y_max - y_min for idx, data in df.iterrows(): for k in data: if not pd.isna(k): pos_y = k + y_range / 50 if i % 2 == 0 else k - y_range / 15 ax.text(i - 0.1, pos_y, k) i += 1 ax.get_figure().savefig(pic_name, bbox_inches='tight') def create_pic(self): for build_mode in range(2): for log_type in range(3): title_name = MailHelper.find_in_double_map(build_mode, log_type, performance_config.MailPicConfig.mail_pic_table_label, "mail_pic_table_label") if not title_name: continue pic_name = MailHelper.find_in_double_map(build_mode, log_type, performance_config.MailPicConfig.mail_pic_name, "mail_pic_name") if not pic_name: continue df = None csv_filename = MailHelper.find_in_double_map(build_mode, log_type, performance_config.MailPicConfig.mail_pic_table_name, "pic_table_name") if not csv_filename: continue if os.path.exists(csv_filename): df = pd.read_csv(csv_filename, index_col=0) else: df = pd.DataFrame(columns=self.head_line) dic = {} for prj_name in self.pic_table: dic[prj_name] = [self.pic_table[prj_name][build_mode][log_type]] if len(dic) > 0: df_inserted = pd.DataFrame(dic, index=self.time_index) df = df._append(df_inserted) df_length = len(df) if df_length > self.data_count_in_line_graph: df = df[(df_length - self.data_count_in_line_graph) : df_length] df.to_csv(csv_filename) y_label = 'build time (s)' if log_type < performance_config.LogType.SIZE else 'size (Byte)' self.draw_pic(df, pic_name, title_name, y_label) def create_mail_files(self): self.create_msg_file() self.create_pic() self.create_logs_file() class PerformanceEntry(): def __init__(self) -> None: self.mail_helper = MailHelper() def run(self): self.init() # You can control which project you need to test here, the first param is the key in performance_config.py projects = performance_config.Config.run_list for prj in projects: self.start_test(prj) self.create_mail_files() def start_test(self, index): config = performance_config.get_config(index) if not config: return performance_build.run(config, self.mail_helper) def create_mail_files(self): if performance_config.Config.send_mail: self.mail_helper.create_mail_files() def init(self): self.mail_helper = MailHelper() if __name__ == '__main__': strat_time = time.time() PerformanceEntry().run() print("All test finished at %s\ntotal cost: %ds" % (time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), time.time() - strat_time))