1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (C) 2024 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16import argparse 17import cgi 18import ctypes 19import http 20import json 21import os 22import re 23import signal 24import sys 25import threading 26import time 27import webbrowser 28from datetime import datetime 29from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler 30from pathlib import Path 31from urllib.parse import urlparse 32 33VERSION = 'v1.0.0' 34LIB_NAME = 'libprofDumpJson' 35current = os.path.dirname(os.path.abspath(__file__)) 36libs_directory = current 37dependencies = ['hmicuuc', 'hmicui18n'] 38keep_running = True 39httpd = None 40 41 42def is_windows(): 43 return os.name == 'nt' 44 45 46def is_linux(): 47 return os.name == 'posix' 48 49 50def load_dependencies(load_dependencies_lib, lib_directory): 51 for dep in load_dependencies_lib: 52 dep_path = f"{lib_directory}/lib{dep}.so" 53 try: 54 ctypes.CDLL(dep_path) 55 except OSError as e: 56 raise OSError(f"Failed to load dependency {dep_path}: {e}") 57 58 59def load_library_os_compatible(load_lib_name, lib_directory=None, lib_dependencies=None): 60 if lib_directory is None: 61 lib_directory = Path.cwd() 62 63 if is_windows(): 64 lib_extension = '.dll' 65 load_lib_path = f"{lib_directory}/{load_lib_name}{lib_extension}" 66 load_lib = ctypes.CDLL(str(load_lib_path)) 67 else: 68 lib_extension = '.so' 69 load_lib_path = f"{lib_directory}/{load_lib_name}{lib_extension}" 70 if lib_dependencies: 71 load_dependencies(lib_dependencies, lib_directory) 72 try: 73 load_lib = ctypes.CDLL(str(load_lib_path)) 74 except OSError as e: 75 raise OSError(f"Failed to load {load_lib_path}: {e}") 76 77 if load_lib is None: 78 raise ImportError(f"Could not load library {load_lib_name}.") 79 80 return load_lib 81 82 83lib = load_library_os_compatible(LIB_NAME, libs_directory, dependencies) 84ConvertApToJson = lib.ConvertApToJson 85ConvertApToJson.argtypes = [ctypes.c_char_p, ctypes.c_size_t] 86ConvertApToJson.restype = ctypes.c_size_t 87GetConvertResult = lib.GetConvertResult 88GetConvertResult.argtypes = [ctypes.c_char_p, ctypes.c_size_t] 89GetConvertResult.restype = ctypes.c_bool 90 91 92def get_content_type(file_extension): 93 if file_extension == '.js': 94 return 'application/javascript' 95 if file_extension == '.wasm': 96 return 'application/wasm' 97 if file_extension == '.json': 98 return 'application/json' 99 if file_extension == '.html': 100 return 'text/html' 101 if file_extension == '.svg': 102 return 'image/svg+xml' 103 return 'text/plain' 104 105 106def parse_ap_file(file_path): 107 encoded_path = file_path.encode('utf-8') 108 path_length = len(encoded_path) 109 written_size = ConvertApToJson(encoded_path, path_length) 110 if written_size > 0: 111 buffer = ctypes.create_string_buffer(int(written_size)) 112 GetConvertResult(buffer, written_size) 113 json_result = buffer[:written_size].decode('utf-8') 114 return json_result 115 else: 116 return None 117 118 119def is_subpath(parent_path, child_path): 120 try: 121 relative_path = os.path.relpath(child_path, parent_path) 122 return relative_path != os.pardir 123 except ValueError: 124 return False 125 126 127def open_web(url): 128 webbrowser.open(url) 129 130 131class SafeFileHandler: 132 def __init__(self): 133 self.lock = threading.Lock() 134 135 def parse_ap_file_safely(self, file_path): 136 with self.lock: 137 return parse_ap_file(file_path) 138 139 140safe_handler = SafeFileHandler() 141 142 143class ThreadedHTTPServer(ThreadingHTTPServer): 144 daemon_threads = True 145 146 147class ApRequestHandler(SimpleHTTPRequestHandler): 148 global VERSION 149 150 def log_message(self, formate, *arg): 151 return 152 153 def simple_secure_filename(self, filename): 154 ascii_filename = filename.encode('ascii', errors='ignore').decode('ascii') 155 safe_chars = re.compile(r'[^\w\.\- ]') 156 safe_filename = safe_chars.sub('_', ascii_filename) 157 safe_filename = safe_filename.lstrip('.') 158 if not safe_filename: 159 return "index.html" 160 return safe_filename 161 162 def do_GET(self): 163 parse_result = urlparse(self.path) 164 if parse_result.path.startswith('/ap'): 165 self.application_handler(parse_result) 166 else: 167 self.send_error(http.HTTPStatus.NOT_FOUND, 'Not found') 168 169 def upload_handler(self): 170 form = cgi.FieldStorage( 171 fp=self.rfile, 172 headers=self.headers, 173 environ={ 174 'REQUEST_METHOD': 'POST', 175 'CONTENT_TYPE': self.headers['Content-Type'], 176 } 177 ) 178 if 'file' in form: 179 file_item = form['file'] 180 filename = self.simple_secure_filename(file_item.filename) 181 save_path = os.path.join(current, 'uploads', datetime.now().strftime('%Y%m%d%H%M%S%f')) 182 file_path = os.path.join(save_path, filename) 183 if not os.path.exists(save_path): 184 os.makedirs(save_path, mode=0o755) 185 fd = os.open(file_path, os.O_WRONLY | os.O_CREAT, 0o644) 186 with os.fdopen(fd, 'wb') as f: 187 f.write(file_item.file.read()) 188 ap_res = safe_handler.parse_ap_file_safely(file_path) 189 os.remove(file_path) 190 dir_path = os.path.dirname(file_path) 191 if not os.listdir(dir_path): 192 os.rmdir(dir_path) 193 self.send_response(http.HTTPStatus.OK) 194 self.send_header('Content-Type', 'application/json') 195 self.end_headers() 196 if ap_res is None: 197 response = {"success": False, "code": -1, "message": "parse ap failed", "data": ""} 198 else: 199 response = {"success": True, "code": 0, "message": "success", "data": ap_res} 200 self.wfile.write(bytes(json.dumps(response), "utf-8")) 201 else: 202 self.send_error(http.HTTPStatus.BAD_REQUEST, 'Bad request') 203 204 def do_POST(self): 205 parse_result = urlparse(self.path) 206 if parse_result.path.startswith('/ap/upload'): 207 self.upload_handler() 208 else: 209 self.send_error(http.HTTPStatus.NOT_FOUND, 'Not found') 210 211 def application_handler(self, parse_result): 212 file_path = parse_result.path[3:] 213 file_extension = os.path.splitext(file_path)[1] 214 safe_path = os.path.normpath(file_path).lstrip('/') 215 if is_windows(): 216 safe_path = os.path.normpath(file_path).lstrip("\\") 217 full_path = os.path.join(current, safe_path) 218 if file_path == '' or file_path == '/' or file_path is None or safe_path.strip() == ".": 219 full_path = os.path.join(current, "index.html") 220 file_extension = '.html' 221 elif not is_subpath(current, full_path): 222 self.send_error(http.HTTPStatus.NOT_FOUND, 'Not found') 223 return 224 try: 225 with open(full_path, 'rb') as file: 226 content = file.read() 227 self.send_response(http.HTTPStatus.OK) 228 self.send_header('Content-type', get_content_type(file_extension)) 229 self.send_header("Cross-Origin-Opener-Policy", "unsafe-none") 230 self.send_header("Cross-Origin-Embedder-Policy", "require-corp") 231 self.send_header("Access-Control-Allow-Origin", "*") 232 self.send_header("Access-Control-Allow-Credentials", "true") 233 self.send_header("Access-Control-Allow-Headers", "x-requested-with, authorization, blade-auth") 234 self.send_header("Access-Control-Allow-Methods", "*") 235 self.send_header("Access-Control-Max-Age", "3600") 236 self.send_header("data-version", VERSION) 237 self.end_headers() 238 self.wfile.write(content) 239 except FileNotFoundError: 240 self.send_error(http.HTTPStatus.NOT_FOUND, 'File not found') 241 except Exception as e: 242 self.log_message("ERROR", f"Error handling GET request: {str(e)}") 243 self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, 'Internal Server Error') 244 245 246def sigint_handler(sig, frame): 247 print("\nServer stopped by user") 248 global keep_running 249 keep_running = False 250 httpd.shutdown() 251 sys.exit(0) 252 253 254def main(port): 255 global httpd, keep_running 256 server_address = ('', port) 257 httpd = ThreadedHTTPServer(server_address, ApRequestHandler) 258 print(f'Starting http server on port {port}...') 259 thread = threading.Thread(target=httpd.serve_forever, daemon=True) 260 thread.start() 261 if is_windows(): 262 open_web(f'http:127.0.0.1:{args.port}/ap/') 263 while keep_running: 264 time.sleep(1) 265 266 267if __name__ == '__main__': 268 parser = argparse.ArgumentParser(description="Run a ap dump HTTP server.") 269 parser.add_argument("-p", "--port", type=int, default=9001, help="Specify the server port.") 270 args = parser.parse_args() 271 signal.signal(signal.SIGINT, sigint_handler) 272 main(args.port) 273