1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2023 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15import logging
16import os
17import json
18import argparse
19import stat
20from collections import Counter
21
22
23def get_args():
24    parser = argparse.ArgumentParser(add_help=True)
25    parser.add_argument(
26        "-p",
27        "--project_path",
28        default=r"",
29        type=str,
30        help="root path of project. default: ./",
31    )
32    args = parser.parse_args()
33    return args
34
35
36def dict_to_json(output_path: str, syscaps_dict: dict):
37    """
38    output diff product syscaps json to output path
39    :param output_path:
40    :param syscaps_dict:
41    :return:
42    """
43    print("start generate syscap json...")
44    flags = os.O_WRONLY | os.O_CREAT
45    modes = stat.S_IWUSR | stat.S_IRUSR
46    for product_name, syscaps_list in syscaps_dict.items():
47        counts = Counter(syscaps_list)
48        duplicates = [elem for elem, count in counts.items() if count > 1]
49        if len(duplicates) > 0:
50            print("[Warning] repeated syscap: {}".format(",".join(duplicates)))
51            syscaps_list = list(set(syscaps_list))
52        filename = os.path.join(output_path, f'{product_name}.json')
53        with os.fdopen(os.open(filename, flags, modes), 'w') as f:
54            json.dump({'SysCaps': syscaps_list}, f, indent=4)
55    print("end...")
56
57
58def check_syscap(syscap_str: str):
59    syscap = syscap_str.split(' = ')
60    if syscap[-1].lower() == 'false':
61        return False
62    else:
63        return syscap[0]
64
65
66def bundle_syscap_list_handler(bundle_syscap_list: list, component_syscaps_list: list):
67    """
68    check syscap
69    :param bundle_syscap_list:
70    :param component_syscaps_list:
71    :return:
72    """
73    for component_syscap in component_syscaps_list:
74        component_syscap = check_syscap(component_syscap)
75        if component_syscap:
76            bundle_syscap_list.append(component_syscap)
77    return bundle_syscap_list
78
79
80def read_json_file(bundle_json_path: str):
81    bundle_syscap_list = list()
82    error_list = dict()
83    try:
84        with open(bundle_json_path, 'r', encoding='utf-8') as f:
85            bundle_data = json.load(f)
86            component_data = bundle_data.get("component")
87            component_syscaps_list = component_data.get("syscap")
88            if component_syscaps_list:
89                bundle_syscap_list = bundle_syscap_list_handler(bundle_syscap_list, component_syscaps_list)
90    except FileNotFoundError as e:
91        error_list[bundle_json_path] = str(e)
92    except Exception as e:
93        error_list[bundle_json_path] = str(e)
94    return bundle_syscap_list, error_list
95
96
97def get_all_components_path(components_file_path: str):
98    try:
99        with open(components_file_path, 'r', encoding='utf-8') as f:
100            path_dict = json.load(f)
101        return path_dict
102    except FileNotFoundError:
103        logging.error(r"PATH ERROR")
104        return {}
105
106
107def path_component_to_bundle(path: str) -> str:
108    bundle_json_path = os.path.join(path, 'bundle.json')
109    return bundle_json_path
110
111
112def find_false_syscap(syscap_l: list):
113    res = []
114    for syscap in syscap_l:
115        if not check_syscap(syscap):
116            res.append(syscap.split('=')[0].strip())
117    return res
118
119
120def handle_bundle_json_file(component_path_dict: dict, product_define_dict: dict):
121    """
122    from product required part bundle.json to all products parts list
123    :param component_path_dict:
124    :return: all products parts list
125    """
126    print("start collect syscap path...")
127    syscap_dict = dict()
128    errors_list = list()
129    for product_name, path_dict in component_path_dict.items():
130        bundles_list = list()
131        for component in path_dict.keys():
132            path = path_dict.get(component)
133            bundle_json_path = path_component_to_bundle(path)
134            bundle_syscap_list, error_list = read_json_file(bundle_json_path)
135            if product_define_dict.get(product_name).get(component):
136                false_syscap = find_false_syscap(product_define_dict.get(product_name).get(component))
137                bundle_syscap_l = [sc for sc in bundle_syscap_list if sc not in false_syscap]
138                bundles_list.extend(bundle_syscap_l)
139            else:
140                bundles_list.extend(bundle_syscap_list)
141            errors_list.extend(error_list)
142        syscap_dict.update({product_name: bundles_list})
143    return syscap_dict, errors_list
144
145
146def format_component_path(component_path: str):
147    sep_list = ['\\', '/']
148    sep_list.remove(os.sep)
149    component_path = component_path.replace(sep_list[0], os.sep)
150    return component_path
151
152
153def traversal_path(parts_path_info: dict, project_path: str, product_define_dict):
154    component_path_dict = dict()
155    for product_name, component_name_list in product_define_dict.items():
156        component_paths = dict()
157        for component_name in component_name_list.keys():
158            component_relpath = parts_path_info.get(component_name)
159            if component_relpath:
160                component_path = os.path.join(project_path, component_relpath)
161                component_path = format_component_path(component_path)
162                component_paths[component_name] = component_path
163            else:
164                logging.error(f'can\'t find component_name : {component_name}')
165        component_path_dict.update({product_name: component_paths})
166    return component_path_dict
167
168
169def collect_all_product_component_syscap_dict(parts_path_info: dict, project_path: str, product_define_dict):
170    """
171    get all syscap to dict
172    :param parts_path_info:
173    :param project_path:
174    :param product_define_dict:
175    :return:
176    """
177    if parts_path_info:
178        print("start collect component path...")
179        component_path_dict = traversal_path(parts_path_info, project_path, product_define_dict)
180        syscap_dict, errors_list = handle_bundle_json_file(component_path_dict, product_define_dict)
181        return syscap_dict, errors_list
182    else:
183        return 0, 0
184
185
186def get_subsystem_info(subsystem_config_file, source_root_dir):
187    """
188    get subsystem name and subsystem path from oh/build/subsystem_config.json
189    :param subsystem_config_file: subsystem_config_file path
190    :param source_root_dir: oh project path
191    :return: subsystem name and subsystem path
192    """
193    subsystem_configs = scan(subsystem_config_file, source_root_dir)
194    _all_components_path = []
195    for _, value in subsystem_configs.get('subsystem').items():
196        for i in value.get('build_files'):
197            _all_components_path.append(i)
198    return subsystem_configs.get('subsystem')
199
200
201def _check_path_prefix(paths):
202    allow_path_prefix = ['vendor', 'device']
203    result = list(
204        filter(lambda x: x is False,
205               map(lambda p: p.split('/')[0] in allow_path_prefix, paths)))
206    return len(result) <= 1
207
208
209def traversal_files(subsystem_path, _files):
210    for item in os.scandir(subsystem_path):
211        if is_symlink(item.path):
212            continue
213        elif item.is_file() and item.name == 'ohos.build':
214            _files.append(item.path)
215        elif item.is_file() and item.name == 'bundle.json':
216            _files.append(item.path)
217        elif item.is_dir():
218            traversal_files(item, _files)
219    return _files
220
221
222def get_file_type(file_path):
223    if os.path.islink(file_path):
224        return 'symlink'
225    elif os.path.isfile(file_path):
226        return 'file'
227    elif os.path.isdir(file_path):
228        return 'directory'
229    else:
230        return 'unknown'
231
232
233def is_symlink(file_path):
234    file_type = get_file_type(file_path)
235    if file_type == 'symlink':
236        link_target = os.readlink(file_path)
237        return link_target != file_type
238    return False
239
240
241def _scan_build_file(subsystem_path):
242    _files = []
243    _bundle_files = []
244    try:
245        _files = traversal_files(subsystem_path, _files)
246    except FileNotFoundError:
247        print(f"read file {subsystem_path} failed.")
248    return _files
249
250
251def scan(subsystem_config_file, source_root_dir):
252    subsystem_infos = _read_config(subsystem_config_file)
253    _default_subsystem = {"build": "build"}
254    subsystem_infos.update(_default_subsystem)
255    no_src_subsystem = {}
256    _build_configs = {}
257    for key, val in subsystem_infos.items():
258        _all_build_config_files = []
259        if not isinstance(val, list):
260            val = [val]
261        else:
262            if not _check_path_prefix(val):
263                raise Exception("subsystem '{}' path configuration is incorrect.".format(key), "2013")
264        _info = {'path': val}
265        for _path in val:
266            _subsystem_path = os.path.join(source_root_dir, _path)
267            _build_config_files = _scan_build_file(_subsystem_path)
268            _all_build_config_files.extend(_build_config_files)
269        if _all_build_config_files:
270            _info['build_files'] = _all_build_config_files
271            _build_configs[key] = _info
272        else:
273            no_src_subsystem[key] = val
274
275    scan_result = {
276        'source_path': source_root_dir,
277        'subsystem': _build_configs,
278    }
279    print('subsystem config scan completed')
280    return scan_result
281
282
283def _read_config(subsystem_config_file):
284    if not os.path.exists(subsystem_config_file):
285        raise Exception("config file '{}' doesn't exist.".format(subsystem_config_file), "2013")
286    subsystem_config = _read_json_file(subsystem_config_file)
287    if subsystem_config is None:
288        raise Exception("read file '{}' failed.".format(subsystem_config_file), "2013")
289
290    subsystem_info = {}
291    for key, val in subsystem_config.items():
292        if 'path' not in val:
293            raise Exception("subsystem '{}' not config path.".format(key), "2013")
294        subsystem_info[key] = val.get('path')
295    return subsystem_info
296
297
298def read_build_file(ohos_build_file):
299    if not os.path.exists(ohos_build_file):
300        raise Exception("config file '{}' doesn't exist.".format(ohos_build_file), "2014")
301    subsystem_config = _read_json_file(ohos_build_file)
302    if not subsystem_config:
303        raise Exception("read file '{}' failed.".format(ohos_build_file), "2014")
304    return subsystem_config
305
306
307class BundlePartObj(object):
308    def __init__(self, bundle_config_file):
309        self._build_config_file = bundle_config_file
310        self._loading_config()
311
312    def to_ohos_build(self):
313        _component_info = self.bundle_info.get('component')
314        _subsystem_name = _component_info.get('subsystem')
315        _part_name = _component_info.get('name')
316        _bundle_build = _component_info.get('build')
317        _ohos_build_info = dict()
318        _ohos_build_info['subsystem'] = _subsystem_name
319        _part_info = {}
320        module_list = []
321        if _component_info.get('build').__contains__('sub_component'):
322            _part_info['module_list'] = _component_info.get('build').get(
323                'sub_component')
324        elif _component_info.get('build').__contains__('modules'):
325            _part_info['module_list'] = _component_info.get(
326                'build').get('modules')
327        elif _component_info.get('build').__contains__('group_type'):
328            _module_groups = _component_info.get('build').get('group_type')
329            for _group_type, _module_list in _module_groups.items():
330                _key = '{}:{}'.format(_subsystem_name, _part_name)
331            _part_info['module_list'] = module_list
332        if 'inner_kits' in _bundle_build:
333            _part_info['inner_kits'] = _bundle_build.get('inner_kits')
334        elif 'inner_api' in _bundle_build:
335            _part_info['inner_kits'] = _bundle_build.get('inner_api')
336        if 'features' in _component_info:
337            _part_info['feature_list'] = _component_info.get('features')
338        if 'syscap' in _component_info:
339            _part_info['system_capabilities'] = _component_info.get('syscap')
340        if 'hisysevent_config' in _component_info:
341            _part_info['hisysevent_config'] = _component_info.get(
342                'hisysevent_config')
343        _part_info['part_deps'] = _component_info.get('deps', {})
344        _part_info['part_deps']['build_config_file'] = self._build_config_file
345        _ohos_build_info['parts'] = {_part_name: _part_info}
346        return _ohos_build_info
347
348    def _loading_config(self):
349        if not os.path.exists(self._build_config_file):
350            raise Exception("file '{}' doesn't exist.".format(
351                self._build_config_file), "2011")
352        self.bundle_info = _read_json_file(self._build_config_file)
353        if not self.bundle_info:
354            raise Exception("read file '{}' failed.".format(
355                self._build_config_file), "2011")
356
357
358class LoadBuildConfig(object):
359    """load build config file and parse configuration info."""
360
361    def __init__(self, source_root_dir, subsystem_build_info, subsystem_name):
362        self._source_root_dir = source_root_dir
363        self._build_info = subsystem_build_info
364        self._is_load = False
365        self._parts_variants = {}
366        self._part_list = {}
367        self._part_targets_label = {}
368        self._subsystem_name = subsystem_name
369        self._parts_info_dict = {}
370        self._phony_targets = {}
371        self._parts_path_dict = {}
372        self._part_hisysevent_config = {}
373        self._parts_module_list = {}
374        self._parts_deps = {}
375
376    def parse(self):
377        """parse part info from build config file."""
378        if self._is_load:
379            return
380        subsystem_config, parts_path_dict = self._merge_build_config()
381        parts_config = subsystem_config.get('parts')
382        self._parts_module_list.update(parts_config)
383        self._parts_path_dict = parts_path_dict
384        self._is_load = True
385
386    def parts_path_info(self):
387        """parts to path info."""
388        self.parse()
389        return self._parts_path_dict
390
391    def parts_info_filter(self, save_part):
392        if save_part is None:
393            raise Exception
394        self._parts_info_dict = {
395            key: value for key, value in self._parts_info_dict.items() if key in save_part}
396
397    def _merge_build_config(self):
398        _build_files = self._build_info.get('build_files')
399        is_thirdparty_subsystem = False
400        if _build_files[0].startswith(self._source_root_dir + 'third_party'):
401            is_thirdparty_subsystem = True
402        subsystem_name = None
403        parts_info = {}
404        parts_path_dict = {}
405        for _build_file in _build_files:
406            if _build_file.endswith('bundle.json'):
407                bundle_part_obj = BundlePartObj(_build_file)
408                _parts_config = bundle_part_obj.to_ohos_build()
409            else:
410                _parts_config = read_build_file(_build_file)
411            _subsystem_name = _parts_config.get('subsystem')
412            if not is_thirdparty_subsystem and subsystem_name and _subsystem_name != subsystem_name:
413                raise Exception(
414                    "subsystem name config incorrect in '{}'.".format(
415                        _build_file), "2014")
416            subsystem_name = _subsystem_name
417            _curr_parts_info = _parts_config.get('parts')
418            for _pname in _curr_parts_info.keys():
419                parts_path_dict[_pname] = os.path.relpath(
420                    os.path.dirname(_build_file), self._source_root_dir)
421            parts_info.update(_curr_parts_info)
422        subsystem_config = dict()
423        subsystem_config['subsystem'] = subsystem_name
424        subsystem_config['parts'] = parts_info
425        return subsystem_config, parts_path_dict
426
427
428def get_parts_info(source_root_dir, subsystem_info, build_xts=False):
429    """
430    get parts path info from subsystem info
431    :param source_root_dir: oh project path
432    :param subsystem_info:
433    :param build_xts:
434    :return: parts path info
435    """
436    _phony_target = {}
437    _parts_path_info = {}
438    _parts_hisysevent_config = {}
439    _parts_modules_info = {}
440    _parts_deps = {}
441    for subsystem_name, build_config_info in subsystem_info.items():
442        if not len(build_config_info.get("build_files")):
443            continue
444        build_loader = LoadBuildConfig(source_root_dir, build_config_info, subsystem_name)
445        if subsystem_name == 'xts' and build_xts is False:
446            xts_device_attest_name = ['device_attest_lite', 'device_attest']
447            build_loader.parse()
448            build_loader.parts_info_filter(xts_device_attest_name)
449        _parts_path_info.update(build_loader.parts_path_info())
450    return _parts_path_info
451
452
453def _read_json_file(input_file):
454    if not os.path.exists(input_file):
455        print("file '{}' doesn't exist.".format(input_file))
456        return {}
457    try:
458        with open(input_file, 'r') as input_f:
459            data = json.load(input_f)
460        return data
461    except json.decoder.JSONDecodeError:
462        print("The file '{}' format is incorrect.".format(input_file))
463        raise
464    except Exception:
465        print("read file '{}' failed.".format(input_file))
466        raise
467
468
469def get_product_define_path(source_root_dir):
470    return os.path.join(source_root_dir, 'productdefine', 'common', 'inherit')
471
472
473def components_list_handler(product_file_json):
474    components_dict = dict()
475    for subsystems in product_file_json.get('subsystems'):
476        for components in subsystems.get('components'):
477            components_dict[components.get('component')] = components.get('syscap')
478
479    return components_dict
480
481
482def product_component_handler(product_file, product_file_path):
483    all_components_dict = dict()
484    components_dict = dict()
485    try:
486        with open(product_file_path, 'r', encoding='utf-8') as f:
487            product_file_json = json.load(f)
488            components_dict = components_list_handler(product_file_json)
489    except FileNotFoundError:
490        print(f"read file {product_file_path} failed.")
491    all_components_dict.update({product_file.split('.')[0]: components_dict})
492    return all_components_dict
493
494
495def collect_all_product_component(product_file_dict: dict):
496    all_components_dict = dict()
497    for product_file, product_file_path in product_file_dict.items():
498        product_components_dict = product_component_handler(product_file, product_file_path)
499        all_components_dict.update(product_components_dict)
500    return all_components_dict
501
502
503def get_product_define_dict(source_root_dir):
504    product_define_path = get_product_define_path(source_root_dir)
505    product_file_dict = dict()
506    for file in os.scandir(product_define_path):
507        if file.name.split('.')[-1] == 'json':
508            product_file_dict.update({file.name: os.path.join(product_define_path, file.name)})
509    product_define_dict = collect_all_product_component(product_file_dict)
510    return product_define_dict
511
512
513def output_path_handler(project_path):
514    output_path = os.path.join(project_path, 'interface', 'sdk-js', 'api', 'device-define-common')
515    folder = os.path.exists(output_path)
516    # 多线程创建文件夹问题
517    if not folder:
518        os.makedirs(output_path, exist_ok=True)
519    return output_path
520
521
522def project_path_handler(project_path):
523    if not project_path:
524        project_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
525    return project_path
526
527
528def main():
529    logging.basicConfig(level=logging.INFO)
530    args = get_args()
531    project_path = args.project_path
532    project_path = project_path_handler(project_path)
533    output_path = output_path_handler(project_path)
534    subsystem_config_file = os.path.join(project_path, 'build', 'subsystem_config.json')
535    product_define_dict = get_product_define_dict(project_path)
536    _subsystem_info = get_subsystem_info(subsystem_config_file, project_path)
537    _parts_path_info = get_parts_info(project_path, _subsystem_info)
538    syscap_dict, errors_list = collect_all_product_component_syscap_dict(_parts_path_info, project_path,
539                                                                         product_define_dict)
540    if syscap_dict:
541        dict_to_json(output_path, syscap_dict)
542
543
544if __name__ == "__main__":
545    main()