1#!/usr/bin/env python3 2# -*- coding: utf-8 -*- 3# Copyright (c) 2023 Huawei Device Co., Ltd. 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15import logging 16import os 17import json 18import argparse 19import stat 20from collections import Counter 21 22 23def get_args(): 24 parser = argparse.ArgumentParser(add_help=True) 25 parser.add_argument( 26 "-p", 27 "--project_path", 28 default=r"", 29 type=str, 30 help="root path of project. default: ./", 31 ) 32 args = parser.parse_args() 33 return args 34 35 36def dict_to_json(output_path: str, syscaps_dict: dict): 37 """ 38 output diff product syscaps json to output path 39 :param output_path: 40 :param syscaps_dict: 41 :return: 42 """ 43 print("start generate syscap json...") 44 flags = os.O_WRONLY | os.O_CREAT 45 modes = stat.S_IWUSR | stat.S_IRUSR 46 for product_name, syscaps_list in syscaps_dict.items(): 47 counts = Counter(syscaps_list) 48 duplicates = [elem for elem, count in counts.items() if count > 1] 49 if len(duplicates) > 0: 50 print("[Warning] repeated syscap: {}".format(",".join(duplicates))) 51 syscaps_list = list(set(syscaps_list)) 52 filename = os.path.join(output_path, f'{product_name}.json') 53 with os.fdopen(os.open(filename, flags, modes), 'w') as f: 54 json.dump({'SysCaps': syscaps_list}, f, indent=4) 55 print("end...") 56 57 58def check_syscap(syscap_str: str): 59 syscap = syscap_str.split(' = ') 60 if syscap[-1].lower() == 'false': 61 return False 62 else: 63 return syscap[0] 64 65 66def bundle_syscap_list_handler(bundle_syscap_list: list, component_syscaps_list: list): 67 """ 68 check syscap 69 :param bundle_syscap_list: 70 :param component_syscaps_list: 71 :return: 72 """ 73 for component_syscap in component_syscaps_list: 74 component_syscap = check_syscap(component_syscap) 75 if component_syscap: 76 bundle_syscap_list.append(component_syscap) 77 return bundle_syscap_list 78 79 80def read_json_file(bundle_json_path: str): 81 bundle_syscap_list = list() 82 error_list = dict() 83 try: 84 with open(bundle_json_path, 'r', encoding='utf-8') as f: 85 bundle_data = json.load(f) 86 component_data = bundle_data.get("component") 87 component_syscaps_list = component_data.get("syscap") 88 if component_syscaps_list: 89 bundle_syscap_list = bundle_syscap_list_handler(bundle_syscap_list, component_syscaps_list) 90 except FileNotFoundError as e: 91 error_list[bundle_json_path] = str(e) 92 except Exception as e: 93 error_list[bundle_json_path] = str(e) 94 return bundle_syscap_list, error_list 95 96 97def get_all_components_path(components_file_path: str): 98 try: 99 with open(components_file_path, 'r', encoding='utf-8') as f: 100 path_dict = json.load(f) 101 return path_dict 102 except FileNotFoundError: 103 logging.error(r"PATH ERROR") 104 return {} 105 106 107def path_component_to_bundle(path: str) -> str: 108 bundle_json_path = os.path.join(path, 'bundle.json') 109 return bundle_json_path 110 111 112def find_false_syscap(syscap_l: list): 113 res = [] 114 for syscap in syscap_l: 115 if not check_syscap(syscap): 116 res.append(syscap.split('=')[0].strip()) 117 return res 118 119 120def handle_bundle_json_file(component_path_dict: dict, product_define_dict: dict): 121 """ 122 from product required part bundle.json to all products parts list 123 :param component_path_dict: 124 :return: all products parts list 125 """ 126 print("start collect syscap path...") 127 syscap_dict = dict() 128 errors_list = list() 129 for product_name, path_dict in component_path_dict.items(): 130 bundles_list = list() 131 for component in path_dict.keys(): 132 path = path_dict.get(component) 133 bundle_json_path = path_component_to_bundle(path) 134 bundle_syscap_list, error_list = read_json_file(bundle_json_path) 135 if product_define_dict.get(product_name).get(component): 136 false_syscap = find_false_syscap(product_define_dict.get(product_name).get(component)) 137 bundle_syscap_l = [sc for sc in bundle_syscap_list if sc not in false_syscap] 138 bundles_list.extend(bundle_syscap_l) 139 else: 140 bundles_list.extend(bundle_syscap_list) 141 errors_list.extend(error_list) 142 syscap_dict.update({product_name: bundles_list}) 143 return syscap_dict, errors_list 144 145 146def format_component_path(component_path: str): 147 sep_list = ['\\', '/'] 148 sep_list.remove(os.sep) 149 component_path = component_path.replace(sep_list[0], os.sep) 150 return component_path 151 152 153def traversal_path(parts_path_info: dict, project_path: str, product_define_dict): 154 component_path_dict = dict() 155 for product_name, component_name_list in product_define_dict.items(): 156 component_paths = dict() 157 for component_name in component_name_list.keys(): 158 component_relpath = parts_path_info.get(component_name) 159 if component_relpath: 160 component_path = os.path.join(project_path, component_relpath) 161 component_path = format_component_path(component_path) 162 component_paths[component_name] = component_path 163 else: 164 logging.error(f'can\'t find component_name : {component_name}') 165 component_path_dict.update({product_name: component_paths}) 166 return component_path_dict 167 168 169def collect_all_product_component_syscap_dict(parts_path_info: dict, project_path: str, product_define_dict): 170 """ 171 get all syscap to dict 172 :param parts_path_info: 173 :param project_path: 174 :param product_define_dict: 175 :return: 176 """ 177 if parts_path_info: 178 print("start collect component path...") 179 component_path_dict = traversal_path(parts_path_info, project_path, product_define_dict) 180 syscap_dict, errors_list = handle_bundle_json_file(component_path_dict, product_define_dict) 181 return syscap_dict, errors_list 182 else: 183 return 0, 0 184 185 186def get_subsystem_info(subsystem_config_file, source_root_dir): 187 """ 188 get subsystem name and subsystem path from oh/build/subsystem_config.json 189 :param subsystem_config_file: subsystem_config_file path 190 :param source_root_dir: oh project path 191 :return: subsystem name and subsystem path 192 """ 193 subsystem_configs = scan(subsystem_config_file, source_root_dir) 194 _all_components_path = [] 195 for _, value in subsystem_configs.get('subsystem').items(): 196 for i in value.get('build_files'): 197 _all_components_path.append(i) 198 return subsystem_configs.get('subsystem') 199 200 201def _check_path_prefix(paths): 202 allow_path_prefix = ['vendor', 'device'] 203 result = list( 204 filter(lambda x: x is False, 205 map(lambda p: p.split('/')[0] in allow_path_prefix, paths))) 206 return len(result) <= 1 207 208 209def traversal_files(subsystem_path, _files): 210 for item in os.scandir(subsystem_path): 211 if is_symlink(item.path): 212 continue 213 elif item.is_file() and item.name == 'ohos.build': 214 _files.append(item.path) 215 elif item.is_file() and item.name == 'bundle.json': 216 _files.append(item.path) 217 elif item.is_dir(): 218 traversal_files(item, _files) 219 return _files 220 221 222def get_file_type(file_path): 223 if os.path.islink(file_path): 224 return 'symlink' 225 elif os.path.isfile(file_path): 226 return 'file' 227 elif os.path.isdir(file_path): 228 return 'directory' 229 else: 230 return 'unknown' 231 232 233def is_symlink(file_path): 234 file_type = get_file_type(file_path) 235 if file_type == 'symlink': 236 link_target = os.readlink(file_path) 237 return link_target != file_type 238 return False 239 240 241def _scan_build_file(subsystem_path): 242 _files = [] 243 _bundle_files = [] 244 try: 245 _files = traversal_files(subsystem_path, _files) 246 except FileNotFoundError: 247 print(f"read file {subsystem_path} failed.") 248 return _files 249 250 251def scan(subsystem_config_file, source_root_dir): 252 subsystem_infos = _read_config(subsystem_config_file) 253 _default_subsystem = {"build": "build"} 254 subsystem_infos.update(_default_subsystem) 255 no_src_subsystem = {} 256 _build_configs = {} 257 for key, val in subsystem_infos.items(): 258 _all_build_config_files = [] 259 if not isinstance(val, list): 260 val = [val] 261 else: 262 if not _check_path_prefix(val): 263 raise Exception("subsystem '{}' path configuration is incorrect.".format(key), "2013") 264 _info = {'path': val} 265 for _path in val: 266 _subsystem_path = os.path.join(source_root_dir, _path) 267 _build_config_files = _scan_build_file(_subsystem_path) 268 _all_build_config_files.extend(_build_config_files) 269 if _all_build_config_files: 270 _info['build_files'] = _all_build_config_files 271 _build_configs[key] = _info 272 else: 273 no_src_subsystem[key] = val 274 275 scan_result = { 276 'source_path': source_root_dir, 277 'subsystem': _build_configs, 278 } 279 print('subsystem config scan completed') 280 return scan_result 281 282 283def _read_config(subsystem_config_file): 284 if not os.path.exists(subsystem_config_file): 285 raise Exception("config file '{}' doesn't exist.".format(subsystem_config_file), "2013") 286 subsystem_config = _read_json_file(subsystem_config_file) 287 if subsystem_config is None: 288 raise Exception("read file '{}' failed.".format(subsystem_config_file), "2013") 289 290 subsystem_info = {} 291 for key, val in subsystem_config.items(): 292 if 'path' not in val: 293 raise Exception("subsystem '{}' not config path.".format(key), "2013") 294 subsystem_info[key] = val.get('path') 295 return subsystem_info 296 297 298def read_build_file(ohos_build_file): 299 if not os.path.exists(ohos_build_file): 300 raise Exception("config file '{}' doesn't exist.".format(ohos_build_file), "2014") 301 subsystem_config = _read_json_file(ohos_build_file) 302 if not subsystem_config: 303 raise Exception("read file '{}' failed.".format(ohos_build_file), "2014") 304 return subsystem_config 305 306 307class BundlePartObj(object): 308 def __init__(self, bundle_config_file): 309 self._build_config_file = bundle_config_file 310 self._loading_config() 311 312 def to_ohos_build(self): 313 _component_info = self.bundle_info.get('component') 314 _subsystem_name = _component_info.get('subsystem') 315 _part_name = _component_info.get('name') 316 _bundle_build = _component_info.get('build') 317 _ohos_build_info = dict() 318 _ohos_build_info['subsystem'] = _subsystem_name 319 _part_info = {} 320 module_list = [] 321 if _component_info.get('build').__contains__('sub_component'): 322 _part_info['module_list'] = _component_info.get('build').get( 323 'sub_component') 324 elif _component_info.get('build').__contains__('modules'): 325 _part_info['module_list'] = _component_info.get( 326 'build').get('modules') 327 elif _component_info.get('build').__contains__('group_type'): 328 _module_groups = _component_info.get('build').get('group_type') 329 for _group_type, _module_list in _module_groups.items(): 330 _key = '{}:{}'.format(_subsystem_name, _part_name) 331 _part_info['module_list'] = module_list 332 if 'inner_kits' in _bundle_build: 333 _part_info['inner_kits'] = _bundle_build.get('inner_kits') 334 elif 'inner_api' in _bundle_build: 335 _part_info['inner_kits'] = _bundle_build.get('inner_api') 336 if 'features' in _component_info: 337 _part_info['feature_list'] = _component_info.get('features') 338 if 'syscap' in _component_info: 339 _part_info['system_capabilities'] = _component_info.get('syscap') 340 if 'hisysevent_config' in _component_info: 341 _part_info['hisysevent_config'] = _component_info.get( 342 'hisysevent_config') 343 _part_info['part_deps'] = _component_info.get('deps', {}) 344 _part_info['part_deps']['build_config_file'] = self._build_config_file 345 _ohos_build_info['parts'] = {_part_name: _part_info} 346 return _ohos_build_info 347 348 def _loading_config(self): 349 if not os.path.exists(self._build_config_file): 350 raise Exception("file '{}' doesn't exist.".format( 351 self._build_config_file), "2011") 352 self.bundle_info = _read_json_file(self._build_config_file) 353 if not self.bundle_info: 354 raise Exception("read file '{}' failed.".format( 355 self._build_config_file), "2011") 356 357 358class LoadBuildConfig(object): 359 """load build config file and parse configuration info.""" 360 361 def __init__(self, source_root_dir, subsystem_build_info, subsystem_name): 362 self._source_root_dir = source_root_dir 363 self._build_info = subsystem_build_info 364 self._is_load = False 365 self._parts_variants = {} 366 self._part_list = {} 367 self._part_targets_label = {} 368 self._subsystem_name = subsystem_name 369 self._parts_info_dict = {} 370 self._phony_targets = {} 371 self._parts_path_dict = {} 372 self._part_hisysevent_config = {} 373 self._parts_module_list = {} 374 self._parts_deps = {} 375 376 def parse(self): 377 """parse part info from build config file.""" 378 if self._is_load: 379 return 380 subsystem_config, parts_path_dict = self._merge_build_config() 381 parts_config = subsystem_config.get('parts') 382 self._parts_module_list.update(parts_config) 383 self._parts_path_dict = parts_path_dict 384 self._is_load = True 385 386 def parts_path_info(self): 387 """parts to path info.""" 388 self.parse() 389 return self._parts_path_dict 390 391 def parts_info_filter(self, save_part): 392 if save_part is None: 393 raise Exception 394 self._parts_info_dict = { 395 key: value for key, value in self._parts_info_dict.items() if key in save_part} 396 397 def _merge_build_config(self): 398 _build_files = self._build_info.get('build_files') 399 is_thirdparty_subsystem = False 400 if _build_files[0].startswith(self._source_root_dir + 'third_party'): 401 is_thirdparty_subsystem = True 402 subsystem_name = None 403 parts_info = {} 404 parts_path_dict = {} 405 for _build_file in _build_files: 406 if _build_file.endswith('bundle.json'): 407 bundle_part_obj = BundlePartObj(_build_file) 408 _parts_config = bundle_part_obj.to_ohos_build() 409 else: 410 _parts_config = read_build_file(_build_file) 411 _subsystem_name = _parts_config.get('subsystem') 412 if not is_thirdparty_subsystem and subsystem_name and _subsystem_name != subsystem_name: 413 raise Exception( 414 "subsystem name config incorrect in '{}'.".format( 415 _build_file), "2014") 416 subsystem_name = _subsystem_name 417 _curr_parts_info = _parts_config.get('parts') 418 for _pname in _curr_parts_info.keys(): 419 parts_path_dict[_pname] = os.path.relpath( 420 os.path.dirname(_build_file), self._source_root_dir) 421 parts_info.update(_curr_parts_info) 422 subsystem_config = dict() 423 subsystem_config['subsystem'] = subsystem_name 424 subsystem_config['parts'] = parts_info 425 return subsystem_config, parts_path_dict 426 427 428def get_parts_info(source_root_dir, subsystem_info, build_xts=False): 429 """ 430 get parts path info from subsystem info 431 :param source_root_dir: oh project path 432 :param subsystem_info: 433 :param build_xts: 434 :return: parts path info 435 """ 436 _phony_target = {} 437 _parts_path_info = {} 438 _parts_hisysevent_config = {} 439 _parts_modules_info = {} 440 _parts_deps = {} 441 for subsystem_name, build_config_info in subsystem_info.items(): 442 if not len(build_config_info.get("build_files")): 443 continue 444 build_loader = LoadBuildConfig(source_root_dir, build_config_info, subsystem_name) 445 if subsystem_name == 'xts' and build_xts is False: 446 xts_device_attest_name = ['device_attest_lite', 'device_attest'] 447 build_loader.parse() 448 build_loader.parts_info_filter(xts_device_attest_name) 449 _parts_path_info.update(build_loader.parts_path_info()) 450 return _parts_path_info 451 452 453def _read_json_file(input_file): 454 if not os.path.exists(input_file): 455 print("file '{}' doesn't exist.".format(input_file)) 456 return {} 457 try: 458 with open(input_file, 'r') as input_f: 459 data = json.load(input_f) 460 return data 461 except json.decoder.JSONDecodeError: 462 print("The file '{}' format is incorrect.".format(input_file)) 463 raise 464 except Exception: 465 print("read file '{}' failed.".format(input_file)) 466 raise 467 468 469def get_product_define_path(source_root_dir): 470 return os.path.join(source_root_dir, 'productdefine', 'common', 'inherit') 471 472 473def components_list_handler(product_file_json): 474 components_dict = dict() 475 for subsystems in product_file_json.get('subsystems'): 476 for components in subsystems.get('components'): 477 components_dict[components.get('component')] = components.get('syscap') 478 479 return components_dict 480 481 482def product_component_handler(product_file, product_file_path): 483 all_components_dict = dict() 484 components_dict = dict() 485 try: 486 with open(product_file_path, 'r', encoding='utf-8') as f: 487 product_file_json = json.load(f) 488 components_dict = components_list_handler(product_file_json) 489 except FileNotFoundError: 490 print(f"read file {product_file_path} failed.") 491 all_components_dict.update({product_file.split('.')[0]: components_dict}) 492 return all_components_dict 493 494 495def collect_all_product_component(product_file_dict: dict): 496 all_components_dict = dict() 497 for product_file, product_file_path in product_file_dict.items(): 498 product_components_dict = product_component_handler(product_file, product_file_path) 499 all_components_dict.update(product_components_dict) 500 return all_components_dict 501 502 503def get_product_define_dict(source_root_dir): 504 product_define_path = get_product_define_path(source_root_dir) 505 product_file_dict = dict() 506 for file in os.scandir(product_define_path): 507 if file.name.split('.')[-1] == 'json': 508 product_file_dict.update({file.name: os.path.join(product_define_path, file.name)}) 509 product_define_dict = collect_all_product_component(product_file_dict) 510 return product_define_dict 511 512 513def output_path_handler(project_path): 514 output_path = os.path.join(project_path, 'interface', 'sdk-js', 'api', 'device-define-common') 515 folder = os.path.exists(output_path) 516 # 多线程创建文件夹问题 517 if not folder: 518 os.makedirs(output_path, exist_ok=True) 519 return output_path 520 521 522def project_path_handler(project_path): 523 if not project_path: 524 project_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) 525 return project_path 526 527 528def main(): 529 logging.basicConfig(level=logging.INFO) 530 args = get_args() 531 project_path = args.project_path 532 project_path = project_path_handler(project_path) 533 output_path = output_path_handler(project_path) 534 subsystem_config_file = os.path.join(project_path, 'build', 'subsystem_config.json') 535 product_define_dict = get_product_define_dict(project_path) 536 _subsystem_info = get_subsystem_info(subsystem_config_file, project_path) 537 _parts_path_info = get_parts_info(project_path, _subsystem_info) 538 syscap_dict, errors_list = collect_all_product_component_syscap_dict(_parts_path_info, project_path, 539 product_define_dict) 540 if syscap_dict: 541 dict_to_json(output_path, syscap_dict) 542 543 544if __name__ == "__main__": 545 main()