1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (C) 2024 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16import argparse
17import cgi
18import ctypes
19import http
20import json
21import os
22import re
23import signal
24import sys
25import threading
26import time
27import webbrowser
28from datetime import datetime
29from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
30from pathlib import Path
31from urllib.parse import urlparse
32
33VERSION = 'v1.0.0'
34LIB_NAME = 'libprofDumpJson'
35current = os.path.dirname(os.path.abspath(__file__))
36libs_directory = current
37dependencies = ['hmicuuc', 'hmicui18n']
38keep_running = True
39httpd = None
40
41
42def is_windows():
43    return os.name == 'nt'
44
45
46def is_linux():
47    return os.name == 'posix'
48
49
50def load_dependencies(load_dependencies_lib, lib_directory):
51    for dep in load_dependencies_lib:
52        dep_path = f"{lib_directory}/lib{dep}.so"
53        try:
54            ctypes.CDLL(dep_path)
55        except OSError as e:
56            raise OSError(f"Failed to load dependency {dep_path}: {e}")
57
58
59def load_library_os_compatible(load_lib_name, lib_directory=None, lib_dependencies=None):
60    if lib_directory is None:
61        lib_directory = Path.cwd()
62
63    if is_windows():
64        lib_extension = '.dll'
65        load_lib_path = f"{lib_directory}/{load_lib_name}{lib_extension}"
66        load_lib = ctypes.CDLL(str(load_lib_path))
67    else:
68        lib_extension = '.so'
69        load_lib_path = f"{lib_directory}/{load_lib_name}{lib_extension}"
70        if lib_dependencies:
71            load_dependencies(lib_dependencies, lib_directory)
72        try:
73            load_lib = ctypes.CDLL(str(load_lib_path))
74        except OSError as e:
75            raise OSError(f"Failed to load {load_lib_path}: {e}")
76
77    if load_lib is None:
78        raise ImportError(f"Could not load library {load_lib_name}.")
79
80    return load_lib
81
82
83lib = load_library_os_compatible(LIB_NAME, libs_directory, dependencies)
84ConvertApToJson = lib.ConvertApToJson
85ConvertApToJson.argtypes = [ctypes.c_char_p, ctypes.c_size_t]
86ConvertApToJson.restype = ctypes.c_size_t
87GetConvertResult = lib.GetConvertResult
88GetConvertResult.argtypes = [ctypes.c_char_p, ctypes.c_size_t]
89GetConvertResult.restype = ctypes.c_bool
90
91
92def get_content_type(file_extension):
93    if file_extension == '.js':
94        return 'application/javascript'
95    if file_extension == '.wasm':
96        return 'application/wasm'
97    if file_extension == '.json':
98        return 'application/json'
99    if file_extension == '.html':
100        return 'text/html'
101    if file_extension == '.svg':
102        return 'image/svg+xml'
103    return 'text/plain'
104
105
106def parse_ap_file(file_path):
107    encoded_path = file_path.encode('utf-8')
108    path_length = len(encoded_path)
109    written_size = ConvertApToJson(encoded_path, path_length)
110    if written_size > 0:
111        buffer = ctypes.create_string_buffer(int(written_size))
112        GetConvertResult(buffer, written_size)
113        json_result = buffer[:written_size].decode('utf-8')
114        return json_result
115    else:
116        return None
117
118
119def is_subpath(parent_path, child_path):
120    try:
121        relative_path = os.path.relpath(child_path, parent_path)
122        return relative_path != os.pardir
123    except ValueError:
124        return False
125
126
127def open_web(url):
128    webbrowser.open(url)
129
130
131class SafeFileHandler:
132    def __init__(self):
133        self.lock = threading.Lock()
134
135    def parse_ap_file_safely(self, file_path):
136        with self.lock:
137            return parse_ap_file(file_path)
138
139
140safe_handler = SafeFileHandler()
141
142
143class ThreadedHTTPServer(ThreadingHTTPServer):
144    daemon_threads = True
145
146
147class ApRequestHandler(SimpleHTTPRequestHandler):
148    global VERSION
149
150    def log_message(self, formate, *arg):
151        return
152
153    def simple_secure_filename(self, filename):
154        ascii_filename = filename.encode('ascii', errors='ignore').decode('ascii')
155        safe_chars = re.compile(r'[^\w\.\- ]')
156        safe_filename = safe_chars.sub('_', ascii_filename)
157        safe_filename = safe_filename.lstrip('.')
158        if not safe_filename:
159            return "index.html"
160        return safe_filename
161
162    def do_GET(self):
163        parse_result = urlparse(self.path)
164        if parse_result.path.startswith('/ap'):
165            self.application_handler(parse_result)
166        else:
167            self.send_error(http.HTTPStatus.NOT_FOUND, 'Not found')
168
169    def upload_handler(self):
170        form = cgi.FieldStorage(
171            fp=self.rfile,
172            headers=self.headers,
173            environ={
174                'REQUEST_METHOD': 'POST',
175                'CONTENT_TYPE': self.headers['Content-Type'],
176            }
177        )
178        if 'file' in form:
179            file_item = form['file']
180            filename = self.simple_secure_filename(file_item.filename)
181            save_path = os.path.join(current, 'uploads', datetime.now().strftime('%Y%m%d%H%M%S%f'))
182            file_path = os.path.join(save_path, filename)
183            if not os.path.exists(save_path):
184                os.makedirs(save_path, mode=0o755)
185            fd = os.open(file_path, os.O_WRONLY | os.O_CREAT, 0o644)
186            with os.fdopen(fd, 'wb') as f:
187                f.write(file_item.file.read())
188            ap_res = safe_handler.parse_ap_file_safely(file_path)
189            os.remove(file_path)
190            dir_path = os.path.dirname(file_path)
191            if not os.listdir(dir_path):
192                os.rmdir(dir_path)
193            self.send_response(http.HTTPStatus.OK)
194            self.send_header('Content-Type', 'application/json')
195            self.end_headers()
196            if ap_res is None:
197                response = {"success": False, "code": -1, "message": "parse ap failed", "data": ""}
198            else:
199                response = {"success": True, "code": 0, "message": "success", "data": ap_res}
200            self.wfile.write(bytes(json.dumps(response), "utf-8"))
201        else:
202            self.send_error(http.HTTPStatus.BAD_REQUEST, 'Bad request')
203
204    def do_POST(self):
205        parse_result = urlparse(self.path)
206        if parse_result.path.startswith('/ap/upload'):
207            self.upload_handler()
208        else:
209            self.send_error(http.HTTPStatus.NOT_FOUND, 'Not found')
210
211    def application_handler(self, parse_result):
212        file_path = parse_result.path[3:]
213        file_extension = os.path.splitext(file_path)[1]
214        safe_path = os.path.normpath(file_path).lstrip('/')
215        if is_windows():
216            safe_path = os.path.normpath(file_path).lstrip("\\")
217        full_path = os.path.join(current, safe_path)
218        if file_path == '' or file_path == '/' or file_path is None or safe_path.strip() == ".":
219            full_path = os.path.join(current, "index.html")
220            file_extension = '.html'
221        elif not is_subpath(current, full_path):
222            self.send_error(http.HTTPStatus.NOT_FOUND, 'Not found')
223            return
224        try:
225            with open(full_path, 'rb') as file:
226                content = file.read()
227            self.send_response(http.HTTPStatus.OK)
228            self.send_header('Content-type', get_content_type(file_extension))
229            self.send_header("Cross-Origin-Opener-Policy", "unsafe-none")
230            self.send_header("Cross-Origin-Embedder-Policy", "require-corp")
231            self.send_header("Access-Control-Allow-Origin", "*")
232            self.send_header("Access-Control-Allow-Credentials", "true")
233            self.send_header("Access-Control-Allow-Headers", "x-requested-with, authorization, blade-auth")
234            self.send_header("Access-Control-Allow-Methods", "*")
235            self.send_header("Access-Control-Max-Age", "3600")
236            self.send_header("data-version", VERSION)
237            self.end_headers()
238            self.wfile.write(content)
239        except FileNotFoundError:
240            self.send_error(http.HTTPStatus.NOT_FOUND, 'File not found')
241        except Exception as e:
242            self.log_message("ERROR", f"Error handling GET request: {str(e)}")
243            self.send_error(http.HTTPStatus.INTERNAL_SERVER_ERROR, 'Internal Server Error')
244
245
246def sigint_handler(sig, frame):
247    print("\nServer stopped by user")
248    global keep_running
249    keep_running = False
250    httpd.shutdown()
251    sys.exit(0)
252
253
254def main(port):
255    global httpd, keep_running
256    server_address = ('', port)
257    httpd = ThreadedHTTPServer(server_address, ApRequestHandler)
258    print(f'Starting http server on port {port}...')
259    thread = threading.Thread(target=httpd.serve_forever, daemon=True)
260    thread.start()
261    if is_windows():
262        open_web(f'http:127.0.0.1:{args.port}/ap/')
263    while keep_running:
264        time.sleep(1)
265
266
267if __name__ == '__main__':
268    parser = argparse.ArgumentParser(description="Run a ap dump HTTP server.")
269    parser.add_argument("-p", "--port", type=int, default=9001, help="Specify the server port.")
270    args = parser.parse_args()
271    signal.signal(signal.SIGINT, sigint_handler)
272    main(args.port)
273