18c2ecf20Sopenharmony_ci# SPDX-License-Identifier: GPL-2.0 28c2ecf20Sopenharmony_ci# Copyright (c) 2020 SUSE LLC. 38c2ecf20Sopenharmony_ci 48c2ecf20Sopenharmony_ciimport collections 58c2ecf20Sopenharmony_ciimport functools 68c2ecf20Sopenharmony_ciimport json 78c2ecf20Sopenharmony_ciimport os 88c2ecf20Sopenharmony_ciimport socket 98c2ecf20Sopenharmony_ciimport subprocess 108c2ecf20Sopenharmony_ciimport unittest 118c2ecf20Sopenharmony_ci 128c2ecf20Sopenharmony_ci 138c2ecf20Sopenharmony_ci# Add the source tree of bpftool and /usr/local/sbin to PATH 148c2ecf20Sopenharmony_cicur_dir = os.path.dirname(os.path.realpath(__file__)) 158c2ecf20Sopenharmony_cibpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..", 168c2ecf20Sopenharmony_ci "tools", "bpf", "bpftool")) 178c2ecf20Sopenharmony_cios.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"] 188c2ecf20Sopenharmony_ci 198c2ecf20Sopenharmony_ci 208c2ecf20Sopenharmony_ciclass IfaceNotFoundError(Exception): 218c2ecf20Sopenharmony_ci pass 228c2ecf20Sopenharmony_ci 238c2ecf20Sopenharmony_ci 248c2ecf20Sopenharmony_ciclass UnprivilegedUserError(Exception): 258c2ecf20Sopenharmony_ci pass 268c2ecf20Sopenharmony_ci 278c2ecf20Sopenharmony_ci 288c2ecf20Sopenharmony_cidef _bpftool(args, json=True): 298c2ecf20Sopenharmony_ci _args = ["bpftool"] 308c2ecf20Sopenharmony_ci if json: 318c2ecf20Sopenharmony_ci _args.append("-j") 328c2ecf20Sopenharmony_ci _args.extend(args) 338c2ecf20Sopenharmony_ci 348c2ecf20Sopenharmony_ci return subprocess.check_output(_args) 358c2ecf20Sopenharmony_ci 368c2ecf20Sopenharmony_ci 378c2ecf20Sopenharmony_cidef bpftool(args): 388c2ecf20Sopenharmony_ci return _bpftool(args, json=False).decode("utf-8") 398c2ecf20Sopenharmony_ci 408c2ecf20Sopenharmony_ci 418c2ecf20Sopenharmony_cidef bpftool_json(args): 428c2ecf20Sopenharmony_ci res = _bpftool(args) 438c2ecf20Sopenharmony_ci return json.loads(res) 448c2ecf20Sopenharmony_ci 458c2ecf20Sopenharmony_ci 468c2ecf20Sopenharmony_cidef get_default_iface(): 478c2ecf20Sopenharmony_ci for iface in socket.if_nameindex(): 488c2ecf20Sopenharmony_ci if iface[1] != "lo": 498c2ecf20Sopenharmony_ci return iface[1] 508c2ecf20Sopenharmony_ci raise IfaceNotFoundError("Could not find any network interface to probe") 518c2ecf20Sopenharmony_ci 528c2ecf20Sopenharmony_ci 538c2ecf20Sopenharmony_cidef default_iface(f): 548c2ecf20Sopenharmony_ci @functools.wraps(f) 558c2ecf20Sopenharmony_ci def wrapper(*args, **kwargs): 568c2ecf20Sopenharmony_ci iface = get_default_iface() 578c2ecf20Sopenharmony_ci return f(*args, iface, **kwargs) 588c2ecf20Sopenharmony_ci return wrapper 598c2ecf20Sopenharmony_ci 608c2ecf20Sopenharmony_ci 618c2ecf20Sopenharmony_ciclass TestBpftool(unittest.TestCase): 628c2ecf20Sopenharmony_ci @classmethod 638c2ecf20Sopenharmony_ci def setUpClass(cls): 648c2ecf20Sopenharmony_ci if os.getuid() != 0: 658c2ecf20Sopenharmony_ci raise UnprivilegedUserError( 668c2ecf20Sopenharmony_ci "This test suite needs root privileges") 678c2ecf20Sopenharmony_ci 688c2ecf20Sopenharmony_ci @default_iface 698c2ecf20Sopenharmony_ci def test_feature_dev_json(self, iface): 708c2ecf20Sopenharmony_ci unexpected_helpers = [ 718c2ecf20Sopenharmony_ci "bpf_probe_write_user", 728c2ecf20Sopenharmony_ci "bpf_trace_printk", 738c2ecf20Sopenharmony_ci ] 748c2ecf20Sopenharmony_ci expected_keys = [ 758c2ecf20Sopenharmony_ci "syscall_config", 768c2ecf20Sopenharmony_ci "program_types", 778c2ecf20Sopenharmony_ci "map_types", 788c2ecf20Sopenharmony_ci "helpers", 798c2ecf20Sopenharmony_ci "misc", 808c2ecf20Sopenharmony_ci ] 818c2ecf20Sopenharmony_ci 828c2ecf20Sopenharmony_ci res = bpftool_json(["feature", "probe", "dev", iface]) 838c2ecf20Sopenharmony_ci # Check if the result has all expected keys. 848c2ecf20Sopenharmony_ci self.assertCountEqual(res.keys(), expected_keys) 858c2ecf20Sopenharmony_ci # Check if unexpected helpers are not included in helpers probes 868c2ecf20Sopenharmony_ci # result. 878c2ecf20Sopenharmony_ci for helpers in res["helpers"].values(): 888c2ecf20Sopenharmony_ci for unexpected_helper in unexpected_helpers: 898c2ecf20Sopenharmony_ci self.assertNotIn(unexpected_helper, helpers) 908c2ecf20Sopenharmony_ci 918c2ecf20Sopenharmony_ci def test_feature_kernel(self): 928c2ecf20Sopenharmony_ci test_cases = [ 938c2ecf20Sopenharmony_ci bpftool_json(["feature", "probe", "kernel"]), 948c2ecf20Sopenharmony_ci bpftool_json(["feature", "probe"]), 958c2ecf20Sopenharmony_ci bpftool_json(["feature"]), 968c2ecf20Sopenharmony_ci ] 978c2ecf20Sopenharmony_ci unexpected_helpers = [ 988c2ecf20Sopenharmony_ci "bpf_probe_write_user", 998c2ecf20Sopenharmony_ci "bpf_trace_printk", 1008c2ecf20Sopenharmony_ci ] 1018c2ecf20Sopenharmony_ci expected_keys = [ 1028c2ecf20Sopenharmony_ci "syscall_config", 1038c2ecf20Sopenharmony_ci "system_config", 1048c2ecf20Sopenharmony_ci "program_types", 1058c2ecf20Sopenharmony_ci "map_types", 1068c2ecf20Sopenharmony_ci "helpers", 1078c2ecf20Sopenharmony_ci "misc", 1088c2ecf20Sopenharmony_ci ] 1098c2ecf20Sopenharmony_ci 1108c2ecf20Sopenharmony_ci for tc in test_cases: 1118c2ecf20Sopenharmony_ci # Check if the result has all expected keys. 1128c2ecf20Sopenharmony_ci self.assertCountEqual(tc.keys(), expected_keys) 1138c2ecf20Sopenharmony_ci # Check if unexpected helpers are not included in helpers probes 1148c2ecf20Sopenharmony_ci # result. 1158c2ecf20Sopenharmony_ci for helpers in tc["helpers"].values(): 1168c2ecf20Sopenharmony_ci for unexpected_helper in unexpected_helpers: 1178c2ecf20Sopenharmony_ci self.assertNotIn(unexpected_helper, helpers) 1188c2ecf20Sopenharmony_ci 1198c2ecf20Sopenharmony_ci def test_feature_kernel_full(self): 1208c2ecf20Sopenharmony_ci test_cases = [ 1218c2ecf20Sopenharmony_ci bpftool_json(["feature", "probe", "kernel", "full"]), 1228c2ecf20Sopenharmony_ci bpftool_json(["feature", "probe", "full"]), 1238c2ecf20Sopenharmony_ci ] 1248c2ecf20Sopenharmony_ci expected_helpers = [ 1258c2ecf20Sopenharmony_ci "bpf_probe_write_user", 1268c2ecf20Sopenharmony_ci "bpf_trace_printk", 1278c2ecf20Sopenharmony_ci ] 1288c2ecf20Sopenharmony_ci 1298c2ecf20Sopenharmony_ci for tc in test_cases: 1308c2ecf20Sopenharmony_ci # Check if expected helpers are included at least once in any 1318c2ecf20Sopenharmony_ci # helpers list for any program type. Unfortunately we cannot assume 1328c2ecf20Sopenharmony_ci # that they will be included in all program types or a specific 1338c2ecf20Sopenharmony_ci # subset of programs. It depends on the kernel version and 1348c2ecf20Sopenharmony_ci # configuration. 1358c2ecf20Sopenharmony_ci found_helpers = False 1368c2ecf20Sopenharmony_ci 1378c2ecf20Sopenharmony_ci for helpers in tc["helpers"].values(): 1388c2ecf20Sopenharmony_ci if all(expected_helper in helpers 1398c2ecf20Sopenharmony_ci for expected_helper in expected_helpers): 1408c2ecf20Sopenharmony_ci found_helpers = True 1418c2ecf20Sopenharmony_ci break 1428c2ecf20Sopenharmony_ci 1438c2ecf20Sopenharmony_ci self.assertTrue(found_helpers) 1448c2ecf20Sopenharmony_ci 1458c2ecf20Sopenharmony_ci def test_feature_kernel_full_vs_not_full(self): 1468c2ecf20Sopenharmony_ci full_res = bpftool_json(["feature", "probe", "full"]) 1478c2ecf20Sopenharmony_ci not_full_res = bpftool_json(["feature", "probe"]) 1488c2ecf20Sopenharmony_ci not_full_set = set() 1498c2ecf20Sopenharmony_ci full_set = set() 1508c2ecf20Sopenharmony_ci 1518c2ecf20Sopenharmony_ci for helpers in full_res["helpers"].values(): 1528c2ecf20Sopenharmony_ci for helper in helpers: 1538c2ecf20Sopenharmony_ci full_set.add(helper) 1548c2ecf20Sopenharmony_ci 1558c2ecf20Sopenharmony_ci for helpers in not_full_res["helpers"].values(): 1568c2ecf20Sopenharmony_ci for helper in helpers: 1578c2ecf20Sopenharmony_ci not_full_set.add(helper) 1588c2ecf20Sopenharmony_ci 1598c2ecf20Sopenharmony_ci self.assertCountEqual(full_set - not_full_set, 1608c2ecf20Sopenharmony_ci {"bpf_probe_write_user", "bpf_trace_printk"}) 1618c2ecf20Sopenharmony_ci self.assertCountEqual(not_full_set - full_set, set()) 1628c2ecf20Sopenharmony_ci 1638c2ecf20Sopenharmony_ci def test_feature_macros(self): 1648c2ecf20Sopenharmony_ci expected_patterns = [ 1658c2ecf20Sopenharmony_ci r"/\*\*\* System call availability \*\*\*/", 1668c2ecf20Sopenharmony_ci r"#define HAVE_BPF_SYSCALL", 1678c2ecf20Sopenharmony_ci r"/\*\*\* eBPF program types \*\*\*/", 1688c2ecf20Sopenharmony_ci r"#define HAVE.*PROG_TYPE", 1698c2ecf20Sopenharmony_ci r"/\*\*\* eBPF map types \*\*\*/", 1708c2ecf20Sopenharmony_ci r"#define HAVE.*MAP_TYPE", 1718c2ecf20Sopenharmony_ci r"/\*\*\* eBPF helper functions \*\*\*/", 1728c2ecf20Sopenharmony_ci r"#define HAVE.*HELPER", 1738c2ecf20Sopenharmony_ci r"/\*\*\* eBPF misc features \*\*\*/", 1748c2ecf20Sopenharmony_ci ] 1758c2ecf20Sopenharmony_ci 1768c2ecf20Sopenharmony_ci res = bpftool(["feature", "probe", "macros"]) 1778c2ecf20Sopenharmony_ci for pattern in expected_patterns: 1788c2ecf20Sopenharmony_ci self.assertRegex(res, pattern) 179