100600bfbSopenharmony_ci#!/usr/bin/env python3
200600bfbSopenharmony_ci# -*- coding: utf-8 -*-
300600bfbSopenharmony_ci# Copyright (C) 2024 Huawei Device Co., Ltd.
400600bfbSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
500600bfbSopenharmony_ci# you may not use this file except in compliance with the License.
600600bfbSopenharmony_ci# You may obtain a copy of the License at
700600bfbSopenharmony_ci#
800600bfbSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
900600bfbSopenharmony_ci#
1000600bfbSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1100600bfbSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1200600bfbSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1300600bfbSopenharmony_ci# See the License for the specific language governing permissions and
1400600bfbSopenharmony_ci# limitations under the License.
1500600bfbSopenharmony_ci
1600600bfbSopenharmony_ciimport pytest
1700600bfbSopenharmony_ciimport re
1800600bfbSopenharmony_ciimport time
1900600bfbSopenharmony_ciimport subprocess
2000600bfbSopenharmony_cifrom utils import *
2100600bfbSopenharmony_ci
2200600bfbSopenharmony_ciPSS_INDEX = 2
2300600bfbSopenharmony_ciSWAP_PSS_INDEX = 8
2400600bfbSopenharmony_ci
2500600bfbSopenharmony_cidef ParseSmapsOutput(output):
2600600bfbSopenharmony_ci    memory_data = {}
2700600bfbSopenharmony_ci    for line in output.split("\n"):
2800600bfbSopenharmony_ci        if re.search(r"\d", line) is None:
2900600bfbSopenharmony_ci            continue
3000600bfbSopenharmony_ci        key = re.search(r'(/|\[|[a-zA-Z])(.*$)', line)
3100600bfbSopenharmony_ci        key = key.group().strip()
3200600bfbSopenharmony_ci        memory_data[key] = [int(val) for val in re.findall(r'\d+', line)]
3300600bfbSopenharmony_ci    return memory_data
3400600bfbSopenharmony_ci
3500600bfbSopenharmony_ci@print_check_result
3600600bfbSopenharmony_cidef CheckSmapsTotalPss(memory_data):
3700600bfbSopenharmony_ci    PSS_SUMMARY = memory_data["Summary"][PSS_INDEX]
3800600bfbSopenharmony_ci    SWAPPSS_SUMMARY = memory_data["Summary"][SWAP_PSS_INDEX]
3900600bfbSopenharmony_ci    return sum([val[PSS_INDEX] for key, val in memory_data.items() if key != "Summary"]) + SWAPPSS_SUMMARY == PSS_SUMMARY
4000600bfbSopenharmony_ci
4100600bfbSopenharmony_ci@print_check_result
4200600bfbSopenharmony_cidef CheckMemAddrss(output):
4300600bfbSopenharmony_ci    ret = re.search(r"\s([a-zA-Z0-9]{8,})\s", output)
4400600bfbSopenharmony_ci    return ret is not None
4500600bfbSopenharmony_ci
4600600bfbSopenharmony_ci@print_check_result
4700600bfbSopenharmony_cidef CheckPseft(output):
4800600bfbSopenharmony_ci    result = re.search("UID\s+PID\s+TID  PPID TCNT STIME TTY\s+TIME CMD\n([^\n]+\n){1,}", output)
4900600bfbSopenharmony_ci    return result is not None
5000600bfbSopenharmony_ci
5100600bfbSopenharmony_ci@print_check_result
5200600bfbSopenharmony_cidef CheckMountInfo(output):
5300600bfbSopenharmony_ci    result = re.search("/proc/\d+/mountinfo\n\n([^\n]+\n){4,}", output)
5400600bfbSopenharmony_ci    return result is not None
5500600bfbSopenharmony_ci
5600600bfbSopenharmony_cidef CheckSmaps(output):
5700600bfbSopenharmony_ci    result = re.search(r"/proc/\d+/maps", output)
5800600bfbSopenharmony_ci    return result is not None
5900600bfbSopenharmony_ci
6000600bfbSopenharmony_cidef CheckMemSmapsWithRoot(output):
6100600bfbSopenharmony_ci    memory_data = ParseSmapsOutput(output)
6200600bfbSopenharmony_ci    ret = all(check(memory_data) for check in [CheckSmapsTotalPss])
6300600bfbSopenharmony_ci    return ret
6400600bfbSopenharmony_ci
6500600bfbSopenharmony_cidef CheckMemSmapsVWithRoot(output):
6600600bfbSopenharmony_ci    memory_data = ParseSmapsOutput(output)
6700600bfbSopenharmony_ci    return CheckSmapsTotalPss(memory_data) and CheckMemAddrss(output)
6800600bfbSopenharmony_ci
6900600bfbSopenharmony_cidef CheckHelpOutput(output):
7000600bfbSopenharmony_ci    return "usage:" in output
7100600bfbSopenharmony_ci
7200600bfbSopenharmony_cidef CheckNetCmd(output):
7300600bfbSopenharmony_ci    cmds = ["iptables -L -nvx", "ip6tables -L -nvx", "iptables -t nat -L -nvx", "iptables -t mangle -L -nvx",
7400600bfbSopenharmony_ci            "ip6tables -t mangle -L -nvx", "iptables -t raw -L -nvx", "ip6tables -t raw -L -nvx",
7500600bfbSopenharmony_ci            "ip link", "ip -4 addr show", "ip -6 addr show", "ip rule show", "ip -6 rule show"]
7600600bfbSopenharmony_ci    ret = all([item in output for item in cmds])
7700600bfbSopenharmony_ci    return ret
7800600bfbSopenharmony_ci
7900600bfbSopenharmony_ciclass TestHidumperPermission:
8000600bfbSopenharmony_ci    @classmethod
8100600bfbSopenharmony_ci    def setup_class(cls):
8200600bfbSopenharmony_ci        if not IsRootVersion():
8300600bfbSopenharmony_ci            subprocess.check_call("hdc shell aa start -a EntryAbility -b com.example.myapplication", shell=True)
8400600bfbSopenharmony_ci
8500600bfbSopenharmony_ci    @classmethod
8600600bfbSopenharmony_ci    def teardown_class(cls):
8700600bfbSopenharmony_ci        if not IsRootVersion():
8800600bfbSopenharmony_ci            subprocess.check_call("hdc shell aa force-stop -b com.example.myapplication", shell=True)
8900600bfbSopenharmony_ci
9000600bfbSopenharmony_ci    @pytest.mark.L0
9100600bfbSopenharmony_ci    def test_mem_smaps(self):
9200600bfbSopenharmony_ci        processName = "render_service"
9300600bfbSopenharmony_ci        pid = GetPidByProcessName(processName)
9400600bfbSopenharmony_ci        if (IsRootVersion()):
9500600bfbSopenharmony_ci            CheckFunc = CheckMemSmapsWithRoot
9600600bfbSopenharmony_ci        else:
9700600bfbSopenharmony_ci            CheckFunc = CheckHelpOutput
9800600bfbSopenharmony_ci        command = f"hidumper --mem-smaps {pid}"
9900600bfbSopenharmony_ci        # 校验命令行输出
10000600bfbSopenharmony_ci        CheckCmd(command, CheckFunc)
10100600bfbSopenharmony_ci        # 校验命令行重定向输出
10200600bfbSopenharmony_ci        CheckCmdRedirect(command, CheckFunc)
10300600bfbSopenharmony_ci        # 校验命令行输出到zip文件
10400600bfbSopenharmony_ci        if IsRootVersion():
10500600bfbSopenharmony_ci            CheckCmdZip(command, CheckFunc)
10600600bfbSopenharmony_ci
10700600bfbSopenharmony_ci    @pytest.mark.L0
10800600bfbSopenharmony_ci    def test_mem_smaps_v(self):
10900600bfbSopenharmony_ci        processName = "render_service"
11000600bfbSopenharmony_ci        pid = GetPidByProcessName(processName)
11100600bfbSopenharmony_ci        if (IsRootVersion()):
11200600bfbSopenharmony_ci            CheckFunc = CheckMemSmapsVWithRoot
11300600bfbSopenharmony_ci        else:
11400600bfbSopenharmony_ci            CheckFunc = CheckHelpOutput
11500600bfbSopenharmony_ci        command = f"hidumper --mem-smaps {pid} -v"
11600600bfbSopenharmony_ci        # 校验命令行输出
11700600bfbSopenharmony_ci        CheckCmd(command, CheckFunc)
11800600bfbSopenharmony_ci        # 校验命令行重定向输出
11900600bfbSopenharmony_ci        CheckCmdRedirect(command, CheckFunc)
12000600bfbSopenharmony_ci        # 校验命令行输出到zip文件
12100600bfbSopenharmony_ci        if IsRootVersion():
12200600bfbSopenharmony_ci            CheckCmdZip(command, CheckFunc)
12300600bfbSopenharmony_ci
12400600bfbSopenharmony_ci    @pytest.mark.L0
12500600bfbSopenharmony_ci    def test_process_all(self):
12600600bfbSopenharmony_ci        if IsRootVersion():
12700600bfbSopenharmony_ci            CheckFunc = lambda x : all([CheckSmaps(x), CheckPseft(x), CheckMountInfo(x)])
12800600bfbSopenharmony_ci        else:
12900600bfbSopenharmony_ci            CheckFunc = lambda x : all([not CheckSmaps(x), CheckPseft(x), CheckMountInfo(x)])
13000600bfbSopenharmony_ci            pid = GetPidByProcessName("com.example.myapplication")
13100600bfbSopenharmony_ci            if pid == "":
13200600bfbSopenharmony_ci                pytest.skip("test application not found")
13300600bfbSopenharmony_ci        command = f"hidumper -p"
13400600bfbSopenharmony_ci        # 校验命令行输出
13500600bfbSopenharmony_ci        CheckCmd(command, CheckFunc)
13600600bfbSopenharmony_ci        # 校验命令行重定向输出
13700600bfbSopenharmony_ci        CheckCmdRedirect(command, CheckFunc)
13800600bfbSopenharmony_ci        # 校验命令行输出到zip文件
13900600bfbSopenharmony_ci        CheckCmdZip(command, CheckFunc)
14000600bfbSopenharmony_ci
14100600bfbSopenharmony_ci    @pytest.mark.L0
14200600bfbSopenharmony_ci    def test_process_pid(self):
14300600bfbSopenharmony_ci        command = None
14400600bfbSopenharmony_ci        pid = 1
14500600bfbSopenharmony_ci        if IsRootVersion():
14600600bfbSopenharmony_ci            CheckFunc = lambda x : all([CheckSmaps(x), CheckPseft(x), CheckMountInfo(x)])
14700600bfbSopenharmony_ci        else:
14800600bfbSopenharmony_ci            CheckFunc = lambda x : all([not CheckSmaps(x), CheckPseft(x), CheckMountInfo(x)])
14900600bfbSopenharmony_ci            pid = GetPidByProcessName("com.example.myapplication")
15000600bfbSopenharmony_ci            if pid == "":
15100600bfbSopenharmony_ci                pytest.skip("test application not found")
15200600bfbSopenharmony_ci        command = f"hidumper -p {pid}"
15300600bfbSopenharmony_ci        # 校验命令行输出
15400600bfbSopenharmony_ci        CheckCmd(command, CheckFunc)
15500600bfbSopenharmony_ci        # 校验命令行重定向输出
15600600bfbSopenharmony_ci        CheckCmdRedirect(command, CheckFunc)
15700600bfbSopenharmony_ci        # 校验命令行输出到zip文件
15800600bfbSopenharmony_ci        CheckCmdZip(command, CheckFunc)
15900600bfbSopenharmony_ci
16000600bfbSopenharmony_ci    @pytest.mark.L0
16100600bfbSopenharmony_ci    def test_hidumper_help(self):
16200600bfbSopenharmony_ci        if not IsRootVersion():
16300600bfbSopenharmony_ci            output = subprocess.check_output("hdc shell \"hidumper -h |grep mem\"", shell=True, text=True, encoding="utf-8")
16400600bfbSopenharmony_ci            assert "--mem [pid]" in output
16500600bfbSopenharmony_ci
16600600bfbSopenharmony_ci    @pytest.mark.L0
16700600bfbSopenharmony_ci    def test_process_with_non_debug_pid(self):
16800600bfbSopenharmony_ci        if not IsRootVersion():
16900600bfbSopenharmony_ci            output = subprocess.check_output("hdc shell \"hidumper -p 1\"", shell=True, text=True, encoding="utf-8")
17000600bfbSopenharmony_ci            assert "only support debug application" in output
17100600bfbSopenharmony_ci
17200600bfbSopenharmony_ci    @pytest.mark.L0
17300600bfbSopenharmony_ci    def test_output_append(self):
17400600bfbSopenharmony_ci        if IsRootVersion():
17500600bfbSopenharmony_ci            command = "hdc shell \"hidumper --mem 1 >> /data/log/hidumper.log\""
17600600bfbSopenharmony_ci            # 校验命令行输出到zip文件
17700600bfbSopenharmony_ci            subprocess.check_call(command, shell=True)
17800600bfbSopenharmony_ci            output = subprocess.check_output("hdc shell ls -l /data/log/hidumper.log", shell=True).decode()
17900600bfbSopenharmony_ci            assert int(output.strip().split()[4]) > 0
18000600bfbSopenharmony_ci        else:
18100600bfbSopenharmony_ci            pytest.skip("test only in root mode")
18200600bfbSopenharmony_ci
18300600bfbSopenharmony_ci    @pytest.mark.L0
18400600bfbSopenharmony_ci    def test_net(self):
18500600bfbSopenharmony_ci        if IsRootVersion():
18600600bfbSopenharmony_ci            command = "hdc shell \"hidumper --net\""
18700600bfbSopenharmony_ci            subprocess.check_call(command, shell=True)
18800600bfbSopenharmony_ci            output = subprocess.check_output("hdc shell ls -l /data/log/hidumper.log", shell=True).decode()
18900600bfbSopenharmony_ci            assert int(output.strip().split()[4]) > 0
19000600bfbSopenharmony_ci        else:
19100600bfbSopenharmony_ci            pytest.skip("test only in root mode")
19200600bfbSopenharmony_ci