1#!/usr/bin/env python3
2# -*- coding: utf-8 -*-
3# Copyright (c) 2022 Huawei Device Co., Ltd.
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16# This file is to implement the rom analyzation of standard device.
17#
18
19import argparse
20import copy
21import glob
22import json
23import os
24import re
25import sys
26import subprocess
27import typing
28import xml.dom.minidom as dom
29from typing import Dict
30from pprint import pprint
31from pkgs.basic_tool import unit_adaptive
32
33from pkgs.simple_excel_writer import SimpleExcelWriter
34
35debug = True if sys.gettrace() else False
36
37
38class HDCTool:
39    @classmethod
40    def verify_hdc(cls, verify_str: str = "OpenHarmony") -> bool:
41        """
42        验证hdc是否可用
43        True:可用
44        False:不可用
45        """
46        cp = subprocess.run(["hdc", "--help"], capture_output=True)
47        stdout = str(cp.stdout)
48        stderr = str(cp.stderr)
49        return verify_str in stdout or verify_str in stderr
50
51    @classmethod
52    def verify_device(cls, device_num: str) -> bool:
53        """
54        验证设备是否已经连接
55        True:已连接
56        False:未连接
57        """
58        cp = subprocess.run(["hdc", "list", "targets"], capture_output=True)
59        stdout = str(cp.stdout)
60        stderr = str(cp.stderr)
61        return device_num in stderr or device_num in stdout
62
63    @classmethod
64    def exec(cls, args: list, output_from: str = "stdout"):
65        cp = subprocess.run(args, capture_output=True)
66        if output_from == "stdout":
67            return cp.stdout.decode()
68        elif output_from == "stderr":
69            return cp.stderr.decode()
70        else:
71            print("error: 'output_from' must be stdout or stdin")
72
73
74def delete_values_from_dict(target_dict: typing.Dict, key_list: typing.Iterable):
75    for k in key_list:
76        if k not in target_dict.keys():
77            continue
78        del target_dict[k]
79
80
81class RamAnalyzer:
82    @classmethod
83    def analysis(cls, cfg_path: str, json_path: str, rom_result_json: str, device_num: str,
84                 output_file: str, ss: str, output_excel: bool, baseline_file: str, unit_adapt: bool):
85        """
86        process size subsystem/component so so_size
87        """
88        if not HDCTool.verify_hdc():
89            print("error: Command 'hdc' not found")
90            return
91        if not HDCTool.verify_device(device_num):
92            print("error: {} is inaccessible or not found".format(device_num))
93            return
94        with open(rom_result_json, 'r', encoding='utf-8') as f:
95            rom_result_dict: typing.Dict = json.loads(f.read())
96        # 从rom的分析结果中将需要的elf信息重组
97        so_info_dict: typing.Dict[
98            str, typing.Dict[str["component_name|subsystem_name|size"], str]] = cls.get_elf_info_from_rom_result(
99            rom_result_json)
100        process_elf_dict: typing.Dict[str, typing.List[str]] = cls.get_process_so_relationship(cfg_path,
101                                                                                               json_path)
102        process_size_dict: typing.Dict[str, int] = cls.process_hidumper_info(
103            device_num, ss)
104        result_dict: typing.Dict[str, typing.Dict[str, typing.Any]] = dict()
105        result_dict = cls.result_process4(result_dict, process_size_dict, rom_result_dict, process_elf_dict,
106                                          so_info_dict)
107        base_dir, _ = os.path.split(output_file)
108        if len(base_dir) != 0 and not os.path.isdir(base_dir):
109            os.makedirs(base_dir, exist_ok=True)
110        with os.fdopen(os.open(output_file + ".json", os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', encoding='utf-8') as f:
111            json.dump(result_dict, f, indent=4)
112        refactored_result: Dict[str, Dict] = refacotr_result(result_dict)
113        if unit_adapt:
114            cls.refactored_result_unit_adaptive(refactored_result)
115        if baseline_file:
116            cls.add_baseline(refactored_result, baseline_file)
117        with os.fdopen(os.open(f"refactored_{output_file}.json", os.O_WRONLY | os.O_CREAT, mode=0o640), 'w', encoding='utf-8') as f:
118            json.dump(refactored_result, f, indent=4)
119        if output_excel:
120            cls.__save_result_as_excel(
121                refactored_result, output_file + ".xls", ss, baseline_file, unit_adapt)
122
123    __ss_dict: typing.Dict[str, int] = {
124        "Pss": 2,
125        "Vss": 3,
126        "Rss": 4,
127        "Uss": 5
128    }
129
130    @classmethod
131    def process_hidumper_info(cls, device_num: str, ss: str) -> typing.Dict[str, int]:
132        """
133        处理进程名与对应进程大小
134        """
135
136        def exec_once() -> typing.Dict[str, int]:
137            stdout = HDCTool.exec(
138                ["hdc", "-t", device_num, "shell", "hidumper", "--mem"])
139            name_size_dict = cls.__parse_hidumper_mem(stdout, device_num, ss)
140            return name_size_dict
141
142        if not HDCTool.verify_hdc():
143            print("error: Command 'hdc' not found")
144            return dict()
145        if not HDCTool.verify_device(device_num):
146            print("error: {} is inaccessible or not found".format(device_num))
147            return dict()
148
149        return exec_once()
150
151    @classmethod
152    def get_elf_info_from_rom_result(cls, rom_result_json: str) -> typing.Dict[str, typing.Dict[str, str]]:
153        """
154        利用rom_analyzer.py的分析结果,重组成
155        {file_base_name: {"subsystem_name":subsystem_name, "component_name":component_name}}
156        的形式
157        """
158        with open(rom_result_json, 'r', encoding='utf-8') as f:
159            rom_info_dict = json.load(f)
160        elf_info_dict: typing.Dict[str, typing.Dict[str, str]] = dict()
161        for subsystem_name in rom_info_dict.keys():
162            sub_val_dict: typing.Dict[str, typing.Any] = rom_info_dict.get(
163                subsystem_name)
164            delete_values_from_dict(sub_val_dict, ["size", "file_count"])
165            for component_name in sub_val_dict.keys():
166                component_val_dict: typing.Dict[str, str] = sub_val_dict.get(
167                    component_name)
168                delete_values_from_dict(component_val_dict, [
169                    "size", "file_count"])
170                for file_name, size in component_val_dict.items():
171                    file_basename: str = os.path.split(file_name)[-1]
172                    elf_info_dict[file_basename] = {
173                        "subsystem_name": subsystem_name,
174                        "component_name": component_name,
175                        "size": size
176                    }
177
178        return elf_info_dict
179
180    @classmethod
181    def get_process_so_relationship(cls, cfg_path: str, profile_path: str) -> typing.Dict[
182        str, typing.List[str]]:
183        """
184        parse the relationship between process and elf file
185        """
186        # 从merged_sa里面收集
187        process_elf_dict: typing.Dict[str, typing.List[str]] = dict()
188        cfg_list = glob.glob(cfg_path + os.sep + "*.cfg", recursive=True)
189        for cfg in cfg_list:
190            if debug:
191                print("parsing: ", cfg)
192            try:
193                cls.__parse_process_cfg(cfg, profile_path, process_elf_dict)
194            except:
195                print("parse '{}' failed".format(cfg))
196            finally:
197                ...
198        return process_elf_dict
199
200    @classmethod
201    def find_elf_size_from_rom_result(cls, service_name: str, subsystem_name: str, component_name: str,
202                                      evaluator: typing.Callable, rom_result_dict: typing.Dict[str, typing.Dict]) -> \
203            typing.Tuple[
204                bool, str, str, int]:
205        """
206        全局查找进程的相关elf文件
207        subsystem_name与component_name可明确指定,或为*以遍历整个dict
208        evaluator:评估elf文件的从phone下面开始的路径与service_name的关系,评判如何才是找到了
209        returns: 是否查找到,elf文件名,部件名,size
210        """
211        subsystem_name_list = [
212            subsystem_name] if subsystem_name != "*" else rom_result_dict.keys()
213        for sn in subsystem_name_list:
214            sub_val_dict = rom_result_dict.get(sn)
215            component_name_list = [
216                component_name] if component_name != '*' else sub_val_dict.keys()
217            for cn in component_name_list:
218                if cn == "size" or cn == "file_count":
219                    continue
220                component_val_dict: typing.Dict[str,
221                int] = sub_val_dict.get(cn)
222                for k, v in component_val_dict.items():
223                    if k == "size" or k == "file_count":
224                        continue
225                    if not evaluator(service_name, k):
226                        continue
227                    return True, os.path.split(k)[-1], sn, cn, v
228        return False, str(), str(), str(), int()
229
230    @classmethod
231    def add_baseline(self, refactored_result_dict: Dict, baseline_file: str) -> None:
232        with open(baseline_file, 'r', encoding='utf-8') as f:
233            baseline_dict = json.load(f)
234        for subsystem_name, subsystem_info in refactored_result_dict.items():
235            for component_name, component_info in subsystem_info.items():
236                if component_name == "size":
237                    continue
238                if not baseline_dict.get(subsystem_name):
239                    continue
240                if not baseline_dict[subsystem_name].get(component_name):
241                    continue
242                component_info["baseline"] = baseline_dict[subsystem_name][component_name].get(
243                    "ram")
244
245    @classmethod
246    def inside_refactored_result_unit_adaptive(cls, process_info):
247        for elf_name, elf_size in process_info["elf"].items():
248            process_info["elf"][elf_name] = unit_adaptive(elf_size)
249        return process_info
250
251    @classmethod
252    def refactored_result_unit_adaptive(cls, result_dict: Dict[str, Dict]) -> None:
253        for subsystem_name, subsystem_info in result_dict.items():
254            sub_size = unit_adaptive(subsystem_info["size"])
255            del subsystem_info["size"]
256            for component_name, component_info in subsystem_info.items():
257                com_size = unit_adaptive(component_info["size"])
258                del component_info["size"]
259                for process_name, process_info in component_info.items():
260                    pro_size = unit_adaptive(process_info["size"])
261                    del process_info["size"]
262                    process_info = cls.inside_refactored_result_unit_adaptive(process_info)
263                    process_info["size"] = pro_size
264                component_info["size"] = com_size
265            subsystem_info["size"] = sub_size
266
267    @classmethod
268    def result_process1(cls, result_dict, process_name, process_size, elf, size):
269        result_dict[process_name] = dict()
270        result_dict[process_name]["size"] = process_size
271        result_dict[process_name]["startup"] = dict()
272        result_dict[process_name]["startup"]["init"] = dict()
273        result_dict[process_name]["startup"]["init"][elf if len(
274            elf) != 0 else "UNKNOWN"] = size
275        return result_dict
276
277    @classmethod
278    def result_process2(cls, result_dict, process_name, subsystem_name, process_size, component_name, hap_name, size):
279        result_dict[process_name] = dict()
280        result_dict[process_name]["size"] = process_size
281        result_dict[process_name][subsystem_name] = dict()
282        result_dict[process_name][subsystem_name][component_name] = dict()
283        result_dict[process_name][subsystem_name][component_name][hap_name if len(
284            hap_name) != 0 else "UNKNOWN"] = size
285        return result_dict
286
287    @classmethod
288    def result_process3(cls, result_dict, process_name, process_size):
289        result_dict[process_name] = dict()
290        result_dict[process_name]["size"] = process_size
291        result_dict[process_name]["UNKNOWN"] = dict()
292        result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict()
293        result_dict[process_name]["UNKNOWN"]["UNKNOWN"]["UNKNOWN"] = int()
294        return result_dict
295
296    @classmethod
297    def result_process4(cls, result_dict, process_size_dict, rom_result_dict, process_elf_dict, so_info_dict):
298        def get(key: typing.Any, dt: typing.Dict[str, typing.Any]):
299            for k, v in dt.items():
300                if k.startswith(key) or (len(v) > 0 and key == v[0]):
301                    # 要么uinput_inject的对应key为mmi_uinput_inject。对于此类特殊处理,即:如果service_name找不到,但是直接执行的bin等于这个名字,也认为找到
302                    return v
303
304        for process_name, process_size in process_size_dict.items():  # 从进程出发
305            if not process_name:
306                print("warning: an empty 'process_name' has been found.")
307                continue
308            # 如果部件是init,特殊处理
309            if process_name == "init":
310                _, elf, _, _, size = cls.find_elf_size_from_rom_result(process_name, "startup", "init",
311                                                                       lambda x, y: os.path.split(y)[
312                                                                                        -1].lower() == x.lower(),
313                                                                       rom_result_dict)
314                result_dict = cls.result_process1(result_dict, process_name, process_size, elf, size)
315                continue
316            # 如果是hap,特殊处理
317            if (process_name.startswith("com.") or process_name.startswith("ohos.")):
318                _, hap_name, subsystem_name, component_name, size = cls.find_elf_size_from_rom_result(process_name, "*",
319                                                                                                      "*",
320                                                                                                      lambda x, y: len(
321                                                                                                          y.split(
322                                                                                                              '/')) >= 3 and x.lower().startswith(
323                                                                                                          y.split('/')[
324                                                                                                              2].lower()),
325                                                                                                      rom_result_dict)
326                result_dict = cls.result_process2(result_dict, process_name, subsystem_name, process_size,
327                                                  component_name, hap_name, size)
328                continue
329            # 得到进程相关的elf文件list
330            so_list: list = get(process_name, process_elf_dict)
331            if so_list is None:
332                print("warning: process '{}' not found in .json or .cfg".format(
333                    process_name))
334                result_dict = cls.result_process3(result_dict, process_name, process_size)
335                continue
336            result_dict[process_name] = dict()
337            result_dict[process_name]["size"] = process_size
338            for so in so_list:
339                unit = so_info_dict.get(so)
340                if unit is None:
341                    result_dict[process_name]["UNKNOWN"] = dict()
342                    result_dict[process_name]["UNKNOWN"]["UNKNOWN"] = dict()
343                    result_dict[process_name]["UNKNOWN"]["UNKNOWN"][so] = int()
344                    print("warning: '{}' in {} not found in json from rom analysis result".format(
345                        so, process_name))
346                    continue
347                component_name = unit.get("component_name")
348                subsystem_name = unit.get("subsystem_name")
349                so_size = unit.get("size")
350                if result_dict.get(process_name).get(subsystem_name) is None:
351                    result_dict[process_name][subsystem_name] = dict()
352                if result_dict.get(process_name).get(subsystem_name).get(component_name) is None:
353                    result_dict[process_name][subsystem_name][component_name] = dict()
354                result_dict[process_name][subsystem_name][component_name][so] = so_size
355        return result_dict
356
357    @classmethod
358    def __hidumper_mem_line_process(cls, content: typing.Text) -> typing.List[typing.Text]:
359        """
360        将hidumper的拥有的数据行进行分割,得到
361        [pid, name, pss, vss, rss, uss]格式的list
362        """
363        trival_pattern = re.compile(r"kB|\(.*\)(?#去除单位kB以及小括号内的任意数据,包括小括号)")
364        content = re.sub(trival_pattern, "", content)
365        blank_pattern = re.compile(r"\s+(?#匹配一个或多个空格)")
366        return re.sub(blank_pattern, ' ', content.strip()).split()
367
368    @classmethod
369    def __parse_hidumper_mem(cls, content: typing.Text, device_num: str, ss: str = "Pss") -> typing.Dict[
370        typing.Text, int]:
371        """
372        解析:hidumper --meme的结果
373        返回{process_name: pss}形式的字典
374        '248  	samgr              1464(0 in SwapPss) kB    15064 kB     6928 kB     1072 kB\r'
375        """
376
377        def find_full_process_name(hname: str) -> str:
378            for lname in __process_name_list:
379                if lname.startswith(hname):
380                    return lname
381            return str()
382
383        def process_ps_ef(content: str) -> list:
384            line_list = content.strip().split("\n")[1:]
385            process_name_list = list()
386            for line in line_list:
387                process_name = line.split()[7]
388                if process_name.startswith('['):
389                    continue
390                process_name_list.append(process_name)
391            return process_name_list
392
393        if ss not in cls.__ss_dict.keys():
394            print("error: {} is not a valid parameter".format(ss))
395            return dict()
396        output = content.split('\n')
397        process_pss_dict = dict()
398        __process_name_list: typing.List[str] = process_ps_ef(
399            HDCTool.exec(["hdc", "-t", device_num, "shell", "ps", "-ef"]))
400        for line in output:
401            if "Total Memory Usage by Size" in line:
402                break
403            if line.isspace():
404                continue
405            processed: typing.List[typing.Text] = cls.__hidumper_mem_line_process(
406                line)
407            # 如果第一列不是数字(pid),就过
408            if not processed or not processed[0].isnumeric():
409                continue
410            name = processed[1]  # 否则的话就取名字,和对应的size
411            size = int(processed[cls.__ss_dict.get(ss)]) * \
412                   1024  # kilo byte to byte
413            full_process_name = find_full_process_name(name)
414            if not full_process_name:
415                print(
416                    f"warning: process \"{full_process_name}\" not found in the result of command \"ps -ef\"")
417                continue
418            process_pss_dict[full_process_name] = size
419        return process_pss_dict
420
421    @classmethod
422    def __parse_process_json(cls, file_path: str, result_dict: typing.Dict[str, typing.List[str]]):
423        """
424        解析json文件,结存存入 result_dict中,格式:{process_name: os_list}
425        其中,so_list中是so的base_name
426        """
427        if not (os.path.isfile(file_path) and file_path.endswith(".json")):
428            print("warning: {} not exist or not a json file".format(file_path))
429            return
430        with open(file_path, 'r', encoding='utf-8') as f:
431            j_content: typing.Dict[str, typing.Any] = json.load(f)
432        if "process" not in j_content.keys() or "systemability" not in j_content.keys():
433            print(
434                f"warning: {file_path} has no field 'process' or 'systemability'")
435            return
436        process_name: str = j_content.get("process")
437        elf_list: typing.List[str] = list()
438        for sa in j_content.get("systemability"):
439            libpath: str = sa.get("libpath")
440            if not libpath:
441                continue
442            elf_list.append(libpath)
443        result_dict[process_name] = elf_list
444
445    @classmethod
446    def __parse_process_cfg(cls, cfg_path: str, profile_path: str, result_dict: dict):
447        """
448        解析cfg,因为有的cfg会拉起xml中的进程,所以也可能会去解析xml
449        """
450        with open(cfg_path, 'r', encoding='utf-8') as f:
451            cfg_dict = json.loads(f.read())
452        services = cfg_dict.get("services")
453        if services is None:
454            print("warning: 'services' not in {}".format(cfg_path))
455            return
456        for service in services:
457            process_name = service.get("name")
458            first, *path_list = service.get("path")
459            if first.endswith("sa_main"):
460                # 由sa_main去来起进程
461                xml_base_name = os.path.split(path_list[0])[-1]
462                cls.__parse_process_json(os.path.join(
463                    profile_path, xml_base_name), result_dict)
464            else:
465                # 直接执行
466                if result_dict.get(process_name) is None:
467                    result_dict[process_name] = list()
468                result_dict.get(process_name).append(os.path.split(first)[-1])
469
470    @classmethod
471    def __inside_save_result_as_excel(cls, baseline_file, subsystem_name, component_name, component_size,
472                                      component_baseline, process_name, process_size, elf_name, elf_size):
473        if baseline_file:
474            return [subsystem_name, component_name, component_size,
475                    component_baseline, process_name, process_size, elf_name, elf_size]
476        else:
477            return [subsystem_name, component_name, component_size,
478                    process_name, process_size, elf_name, elf_size]
479
480    @classmethod
481    def __save_result_as_excel(cls, data_dict: dict, filename: str, ss: str, baseline_file: str, unit_adapt: bool):
482        """
483        保存结果到excel中
484        子系统:{
485            "size": 1234,
486            部件:{
487                "size":123,
488                "base_line":124,
489                进程:{
490                    "size":12,
491                    "elf":{
492                        "elf_file_1":elf_size,
493                        ...
494                    }
495                }
496            }
497        }
498        """
499        tmp_dict = copy.deepcopy(data_dict)
500        writer = SimpleExcelWriter("ram_info")
501        header_unit = "" if unit_adapt else ", Byte"
502        header = [
503            "subsystem_name", "component_name", f"component_size(ram{header_unit})", "process_name",
504            f"process_size({ss}{header_unit})", "elf", f"elf_size{'' if unit_adapt else '(Byte)'}"
505        ]
506        if baseline_file:
507            header = [
508                "subsystem_name", "component_name", f"component_size(ram{header_unit})", "baseline", "process_name",
509                f"process_size({ss}{header_unit})", "elf", f"elf_size{'' if unit_adapt else '(Byte)'}"
510            ]
511        writer.set_sheet_header(header)
512        subsystem_c = 0
513        subsystem_start_r = 1
514        subsystem_end_r = 0
515
516        component_c = 1
517        component_start_r = 1
518        component_end_r = 0
519        component_size_c = 2
520        baseline_c = 3
521
522        process_start_r = 1
523        process_end_r = 0
524        process_c = 4
525        process_size_c = 5
526        if not baseline_file:
527            process_c -= 1
528            process_size_c -= 1
529        for subsystem_name, subsystem_info in tmp_dict.items():
530            subsystem_size = subsystem_info.get("size")
531            if subsystem_size:
532                del subsystem_info["size"]
533            for component_name, component_info in subsystem_info.items():
534                component_size = component_info.get("size")
535                component_baseline = component_info.get("baseline")
536                if "size" in component_info.keys():
537                    del component_info["size"]
538                if "baseline" in component_info.keys():
539                    del component_info["baseline"]
540                for process_name, process_info in component_info.items():
541                    process_size = process_info.get("size")
542                    elf_info = process_info.get("elf")
543                    for elf_name, elf_size in elf_info.items():
544                        line = cls.__inside_save_result_as_excel(baseline_file, subsystem_name, component_name,
545                                                                 component_size,
546                                                                 component_baseline, process_name, process_size,
547                                                                 elf_name, elf_size)
548                        writer.append_line(line)
549                    elf_count = len(elf_info)
550                    process_end_r += elf_count
551                    component_end_r += elf_count
552                    subsystem_end_r += elf_count
553                    writer.write_merge(
554                        process_start_r, process_c, process_end_r, process_c, process_name)
555                    writer.write_merge(
556                        process_start_r, process_size_c, process_end_r, process_size_c, process_size)
557                    process_start_r = process_end_r + 1
558                writer.write_merge(component_start_r, component_c,
559                                   component_end_r, component_c, component_name)
560                writer.write_merge(component_start_r, component_size_c,
561                                   component_end_r, component_size_c, component_size)
562                if baseline_file:
563                    writer.write_merge(component_start_r, baseline_c,
564                                       component_end_r, baseline_c, component_baseline)
565                component_start_r = component_end_r + 1
566            writer.write_merge(subsystem_start_r, subsystem_c,
567                               subsystem_end_r, subsystem_c, subsystem_name)
568            subsystem_start_r = subsystem_end_r + 1
569        writer.save(filename)
570
571
572def inside_refacotr_result(component_info, refactored_ram_dict, subsystem_name, component_name, process_name,
573                           process_size):
574    for elf_name, elf_size in component_info.items():
575        if not refactored_ram_dict.get(subsystem_name):
576            refactored_ram_dict[subsystem_name] = dict()
577            refactored_ram_dict[subsystem_name]["size"] = 0
578        if not refactored_ram_dict[subsystem_name].get(component_name):
579            refactored_ram_dict[subsystem_name][component_name] = dict(
580            )
581            refactored_ram_dict[subsystem_name][component_name]["size"] = 0
582        refactored_ram_dict[subsystem_name][component_name][process_name] = dict(
583        )
584        refactored_ram_dict[subsystem_name][component_name][process_name]["size"] = process_size
585        refactored_ram_dict[subsystem_name][component_name][process_name]["elf"] = dict(
586        )
587        refactored_ram_dict[subsystem_name][component_name][process_name]["elf"][elf_name] = elf_size
588        refactored_ram_dict[subsystem_name]["size"] += process_size
589        refactored_ram_dict[subsystem_name][component_name]["size"] += process_size
590    return refactored_ram_dict
591
592
593def refacotr_result(ram_result: Dict[str, Dict]) -> Dict[str, Dict]:
594    refactored_ram_dict: Dict[str, Dict] = dict()
595    for process_name, process_info in ram_result.items():
596        process_size = process_info.get("size")
597        del process_info["size"]
598        for subsystem_name, subsystem_info in process_info.items():
599            for component_name, component_info in subsystem_info.items():
600                refactored_ram_dict = inside_refacotr_result(component_info, refactored_ram_dict, subsystem_name,
601                                                             component_name, process_name, process_size)
602    return refactored_ram_dict
603
604
605def get_args():
606    version_num = 1.0
607    parser = argparse.ArgumentParser(
608        description="analyze ram size of component"
609    )
610    parser.add_argument("-v", "-version", action="version",
611                        version=f"version {version_num}")
612    parser.add_argument("-s", "--json_path", type=str, required=True,
613                        help="path of sa json file. eg: -x ~/openharmony/out/rk3568/packages/phone/system/profile")
614    parser.add_argument("-c", "--cfg_path", type=str, required=True,
615                        help="path of cfg files. eg: -c ./cfgs/")
616    parser.add_argument("-j", "--rom_result", type=str, default="./rom_analysis_result.json",
617                        help="json file produced by rom_analyzer_v1.0.py, default: ./rom_analysis_result.json."
618                             "eg: -j ./demo/rom_analysis_result.json")
619    parser.add_argument("-n", "--device_num", type=str, required=True,
620                        help="device number to be collect hidumper info. eg: -n 7001005458323933328a01fce16d3800")
621    parser.add_argument("-b", "--baseline_file", type=str, default="",
622                        help="baseline file of rom and ram generated by rom analysis.")
623    parser.add_argument("-o", "--output_filename", default="ram_analysis_result", type=str,
624                        help="base name of output file, default: ram_analysis_result. eg: -o ram_analysis_result")
625    parser.add_argument("-u", "--unit_adaptive",
626                        action="store_true", help="unit adaptive")
627    parser.add_argument("-e", "--excel", type=bool, default=False,
628                        help="if output result as excel, default: False. eg: -e True")
629    args = parser.parse_args()
630    return args
631
632
633def abspath(path: str) -> str:
634    return os.path.abspath(os.path.expanduser(path))
635
636
637if __name__ == '__main__':
638    args = get_args()
639    cfg_path_name = abspath(args.cfg_path)
640    profile_path_name = abspath(args.json_path)
641    rom_result = args.rom_result
642    device = args.device_num
643    output_filename = args.output_filename
644    baseline = args.baseline_file
645    output_excel_path = args.excel
646    unit_adaptiv = args.unit_adaptive
647    RamAnalyzer.analysis(cfg_path_name, profile_path_name, rom_result,
648                         device_num=device, output_file=output_filename, ss="Pss", output_excel=output_excel_path,
649                         baseline_file=baseline, unit_adapt=unit_adaptiv)