175 lines
5.3 KiB
Python
175 lines
5.3 KiB
Python
|
# SPDX-License-Identifier: GPL-2.0
|
||
|
# Copyright (c) 2020 SUSE LLC.
|
||
|
|
||
|
import collections
|
||
|
import functools
|
||
|
import json
|
||
|
import os
|
||
|
import socket
|
||
|
import subprocess
|
||
|
import unittest
|
||
|
|
||
|
|
||
|
# Add the source tree of bpftool and /usr/local/sbin to PATH
|
||
|
cur_dir = os.path.dirname(os.path.realpath(__file__))
|
||
|
bpftool_dir = os.path.abspath(os.path.join(cur_dir, "..", "..", "..", "..",
|
||
|
"tools", "bpf", "bpftool"))
|
||
|
os.environ["PATH"] = bpftool_dir + ":/usr/local/sbin:" + os.environ["PATH"]
|
||
|
|
||
|
|
||
|
class IfaceNotFoundError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class UnprivilegedUserError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
def _bpftool(args, json=True):
|
||
|
_args = ["bpftool"]
|
||
|
if json:
|
||
|
_args.append("-j")
|
||
|
_args.extend(args)
|
||
|
|
||
|
return subprocess.check_output(_args)
|
||
|
|
||
|
|
||
|
def bpftool(args):
|
||
|
return _bpftool(args, json=False).decode("utf-8")
|
||
|
|
||
|
|
||
|
def bpftool_json(args):
|
||
|
res = _bpftool(args)
|
||
|
return json.loads(res)
|
||
|
|
||
|
|
||
|
def get_default_iface():
|
||
|
for iface in socket.if_nameindex():
|
||
|
if iface[1] != "lo":
|
||
|
return iface[1]
|
||
|
raise IfaceNotFoundError("Could not find any network interface to probe")
|
||
|
|
||
|
|
||
|
def default_iface(f):
|
||
|
@functools.wraps(f)
|
||
|
def wrapper(*args, **kwargs):
|
||
|
iface = get_default_iface()
|
||
|
return f(*args, iface, **kwargs)
|
||
|
return wrapper
|
||
|
|
||
|
DMESG_EMITTING_HELPERS = [
|
||
|
"bpf_probe_write_user",
|
||
|
"bpf_trace_printk",
|
||
|
"bpf_trace_vprintk",
|
||
|
]
|
||
|
|
||
|
class TestBpftool(unittest.TestCase):
|
||
|
@classmethod
|
||
|
def setUpClass(cls):
|
||
|
if os.getuid() != 0:
|
||
|
raise UnprivilegedUserError(
|
||
|
"This test suite needs root privileges")
|
||
|
|
||
|
@default_iface
|
||
|
def test_feature_dev_json(self, iface):
|
||
|
unexpected_helpers = DMESG_EMITTING_HELPERS
|
||
|
expected_keys = [
|
||
|
"syscall_config",
|
||
|
"program_types",
|
||
|
"map_types",
|
||
|
"helpers",
|
||
|
"misc",
|
||
|
]
|
||
|
|
||
|
res = bpftool_json(["feature", "probe", "dev", iface])
|
||
|
# Check if the result has all expected keys.
|
||
|
self.assertCountEqual(res.keys(), expected_keys)
|
||
|
# Check if unexpected helpers are not included in helpers probes
|
||
|
# result.
|
||
|
for helpers in res["helpers"].values():
|
||
|
for unexpected_helper in unexpected_helpers:
|
||
|
self.assertNotIn(unexpected_helper, helpers)
|
||
|
|
||
|
def test_feature_kernel(self):
|
||
|
test_cases = [
|
||
|
bpftool_json(["feature", "probe", "kernel"]),
|
||
|
bpftool_json(["feature", "probe"]),
|
||
|
bpftool_json(["feature"]),
|
||
|
]
|
||
|
unexpected_helpers = DMESG_EMITTING_HELPERS
|
||
|
expected_keys = [
|
||
|
"syscall_config",
|
||
|
"system_config",
|
||
|
"program_types",
|
||
|
"map_types",
|
||
|
"helpers",
|
||
|
"misc",
|
||
|
]
|
||
|
|
||
|
for tc in test_cases:
|
||
|
# Check if the result has all expected keys.
|
||
|
self.assertCountEqual(tc.keys(), expected_keys)
|
||
|
# Check if unexpected helpers are not included in helpers probes
|
||
|
# result.
|
||
|
for helpers in tc["helpers"].values():
|
||
|
for unexpected_helper in unexpected_helpers:
|
||
|
self.assertNotIn(unexpected_helper, helpers)
|
||
|
|
||
|
def test_feature_kernel_full(self):
|
||
|
test_cases = [
|
||
|
bpftool_json(["feature", "probe", "kernel", "full"]),
|
||
|
bpftool_json(["feature", "probe", "full"]),
|
||
|
]
|
||
|
expected_helpers = DMESG_EMITTING_HELPERS
|
||
|
|
||
|
for tc in test_cases:
|
||
|
# Check if expected helpers are included at least once in any
|
||
|
# helpers list for any program type. Unfortunately we cannot assume
|
||
|
# that they will be included in all program types or a specific
|
||
|
# subset of programs. It depends on the kernel version and
|
||
|
# configuration.
|
||
|
found_helpers = False
|
||
|
|
||
|
for helpers in tc["helpers"].values():
|
||
|
if all(expected_helper in helpers
|
||
|
for expected_helper in expected_helpers):
|
||
|
found_helpers = True
|
||
|
break
|
||
|
|
||
|
self.assertTrue(found_helpers)
|
||
|
|
||
|
def test_feature_kernel_full_vs_not_full(self):
|
||
|
full_res = bpftool_json(["feature", "probe", "full"])
|
||
|
not_full_res = bpftool_json(["feature", "probe"])
|
||
|
not_full_set = set()
|
||
|
full_set = set()
|
||
|
|
||
|
for helpers in full_res["helpers"].values():
|
||
|
for helper in helpers:
|
||
|
full_set.add(helper)
|
||
|
|
||
|
for helpers in not_full_res["helpers"].values():
|
||
|
for helper in helpers:
|
||
|
not_full_set.add(helper)
|
||
|
|
||
|
self.assertCountEqual(full_set - not_full_set,
|
||
|
set(DMESG_EMITTING_HELPERS))
|
||
|
self.assertCountEqual(not_full_set - full_set, set())
|
||
|
|
||
|
def test_feature_macros(self):
|
||
|
expected_patterns = [
|
||
|
r"/\*\*\* System call availability \*\*\*/",
|
||
|
r"#define HAVE_BPF_SYSCALL",
|
||
|
r"/\*\*\* eBPF program types \*\*\*/",
|
||
|
r"#define HAVE.*PROG_TYPE",
|
||
|
r"/\*\*\* eBPF map types \*\*\*/",
|
||
|
r"#define HAVE.*MAP_TYPE",
|
||
|
r"/\*\*\* eBPF helper functions \*\*\*/",
|
||
|
r"#define HAVE.*HELPER",
|
||
|
r"/\*\*\* eBPF misc features \*\*\*/",
|
||
|
]
|
||
|
|
||
|
res = bpftool(["feature", "probe", "macros"])
|
||
|
for pattern in expected_patterns:
|
||
|
self.assertRegex(res, pattern)
|