100600bfbSopenharmony_ci#!/usr/bin/env python3
200600bfbSopenharmony_ci# -*- coding: utf-8 -*-
300600bfbSopenharmony_ci# Copyright (C) 2024 Huawei Device Co., Ltd.
400600bfbSopenharmony_ci# Licensed under the Apache License, Version 2.0 (the "License");
500600bfbSopenharmony_ci# you may not use this file except in compliance with the License.
600600bfbSopenharmony_ci# You may obtain a copy of the License at
700600bfbSopenharmony_ci#
800600bfbSopenharmony_ci#     http://www.apache.org/licenses/LICENSE-2.0
900600bfbSopenharmony_ci#
1000600bfbSopenharmony_ci# Unless required by applicable law or agreed to in writing, software
1100600bfbSopenharmony_ci# distributed under the License is distributed on an "AS IS" BASIS,
1200600bfbSopenharmony_ci# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1300600bfbSopenharmony_ci# See the License for the specific language governing permissions and
1400600bfbSopenharmony_ci# limitations under the License.
1500600bfbSopenharmony_ci
1600600bfbSopenharmony_ciimport pytest
1700600bfbSopenharmony_ciimport re
1800600bfbSopenharmony_cifrom utils import *
1900600bfbSopenharmony_ci
2000600bfbSopenharmony_ciPSS_TOTAL_INDEX = 0
2100600bfbSopenharmony_ciSWAP_PSS_INDEX = 6
2200600bfbSopenharmony_ciCOLUMN_NUM = 10
2300600bfbSopenharmony_ci
2400600bfbSopenharmony_cidef WaitUntillOutputAppear(command, targetStr, second):
2500600bfbSopenharmony_ci    time.sleep(1)
2600600bfbSopenharmony_ci    command = None
2700600bfbSopenharmony_ci    if IsRootVersion():
2800600bfbSopenharmony_ci        command = "hdc shell \"ls -l /data/log/faultlog/temp |grep jsheap\""
2900600bfbSopenharmony_ci    else:
3000600bfbSopenharmony_ci        command = "hdc shell \"ls -l /data/log/reliability/resource_leak/memory_leak |grep jsheap\""
3100600bfbSopenharmony_ci    output = subprocess.check_output(command, shell=True, text=True, encoding="utf-8").strip()
3200600bfbSopenharmony_ci    return output != ""
3300600bfbSopenharmony_ci
3400600bfbSopenharmony_cidef ParseMemoryOutput(output):
3500600bfbSopenharmony_ci    memory_data = {}
3600600bfbSopenharmony_ci    for line in output.split("\n"):
3700600bfbSopenharmony_ci        if re.search(r"\d", line) is None:
3800600bfbSopenharmony_ci            continue
3900600bfbSopenharmony_ci        key = re.search(r'\s*([a-zA-Z.]+(?:\s+[a-zA-Z.]+)*)\s*', line).group(1).strip();
4000600bfbSopenharmony_ci        memory_data[key] = [int(val) for val in re.findall(r'\d+', line)]
4100600bfbSopenharmony_ci    return memory_data
4200600bfbSopenharmony_ci
4300600bfbSopenharmony_ci@print_check_result
4400600bfbSopenharmony_cidef CheckTotalPss(memory_data):
4500600bfbSopenharmony_ci    pss_sum = sum([val[0] for key,val in memory_data.items() if key in ["GL", "Graph", "guard", "native heap", "AnonPage other", "stack", ".so", ".ttf", "dev", "dmabuf", "FilePage other"]])
4600600bfbSopenharmony_ci    return memory_data["Total"][PSS_TOTAL_INDEX] == (pss_sum + memory_data["Total"][SWAP_PSS_INDEX])
4700600bfbSopenharmony_ci
4800600bfbSopenharmony_ci@print_check_result
4900600bfbSopenharmony_cidef CheckDataLength(memory_data):
5000600bfbSopenharmony_ci    GL_mem = memory_data.get("GL", [])
5100600bfbSopenharmony_ci    assert len(GL_mem) == COLUMN_NUM, "GL memory data error"
5200600bfbSopenharmony_ci    Graph_mem = memory_data.get("Graph", [])
5300600bfbSopenharmony_ci    assert len(Graph_mem) == COLUMN_NUM, "Graph memory data error"
5400600bfbSopenharmony_ci    guard_mem = memory_data.get("guard", [])
5500600bfbSopenharmony_ci    assert len(guard_mem) == COLUMN_NUM, "Guard memory data error"
5600600bfbSopenharmony_ci    native_heap_mem = memory_data.get("native heap", [])
5700600bfbSopenharmony_ci    assert len(native_heap_mem) == COLUMN_NUM, "native heap memory data error"
5800600bfbSopenharmony_ci    AnonPage_other = memory_data.get("AnonPage other", [])
5900600bfbSopenharmony_ci    assert len(AnonPage_other) == COLUMN_NUM, "AnonPage other memory data error"
6000600bfbSopenharmony_ci    stack_mem = memory_data.get("stack", [])
6100600bfbSopenharmony_ci    assert len(stack_mem) == COLUMN_NUM, "stack memory data error"
6200600bfbSopenharmony_ci    so_mem = memory_data.get(".so", [])
6300600bfbSopenharmony_ci    assert len(so_mem) == COLUMN_NUM, ".so memory data error"
6400600bfbSopenharmony_ci    dev_mem = memory_data.get("dev", [])
6500600bfbSopenharmony_ci    assert len(dev_mem) == COLUMN_NUM, "dev memory data error"
6600600bfbSopenharmony_ci    FilePage_other = memory_data.get("FilePage other", [])
6700600bfbSopenharmony_ci    assert len(FilePage_other) == COLUMN_NUM, "FilePage other memory data error"
6800600bfbSopenharmony_ci    Total_mem = memory_data.get("Total", [])
6900600bfbSopenharmony_ci    assert len(Total_mem) == COLUMN_NUM, "Total memory data error"
7000600bfbSopenharmony_ci    return True
7100600bfbSopenharmony_ci
7200600bfbSopenharmony_ci@print_check_result
7300600bfbSopenharmony_cidef CheckDmaGraphMem(memory_data):
7400600bfbSopenharmony_ci    return memory_data["Dma"][0] == memory_data["Graph"][PSS_TOTAL_INDEX]
7500600bfbSopenharmony_ci
7600600bfbSopenharmony_cidef CheckHidumperMemoryWithPidOutput(output):
7700600bfbSopenharmony_ci    memory_data = ParseMemoryOutput(output)
7800600bfbSopenharmony_ci    ret = all(check(memory_data) for check in [CheckDmaGraphMem, CheckTotalPss, CheckDataLength])
7900600bfbSopenharmony_ci    return ret
8000600bfbSopenharmony_ci
8100600bfbSopenharmony_cidef CheckHidumperMemoryWithoutPidOutput(output):
8200600bfbSopenharmony_ci    graph = re.search(r"Graph\((\d+) kB\)", output).group(1)
8300600bfbSopenharmony_ci    dma = re.search(r"DMA\((\d+) kB\)", output).group(1)
8400600bfbSopenharmony_ci    return int(graph) == int(dma)
8500600bfbSopenharmony_ci
8600600bfbSopenharmony_ciclass TestHidumperMemory:
8700600bfbSopenharmony_ci    @pytest.mark.L0
8800600bfbSopenharmony_ci    def test_memory_all(self):
8900600bfbSopenharmony_ci        command = f"hidumper --mem"
9000600bfbSopenharmony_ci        # 校验命令行输出
9100600bfbSopenharmony_ci        CheckCmd(command, CheckHidumperMemoryWithoutPidOutput)
9200600bfbSopenharmony_ci        # 校验命令行重定向输出
9300600bfbSopenharmony_ci        CheckCmdRedirect(command, CheckHidumperMemoryWithoutPidOutput)
9400600bfbSopenharmony_ci        # 校验命令行输出到zip文件
9500600bfbSopenharmony_ci        CheckCmdZip(command, CheckHidumperMemoryWithoutPidOutput)
9600600bfbSopenharmony_ci
9700600bfbSopenharmony_ci    @pytest.mark.L0
9800600bfbSopenharmony_ci    def test_memory_pid(self):
9900600bfbSopenharmony_ci        processName = "render_service"
10000600bfbSopenharmony_ci        pid = GetPidByProcessName(processName)
10100600bfbSopenharmony_ci        command = f"hidumper --mem {pid}"
10200600bfbSopenharmony_ci        # 校验命令行输出
10300600bfbSopenharmony_ci        CheckCmd(command, CheckHidumperMemoryWithPidOutput)
10400600bfbSopenharmony_ci        # 校验命令行重定向输出
10500600bfbSopenharmony_ci        CheckCmdRedirect(command, CheckHidumperMemoryWithPidOutput)
10600600bfbSopenharmony_ci        # 校验命令行输出到zip文件
10700600bfbSopenharmony_ci        CheckCmdZip(command, CheckHidumperMemoryWithPidOutput)
10800600bfbSopenharmony_ci
10900600bfbSopenharmony_ci
11000600bfbSopenharmony_ciclass TestHidumperMemoryJsheap:
11100600bfbSopenharmony_ci    @classmethod
11200600bfbSopenharmony_ci    def setup_class(cls):
11300600bfbSopenharmony_ci        if not IsRootVersion():
11400600bfbSopenharmony_ci            subprocess.check_call("hdc shell aa start -a EntryAbility -b com.example.myapplication", shell=True)
11500600bfbSopenharmony_ci
11600600bfbSopenharmony_ci    @classmethod
11700600bfbSopenharmony_ci    def teardown_class(cls):
11800600bfbSopenharmony_ci        if not IsRootVersion():
11900600bfbSopenharmony_ci            subprocess.check_call("hdc shell aa force-stop -b com.example.myapplication", shell=True)
12000600bfbSopenharmony_ci
12100600bfbSopenharmony_ci    def teardown_method(self):
12200600bfbSopenharmony_ci        if not IsRootVersion():
12300600bfbSopenharmony_ci            subprocess.check_call("hdc shell \"rm -rf /data/log/reliability/resource_leak/memory_leak/*\"", shell=True)
12400600bfbSopenharmony_ci        else:
12500600bfbSopenharmony_ci            subprocess.check_call("hdc shell \"rm -rf /data/log/faultlog/temp/*\"", shell=True)
12600600bfbSopenharmony_ci
12700600bfbSopenharmony_ci    @pytest.mark.L0
12800600bfbSopenharmony_ci    def test_mem_jsheap(self):
12900600bfbSopenharmony_ci        pid = None
13000600bfbSopenharmony_ci        if IsOpenHarmonyVersion():
13100600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.launcher")
13200600bfbSopenharmony_ci        elif IsRootVersion():
13300600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.sceneboard")
13400600bfbSopenharmony_ci        else:
13500600bfbSopenharmony_ci            pid = GetPidByProcessName("com.example.myapplication")
13600600bfbSopenharmony_ci            if pid == "":
13700600bfbSopenharmony_ci                pytest.skip("test application not found")
13800600bfbSopenharmony_ci        command = f"hdc shell \"hidumper --mem-jsheap {pid}\""
13900600bfbSopenharmony_ci        # 校验命令行输出
14000600bfbSopenharmony_ci        subprocess.check_call(command, shell=True)
14100600bfbSopenharmony_ci        if IsRootVersion():
14200600bfbSopenharmony_ci            assert WaitUntillOutputAppear("hdc shell \"ls -l /data/log/faultlog/temp |grep jsheap\"", "jsheap", 10)
14300600bfbSopenharmony_ci        else:
14400600bfbSopenharmony_ci            assert WaitUntillOutputAppear("hdc shell \"ls -l /data/log/reliability/resource_leak/memory_leak |grep jsheap\"", "jsheap", 10)
14500600bfbSopenharmony_ci
14600600bfbSopenharmony_ci    @pytest.mark.L0
14700600bfbSopenharmony_ci    def test_mem_jsheap_T(self):
14800600bfbSopenharmony_ci        pid = None
14900600bfbSopenharmony_ci        if IsOpenHarmonyVersion():
15000600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.launcher")
15100600bfbSopenharmony_ci        elif IsRootVersion():
15200600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.sceneboard")
15300600bfbSopenharmony_ci        else:
15400600bfbSopenharmony_ci            pid = GetPidByProcessName("com.example.myapplication")
15500600bfbSopenharmony_ci            if pid == "":
15600600bfbSopenharmony_ci                pytest.skip("test application not found")
15700600bfbSopenharmony_ci        command = f"hdc shell \"hidumper --mem-jsheap {pid} -T {pid}\""
15800600bfbSopenharmony_ci        # 校验命令行输出
15900600bfbSopenharmony_ci        subprocess.check_call(command, shell=True)
16000600bfbSopenharmony_ci        if IsRootVersion():
16100600bfbSopenharmony_ci            assert WaitUntillOutputAppear("hdc shell \"ls -l /data/log/faultlog/temp |grep jsheap\"", "jsheap", 10)
16200600bfbSopenharmony_ci        else:
16300600bfbSopenharmony_ci            assert WaitUntillOutputAppear("hdc shell \"ls -l /data/log/reliability/resource_leak/memory_leak |grep jsheap\"", "jsheap", 10)
16400600bfbSopenharmony_ci
16500600bfbSopenharmony_ci    @pytest.mark.L0
16600600bfbSopenharmony_ci    def test_mem_jsheap_gc(self):
16700600bfbSopenharmony_ci        pid = None
16800600bfbSopenharmony_ci        if IsOpenHarmonyVersion():
16900600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.launcher")
17000600bfbSopenharmony_ci        elif IsRootVersion():
17100600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.sceneboard")
17200600bfbSopenharmony_ci        else:
17300600bfbSopenharmony_ci            pid = GetPidByProcessName("com.example.myapplication")
17400600bfbSopenharmony_ci            if pid == "":
17500600bfbSopenharmony_ci                pytest.skip("test application not found")
17600600bfbSopenharmony_ci        command = f"hdc shell \"hidumper --mem-jsheap {pid} --gc\""
17700600bfbSopenharmony_ci        # 校验命令行输出
17800600bfbSopenharmony_ci        subprocess.check_call(command, shell=True)
17900600bfbSopenharmony_ci        assert WaitUntillLogAppear("hdc shell \"hilog | grep ArkCompiler\"", f"TriggerGC tid 0 curTid {pid}", 10)
18000600bfbSopenharmony_ci
18100600bfbSopenharmony_ci    @pytest.mark.L0
18200600bfbSopenharmony_ci    def test_mem_jsheap_T_gc(self):
18300600bfbSopenharmony_ci        pid = None
18400600bfbSopenharmony_ci        if IsOpenHarmonyVersion():
18500600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.launcher")
18600600bfbSopenharmony_ci        elif IsRootVersion():
18700600bfbSopenharmony_ci            pid = GetPidByProcessName("com.ohos.sceneboard")
18800600bfbSopenharmony_ci        else:
18900600bfbSopenharmony_ci            pid = GetPidByProcessName("com.example.myapplication")
19000600bfbSopenharmony_ci            if pid == "":
19100600bfbSopenharmony_ci                pytest.skip("test application not found")
19200600bfbSopenharmony_ci        command = f"hdc shell \"hidumper --mem-jsheap {pid} -T {pid} --gc\""
19300600bfbSopenharmony_ci        # 校验命令行输出
19400600bfbSopenharmony_ci        subprocess.check_call(command, shell=True)
19500600bfbSopenharmony_ci        assert WaitUntillLogAppear("hdc shell \"hilog | grep ArkCompiler\"", f"TriggerGC tid 0 curTid {pid}", 10)
19600600bfbSopenharmony_ci
197