xref: /arkcompiler/ets_frontend/test262/utils.py (revision 3af6ab5f)
1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3
4"""
5Copyright (c) 2021-2024 Huawei Device Co., Ltd.
6Licensed under the Apache License, Version 2.0 (the "License");
7you may not use this file except in compliance with the License.
8You may obtain a copy of the License at
9
10    http://www.apache.org/licenses/LICENSE-2.0
11
12Unless required by applicable law or agreed to in writing, software
13distributed under the License is distributed on an "AS IS" BASIS,
14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15See the License for the specific language governing permissions and
16limitations under the License.
17
18Description: Implement the public interface in the 262 use case
19"""
20
21import os
22import signal
23import sys
24import subprocess
25import datetime
26import time
27import shutil
28import platform
29import re
30from config import DEFAULT_TIMEOUT, DEFAULT_RETRIES
31
32TERM_NORMAL = '\033[0m'
33TERM_YELLOW = '\033[1;33m'
34TERM_BLUE = '\033[1;34m'
35TERM_RED = '\033[1;31m'
36TERM_FUCHSIA = '\033[1;35m'
37
38ABC_EXT = ".abc"
39TEMP_ABC_EXT = ".temp.abc"
40TXT_EXT = ".txt"
41TEMP_TXT_EXT = ".temp.txt"
42
43
44def output(retcode, msg):
45    if retcode == 0:
46        if msg != '':
47            print(str(msg))
48    elif retcode == -6:
49        sys.stderr.write("Aborted (core dumped)")
50    elif retcode == -4:
51        sys.stderr.write("Aborted (core dumped)")
52    elif retcode == -11:
53        sys.stderr.write("Segmentation fault (core dumped)")
54    elif msg != '':
55        sys.stderr.write(str(msg))
56    else:
57        sys.stderr.write("Unknown Error: " + str(retcode))
58
59
60def filter_arm_specific_errors(errs_str):
61    list_errs = []
62    for err in errs_str.split("\n"):
63        if err:
64            if ("memset will be used instead" not in err and
65                "This is the expected behaviour if you are running under QEMU" not in err and
66                "Can't connect to server" not in err):
67                list_errs.append(err)
68
69    if len(list_errs) != 0:
70        output(1, " ".join(list_errs))
71        return False
72
73    return True
74
75
76def exec_command(cmd_args, timeout=DEFAULT_TIMEOUT, custom_cwd=None):
77    proc = subprocess.Popen(cmd_args,
78                            stderr=subprocess.PIPE,
79                            stdout=subprocess.PIPE,
80                            close_fds=True,
81                            start_new_session=True,
82                            cwd=custom_cwd)
83    cmd_string = " ".join(cmd_args)
84    code_format = 'utf-8'
85    if platform.system() == "Windows":
86        code_format = 'gbk'
87
88    try:
89        (output_res, errs) = proc.communicate(timeout=timeout)
90        ret_code = proc.poll()
91
92        errs_str = errs.decode(code_format, 'ignore')
93        if filter_arm_specific_errors(errs_str):
94            errs = None
95        else:
96            return 1
97
98        if ret_code and ret_code != 1:
99            code = ret_code
100            msg = f"Command {cmd_string}: \n"
101            msg += f"error: {errs_str}"
102        else:
103            code = 0
104            msg = str(output_res.decode(code_format, 'ignore'))
105
106    except subprocess.TimeoutExpired:
107        proc.kill()
108        proc.terminate()
109        os.kill(proc.pid, signal.SIGTERM)
110        code = 1
111        msg = f"Timeout:'{cmd_string}' timed out after' {str(timeout)} seconds"
112    except Exception as err:
113        code = 1
114        msg = f"{cmd_string}: unknown error: {str(err)}"
115    output(code, msg)
116    return code
117
118
119def print_command(cmd_args):
120    sys.stderr.write("\n")
121    sys.stderr.write(" ".join(cmd_args))
122    sys.stderr.write("\n")
123
124
125# for debug use, to keep aot file
126def run_command(cmd_args):
127    return subprocess.run(" ".join(cmd_args))
128
129
130def get_formated_path(path):
131    # In the case of Windows, it is necessary to convert ' \\' to '/', otherwise there will be a crash or the file cannot be found
132    # Maintain consistent interface path with DevEco Studio
133    if platform.system() == "Windows":
134        return path.replace("\\", "/")
135    return path
136
137
138def current_time():
139    return datetime.datetime.now().strftime('%m-%d %H:%M:%S.%f')
140
141
142class Logging():
143    def __init__(self):
144        self.is_logging = True
145
146    def debug(self, info):
147        if self.is_logging:
148            print(
149                f'{current_time()} D:>>>  {TERM_BLUE}{str(info)}{TERM_NORMAL}')
150
151    def info(self, info):
152        if self.is_logging:
153            if len(str(info)) > 100:
154                print(f'{current_time()} I:>>> \n{str(info)} ')
155            else:
156                print(f'{current_time()} I:>>>    {str(info)} ')
157
158
159LOGGING = Logging()
160
161
162class Command():
163    def __init__(self, cmd):
164        self.cmd = cmd
165
166    def run(self):
167        LOGGING.debug("command: " + self.cmd)
168        out = os.popen(self.cmd).read()
169        LOGGING.info(out)
170        return out
171
172
173def run_cmd(command):
174    cmd = Command(command)
175    return cmd.run()
176
177
178class CommandCwd():
179    def __init__(self, cmds, cwd):
180        self.cmds = cmds
181        self.cwd = cwd
182
183    def run(self):
184        cmd = " ".join(self.cmds)
185        LOGGING.debug("command: " + cmd + " | " + "dir: " + self.cwd)
186        if platform.system() == "Windows" :
187            proc = subprocess.Popen(self.cmds, cwd=self.cwd ,shell=True)
188        else :
189            proc = subprocess.Popen(self.cmds, cwd=self.cwd)
190        return proc.wait()
191
192
193def run_cmd_cwd(commands, cwd=os.getcwd()):
194    cmd = CommandCwd(commands, cwd)
195    return cmd.run()
196
197
198def sleep(duration):
199    LOGGING.debug("sleeping %d" % duration)
200    time.sleep(duration)
201
202
203def write_file(save_file, result):
204    LOGGING.debug(f"write file:{save_file}")
205    with open(save_file, "a+") as file:
206        file.write(result + "\n")
207        file.flush()
208
209
210def remove_dir(path):
211    if os.path.exists(path):
212        shutil.rmtree(path)
213
214
215def remove_file(file):
216    if os.path.exists(file):
217        os.remove(file)
218
219
220def mkdir(path):
221    if not os.path.exists(path):
222        os.makedirs(path)
223
224
225def report_command(cmd_type, cmd, env=None):
226    sys.stderr.write(f'{TERM_BLUE}{cmd_type}{TERM_NORMAL}\n')
227    if env is not None:
228        sys.stderr.write(''.join(f'{TERM_BLUE}{var}={val} \\{TERM_NORMAL}\n'
229                                 for var, val in sorted(env.items())))
230    cmd_str = (f'{TERM_NORMAL}\n\t{TERM_BLUE}').join(cmd)
231    sys.stderr.write(f'\t{TERM_BLUE}{cmd_str}{TERM_NORMAL}\n')
232    sys.stderr.write("\n")
233
234
235def git_clone(git_url, code_dir):
236    cmd = ['git', 'clone', git_url, code_dir]
237    retries = 1
238    while retries <= DEFAULT_RETRIES:
239        ret = run_cmd_cwd(cmd)
240        if ret == 0:
241            break
242        else:
243            print(f"\n warning: Atempt: #{retries} to clone '{git_url}' failed. Try cloining again")
244            retries += 1
245    assert not ret, f"\n error: Cloning '{git_url}' failed."
246
247
248def git_checkout(git_bash, cwd):
249    cmd = ['git', 'checkout', '-f', git_bash]
250    ret = run_cmd_cwd(cmd, cwd)
251    assert not ret, f"\n error: git checkout '{git_bash}' failed."
252
253
254def git_apply(patch_file, cwd):
255    cmd = ['git', 'apply', patch_file]
256    ret = run_cmd_cwd(cmd, cwd)
257    assert not ret, f"\n error: Failed to apply '{patch_file}'"
258
259
260def git_clean(cwd):
261    cmd = ['git', 'checkout', '--', '.']
262    run_cmd_cwd(cmd, cwd)
263
264
265def npm_install(cwd):
266    cmd = ['npm', 'install']
267    ret = run_cmd_cwd(cmd, cwd)
268    assert not ret, f"\n error: Failed to 'npm install'"
269
270
271def search_dependency(file, directory):
272    for root, dirs, files in os.walk(directory, topdown=True):
273        for f in files:
274            if f == file:
275                return os.path.join(root, f)
276    return "FILE_NOT_FOUND"
277
278
279def collect_module_dependencies(file, directory, traversedDependencies):
280    dependencies = []
281    traversedDependencies.append(file)
282    with open(file, 'r', encoding='utf-8') as f:
283        content = f.read()
284        module_import_list = re.findall(r'(import|from)(?:\s*)\(?(\'(\.\/.*)\'|"(\.\/.*)")\)?', content)
285
286        for result in list(set(module_import_list)):
287            specifier = (result[2] if len(result[2]) != 0 else result[3]).lstrip('./')
288            if os.path.basename(file) is not specifier:
289                dependency = search_dependency(specifier, directory)
290                if dependency == "FILE_NOT_FOUND":
291                    continue
292
293                if dependency not in traversedDependencies:
294                    dependencies.extend(collect_module_dependencies(dependency, directory,
295                                                                    list(set(traversedDependencies))))
296                dependencies.append(dependency)
297
298    return dependencies