1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4"""
5Copyright (c) 2023-2024 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18Description: run regress test case
19"""
20import argparse
21import dataclasses
22import datetime
23import json
24import logging
25import multiprocessing
26import os
27import platform
28import re
29import shutil
30import stat
31import subprocess
32import sys
33from abc import ABC
34from typing import Optional, List, Type, Dict, Set, Tuple, Callable
35from os.path import dirname, join
36from pathlib import Path
37import xml.etree.cElementTree as XTree
38from enum import Enum, auto
39
40from regress_test_config import RegressTestConfig
41
42ENV_PATTERN = re.compile(r"//\s+Environment Variables:(.*)")
43
44
45def init_log_file(args):
46    logging.basicConfig(filename=args.out_log, format=RegressTestConfig.DEFAULT_LOG_FORMAT, level=logging.INFO)
47
48
49def parse_args():
50    parser = argparse.ArgumentParser()
51    parser.add_argument('--test-dir', metavar='DIR',
52                        help='Directory to test ')
53    parser.add_argument('--test-file', metavar='FILE',
54                        help='File to test')
55    parser.add_argument('--test-list', metavar='FILE', dest="test_list", default=None,
56                        help='File with list of tests to run')
57    parser.add_argument('--ignore-list', metavar='FILE', dest="ignore_list", default=None,
58                        help='File with known failed tests list')
59    parser.add_argument('--timeout', default=RegressTestConfig.DEFAULT_TIMEOUT, type=int,
60                        help='Set a custom test timeout in seconds !!!\n')
61    parser.add_argument('--processes', default=RegressTestConfig.DEFAULT_PROCESSES, type=int,
62                        help='set number of processes to use. Default value: 1\n')
63    parser.add_argument('--merge-abc-binary',
64                        help="merge-abc's binary tool")
65    parser.add_argument('--ark-tool',
66                        help="ark's binary tool")
67    parser.add_argument('--ark-aot-tool',
68                        help="ark_aot's binary tool")
69    parser.add_argument('--ark-aot', default=False, action='store_true',
70                        help="runs in ark-aot mode")
71    parser.add_argument('--run-pgo', default=False, action='store_true',
72                        help="runs in pgo mode")
73    parser.add_argument('--enable-litecg', default=False, action='store_true',
74                        help="runs in litecg mode")
75    parser.add_argument('--ark-frontend-binary',
76                        help="ark frontend conversion binary tool")
77    parser.add_argument('--stub-path',
78                        help="stub file for run in AOT modes")
79    parser.add_argument('--LD_LIBRARY_PATH', '--libs-dir',
80                        dest='ld_library_path', default=None, help='LD_LIBRARY_PATH')
81    parser.add_argument('--icu-path',
82                        dest='icu_path', help='icu-data-path')
83    parser.add_argument('--out-dir',
84                        default=None, help='target out dir')
85    parser.add_argument('--force-clone', action="store_true",
86                        default=False, help='Force to clone tests folder')
87    parser.add_argument('--ark-arch',
88                        default=RegressTestConfig.DEFAULT_ARK_ARCH,
89                        required=False,
90                        nargs='?', choices=RegressTestConfig.ARK_ARCH_LIST, type=str)
91    parser.add_argument('--ark-arch-root',
92                        default=RegressTestConfig.DEFAULT_ARK_ARCH,
93                        required=False,
94                        help="the root path for qemu-aarch64 or qemu-arm")
95    parser.add_argument('--disable-force-gc', action='store_true',
96                        help="Run regress tests with close force-gc")
97    return parser.parse_args()
98
99
100def check_ark_frontend_binary(args) -> bool:
101    if args.ark_frontend_binary is None:
102        output('ark_frontend_binary is required, please add this parameter')
103        return False
104    return True
105
106
107def check_frontend_library(args) -> bool:
108    current_dir = str(os.getcwd())
109    current_frontend_binary = os.path.join(current_dir, str(args.ark_frontend_binary))
110    test_tool_frontend_binary = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_frontend_binary)
111    if not os.path.exists(current_frontend_binary) and not os.path.exists(test_tool_frontend_binary):
112        output('entered ark_frontend_binary does not exist. please confirm')
113        return False
114    args.ark_frontend_binary = current_frontend_binary if os.path.exists(
115        current_frontend_binary) else test_tool_frontend_binary
116    args.ark_frontend_binary = os.path.abspath(args.ark_frontend_binary)
117    return True
118
119
120def check_ark_tool(args) -> bool:
121    current_dir = str(os.getcwd())
122    if args.ark_tool is None:
123        output('ark_tool is required, please add this parameter')
124        return False
125
126    current_ark_tool = os.path.join(current_dir, str(args.ark_tool))
127    test_tool_ark_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_tool)
128    if not os.path.exists(current_ark_tool) and not os.path.exists(test_tool_ark_tool):
129        output('entered ark_tool does not exist. please confirm')
130        return False
131
132    args.ark_tool = current_ark_tool if os.path.exists(current_ark_tool) else test_tool_ark_tool
133    args.ark_tool = os.path.abspath(args.ark_tool)
134    return True
135
136
137def check_ark_aot(args) -> bool:
138    if args.ark_aot:
139        current_dir = str(os.getcwd())
140        current_ark_aot_tool = os.path.join(current_dir, str(args.ark_aot_tool))
141        test_tool_ark_aot_tool = os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ark_aot_tool)
142        if not os.path.exists(current_ark_aot_tool) and not os.path.exists(test_tool_ark_aot_tool):
143            output(f'entered ark_aot_tool "{args.ark_aot_tool}" does not exist. Please check')
144            return False
145        args.ark_aot_tool = current_ark_aot_tool if os.path.exists(current_ark_aot_tool) else test_tool_ark_aot_tool
146        args.ark_aot_tool = os.path.abspath(args.ark_aot_tool)
147        return True
148    if args.run_pgo and not args.ark_aot:
149        output('pgo mode cannot be used without aot')
150        return False
151    return True
152
153
154def check_stub_path(args) -> bool:
155    if args.stub_path:
156        current_dir = str(os.getcwd())
157        stub_path = os.path.join(current_dir, str(args.stub_path))
158        if not os.path.exists(stub_path):
159            output(f'entered stub-file "{args.stub_path}" does not exist. Please check')
160            return False
161        args.stub_path = os.path.abspath(args.stub_path)
162    return True
163
164
165def is_ignore_file_present(ignore_path: str) -> bool:
166    if os.path.exists(ignore_path):
167        return True
168    output(f"Cannot find ignore list '{ignore_path}'")
169    return False
170
171
172def check_ignore_list(args) -> bool:
173    if args.ignore_list:
174        if os.path.isabs(args.ignore_list):
175            return is_ignore_file_present(args.ignore_list)
176        args.ignore_list = str(os.path.join(RegressTestConfig.TEST_TOOL_FILE_DIR, args.ignore_list))
177        return is_ignore_file_present(args.ignore_list)
178    return True
179
180
181def check_args(args):
182    result = check_ark_frontend_binary(args)
183    result = result and check_frontend_library(args)
184    result = result and check_ark_tool(args)
185    result = result and check_ark_aot(args)
186    result = result and check_stub_path(args)
187    result = result and check_ignore_list(args)
188
189    if not result:
190        return False
191
192    if args.ld_library_path is not None:
193        libs = args.ld_library_path.split(":")
194        current_dir = str(os.getcwd())
195        libs = [os.path.abspath(os.path.join(current_dir, str(lib))) for lib in libs]
196        args.ld_library_path = ":".join(libs)
197    else:
198        args.ld_library_path = RegressTestConfig.DEFAULT_LIBS_DIR
199    if args.icu_path is None:
200        args.icu_path = RegressTestConfig.ICU_PATH
201    if args.out_dir is None:
202        args.out_dir = RegressTestConfig.PROJECT_BASE_OUT_DIR
203    else:
204        args.out_dir = os.path.abspath(os.path.join(RegressTestConfig.CURRENT_PATH, args.out_dir))
205    if not args.out_dir.endswith("/"):
206        args.out_dir = f"{args.out_dir}/"
207    args.regress_out_dir = os.path.join(args.out_dir, "regresstest")
208    args.out_result = os.path.join(args.regress_out_dir, 'result.txt')
209    args.junit_report = os.path.join(args.regress_out_dir, 'report.xml')
210    args.out_log = os.path.join(args.regress_out_dir, 'test.log')
211    args.test_case_out_dir = os.path.join(args.regress_out_dir, RegressTestConfig.REGRESS_GIT_REPO)
212    return True
213
214
215def remove_dir(path):
216    if os.path.exists(path):
217        shutil.rmtree(path)
218
219
220def output(msg):
221    print(str(msg))
222    logging.info(str(msg))
223
224
225def output_debug(msg):
226    logging.debug(str(msg))
227
228
229def get_extra_error_message(ret_code: int) -> str:
230    error_messages = {
231        0: '',
232        -6: 'Aborted (core dumped)',
233        -4: 'Aborted (core dumped)',
234        -11: 'Segmentation fault (core dumped)',
235        255: '(uncaught error)'
236    }
237    error_message = error_messages.get(ret_code, f'Unknown Error: {str(ret_code)}')
238    return error_message
239
240
241@dataclasses.dataclass
242class StepResult:
243    step_name: str  # a copy of the step name
244    is_passed: bool = False  # True if passed, any other state is False
245    command: List[str] = dataclasses.field(default_factory=list)  # command to run
246    return_code: int = -1
247    stdout: Optional[str] = None  # present only if there is some output
248    stderr: Optional[str] = None  # can be present only if is_passed == False
249    fileinfo: Optional[str] = None  # content of fileinfo file if present
250
251    def report(self) -> str:
252        stdout = self.stdout if self.stdout else ''
253        stderr = self.stderr if self.stderr else ''
254        cmd = " ".join([str(cmd) for cmd in self.command])
255        result: List[str] = [
256            f"{self.step_name}:",
257            f"\tCommand: {cmd}",
258            f"\treturn code={self.return_code}",
259            f"\toutput='{stdout}'",
260            f"\terrors='{stderr}'"]
261        if self.fileinfo:
262            result.append(f"\tFileInfo:\n{self.fileinfo}")
263        return "\n".join(result)
264
265
266@dataclasses.dataclass
267class TestReport:
268    src_path: str  # full path to the source test
269    test_id: str = ""  # path starting from regresstest
270    out_path: str = ""  # full path to intermediate files up to folder
271    passed: bool = False  # False if the test has not started or failed
272    is_skipped: bool = False  # True if the test has found in the skipped (excluded) list
273    is_ignored: bool = False  # True if the test has found in the ignored list
274    steps: List[StepResult] = dataclasses.field(default_factory=list)  # list of results
275
276    def report(self) -> str:
277        result: List[str] = [f"{self.test_id}:"]
278        if self.steps is None:
279            return ""
280        for step in self.steps:
281            result.append(f"\t{step.report()}")
282        return "\n".join(result)
283
284
285class RegressTestStep(ABC):
286    step_obj: Optional['RegressTestStep'] = None
287
288    def __init__(self, args, name):
289        output(f"--- Start step {name} ---")
290        self.args = args
291        self.__start: Optional[datetime.datetime] = None
292        self.__end: Optional[datetime.datetime] = None
293        self.__duration: Optional[datetime.timedelta] = None
294        self.name: str = name
295
296    @staticmethod
297    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
298        pass
299
300    def get_duration(self) -> datetime.timedelta:
301        if self.__duration is None:
302            output(f"Step {self.name} not started or not completed")
303            sys.exit(1)
304        return self.__duration
305
306    def _start(self):
307        self.__start = datetime.datetime.now()
308
309    def _end(self):
310        self.__end = datetime.datetime.now()
311        self.__duration = self.__end - self.__start
312
313
314class RegressTestRepoPrepare(RegressTestStep):
315    def __init__(self, args):
316        RegressTestStep.__init__(self, args, "Repo preparation")
317        self.test_list: List[str] = self.read_test_list(args.test_list)
318
319    @staticmethod
320    def read_test_list(test_list_name: Optional[str]) -> List[str]:
321        if test_list_name is None:
322            return []
323        filename = join(dirname(__file__), test_list_name)
324        if not Path(filename).exists():
325            output(f"File {filename} set as --test-list value cannot be found")
326            exit(1)
327        with open(filename, 'r') as stream:
328            return stream.read().split("\n")
329
330    @staticmethod
331    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
332        repo = RegressTestRepoPrepare(args)
333        RegressTestRepoPrepare.step_obj = repo
334        repo._start()
335
336        repo.run_regress_test_prepare()
337        repo.prepare_clean_data()
338        repo.get_test_case()
339        test_list = repo.get_regress_test_files()
340        skip_list = repo.get_skip_test_cases()
341        if test_reports is None:
342            test_reports = []
343        for test in test_list:
344            shorten = Utils.get_inside_path(test)
345            test_id = f"regresstest/ark-regress/{shorten}"
346            if shorten not in skip_list:
347                report = TestReport(src_path=test, test_id=test_id)
348                test_reports.append(report)
349
350        repo._end()
351        return test_reports
352
353    @staticmethod
354    def git_checkout(checkout_options, check_out_dir=os.getcwd()):
355        cmds = ['git', 'checkout', checkout_options]
356        result = True
357        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
358            ret = proc.wait()
359            if ret:
360                output(f"\n error: git checkout '{checkout_options}' failed.")
361                result = False
362        return result
363
364    @staticmethod
365    def git_pull(check_out_dir=os.getcwd()):
366        cmds = ['git', 'pull', '--rebase']
367        with subprocess.Popen(cmds, cwd=check_out_dir) as proc:
368            proc.wait()
369
370    @staticmethod
371    def git_clean(clean_dir=os.getcwd()):
372        cmds = ['git', 'checkout', '--', '.']
373        with subprocess.Popen(cmds, cwd=clean_dir) as proc:
374            proc.wait()
375
376    @staticmethod
377    def git_clone(git_url, code_dir):
378        cmds = ['git', 'clone', git_url, code_dir]
379        retries = RegressTestConfig.DEFAULT_RETRIES
380        while retries > 0:
381            with subprocess.Popen(cmds) as proc:
382                ret = proc.wait()
383                if ret:
384                    output(f"\n Error: Cloning '{git_url}' failed. Retry remaining '{retries}' times")
385                    retries -= 1
386                else:
387                    return True
388        sys.exit(1)
389
390    @staticmethod
391    def get_skip_test_cases() -> List[str]:
392        return Utils.read_skip_list(RegressTestConfig.SKIP_LIST_FILE)
393
394    def get_test_case(self):
395        if not os.path.isdir(os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, '.git')):
396            self.git_clone(RegressTestConfig.REGRESS_GIT_URL, RegressTestConfig.REGRESS_TEST_CASE_DIR)
397            return self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
398        return True
399
400    def prepare_clean_data(self):
401        self.git_clean(RegressTestConfig.REGRESS_TEST_CASE_DIR)
402        self.git_pull(RegressTestConfig.REGRESS_TEST_CASE_DIR)
403        self.git_checkout(RegressTestConfig.REGRESS_GIT_HASH, RegressTestConfig.REGRESS_TEST_CASE_DIR)
404
405    def run_regress_test_prepare(self):
406        if self.args.force_clone:
407            remove_dir(self.args.regress_out_dir)
408            remove_dir(RegressTestConfig.REGRESS_TEST_CASE_DIR)
409        os.makedirs(self.args.regress_out_dir, exist_ok=True)
410        os.makedirs(RegressTestConfig.REGRESS_TEST_CASE_DIR, exist_ok=True)
411        init_log_file(self.args)
412
413    def get_regress_test_files(self) -> List[str]:
414        result: List[str] = []
415        if self.args.test_file is not None and len(self.args.test_file) > 0:
416            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_file)
417            result.append(str(test_file_list))
418            return result
419        elif self.args.test_dir is not None and len(self.args.test_dir) > 0:
420            test_file_list = os.path.join(RegressTestConfig.REGRESS_TEST_CASE_DIR, self.args.test_dir)
421        else:
422            test_file_list = RegressTestConfig.REGRESS_TEST_CASE_DIR
423        for dir_path, path, filenames in os.walk(test_file_list):
424            if dir_path.find(".git") != -1:
425                continue
426            for filename in filenames:
427                if filename.endswith(".js") or filename.endswith(".mjs"):
428                    result.append(str(os.path.join(dir_path, filename)))
429        return result
430
431
432class RegressTestCompile(RegressTestStep):
433    def __init__(self, args, test_reports: List[TestReport]):
434        RegressTestStep.__init__(self, args, "Regress test compilation")
435        self.out_dir = args.out_dir
436        self.test_reports = test_reports
437        for test in self.test_reports:
438            test.out_path = os.path.dirname(os.path.join(self.out_dir, test.test_id))
439
440    @staticmethod
441    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
442        if test_reports is None:
443            output("No tests loaded")
444            exit(-1)
445        test_prepare = RegressTestCompile(args, test_reports)
446        RegressTestCompile.step_obj = test_prepare
447        test_prepare._start()
448        test_reports = test_prepare.gen_abc_files()
449        test_prepare._end()
450        return test_reports
451
452    @staticmethod
453    def create_files_info(test_report: TestReport) -> Tuple[str, str]:
454        src_files_info = [
455            RegressTestConfig.REGRESS_TEST_TOOL_DIR,
456            test_report.src_path
457        ]
458        file_info_content: List[str] = []
459        file_info_path = str(os.path.join(
460            test_report.out_path,
461            f"{Utils.get_file_only_name(test_report.src_path)}-filesInfo.txt"))
462        os.makedirs(test_report.out_path, exist_ok=True)
463        with os.fdopen(
464                os.open(file_info_path, flags=os.O_RDWR | os.O_CREAT, mode=stat.S_IRUSR | stat.S_IWUSR),
465                mode="w+", encoding="utf-8"
466        ) as fp:
467            for src_file_info in src_files_info:
468                line = f"{src_file_info};{Utils.get_file_only_name(src_file_info)};esm;xxx;yyy\n"
469                file_info_content.append(line)
470                fp.write(line)
471        return file_info_path, "\n".join(file_info_content)
472
473    def gen_abc_files(self) -> List[TestReport]:
474        with multiprocessing.Pool(processes=self.args.processes) as pool:
475            results = pool.imap_unordered(self.gen_abc_file, self.test_reports)
476            results = list(results)
477            pool.close()
478            pool.join()
479
480        return results
481
482    def gen_abc_file(self, test_report: TestReport) -> Optional[TestReport]:
483        if test_report.src_path == RegressTestConfig.REGRESS_TEST_TOOL_DIR:
484            return None
485        file_info_path, file_info_content = self.create_files_info(test_report)
486        out_file = change_extension(test_report.src_path, '.out')
487        expect_file_exists = os.path.exists(out_file)
488        output_file = change_extension(
489            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
490            ".abc")
491        command = [
492            self.args.ark_frontend_binary,
493            f"@{file_info_path}",
494            "--merge-abc",
495            "--module",
496            f'--output={output_file}'
497        ]
498        step_result = StepResult(self.name, command=command, fileinfo=file_info_content)
499        Utils.exec_command(command, test_report.test_id, step_result, self.args.timeout,
500                           lambda rt, _, _2: get_extra_error_message(rt))
501        test_report.steps.append(step_result)
502        test_report.passed = step_result.is_passed
503        if expect_file_exists:
504            out_file_path = os.path.join(test_report.out_path, change_extension(test_report.test_id, '.out'))
505            shutil.copy(str(out_file), str(out_file_path))
506        return test_report
507
508
509class RegressTestPgo(RegressTestStep):
510    def __init__(self, args):
511        RegressTestStep.__init__(self, args, "Regress Test PGO ")
512
513    @staticmethod
514    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
515        pgo = RegressTestPgo(args)
516        RegressTestPgo.step_obj = pgo
517        pgo._start()
518        test_reports = pgo.generate_aps(test_reports)
519        pgo._end()
520        return test_reports
521
522    def get_test_ap_cmd(self, test_report: TestReport) -> List[str]:
523        abc_file = change_extension(
524            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
525            ".abc")
526        ap_file = change_extension(abc_file, ".ap")
527        entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
528        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
529        gen_ap_cmd = []
530        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
531            qemu_tool = "qemu-aarch64"
532            gen_ap_cmd = [
533                qemu_tool,
534                "-L",
535                self.args.ark_arch_root
536            ]
537        gen_ap_cmd.append(self.args.ark_tool)
538        gen_ap_cmd.append("--log-level=info")
539        gen_ap_cmd.append(f"--icu-data-path={self.args.icu_path}")
540        gen_ap_cmd.append("--compiler-target-triple=aarch64-unknown-linux-gn")
541        gen_ap_cmd.append("--enable-pgo-profiler=true")
542        gen_ap_cmd.append("--compiler-opt-inlining=true")
543        gen_ap_cmd.append(f"--compiler-pgo-profiler-path={ap_file}")
544        gen_ap_cmd.append("--asm-interpreter=true")
545        gen_ap_cmd.append(f"--entry-point={entry_point}")
546        gen_ap_cmd.append(f"{abc_file}")
547        return gen_ap_cmd
548
549    def generate_ap(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
550        if test_report is None or not test_report.passed:
551            return test_report
552        command = self.get_test_ap_cmd(test_report)
553        step = StepResult(self.name, command=command)
554        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
555                           lambda rt, _, _2: get_extra_error_message(rt))
556        test_report.steps.append(step)
557        test_report.passed = step.is_passed
558        return test_report
559
560    def generate_aps(self, test_reports: List[TestReport]) -> List[TestReport]:
561        with multiprocessing.Pool(processes=self.args.processes) as pool:
562            results = pool.imap_unordered(self.generate_ap, test_reports)
563            results = list(results)
564            pool.close()
565            pool.join()
566
567        return results
568
569
570class Utils:
571    ark_regress = "ark-regress"
572
573    @staticmethod
574    def get_file_only_name(full_file_name: str) -> str:
575        _, file_name = os.path.split(full_file_name)
576        only_name, _ = os.path.splitext(file_name)
577        return only_name
578
579    @staticmethod
580    def get_file_name(full_file_name: str) -> str:
581        _, file_name = os.path.split(full_file_name)
582        return file_name
583
584    @staticmethod
585    def mk_dst_dir(file, src_dir, dist_dir):
586        idx = file.rfind(src_dir)
587        fpath, _ = os.path.split(file[idx:])
588        fpath = fpath.replace(src_dir, dist_dir)
589        os.makedirs(fpath, exist_ok=True)
590
591    @staticmethod
592    def get_inside_path(file_path: str, marker: Optional[str] = None) -> str:
593        if marker is None:
594            marker = Utils.ark_regress
595        index = file_path.find(marker)
596        if index > -1:
597            return file_path[index + len(marker) + 1:]
598        return file_path
599
600    @staticmethod
601    def exec_command(cmd_args, test_id: str, step_result: StepResult, timeout=RegressTestConfig.DEFAULT_TIMEOUT,
602                     get_extra_error_msg: Optional[Callable[[int, str, str], str]] = None) -> None:
603        code_format = 'utf-8'
604        if platform.system() == "Windows":
605            code_format = 'gbk'
606        cmd_string = "\n\t".join([str(arg).strip() for arg in cmd_args if arg is not None])
607        msg_cmd = "\n".join([
608            f"Run command:\n{cmd_string}",
609            f"Env: {os.environ.get('LD_LIBRARY_PATH')}"
610        ])
611        msg_result = f"TEST ({step_result.step_name.strip()}): {test_id}"
612        try:
613            with subprocess.Popen(cmd_args, stderr=subprocess.PIPE, stdout=subprocess.PIPE, close_fds=True,
614                                  start_new_session=True) as process:
615                output_res, errs = process.communicate(timeout=timeout)
616                ret_code = process.poll()
617                step_result.return_code = ret_code
618                stderr = str(errs.decode(code_format, 'ignore').strip())
619                stdout = str(output_res.decode(code_format, 'ignore').strip())
620                extra_message = get_extra_error_msg(ret_code, stdout, stderr) if get_extra_error_msg is not None else ""
621                step_result.stderr = f"{extra_message + '. ' if extra_message else '' }{stderr if stderr else ''}"
622                step_result.stdout = stdout
623                if ret_code == 0:
624                    msg_result = f"{msg_result} PASSED"
625                    step_result.is_passed = True
626                else:
627                    msg_result = f"{msg_result} FAILED"
628        except subprocess.TimeoutExpired:
629            process.kill()
630            process.terminate()
631            step_result.return_code = -1
632            step_result.stderr = f"Timeout: timed out after {timeout} seconds"
633            msg_result = f"{msg_result} FAILED"
634        except Exception as exc:
635            step_result.return_code = -1
636            step_result.stderr = f"Unknown error: {exc}"
637            msg_result = f"{msg_result} FAILED"
638        if step_result.is_passed:
639            output(msg_result)
640            output_debug(msg_cmd)
641        else:
642            output(f"{msg_result}\n{step_result.stderr}\n{msg_cmd}")
643
644    @staticmethod
645    def read_skip_list(skip_list_path: str) -> List[str]:
646        skip_tests_list = []
647        with os.fdopen(os.open(skip_list_path, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
648            json_data = json.load(file_object)
649            for key in json_data:
650                skip_tests_list.extend(key["files"])
651        return skip_tests_list
652
653    @staticmethod
654    def read_file_as_str(file_name: str) -> str:
655        with os.fdopen(os.open(file_name, os.O_RDONLY, stat.S_IRUSR), "r") as file_object:
656            content = [line.strip() for line in file_object.readlines()]
657        return "\n".join(content)
658
659
660class RegressTestAot(RegressTestStep):
661    def __init__(self, args):
662        RegressTestStep.__init__(self, args, "Regress Test AOT mode")
663
664    @staticmethod
665    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
666        aot = RegressTestAot(args)
667        RegressTestAot.step_obj = aot
668        aot._start()
669        test_reports = aot.compile_aots(test_reports)
670        aot._end()
671        return test_reports
672
673    def get_test_aot_cmd(self, test_report: TestReport) -> List[str]:
674        abc_file = change_extension(
675            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
676            ".abc")
677        ap_file = change_extension(abc_file, ".ap")
678        aot_file = change_extension(abc_file, "")
679        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
680        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
681            aot_cmd = [
682                "qemu-aarch64",
683                "-L",
684                self.args.ark_arch_root,
685                self.args.ark_aot_tool,
686                "--compiler-target-triple=aarch64-unknown-linux-gnu",
687                f"--aot-file={aot_file}"
688            ]
689        else:
690            aot_cmd = [
691            self.args.ark_aot_tool,
692            f"--aot-file={aot_file}",
693        ]
694
695        pgo = [
696            "--compiler-opt-loop-peeling=true",
697            "--compiler-fast-compile=false",
698            "--compiler-opt-track-field=true",
699            "--compiler-opt-inlining=true",
700            "--compiler-max-inline-bytecodes=45",
701            "--compiler-opt-level=2",
702            f"--compiler-pgo-profiler-path={ap_file}",
703        ]
704        litecg = [
705            "--compiler-enable-litecg=true",
706        ]
707        aot_cmd_tail = [
708            f"{abc_file}",
709        ]
710
711        if self.args.run_pgo:
712            aot_cmd.extend(pgo)
713        if self.args.enable_litecg:
714            aot_cmd.extend(litecg)
715        aot_cmd.extend(aot_cmd_tail)
716        return aot_cmd
717
718    def compile_aot(self, test_report: Optional[TestReport]) -> Optional[TestReport]:
719        if test_report is None or not test_report.passed:
720            return test_report
721        command = self.get_test_aot_cmd(test_report)
722        step = StepResult(self.name, command=command)
723        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
724                           lambda rt, _, _2: get_extra_error_message(rt))
725        test_report.steps.append(step)
726        test_report.passed = step.is_passed
727        return test_report
728
729    def compile_aots(self, test_reports: List[TestReport]) -> List[TestReport]:
730        with multiprocessing.Pool(processes=self.args.processes) as pool:
731            results = pool.imap_unordered(self.compile_aot, test_reports)
732            results = list(results)
733            pool.close()
734            pool.join()
735
736        return results
737
738
739class RegressOption(Enum):
740    NO_FORCE_GC = auto()
741    ELEMENTS_KIND = auto()
742
743
744def get_regress_groups() -> Dict[RegressOption, List[str]]:
745    groups = {}
746    with os.fdopen(os.open(RegressTestConfig.REGRESS_TEST_OPTIONS, os.O_RDONLY, stat.S_IRUSR), "r") as file:
747        for group in json.load(file):
748            groups[RegressOption[group["name"]]] = group["files"]
749    return groups
750
751
752def get_test_options(test: str, test_groups: Dict[RegressOption, List[str]], regress_option: RegressOption) -> List[str]:
753    opt_values: Dict[RegressOption, str] = {
754        RegressOption.NO_FORCE_GC: "--enable-force-gc=",
755        RegressOption.ELEMENTS_KIND: "--enable-elements-kind="
756    }
757
758    def match(opt: RegressOption) -> bool:
759        return test in test_groups.get(opt, [])
760
761    def to_flag(b: bool) -> str:
762        return str(b).lower()
763
764    try:
765        return [opt_values.get(regress_option) + to_flag(not match(regress_option))]
766    except KeyError:
767        return []
768
769
770class RegressTestRun(RegressTestStep):
771    def __init__(self, args):
772        RegressTestStep.__init__(self, args, "Regress Test Run ")
773        self.test_groups: Dict[RegressOption, List[str]] = get_regress_groups()
774
775    @staticmethod
776    def run(args, test_reports: Optional[List[TestReport]] = None) -> List[TestReport]:
777        runner = RegressTestRun(args)
778        RegressTestRun.step_obj = runner
779        runner._start()
780        test_reports = runner.run_test_case_dir(test_reports)
781        runner._end()
782        return test_reports
783
784    @staticmethod
785    def extra_check_with_expect(ret_code: int, test_report: TestReport, expect_file, stdout: str, stderr: str) -> str:
786        expect_output_str = read_expect_file(expect_file, test_report.src_path)
787        test_case_file = Utils.read_file_as_str(test_report.src_path)
788        if stdout == expect_output_str.strip() or stderr == expect_output_str.strip():
789            if ret_code == 0 or (ret_code == 255 and "/fail/" in test_case_file):
790                return ""
791            else:
792                return get_extra_error_message(ret_code)
793        msg = f'expect: [{expect_output_str}]\nbut got: [{stderr}]'
794        return msg
795
796    @staticmethod
797    def extra_check_with_assert(ret_code: int, stderr: Optional[str]) -> str:
798        if ret_code == 0 and stderr and "[ecmascript] Stack overflow" not in stderr:
799            return str(stderr)
800        return get_extra_error_message(ret_code)
801
802    def run_test_case_dir(self, test_reports: List[TestReport]) -> List[TestReport]:
803        with multiprocessing.Pool(processes=self.args.processes, initializer=init_worker,
804                                  initargs=(self.args,)) as pool:
805            results = pool.imap_unordered(self.run_test_case, test_reports)
806            results = list(results)
807            pool.close()
808            pool.join()
809
810        return results
811
812    def run_test_case(self, test_report: TestReport) -> Optional[TestReport]:
813        self.args = worker_wrapper_args
814        if self.args is None or test_report is None or not test_report.passed:
815            return test_report
816        if test_report.src_path.endswith(RegressTestConfig.TEST_TOOL_FILE_JS_NAME):
817            return None
818
819        abc_file = change_extension(
820            os.path.join(test_report.out_path, Utils.get_file_name(test_report.test_id)),
821            ".abc")
822        aot_file = change_extension(abc_file, "")
823        expect_file = change_extension(abc_file, ".out")
824        entry_point = Utils.get_file_only_name(RegressTestConfig.TEST_TOOL_FILE_JS_NAME)
825
826        os.environ["LD_LIBRARY_PATH"] = self.args.ld_library_path
827
828        # test environ LC_ALL/TZ
829        test_name = test_report.test_id.replace('regresstest/ark-regress/', '')
830        set_test_environ(test_report.src_path)
831        command = []
832        if self.args.ark_arch == RegressTestConfig.ARK_ARCH_LIST[1]:
833            qemu_tool = "qemu-aarch64"
834            qemu_arg1 = "-L"
835            qemu_arg2 = self.args.ark_arch_root
836            command = [qemu_tool, qemu_arg1, qemu_arg2]
837        command.append(self.args.ark_tool)
838        command.append(f"--icu-data-path={self.args.icu_path}")
839        command.append(f"--entry-point={entry_point}")
840        if self.args.ark_aot:
841            command.append(f"--stub-file={self.args.stub_path}")
842            command.append(f"--aot-file={aot_file}")
843        if self.args.disable_force_gc:
844            command.append("--enable-force-gc=false")
845        else:
846            command.extend(get_test_options(test_name, self.test_groups, RegressOption.NO_FORCE_GC))
847        command.extend(get_test_options(test_name, self.test_groups, RegressOption.ELEMENTS_KIND))
848        command.append(abc_file)
849
850        return self.run_test_case_file(command, test_report, expect_file)
851
852    def run_test_case_file(self, command, test_report: TestReport, expect_file) -> TestReport:
853        expect_file_exits = os.path.exists(expect_file)
854        step = StepResult(self.name, command=command)
855        if expect_file_exits:
856            self.run_test_case_with_expect(command, step, test_report, expect_file)
857        else:
858            self.run_test_case_with_assert(command, step, test_report)
859        test_report.steps.append(step)
860        test_report.passed = step.is_passed
861        return test_report
862
863    def run_test_case_with_expect(self, command, step: StepResult, test_report: TestReport, expect_file) -> None:
864        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
865                           lambda rt, out, err: self.extra_check_with_expect(rt, test_report, expect_file, out, err))
866
867    def run_test_case_with_assert(self, command, step: StepResult, test_report: TestReport) -> None:
868        Utils.exec_command(command, test_report.test_id, step, self.args.timeout,
869                           lambda rt, _, err: self.extra_check_with_assert(rt, err))
870
871
872class Stats:
873    def __init__(self, args, test_reports: List[TestReport]):
874        self.args = args
875        self.pass_count = 0
876        self.fail_count = 0
877        self.test_reports = test_reports
878        self.errors: Dict[str, List[TestReport]] = {}
879
880    def read_ignore_list(self) -> Optional[Set[str]]:
881        if self.args.ignore_list is None:
882            return None
883        with os.fdopen(os.open(self.args.ignore_list, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
884            lines = file_object.readlines()
885            lines = [line.strip() for line in lines if not line.strip().startswith('#')]
886        return set(lines)
887
888    def get_new_failures(self) -> Optional[List[TestReport]]:
889        ignore_list = self.read_ignore_list()
890        if ignore_list is None:
891            return None
892        new_failures: List[TestReport] = []
893        for test_report in self.test_reports:
894            if test_report and not test_report.passed and test_report.steps:
895                if test_report.test_id not in ignore_list:
896                    new_failures.append(test_report)
897        return new_failures
898
899    def statistics(self):
900        root = XTree.Element("testsuite")
901        root.set("name", "Regression")
902
903        result_file = open_write_file(self.args.out_result, False)
904        for test_report in self.test_reports:
905            if test_report is None:
906                continue
907            testcase = XTree.SubElement(root, "testcase")
908            testcase.set("name", f"{test_report.test_id}")
909            if test_report.passed:
910                write_result_file(f"PASS: {test_report.test_id}", result_file)
911                self.pass_count += 1
912            else:
913                self.fail_count += 1
914                write_result_file(f"FAIL: {test_report.test_id}", result_file)
915                failed_step = test_report.steps[-1]
916                if failed_step.step_name not in self.errors:
917                    self.errors[failed_step.step_name] = []
918                self.errors[failed_step.step_name].append(test_report)
919                XTree.SubElement(testcase, "failure").text = f"<![CDATA[{test_report.report()}]]>"
920
921        root.set("tests", f"{self.pass_count + self.fail_count}")
922        root.set("failures", f"{self.fail_count}")
923
924        tree = XTree.ElementTree(root)
925        tree.write(self.args.junit_report, xml_declaration=True, encoding="UTF-8")
926        result_file.close()
927
928    def print_result(self, args, steps):
929        result_file = open_write_file(args.out_result, True)
930        summary_duration = datetime.timedelta()
931        for step in steps:
932            output(f"Step {step.step_obj.name} - duration {step.step_obj.get_duration()}")
933            summary_duration += step.step_obj.get_duration()
934        msg = f'\npass count: {self.pass_count}'
935        write_result_file(msg, result_file)
936        output(msg)
937        msg = f'fail count: {self.fail_count}'
938        write_result_file(msg, result_file)
939        output(msg)
940        msg = f'total count: {self.fail_count + self.pass_count}'
941        write_result_file(msg, result_file)
942        output(msg)
943        msg = f'total used time is: {str(summary_duration)}'
944        write_result_file(msg, result_file)
945        output(msg)
946        result_file.close()
947
948    def print_failed_tests(self):
949        output("=== Failed tests ===")
950        for key, values in self.errors.items():
951            output(f"{key}: {len(values)} tests")
952
953
954def change_extension(path, new_ext: str):
955    base_path, ext = os.path.splitext(path)
956    if ext:
957        new_path = base_path + new_ext
958    else:
959        new_path = path + new_ext
960    return new_path
961
962
963def get_files_by_ext(start_dir, suffix):
964    result = []
965    for dir_path, dir_names, filenames in os.walk(start_dir):
966        for filename in filenames:
967            if filename.endswith(suffix):
968                result.append(os.path.join(dir_path, filename))
969    return result
970
971
972def read_expect_file(expect_file, test_case_file):
973    with os.fdopen(os.open(expect_file, os.O_RDWR, stat.S_IRUSR), "r+") as file_object:
974        lines = file_object.readlines()
975        lines = [line for line in lines if not line.strip().startswith('#')]
976        expect_output = ''.join(lines)
977        if test_case_file.startswith("/"):
978            test_case_file = test_case_file.lstrip("/")
979        expect_file = test_case_file.replace('regresstest/', '')
980        test_file_path = os.path.join(RegressTestConfig.REGRESS_BASE_TEST_DIR, expect_file)
981        expect_output_str = expect_output.replace('*%(basename)s', test_file_path)
982    return expect_output_str
983
984
985def open_write_file(file_path, append):
986    if append:
987        args = os.O_RDWR | os.O_CREAT | os.O_APPEND
988    else:
989        args = os.O_RDWR | os.O_CREAT
990    file_descriptor = os.open(file_path, args, stat.S_IRUSR | stat.S_IWUSR)
991    file_object = os.fdopen(file_descriptor, "w+")
992    return file_object
993
994
995def open_result_excel(file_path):
996    file_descriptor = os.open(file_path, os.O_RDWR | os.O_CREAT | os.O_APPEND, stat.S_IRUSR | stat.S_IWUSR)
997    file_object = os.fdopen(file_descriptor, "w+")
998    return file_object
999
1000
1001def get_file_source(file):
1002    with open(file, encoding='ISO-8859-1') as f:
1003        return f.read()
1004
1005
1006def set_test_environ(case):
1007    # intl environ LC_ALL
1008    if 'LC_ALL' in os.environ:
1009        del os.environ['LC_ALL']
1010    if 'TZ' in os.environ:
1011        del os.environ['TZ']
1012    if not os.path.exists(case):
1013        return
1014    source = get_file_source(case)
1015    env_match = ENV_PATTERN.search(source)
1016    if env_match:
1017        for env_pair in env_match.group(1).strip().split():
1018            var, value = env_pair.split('=')
1019            if var.find('TZ') >= 0:
1020                os.environ['TZ'] = value
1021            if var.find('LC_ALL') >= 0:
1022                os.environ['LC_ALL'] = value
1023            break
1024
1025
1026# pylint: disable=invalid-name,global-statement
1027worker_wrapper_args = None
1028
1029
1030def init_worker(args):
1031    global worker_wrapper_args
1032    worker_wrapper_args = args
1033
1034
1035def write_result_file(msg: str, result_file):
1036    result_file.write(f'{msg}\n')
1037
1038
1039def main(args):
1040    if not check_args(args):
1041        return 1
1042    output("\nStart regresstest........")
1043    steps: List[Type[RegressTestStep]] = [
1044        RegressTestRepoPrepare,
1045        RegressTestCompile,
1046    ]
1047    if args.ark_aot:
1048        if args.run_pgo:
1049            steps.append(RegressTestPgo)
1050        steps.append(RegressTestAot)
1051    steps.append(RegressTestRun)
1052
1053    test_reports: List[TestReport] = []
1054    for step in steps:
1055        test_reports = step.run(args, test_reports)
1056
1057    stats = Stats(args, test_reports)
1058    stats.statistics()
1059    stats.print_result(args, steps)
1060    stats.print_failed_tests()
1061    new_failures = stats.get_new_failures()
1062    if new_failures is None:
1063        return 0
1064    if len(new_failures) > 0:
1065        msg = [f"Found {len(new_failures)} new failures:"]
1066        for failure in new_failures:
1067            msg.append(f"\t{failure.test_id}")
1068        output("\n".join(msg))
1069    else:
1070        output("No new failures have been found")
1071    return len(new_failures)
1072
1073
1074if __name__ == "__main__":
1075    sys.exit(main(parse_args()))
1076