from __future__ import annotations import argparse import dataclasses import ipaddress import typing import trio import capport.utils.ipneigh import capport.utils.nft_set from . import cptypes @dataclasses.dataclass(kw_only=True, slots=True) class MetricPrinter: args: CliArguments now: int | None = None _known_names: set[str] = dataclasses.field(default_factory=set, init=None) _now_str: str = dataclasses.field(init=False) _label_str: str = dataclasses.field(init=False) def __post_init__(self) -> None: if not self.now is None: self._now_str = f" {self.now}" else: self._now_str = "" labels = [] if self.args.instance: labels.append(f"instance=\"{self.args.instance}\"") labels.append(f"interface=\"{self.args.interface}\"") self._label_str = "{" + ",".join(labels) + "}" def print_metric(self, *, name: str, mtype: str, value: typing.Any, help: str | None = None): if not name in self._known_names: self._known_names.add(name) if help: print(f"# HELP {name} {help}") print(f"# TYPE {name} {mtype}") print(f"{name}{self._label_str} {value} {self._now_str}") def print_gauge(self, name: str, value: int | float, help: str | None = None): self.print_metric(name=name, mtype="gauge", value=value, help=help) async def amain(args: CliArguments): printer = MetricPrinter(args=args) ns = capport.utils.nft_set.NftSet() captive_allowed_entries: set[cptypes.MacAddress] = {entry["mac"] for entry in ns.list()} seen_allowed_entries: set[cptypes.MacAddress] = set() total_ipv4 = 0 total_ipv6 = 0 unique_clients = set() unique_ipv4 = set() unique_ipv6 = set() async with capport.utils.ipneigh.connect() as ipn: ipn.ip.strict_check = True async for (mac, addr) in ipn.dump_neighbors(args.interface): if mac in captive_allowed_entries: seen_allowed_entries.add(mac) unique_clients.add(mac) if isinstance(addr, ipaddress.IPv4Address): total_ipv4 += 1 unique_ipv4.add(mac) else: total_ipv6 += 1 unique_ipv6.add(mac) printer.print_gauge( "capport_allowed_macs", len(captive_allowed_entries), help="Number of allowed client mac addresses", ) printer.print_gauge( "capport_allowed_neigh_macs", len(seen_allowed_entries), help="Number of allowed client mac addresses seen in neighbor cache", ) printer.print_gauge( "capport_unique", len(unique_clients), help="Number of clients (mac addresses) in client network seen in neighbor cache", ) printer.print_gauge( "capport_unique_ipv4", len(unique_ipv4), help="Number of IPv4 clients (unique per mac) in client network seen in neighbor cache", ) printer.print_gauge( "capport_unique_ipv6", len(unique_ipv6), help="Number of IPv6 clients (unique per mac) in client network seen in neighbor cache", ) printer.print_gauge( "capport_total_ipv4", total_ipv4, help="Number of IPv4 addresses seen in neighbor cache", ) printer.print_gauge( "capport_total_ipv6", total_ipv6, help="Number of IPv6 addresses seen in neighbor cache", ) with open('/proc/sys/net/netfilter/nf_conntrack_count') as f: printer.print_gauge( "nf_conntrack_count", int(f.readline()), help="Conntrack count", ) with open('/proc/sys/net/netfilter/nf_conntrack_max') as f: printer.print_gauge( "nf_conntrack_max", int(f.readline()), help="Conntrack max", ) @dataclasses.dataclass class CliArguments: interface: str instance: str | None def __init__(self): parser = argparse.ArgumentParser() parser.add_argument( "--interface", required=True, help="Name of network interface clients are connected to (to look for IP neighbors)", ) parser.add_argument("--instance", help="Name of instance (to tag metrics with)") args = parser.parse_args() self.interface = args.interface self.instance = args.instance def main(): args = CliArguments() trio.run(amain, args)