#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# Copyright (c) 2024 Huawei Device Co., Ltd.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from abc import ABC, abstractmethod
import argparse
import glob
import json
import os
import re
import shutil
import stat
import subprocess
import sys
import threading
SUCCESS_STATUS = 0
CRASH_STATUS = -1
TIMEOUT_STATUS = -2
WORKLOAD_URL = 'https://gitee.com/xliu-huanwei/ark-workload.git'
BENCHMARK_PATH = '../benchmark'
SKIP_TEST = 'skip_tests.json'
DISCREPANCY_REPORT = 'discrepancy_report'
HTML_CONTENT = \
"""
Instruction Discrepancy Report
Instruction Discrepancy Report
No |
Case Path |
es2abc Instruction Number |
v8 Instruction Number |
es2abc/v8 Ratio |
"""
def is_file(parser, arg):
if not os.path.isfile(arg):
parser.error(f'[ERROR]: The file "{arg}" does not exist!')
return os.path.abspath(arg)
def is_directory(parser, arg):
if not os.path.isdir(arg):
parser.error(f'[ERROR]: The directory "{arg}" does not exist!')
return os.path.abspath(arg)
def check_timeout(val):
val = int(val)
if val <= 0:
raise argparse.ArgumentTypeError(f'[ERROR]: {val} is an invalid timeout value')
return val
def get_args():
description = "Generate a report on the difference between the number of v8 and es2abc bytecode instructions."
parser = argparse.ArgumentParser(description=description)
parser.add_argument('-v8', '-d8', '--d8_path', dest='d8_path', type=lambda arg : is_file(parser, arg),
help='Path to the V8 executable d8', required=True)
parser.add_argument('-es2abc', '--es2abc_path', dest='es2abc_path', type=lambda arg : is_file(parser, arg),
help='Path to the executable program es2abc', required=True)
parser.add_argument('--timeout', dest='timeout', type=check_timeout, default=180,
help='Time limits for use case execution (In seconds)')
parser.add_argument('--add_case', dest='case', type=lambda arg : is_file(parser, arg),
help='Add the file path of a single test case to be executed', nargs='+')
parser.add_argument('--add_case_dir', dest='case_dir', type=lambda arg : is_directory(parser, arg),
help='Add the directory where the test cases are to be executed', nargs='+')
return parser.parse_args()
def assemble_command(command_list: list = None, executable_program_path: str = None,
file_path: str = None, parameters: list = None):
if command_list is None:
command_list = [executable_program_path, file_path]
if parameters:
command_list.extend(parameters)
else:
if executable_program_path:
command_list[0] = executable_program_path
if file_path:
command_list[1] = file_path
return command_list
class CaseManager:
def __init__(self, args, skip_test_flag=True):
self.test_root = os.path.dirname(os.path.abspath(__file__))
self.args = args
self.case_list = []
self.skip_tests_info = os.path.join(self.test_root, SKIP_TEST)
self.report_path = os.path.join(self.test_root, DISCREPANCY_REPORT)
self.get_test_case()
if skip_test_flag and os.path.exists(self.skip_tests_info):
self.skip_cases()
self.case_list.sort()
# Creating dictionary: {file_path : [d8 status, es2abc status]}
self.crash_dict = {file_path : [SUCCESS_STATUS, SUCCESS_STATUS] for file_path in self.case_list}
def get_test_case(self):
def add_directory_to_case_list(case_dir, case_list, extension='js', recursive=True):
if not os.path.isdir(case_dir):
print(f'[ERROR]: add_directory_to_case_list failed! {case_dir} does not exist!')
return False
glob_expression = os.path.join(case_dir, f'**/*.{extension}')
files_list = glob.glob(glob_expression, recursive=recursive)
for file in files_list:
abs_file_path = os.path.abspath(file)
if abs_file_path not in case_list: # make sure no duplicate case
case_list.append(abs_file_path)
return True
if self.args.case is not None:
for case in self.args.case:
abs_file_path = os.path.abspath(case)
if abs_file_path not in self.case_list:
self.case_list.append(abs_file_path)
if self.args.case_dir is not None:
for case in self.args.case_dir:
add_directory_to_case_list(case, self.case_list)
cur_dir = os.getcwd()
os.chdir(self.test_root)
# add workload cases
case_dir_path = self.pull_cases_from_repo(WORKLOAD_URL, 'test_cases/ark-workload')
if case_dir_path:
print('[INFO]: pull workload cases Success!')
case_dir_path = os.path.join(case_dir_path, 'weekly_workload', 'js')
add_directory_to_case_list(case_dir_path, self.case_list)
# add benchmark cases
sys.path.insert(0, BENCHMARK_PATH)
from utils import DEFAULT_TESTCASES_DIR, pull_cases, clear_folder_shutil
benchmark_case_path = os.path.join(BENCHMARK_PATH, DEFAULT_TESTCASES_DIR)
clear_folder_shutil(benchmark_case_path)
pull_benchmark_cases_success = pull_cases()
if pull_benchmark_cases_success:
print('[INFO]: pull benchmark cases Success!')
add_directory_to_case_list(benchmark_case_path, self.case_list)
os.chdir(cur_dir)
def git_clone(self, git_url, code_dir, pull=False):
cur_dir = os.getcwd()
cmd = ['git', 'clone', git_url, code_dir]
if pull:
os.chdir(code_dir)
cmd = ['git', 'pull']
process = subprocess.Popen(cmd)
process.wait()
os.chdir(cur_dir)
result = True
if process.returncode:
print(f"\n[ERROR]: git clone or pull '{git_url}' Failed!")
result = False
return result
def pull_cases_from_repo(self, case_url, case_dir):
dir_path = os.path.join(self.test_root, case_dir)
pull = False
if os.path.exists(dir_path):
pull = True
clone_result = self.git_clone(case_url, dir_path, pull)
if not clone_result:
return None
return dir_path
def skip_cases(self):
with open(self.skip_tests_info, 'r') as f:
data = json.load(f)
skip_case_list = []
for reason_files_dict in data:
skip_case_list.extend([os.path.abspath(os.path.join(self.test_root, case))
for case in reason_files_dict['files']])
self.case_list = [case for case in self.case_list if case not in skip_case_list]
def calculate_ratio_for_discrepancy_report(self, d8_instruction_number, es2abc_instruction_number):
if d8_instruction_number <= 0 or es2abc_instruction_number <= 0:
return 'Invalid Ratio'
ratio = (float(es2abc_instruction_number) / d8_instruction_number) * 100
return f'{ratio:.2f}%'
def generate_discrepancy_report(self, d8_output, es2abc_output):
global HTML_CONTENT
case_number = 1
for d8_item, es2abc_item in zip(d8_output, es2abc_output):
d8_instruction_number = d8_item[1]
es2abc_instruction_number = es2abc_item[1]
if self.crash_dict[d8_item[0]][0] == CRASH_STATUS:
d8_instruction_number = \
"{}".format(d8_item[1])
elif self.crash_dict[d8_item[0]][0] == TIMEOUT_STATUS:
d8_instruction_number = \
"{}".format(d8_item[1])
if self.crash_dict[es2abc_item[0]][1] == CRASH_STATUS:
es2abc_instruction_number = \
"{}".format(es2abc_item[1])
elif self.crash_dict[es2abc_item[0]][1] == TIMEOUT_STATUS:
es2abc_instruction_number = \
"{}".format(es2abc_item[1])
case_path = os.path.relpath(d8_item[0], self.test_root)
html_content_of_case_info = f"""
{case_number} |
{case_path} |
{es2abc_instruction_number} |
{d8_instruction_number} |
{self.calculate_ratio_for_discrepancy_report(int(d8_item[1]),
int(es2abc_item[1]))} |
"""
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_case_info)
case_number += 1
html_content_of_end_tag = "
"
HTML_CONTENT = "{}{}".format(HTML_CONTENT, html_content_of_end_tag)
flags = os.O_RDWR | os.O_CREAT
mode = stat.S_IWUSR | stat.S_IRUSR
with os.fdopen(os.open(self.report_path + '.html', flags, mode), 'w') as f:
f.truncate()
f.write(HTML_CONTENT)
class Runner(ABC):
def __init__(self, command_list:list, case_manager:CaseManager):
self.output = []
self.command_list = command_list
self.case_manager = case_manager
@abstractmethod
def run(self):
pass
class D8Runner(Runner):
def __init__(self, command_list, case_manager):
super().__init__(command_list, case_manager)
def run(self):
for file_path in self.case_manager.case_list:
d8_case_path = file_path.replace('.js', '.mjs')
shutil.copyfile(file_path, d8_case_path)
instruction_number = RecognizeInstructionMethod.recognize_d8_bytecode_instruction(
self.command_list, d8_case_path, self.case_manager)
if os.path.exists(d8_case_path):
os.remove(d8_case_path)
self.output.append((file_path, instruction_number))
class ES2ABCRunner(Runner):
def __init__(self, command_list, case_manager):
super().__init__(command_list, case_manager)
def run(self):
for file_path in self.case_manager.case_list:
instruction_number = RecognizeInstructionMethod.recognize_es2abc_bytecode_instruction(
self.command_list, file_path, self.case_manager)
self.output.append((file_path, instruction_number))
class RecognizeInstructionMethod:
@staticmethod
def recognize_d8_bytecode_instruction(command_list, file_path, case_manager):
original_file_path = file_path.replace('.mjs', '.js')
command_list = assemble_command(command_list, file_path=file_path)
process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
output, _ = process.communicate(timeout=case_manager.args.timeout)
except subprocess.TimeoutExpired:
print(f"[WARNING]: v8 - Killed! Timeout: {file_path}")
process.kill()
case_manager.crash_dict[original_file_path][0] = TIMEOUT_STATUS
return -1 # Script execution timeout sets the instruction number to -1
process.wait()
if process.returncode != 0:
case_manager.crash_dict[original_file_path][0] = CRASH_STATUS
print(f'[WARNING]: v8 - Crashed! {file_path}')
decoded_output = output.decode('utf-8', errors='ignore')
instruction_pattern = r'0x[0-9a-fA-F]+ @\s+\d+\s+:\s+([0-9a-fA-F\s]+)\s+(.*)$'
result = re.finditer(instruction_pattern, decoded_output, re.MULTILINE)
instruction_number = sum(1 for _ in result)
return instruction_number
@staticmethod
def recognize_es2abc_bytecode_instruction(command_list, file_path, case_manager):
command_list = assemble_command(command_list, file_path=file_path)
process = subprocess.Popen(command_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
output, _ = process.communicate(timeout=case_manager.args.timeout)
except subprocess.TimeoutExpired:
print(f"[WARNING]: es2abc - Killed! Timeout: {file_path}")
process.kill()
case_manager.crash_dict[file_path][1] = TIMEOUT_STATUS
return -1 # Script execution timeout sets the instructions number to -1
process.wait()
if process.returncode != 0:
case_manager.crash_dict[file_path][1] = CRASH_STATUS
print(f'[WARNING]: es2abc - Crashed! {file_path}')
decoded_output = output.decode('utf-8', errors='ignore')
lines = decoded_output.split('\n')
instruction_number = 0
for line in lines:
if 'instructions_number:' in line:
instruction_number = line.split('instructions_number:')[-1].strip()
break
return instruction_number
def main():
args = get_args()
d8_command = assemble_command(executable_program_path=args.d8_path, parameters=['--print-bytecode'])
es2abc_command = assemble_command(executable_program_path=args.es2abc_path,
parameters=['--module', '--dump-size-stat', '--output=/dev/null'])
case_manager = CaseManager(args)
d8_runner = D8Runner(d8_command, case_manager)
es2abc_runner = ES2ABCRunner(es2abc_command, case_manager)
if len(case_manager.case_list):
thread_d8 = threading.Thread(target=d8_runner.run)
thread_es2abc = threading.Thread(target=es2abc_runner.run)
thread_d8.start()
thread_es2abc.start()
thread_d8.join()
thread_es2abc.join()
case_manager.generate_discrepancy_report(d8_runner.output, es2abc_runner.output)
if __name__ == "__main__":
main()