18c2ecf20Sopenharmony_ci# SPDX-License-Identifier: GPL-2.0
28c2ecf20Sopenharmony_ci# Copyright (c) 2020 SUSE LLC.
38c2ecf20Sopenharmony_ci
48c2ecf20Sopenharmony_ciimport collections
58c2ecf20Sopenharmony_ciimport functools
68c2ecf20Sopenharmony_ciimport json
78c2ecf20Sopenharmony_ciimport os
88c2ecf20Sopenharmony_ciimport socket
98c2ecf20Sopenharmony_ciimport subprocess
108c2ecf20Sopenharmony_ciimport unittest
118c2ecf20Sopenharmony_ci
128c2ecf20Sopenharmony_ci
138c2ecf20Sopenharmony_ci# Add the source tree of bpftool and /usr/local/sbin to PATH
148c2ecf20Sopenharmony_cicur_dir = os.path.dirname(os.path.realpath(__file__))
158c2ecf20Sopenharmony_cibpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
168c2ecf20Sopenharmony_ci                                           "tools", "bpf", "bpftool"))
178c2ecf20Sopenharmony_cios.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
188c2ecf20Sopenharmony_ci
198c2ecf20Sopenharmony_ci
208c2ecf20Sopenharmony_ciclass IfaceNotFoundError(Exception):
218c2ecf20Sopenharmony_ci    pass
228c2ecf20Sopenharmony_ci
238c2ecf20Sopenharmony_ci
248c2ecf20Sopenharmony_ciclass UnprivilegedUserError(Exception):
258c2ecf20Sopenharmony_ci    pass
268c2ecf20Sopenharmony_ci
278c2ecf20Sopenharmony_ci
288c2ecf20Sopenharmony_cidef _bpftool(args, json=True):
298c2ecf20Sopenharmony_ci    _args = ["bpftool"]
308c2ecf20Sopenharmony_ci    if json:
318c2ecf20Sopenharmony_ci        _args.append("-j")
328c2ecf20Sopenharmony_ci    _args.extend(args)
338c2ecf20Sopenharmony_ci
348c2ecf20Sopenharmony_ci    return subprocess.check_output(_args)
358c2ecf20Sopenharmony_ci
368c2ecf20Sopenharmony_ci
378c2ecf20Sopenharmony_cidef bpftool(args):
388c2ecf20Sopenharmony_ci    return _bpftool(args, json=False).decode("utf-8")
398c2ecf20Sopenharmony_ci
408c2ecf20Sopenharmony_ci
418c2ecf20Sopenharmony_cidef bpftool_json(args):
428c2ecf20Sopenharmony_ci    res = _bpftool(args)
438c2ecf20Sopenharmony_ci    return json.loads(res)
448c2ecf20Sopenharmony_ci
458c2ecf20Sopenharmony_ci
468c2ecf20Sopenharmony_cidef get_default_iface():
478c2ecf20Sopenharmony_ci    for iface in socket.if_nameindex():
488c2ecf20Sopenharmony_ci        if iface[1] != "lo":
498c2ecf20Sopenharmony_ci            return iface[1]
508c2ecf20Sopenharmony_ci    raise IfaceNotFoundError("Could not find any network interface to probe")
518c2ecf20Sopenharmony_ci
528c2ecf20Sopenharmony_ci
538c2ecf20Sopenharmony_cidef default_iface(f):
548c2ecf20Sopenharmony_ci    @functools.wraps(f)
558c2ecf20Sopenharmony_ci    def wrapper(*args, **kwargs):
568c2ecf20Sopenharmony_ci        iface = get_default_iface()
578c2ecf20Sopenharmony_ci        return f(*args, iface, **kwargs)
588c2ecf20Sopenharmony_ci    return wrapper
598c2ecf20Sopenharmony_ci
608c2ecf20Sopenharmony_ci
618c2ecf20Sopenharmony_ciclass TestBpftool(unittest.TestCase):
628c2ecf20Sopenharmony_ci    @classmethod
638c2ecf20Sopenharmony_ci    def setUpClass(cls):
648c2ecf20Sopenharmony_ci        if os.getuid() != 0:
658c2ecf20Sopenharmony_ci            raise UnprivilegedUserError(
668c2ecf20Sopenharmony_ci                "This test suite needs root privileges")
678c2ecf20Sopenharmony_ci
688c2ecf20Sopenharmony_ci    @default_iface
698c2ecf20Sopenharmony_ci    def test_feature_dev_json(self, iface):
708c2ecf20Sopenharmony_ci        unexpected_helpers = [
718c2ecf20Sopenharmony_ci            "bpf_probe_write_user",
728c2ecf20Sopenharmony_ci            "bpf_trace_printk",
738c2ecf20Sopenharmony_ci        ]
748c2ecf20Sopenharmony_ci        expected_keys = [
758c2ecf20Sopenharmony_ci            "syscall_config",
768c2ecf20Sopenharmony_ci            "program_types",
778c2ecf20Sopenharmony_ci            "map_types",
788c2ecf20Sopenharmony_ci            "helpers",
798c2ecf20Sopenharmony_ci            "misc",
808c2ecf20Sopenharmony_ci        ]
818c2ecf20Sopenharmony_ci
828c2ecf20Sopenharmony_ci        res = bpftool_json(["feature", "probe", "dev", iface])
838c2ecf20Sopenharmony_ci        # Check if the result has all expected keys.
848c2ecf20Sopenharmony_ci        self.assertCountEqual(res.keys(), expected_keys)
858c2ecf20Sopenharmony_ci        # Check if unexpected helpers are not included in helpers probes
868c2ecf20Sopenharmony_ci        # result.
878c2ecf20Sopenharmony_ci        for helpers in res["helpers"].values():
888c2ecf20Sopenharmony_ci            for unexpected_helper in unexpected_helpers:
898c2ecf20Sopenharmony_ci                self.assertNotIn(unexpected_helper, helpers)
908c2ecf20Sopenharmony_ci
918c2ecf20Sopenharmony_ci    def test_feature_kernel(self):
928c2ecf20Sopenharmony_ci        test_cases = [
938c2ecf20Sopenharmony_ci            bpftool_json(["feature", "probe", "kernel"]),
948c2ecf20Sopenharmony_ci            bpftool_json(["feature", "probe"]),
958c2ecf20Sopenharmony_ci            bpftool_json(["feature"]),
968c2ecf20Sopenharmony_ci        ]
978c2ecf20Sopenharmony_ci        unexpected_helpers = [
988c2ecf20Sopenharmony_ci            "bpf_probe_write_user",
998c2ecf20Sopenharmony_ci            "bpf_trace_printk",
1008c2ecf20Sopenharmony_ci        ]
1018c2ecf20Sopenharmony_ci        expected_keys = [
1028c2ecf20Sopenharmony_ci            "syscall_config",
1038c2ecf20Sopenharmony_ci            "system_config",
1048c2ecf20Sopenharmony_ci            "program_types",
1058c2ecf20Sopenharmony_ci            "map_types",
1068c2ecf20Sopenharmony_ci            "helpers",
1078c2ecf20Sopenharmony_ci            "misc",
1088c2ecf20Sopenharmony_ci        ]
1098c2ecf20Sopenharmony_ci
1108c2ecf20Sopenharmony_ci        for tc in test_cases:
1118c2ecf20Sopenharmony_ci            # Check if the result has all expected keys.
1128c2ecf20Sopenharmony_ci            self.assertCountEqual(tc.keys(), expected_keys)
1138c2ecf20Sopenharmony_ci            # Check if unexpected helpers are not included in helpers probes
1148c2ecf20Sopenharmony_ci            # result.
1158c2ecf20Sopenharmony_ci            for helpers in tc["helpers"].values():
1168c2ecf20Sopenharmony_ci                for unexpected_helper in unexpected_helpers:
1178c2ecf20Sopenharmony_ci                    self.assertNotIn(unexpected_helper, helpers)
1188c2ecf20Sopenharmony_ci
1198c2ecf20Sopenharmony_ci    def test_feature_kernel_full(self):
1208c2ecf20Sopenharmony_ci        test_cases = [
1218c2ecf20Sopenharmony_ci            bpftool_json(["feature", "probe", "kernel", "full"]),
1228c2ecf20Sopenharmony_ci            bpftool_json(["feature", "probe", "full"]),
1238c2ecf20Sopenharmony_ci        ]
1248c2ecf20Sopenharmony_ci        expected_helpers = [
1258c2ecf20Sopenharmony_ci            "bpf_probe_write_user",
1268c2ecf20Sopenharmony_ci            "bpf_trace_printk",
1278c2ecf20Sopenharmony_ci        ]
1288c2ecf20Sopenharmony_ci
1298c2ecf20Sopenharmony_ci        for tc in test_cases:
1308c2ecf20Sopenharmony_ci            # Check if expected helpers are included at least once in any
1318c2ecf20Sopenharmony_ci            # helpers list for any program type. Unfortunately we cannot assume
1328c2ecf20Sopenharmony_ci            # that they will be included in all program types or a specific
1338c2ecf20Sopenharmony_ci            # subset of programs. It depends on the kernel version and
1348c2ecf20Sopenharmony_ci            # configuration.
1358c2ecf20Sopenharmony_ci            found_helpers = False
1368c2ecf20Sopenharmony_ci
1378c2ecf20Sopenharmony_ci            for helpers in tc["helpers"].values():
1388c2ecf20Sopenharmony_ci                if all(expected_helper in helpers
1398c2ecf20Sopenharmony_ci                       for expected_helper in expected_helpers):
1408c2ecf20Sopenharmony_ci                    found_helpers = True
1418c2ecf20Sopenharmony_ci                    break
1428c2ecf20Sopenharmony_ci
1438c2ecf20Sopenharmony_ci            self.assertTrue(found_helpers)
1448c2ecf20Sopenharmony_ci
1458c2ecf20Sopenharmony_ci    def test_feature_kernel_full_vs_not_full(self):
1468c2ecf20Sopenharmony_ci        full_res = bpftool_json(["feature", "probe", "full"])
1478c2ecf20Sopenharmony_ci        not_full_res = bpftool_json(["feature", "probe"])
1488c2ecf20Sopenharmony_ci        not_full_set = set()
1498c2ecf20Sopenharmony_ci        full_set = set()
1508c2ecf20Sopenharmony_ci
1518c2ecf20Sopenharmony_ci        for helpers in full_res["helpers"].values():
1528c2ecf20Sopenharmony_ci            for helper in helpers:
1538c2ecf20Sopenharmony_ci                full_set.add(helper)
1548c2ecf20Sopenharmony_ci
1558c2ecf20Sopenharmony_ci        for helpers in not_full_res["helpers"].values():
1568c2ecf20Sopenharmony_ci            for helper in helpers:
1578c2ecf20Sopenharmony_ci                not_full_set.add(helper)
1588c2ecf20Sopenharmony_ci
1598c2ecf20Sopenharmony_ci        self.assertCountEqual(full_set - not_full_set,
1608c2ecf20Sopenharmony_ci                                {"bpf_probe_write_user", "bpf_trace_printk"})
1618c2ecf20Sopenharmony_ci        self.assertCountEqual(not_full_set - full_set, set())
1628c2ecf20Sopenharmony_ci
1638c2ecf20Sopenharmony_ci    def test_feature_macros(self):
1648c2ecf20Sopenharmony_ci        expected_patterns = [
1658c2ecf20Sopenharmony_ci            r"/\*\*\* System call availability \*\*\*/",
1668c2ecf20Sopenharmony_ci            r"#define HAVE_BPF_SYSCALL",
1678c2ecf20Sopenharmony_ci            r"/\*\*\* eBPF program types \*\*\*/",
1688c2ecf20Sopenharmony_ci            r"#define HAVE.*PROG_TYPE",
1698c2ecf20Sopenharmony_ci            r"/\*\*\* eBPF map types \*\*\*/",
1708c2ecf20Sopenharmony_ci            r"#define HAVE.*MAP_TYPE",
1718c2ecf20Sopenharmony_ci            r"/\*\*\* eBPF helper functions \*\*\*/",
1728c2ecf20Sopenharmony_ci            r"#define HAVE.*HELPER",
1738c2ecf20Sopenharmony_ci            r"/\*\*\* eBPF misc features \*\*\*/",
1748c2ecf20Sopenharmony_ci        ]
1758c2ecf20Sopenharmony_ci
1768c2ecf20Sopenharmony_ci        res = bpftool(["feature", "probe", "macros"])
1778c2ecf20Sopenharmony_ci        for pattern in expected_patterns:
1788c2ecf20Sopenharmony_ci            self.assertRegex(res, pattern)
179