1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3 4""" 5Copyright (c) 2021-2024 Huawei Device Co., Ltd. 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17 18Description: Implement the public interface in the 262 use case 19""" 20 21import os 22import signal 23import sys 24import subprocess 25import datetime 26import time 27import shutil 28import platform 29import re 30from config import DEFAULT_TIMEOUT, DEFAULT_RETRIES 31 32TERM_NORMAL = '\033[0m' 33TERM_YELLOW = '\033[1;33m' 34TERM_BLUE = '\033[1;34m' 35TERM_RED = '\033[1;31m' 36TERM_FUCHSIA = '\033[1;35m' 37 38ABC_EXT = ".abc" 39TEMP_ABC_EXT = ".temp.abc" 40TXT_EXT = ".txt" 41TEMP_TXT_EXT = ".temp.txt" 42 43 44def output(retcode, msg): 45 if retcode == 0: 46 if msg != '': 47 print(str(msg)) 48 elif retcode == -6: 49 sys.stderr.write("Aborted (core dumped)") 50 elif retcode == -4: 51 sys.stderr.write("Aborted (core dumped)") 52 elif retcode == -11: 53 sys.stderr.write("Segmentation fault (core dumped)") 54 elif msg != '': 55 sys.stderr.write(str(msg)) 56 else: 57 sys.stderr.write("Unknown Error: " + str(retcode)) 58 59 60def filter_arm_specific_errors(errs_str): 61 list_errs = [] 62 for err in errs_str.split("\n"): 63 if err: 64 if ("memset will be used instead" not in err and 65 "This is the expected behaviour if you are running under QEMU" not in err and 66 "Can't connect to server" not in err): 67 list_errs.append(err) 68 69 if len(list_errs) != 0: 70 output(1, " ".join(list_errs)) 71 return False 72 73 return True 74 75 76def exec_command(cmd_args, timeout=DEFAULT_TIMEOUT, custom_cwd=None): 77 proc = subprocess.Popen(cmd_args, 78 stderr=subprocess.PIPE, 79 stdout=subprocess.PIPE, 80 close_fds=True, 81 start_new_session=True, 82 cwd=custom_cwd) 83 cmd_string = " ".join(cmd_args) 84 code_format = 'utf-8' 85 if platform.system() == "Windows": 86 code_format = 'gbk' 87 88 try: 89 (output_res, errs) = proc.communicate(timeout=timeout) 90 ret_code = proc.poll() 91 92 errs_str = errs.decode(code_format, 'ignore') 93 if filter_arm_specific_errors(errs_str): 94 errs = None 95 else: 96 return 1 97 98 if ret_code and ret_code != 1: 99 code = ret_code 100 msg = f"Command {cmd_string}: \n" 101 msg += f"error: {errs_str}" 102 else: 103 code = 0 104 msg = str(output_res.decode(code_format, 'ignore')) 105 106 except subprocess.TimeoutExpired: 107 proc.kill() 108 proc.terminate() 109 os.kill(proc.pid, signal.SIGTERM) 110 code = 1 111 msg = f"Timeout:'{cmd_string}' timed out after' {str(timeout)} seconds" 112 except Exception as err: 113 code = 1 114 msg = f"{cmd_string}: unknown error: {str(err)}" 115 output(code, msg) 116 return code 117 118 119def print_command(cmd_args): 120 sys.stderr.write("\n") 121 sys.stderr.write(" ".join(cmd_args)) 122 sys.stderr.write("\n") 123 124 125# for debug use, to keep aot file 126def run_command(cmd_args): 127 return subprocess.run(" ".join(cmd_args)) 128 129 130def get_formated_path(path): 131 # In the case of Windows, it is necessary to convert ' \\' to '/', otherwise there will be a crash or the file cannot be found 132 # Maintain consistent interface path with DevEco Studio 133 if platform.system() == "Windows": 134 return path.replace("\\", "/") 135 return path 136 137 138def current_time(): 139 return datetime.datetime.now().strftime('%m-%d %H:%M:%S.%f') 140 141 142class Logging(): 143 def __init__(self): 144 self.is_logging = True 145 146 def debug(self, info): 147 if self.is_logging: 148 print( 149 f'{current_time()} D:>>> {TERM_BLUE}{str(info)}{TERM_NORMAL}') 150 151 def info(self, info): 152 if self.is_logging: 153 if len(str(info)) > 100: 154 print(f'{current_time()} I:>>> \n{str(info)} ') 155 else: 156 print(f'{current_time()} I:>>> {str(info)} ') 157 158 159LOGGING = Logging() 160 161 162class Command(): 163 def __init__(self, cmd): 164 self.cmd = cmd 165 166 def run(self): 167 LOGGING.debug("command: " + self.cmd) 168 out = os.popen(self.cmd).read() 169 LOGGING.info(out) 170 return out 171 172 173def run_cmd(command): 174 cmd = Command(command) 175 return cmd.run() 176 177 178class CommandCwd(): 179 def __init__(self, cmds, cwd): 180 self.cmds = cmds 181 self.cwd = cwd 182 183 def run(self): 184 cmd = " ".join(self.cmds) 185 LOGGING.debug("command: " + cmd + " | " + "dir: " + self.cwd) 186 if platform.system() == "Windows" : 187 proc = subprocess.Popen(self.cmds, cwd=self.cwd ,shell=True) 188 else : 189 proc = subprocess.Popen(self.cmds, cwd=self.cwd) 190 return proc.wait() 191 192 193def run_cmd_cwd(commands, cwd=os.getcwd()): 194 cmd = CommandCwd(commands, cwd) 195 return cmd.run() 196 197 198def sleep(duration): 199 LOGGING.debug("sleeping %d" % duration) 200 time.sleep(duration) 201 202 203def write_file(save_file, result): 204 LOGGING.debug(f"write file:{save_file}") 205 with open(save_file, "a+") as file: 206 file.write(result + "\n") 207 file.flush() 208 209 210def remove_dir(path): 211 if os.path.exists(path): 212 shutil.rmtree(path) 213 214 215def remove_file(file): 216 if os.path.exists(file): 217 os.remove(file) 218 219 220def mkdir(path): 221 if not os.path.exists(path): 222 os.makedirs(path) 223 224 225def report_command(cmd_type, cmd, env=None): 226 sys.stderr.write(f'{TERM_BLUE}{cmd_type}{TERM_NORMAL}\n') 227 if env is not None: 228 sys.stderr.write(''.join(f'{TERM_BLUE}{var}={val} \\{TERM_NORMAL}\n' 229 for var, val in sorted(env.items()))) 230 cmd_str = (f'{TERM_NORMAL}\n\t{TERM_BLUE}').join(cmd) 231 sys.stderr.write(f'\t{TERM_BLUE}{cmd_str}{TERM_NORMAL}\n') 232 sys.stderr.write("\n") 233 234 235def git_clone(git_url, code_dir): 236 cmd = ['git', 'clone', git_url, code_dir] 237 retries = 1 238 while retries <= DEFAULT_RETRIES: 239 ret = run_cmd_cwd(cmd) 240 if ret == 0: 241 break 242 else: 243 print(f"\n warning: Atempt: #{retries} to clone '{git_url}' failed. Try cloining again") 244 retries += 1 245 assert not ret, f"\n error: Cloning '{git_url}' failed." 246 247 248def git_checkout(git_bash, cwd): 249 cmd = ['git', 'checkout', '-f', git_bash] 250 ret = run_cmd_cwd(cmd, cwd) 251 assert not ret, f"\n error: git checkout '{git_bash}' failed." 252 253 254def git_apply(patch_file, cwd): 255 cmd = ['git', 'apply', patch_file] 256 ret = run_cmd_cwd(cmd, cwd) 257 assert not ret, f"\n error: Failed to apply '{patch_file}'" 258 259 260def git_clean(cwd): 261 cmd = ['git', 'checkout', '--', '.'] 262 run_cmd_cwd(cmd, cwd) 263 264 265def npm_install(cwd): 266 cmd = ['npm', 'install'] 267 ret = run_cmd_cwd(cmd, cwd) 268 assert not ret, f"\n error: Failed to 'npm install'" 269 270 271def search_dependency(file, directory): 272 for root, dirs, files in os.walk(directory, topdown=True): 273 for f in files: 274 if f == file: 275 return os.path.join(root, f) 276 return "FILE_NOT_FOUND" 277 278 279def collect_module_dependencies(file, directory, traversedDependencies): 280 dependencies = [] 281 traversedDependencies.append(file) 282 with open(file, 'r', encoding='utf-8') as f: 283 content = f.read() 284 module_import_list = re.findall(r'(import|from)(?:\s*)\(?(\'(\.\/.*)\'|"(\.\/.*)")\)?', content) 285 286 for result in list(set(module_import_list)): 287 specifier = (result[2] if len(result[2]) != 0 else result[3]).lstrip('./') 288 if os.path.basename(file) is not specifier: 289 dependency = search_dependency(specifier, directory) 290 if dependency == "FILE_NOT_FOUND": 291 continue 292 293 if dependency not in traversedDependencies: 294 dependencies.extend(collect_module_dependencies(dependency, directory, 295 list(set(traversedDependencies)))) 296 dependencies.append(dependency) 297 298 return dependencies