162306a36Sopenharmony_ci# SPDX-License-Identifier: GPL-2.0
262306a36Sopenharmony_ci# Copyright (c) 2020 SUSE LLC.
362306a36Sopenharmony_ci
462306a36Sopenharmony_ciimport collections
562306a36Sopenharmony_ciimport functools
662306a36Sopenharmony_ciimport json
762306a36Sopenharmony_ciimport os
862306a36Sopenharmony_ciimport socket
962306a36Sopenharmony_ciimport subprocess
1062306a36Sopenharmony_ciimport unittest
1162306a36Sopenharmony_ci
1262306a36Sopenharmony_ci
1362306a36Sopenharmony_ci# Add the source tree of bpftool and /usr/local/sbin to PATH
1462306a36Sopenharmony_cicur_dir = os.path.dirname(os.path.realpath(__file__))
1562306a36Sopenharmony_cibpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
1662306a36Sopenharmony_ci                                           "tools", "bpf", "bpftool"))
1762306a36Sopenharmony_cios.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
1862306a36Sopenharmony_ci
1962306a36Sopenharmony_ci
2062306a36Sopenharmony_ciclass IfaceNotFoundError(Exception):
2162306a36Sopenharmony_ci    pass
2262306a36Sopenharmony_ci
2362306a36Sopenharmony_ci
2462306a36Sopenharmony_ciclass UnprivilegedUserError(Exception):
2562306a36Sopenharmony_ci    pass
2662306a36Sopenharmony_ci
2762306a36Sopenharmony_ci
2862306a36Sopenharmony_cidef _bpftool(args, json=True):
2962306a36Sopenharmony_ci    _args = ["bpftool"]
3062306a36Sopenharmony_ci    if json:
3162306a36Sopenharmony_ci        _args.append("-j")
3262306a36Sopenharmony_ci    _args.extend(args)
3362306a36Sopenharmony_ci
3462306a36Sopenharmony_ci    return subprocess.check_output(_args)
3562306a36Sopenharmony_ci
3662306a36Sopenharmony_ci
3762306a36Sopenharmony_cidef bpftool(args):
3862306a36Sopenharmony_ci    return _bpftool(args, json=False).decode("utf-8")
3962306a36Sopenharmony_ci
4062306a36Sopenharmony_ci
4162306a36Sopenharmony_cidef bpftool_json(args):
4262306a36Sopenharmony_ci    res = _bpftool(args)
4362306a36Sopenharmony_ci    return json.loads(res)
4462306a36Sopenharmony_ci
4562306a36Sopenharmony_ci
4662306a36Sopenharmony_cidef get_default_iface():
4762306a36Sopenharmony_ci    for iface in socket.if_nameindex():
4862306a36Sopenharmony_ci        if iface[1] != "lo":
4962306a36Sopenharmony_ci            return iface[1]
5062306a36Sopenharmony_ci    raise IfaceNotFoundError("Could not find any network interface to probe")
5162306a36Sopenharmony_ci
5262306a36Sopenharmony_ci
5362306a36Sopenharmony_cidef default_iface(f):
5462306a36Sopenharmony_ci    @functools.wraps(f)
5562306a36Sopenharmony_ci    def wrapper(*args, **kwargs):
5662306a36Sopenharmony_ci        iface = get_default_iface()
5762306a36Sopenharmony_ci        return f(*args, iface, **kwargs)
5862306a36Sopenharmony_ci    return wrapper
5962306a36Sopenharmony_ci
6062306a36Sopenharmony_ciDMESG_EMITTING_HELPERS = [
6162306a36Sopenharmony_ci        "bpf_probe_write_user",
6262306a36Sopenharmony_ci        "bpf_trace_printk",
6362306a36Sopenharmony_ci        "bpf_trace_vprintk",
6462306a36Sopenharmony_ci    ]
6562306a36Sopenharmony_ci
6662306a36Sopenharmony_ciclass TestBpftool(unittest.TestCase):
6762306a36Sopenharmony_ci    @classmethod
6862306a36Sopenharmony_ci    def setUpClass(cls):
6962306a36Sopenharmony_ci        if os.getuid() != 0:
7062306a36Sopenharmony_ci            raise UnprivilegedUserError(
7162306a36Sopenharmony_ci                "This test suite needs root privileges")
7262306a36Sopenharmony_ci
7362306a36Sopenharmony_ci    @default_iface
7462306a36Sopenharmony_ci    def test_feature_dev_json(self, iface):
7562306a36Sopenharmony_ci        unexpected_helpers = DMESG_EMITTING_HELPERS
7662306a36Sopenharmony_ci        expected_keys = [
7762306a36Sopenharmony_ci            "syscall_config",
7862306a36Sopenharmony_ci            "program_types",
7962306a36Sopenharmony_ci            "map_types",
8062306a36Sopenharmony_ci            "helpers",
8162306a36Sopenharmony_ci            "misc",
8262306a36Sopenharmony_ci        ]
8362306a36Sopenharmony_ci
8462306a36Sopenharmony_ci        res = bpftool_json(["feature", "probe", "dev", iface])
8562306a36Sopenharmony_ci        # Check if the result has all expected keys.
8662306a36Sopenharmony_ci        self.assertCountEqual(res.keys(), expected_keys)
8762306a36Sopenharmony_ci        # Check if unexpected helpers are not included in helpers probes
8862306a36Sopenharmony_ci        # result.
8962306a36Sopenharmony_ci        for helpers in res["helpers"].values():
9062306a36Sopenharmony_ci            for unexpected_helper in unexpected_helpers:
9162306a36Sopenharmony_ci                self.assertNotIn(unexpected_helper, helpers)
9262306a36Sopenharmony_ci
9362306a36Sopenharmony_ci    def test_feature_kernel(self):
9462306a36Sopenharmony_ci        test_cases = [
9562306a36Sopenharmony_ci            bpftool_json(["feature", "probe", "kernel"]),
9662306a36Sopenharmony_ci            bpftool_json(["feature", "probe"]),
9762306a36Sopenharmony_ci            bpftool_json(["feature"]),
9862306a36Sopenharmony_ci        ]
9962306a36Sopenharmony_ci        unexpected_helpers = DMESG_EMITTING_HELPERS
10062306a36Sopenharmony_ci        expected_keys = [
10162306a36Sopenharmony_ci            "syscall_config",
10262306a36Sopenharmony_ci            "system_config",
10362306a36Sopenharmony_ci            "program_types",
10462306a36Sopenharmony_ci            "map_types",
10562306a36Sopenharmony_ci            "helpers",
10662306a36Sopenharmony_ci            "misc",
10762306a36Sopenharmony_ci        ]
10862306a36Sopenharmony_ci
10962306a36Sopenharmony_ci        for tc in test_cases:
11062306a36Sopenharmony_ci            # Check if the result has all expected keys.
11162306a36Sopenharmony_ci            self.assertCountEqual(tc.keys(), expected_keys)
11262306a36Sopenharmony_ci            # Check if unexpected helpers are not included in helpers probes
11362306a36Sopenharmony_ci            # result.
11462306a36Sopenharmony_ci            for helpers in tc["helpers"].values():
11562306a36Sopenharmony_ci                for unexpected_helper in unexpected_helpers:
11662306a36Sopenharmony_ci                    self.assertNotIn(unexpected_helper, helpers)
11762306a36Sopenharmony_ci
11862306a36Sopenharmony_ci    def test_feature_kernel_full(self):
11962306a36Sopenharmony_ci        test_cases = [
12062306a36Sopenharmony_ci            bpftool_json(["feature", "probe", "kernel", "full"]),
12162306a36Sopenharmony_ci            bpftool_json(["feature", "probe", "full"]),
12262306a36Sopenharmony_ci        ]
12362306a36Sopenharmony_ci        expected_helpers = DMESG_EMITTING_HELPERS
12462306a36Sopenharmony_ci
12562306a36Sopenharmony_ci        for tc in test_cases:
12662306a36Sopenharmony_ci            # Check if expected helpers are included at least once in any
12762306a36Sopenharmony_ci            # helpers list for any program type. Unfortunately we cannot assume
12862306a36Sopenharmony_ci            # that they will be included in all program types or a specific
12962306a36Sopenharmony_ci            # subset of programs. It depends on the kernel version and
13062306a36Sopenharmony_ci            # configuration.
13162306a36Sopenharmony_ci            found_helpers = False
13262306a36Sopenharmony_ci
13362306a36Sopenharmony_ci            for helpers in tc["helpers"].values():
13462306a36Sopenharmony_ci                if all(expected_helper in helpers
13562306a36Sopenharmony_ci                       for expected_helper in expected_helpers):
13662306a36Sopenharmony_ci                    found_helpers = True
13762306a36Sopenharmony_ci                    break
13862306a36Sopenharmony_ci
13962306a36Sopenharmony_ci            self.assertTrue(found_helpers)
14062306a36Sopenharmony_ci
14162306a36Sopenharmony_ci    def test_feature_kernel_full_vs_not_full(self):
14262306a36Sopenharmony_ci        full_res = bpftool_json(["feature", "probe", "full"])
14362306a36Sopenharmony_ci        not_full_res = bpftool_json(["feature", "probe"])
14462306a36Sopenharmony_ci        not_full_set = set()
14562306a36Sopenharmony_ci        full_set = set()
14662306a36Sopenharmony_ci
14762306a36Sopenharmony_ci        for helpers in full_res["helpers"].values():
14862306a36Sopenharmony_ci            for helper in helpers:
14962306a36Sopenharmony_ci                full_set.add(helper)
15062306a36Sopenharmony_ci
15162306a36Sopenharmony_ci        for helpers in not_full_res["helpers"].values():
15262306a36Sopenharmony_ci            for helper in helpers:
15362306a36Sopenharmony_ci                not_full_set.add(helper)
15462306a36Sopenharmony_ci
15562306a36Sopenharmony_ci        self.assertCountEqual(full_set - not_full_set,
15662306a36Sopenharmony_ci                              set(DMESG_EMITTING_HELPERS))
15762306a36Sopenharmony_ci        self.assertCountEqual(not_full_set - full_set, set())
15862306a36Sopenharmony_ci
15962306a36Sopenharmony_ci    def test_feature_macros(self):
16062306a36Sopenharmony_ci        expected_patterns = [
16162306a36Sopenharmony_ci            r"/\*\*\* System call availability \*\*\*/",
16262306a36Sopenharmony_ci            r"#define HAVE_BPF_SYSCALL",
16362306a36Sopenharmony_ci            r"/\*\*\* eBPF program types \*\*\*/",
16462306a36Sopenharmony_ci            r"#define HAVE.*PROG_TYPE",
16562306a36Sopenharmony_ci            r"/\*\*\* eBPF map types \*\*\*/",
16662306a36Sopenharmony_ci            r"#define HAVE.*MAP_TYPE",
16762306a36Sopenharmony_ci            r"/\*\*\* eBPF helper functions \*\*\*/",
16862306a36Sopenharmony_ci            r"#define HAVE.*HELPER",
16962306a36Sopenharmony_ci            r"/\*\*\* eBPF misc features \*\*\*/",
17062306a36Sopenharmony_ci        ]
17162306a36Sopenharmony_ci
17262306a36Sopenharmony_ci        res = bpftool(["feature", "probe", "macros"])
17362306a36Sopenharmony_ci        for pattern in expected_patterns:
17462306a36Sopenharmony_ci            self.assertRegex(res, pattern)
175