#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from glob import glob
import argparse
import copy
import multiprocessing
import os
import stat
import subprocess
import sys
WORKLOAD_PATH = 'test_cases/ark-workload/weekly_workload'
PARSE_COMMAND_PATH = '../parse_command'
SIZE_PERCENTAGE_REPORT = 'size_percentage_report.html'
HTML_CONTENT = \
"""
Size Percentage Report
Size Percentage Report
No |
Case Path |
ABC Size |
Percentage of Sections |
"""
def is_directory(parser, arg):
if not os.path.isdir(arg):
parser.error("The directory '%s' does not exist" % arg)
return os.path.abspath(arg)
def is_file(parser, arg):
if not os.path.isfile(arg):
parser.error("The file '%s' does not exist" % arg)
return os.path.abspath(arg)
def check_timeout(value):
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError("%s is an invalid timeout value" % value)
return ivalue
def get_args():
description = "Generate statistics on the percentage of size corresponding to each case."
parser = argparse.ArgumentParser(description=description)
parser.add_argument('--es2abc-path', dest='es2abc_path', type=lambda arg : is_file(parser, arg),
help='Path to the executable program es2abc', required=True)
parser.add_argument('--no-progress', action='store_false', dest='progress', default=True,
help='Don\'t show progress bar')
parser.add_argument('--timeout', type=check_timeout, dest='timeout', default=180,
help='Time limits for use case execution (In seconds)')
return parser.parse_args()
class Test:
def __init__(self, test_path, flags):
self.test_path = test_path
self.test_case_name = self.test_path.split('/')[-1]
self.flags = flags
self.output = None
def __lt__(self, other):
return self.test_path < other.test_path
def run(self, runner):
test_abc_name = self.test_case_name[:self.test_case_name.rfind('.')] + '.abc'
test_abc_path = os.path.join(os.path.dirname(self.test_path), test_abc_name)
cmd = runner.cmd_prefix + [runner.es2abc_path]
cmd.extend(self.flags)
cmd.extend(["--output=" + test_abc_path])
cmd.append(self.test_path)
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
out, err = process.communicate(timeout=runner.args.timeout)
self.output = out.decode("utf-8", errors="ignore") + err.decode("utf-8", errors="ignore")
if os.path.exists(test_abc_path):
self.abc_size = os.path.getsize(test_abc_path)
self.percentage_of_sections = self.output
os.remove(test_abc_path)
else:
self.abc_size = 0
self.percentage_of_sections = 'NULL'
self.test_case_name = os.path.relpath(self.test_path, runner.test_root)
return self
class Runner:
def __init__(self, args):
self.args = args
self.test_root = os.path.dirname(os.path.abspath(__file__))
self.es2abc_path = args.es2abc_path
self.size_percentage_statistics_file = os.path.join(self.test_root, SIZE_PERCENTAGE_REPORT)
self.case_list = [] # store abs case_path
self.tests = [] # store Test obj
self.cmd_prefix = []
self.results = []
def wrap_cases(self, flags, func=Test):
self.tests += list(map(lambda f: func(f, flags), self.case_list))
self.case_list.clear()
def add_case(self, case_path, extension, flags):
abs_case_path = os.path.abspath(case_path)
if abs_case_path not in self.case_list and abs_case_path.endswith(extension):
self.case_list.append(case_path)
self.wrap_cases(flags)
def add_directory(self, directory, extension, flags):
glob_expression = os.path.join(os.path.abspath(directory), "**/*%s" % (extension))
cases = glob(glob_expression, recursive=True)
for case in cases:
self.add_case(case, extension, flags)
def run_test(self, test):
return test.run(self)
def run(self):
pool = multiprocessing.Pool()
result_iter = pool.imap_unordered(self.run_test, self.tests, chunksize=32)
pool.close()
if self.args.progress:
from tqdm import tqdm
result_iter = tqdm(result_iter, total=len(self.tests))
self.results = [result for result in result_iter]
self.results.sort()
pool.join()
self.generate_size_percentage_report()
def format_sections_for_percentage_report(self, output, percentage_of_sections_list):
lines = output.split('\n')
start_flag = False
for line in lines:
if start_flag:
if 'percent' in line:
[item_section_and_item_size, percentage] = line.split(',')
[item_section, item_size] = item_section_and_item_size.split(':')
percentage = percentage.split(':')[-1].strip()
percentage_of_sections_list.append([item_section.strip(), item_size.strip(), \
percentage.strip()])
if 'Panda file size statistic:' in line:
start_flag = True
def generate_size_percentage_report(self):
global HTML_CONTENT
for number, result in enumerate(self.results, 1):
percentage_of_sections_list = []
self.format_sections_for_percentage_report(result.percentage_of_sections, percentage_of_sections_list)
percentage_of_sections_list.sort(key=lambda percentage_of_section: int(percentage_of_section[1]),
reverse=True)
html_content_of_result = f"""
{number} |
{result.test_case_name} |
{result.abc_size} |
"""
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_result)
header = [''] * 3
if len(percentage_of_sections_list):
header = ['section', 'size', 'percentage']
html_content_of_percentage = f"""
{header[0]} |
{header[1]} |
{header[2]} |
"""
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_percentage)
total_size = 0
for [item_section, item_size, percentage] in percentage_of_sections_list:
html_content_of_sections = f"""
{item_section} |
{item_size} |
{percentage} |
"""
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_sections)
total_size += int(item_size)
html_content_of_total_size = f"""
total size |
{total_size} |
|
"""
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_total_size)
HTML_CONTENT = "{} |
".format(HTML_CONTENT)
else:
html_content_of_not_available = f"N/A | "
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_not_available)
html_content_of_end_tag = "
"
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_end_tag)
flags = os.O_RDWR | os.O_CREAT
mode = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(self.size_percentage_statistics_file, flags, mode), 'w') as report:
report.truncate()
report.write(HTML_CONTENT)
def main():
args = get_args()
runner = Runner(args)
# Get CaseManager from parse_command
cur_dir = os.getcwd()
os.chdir(runner.test_root)
sys.path.insert(0, PARSE_COMMAND_PATH)
from parse_command import CaseManager
os.chdir(cur_dir)
# Add benchmark cases and workload cases (js)
args_for_case_manager = copy.copy(args)
setattr(args_for_case_manager, 'case', None)
setattr(args_for_case_manager, 'case_dir', None)
case_manager = CaseManager(args_for_case_manager, skip_test_flag=False)
for case_path in case_manager.case_list:
runner.add_case(case_path, '.js', ['--module', '--dump-file-item-size'])
# Add workload cases (ts)
ts_workload_path = os.path.join(case_manager.test_root, WORKLOAD_PATH, 'ts')
runner.add_directory(ts_workload_path, '.ts', ['--module', '--dump-file-item-size'])
runner.run()
if __name__ == "__main__":
main()