complete rewrite

This commit is contained in:
2025-05-09 17:12:57 +02:00
parent 65cb5d31f3
commit ef33d3e38b
8 changed files with 376 additions and 329 deletions

View File

@@ -1,13 +1,19 @@
from ._registry import ( # noqa: reexport
Registry as Registry,
Path as Path,
GLOBAL_REGISTRY as GLOBAL_REGISTRY,
)
from ._metrics import ( # noqa: reexport
Counter as Counter,
CounterFamily as CounterFamily,
Gauge as Gauge,
GaugeFamily as GaugeFamily,
Summary as Summary,
SummaryFamily as SummaryFamily,
)
from ._metric_base import ( # noqa: reexport
DynamicTimestamp as DynamicTimestamp,
MetricFamily,
MetricPoint,
MetricValueSet,
Now as Now,
NOW as NOW,
)

View File

@@ -1,83 +1,92 @@
from threading import Lock
import typing
import time
from __future__ import annotations
import abc
import dataclasses
import time
import typing
from ._path import Path
from ._path import (
Labels,
LabelsData,
)
class Now(typing.NamedTuple):
class Now:
__slots__ = ()
def time(self) -> int:
"""Time since epoch in milliseconds"""
return round(time.time() * 1000)
NOW = Now()
class MetricValue(typing.NamedTuple):
value: float
timestamp: typing.Optional[int]
DynamicTimestamp = int | Now | None
_NAN = float('nan')
def _to_timestamp(timestamp: DynamicTimestamp) -> int | None:
if isinstance(timestamp, Now):
return timestamp.time()
else:
return timestamp
class MetricMutableValue:
# global lock for metrics by default
DEFAULT_LOCK: typing.ClassVar[Lock] = Lock()
def __init__(self, value: float = _NAN, timestamp: typing.Optional[int] = None, lock: Lock = DEFAULT_LOCK):
self._lock = lock
self._value = MetricValue(value, timestamp)
def set_no_lock(self, value: float, timestamp: typing.Optional[int]):
self._value = MetricValue(value, timestamp)
def set(self, value: float, timestamp: typing.Union[int, None, Now] = NOW):
if isinstance(timestamp, Now):
timestamp = timestamp.time()
with self._lock:
self.set_no_lock(value, timestamp)
def inc_no_lock(self, add: float, timestamp: typing.Optional[int]):
self._value = MetricValue(self._value.value + add, timestamp)
def inc(self, add: float = 1, timestamp: typing.Union[int, None, Now] = NOW):
if isinstance(timestamp, Now):
timestamp = timestamp.time()
with self._lock:
self.inc_no_lock(add, timestamp)
def get(self) -> MetricValue:
with self._lock:
return self._value
@dataclasses.dataclass(slots=True, kw_only=True)
class MetricValueSet:
# list of suffixes and values; suffixes must match the allowed suffixes of the family of the point
values: list[tuple[str, float]]
timestamp: int | None
class MetricBase(MetricMutableValue):
metric_type: typing.ClassVar[str]
help_text: typing.Optional[str] = None
def __init__(self, value: float = _NAN, timestamp: typing.Optional[int] = None, help: typing.Optional[str] = None):
super().__init__(value, timestamp)
self.help_text = help
_TMetricPoint = typing.TypeVar('_TMetricPoint', bound='MetricPoint')
class MetricGroupDefinition(typing.NamedTuple):
reserved_suffixes: typing.FrozenSet[str]
reserved_labels: typing.FrozenSet[str]
class MetricPoint(abc.ABC):
# _family and _labels set by MetricFamily._init_point
# __weakref__ is needed for weakref support
__slots__ = ('_family', '_labels', '__weakref__')
_family: MetricFamily
_labels: Labels
EMPTY_GROUP_DEFINITION = MetricGroupDefinition(frozenset(), frozenset())
@property
def family(self) -> MetricFamily:
return self._family
@property
def labels(self) -> Labels:
return self._labels
class MetricGroupBase(abc.ABC):
metric_type: typing.ClassVar[str]
group_definition: typing.ClassVar[MetricGroupDefinition]
help_text: typing.Optional[str] = None
def __init__(self, help: typing.Optional[str] = None):
self.help_text = help
def __repr__(self) -> str:
return f"<Point {self._family.name}{self._labels.key}>"
@abc.abstractmethod
def metrics(self) -> typing.Iterator[typing.Tuple[(Path, MetricMutableValue)]]:
pass
def _get_value_set(self) -> MetricValueSet:
...
class MetricFamily(abc.ABC, typing.Generic[_TMetricPoint]):
TYPE: typing.ClassVar[str]
SUFFIXES: typing.ClassVar[list[str]] = ['']
RESERVED_LABELS: typing.ClassVar[list[str]] = []
def __init__(
self,
*,
name: str,
unit: str = "",
help: str = "",
) -> None:
self.name = name
self.unit = unit
self.help = help
def _init_point(self, labels: LabelsData, point: _TMetricPoint) -> _TMetricPoint:
if not isinstance(labels, Labels):
labels = Labels(labels)
for reserved_label in self.RESERVED_LABELS:
if reserved_label in labels.map:
raise KeyError(f"Point {labels.key} uses reserved label {reserved_label}")
point._family = self
point._labels = labels
return point

View File

@@ -1,56 +1,145 @@
from __future__ import annotations
import contextlib
import dataclasses
import threading
import typing
from ._metric_base import MetricBase, MetricGroupBase, MetricGroupDefinition, MetricMutableValue, Now, NOW
from ._registry import Path, GLOBAL_REGISTRY
from ._metric_base import (
_to_timestamp,
DynamicTimestamp,
MetricFamily,
MetricPoint,
MetricValueSet,
)
from ._path import LabelsData
class Counter(MetricBase):
metric_type: typing.ClassVar[str] = 'counter'
def __init__(self, value: float = 0, path: typing.Optional[Path] = None, help: typing.Optional[str] = None):
super().__init__(value=value, help=help)
if not path is None:
GLOBAL_REGISTRY.register(path, self)
@contextlib.contextmanager
def opt_lock(lock: threading.Lock | None) -> typing.Iterator[None]:
if lock:
with lock:
yield
else:
yield
class Gauge(MetricBase):
metric_type: typing.ClassVar[str] = 'gauge'
@dataclasses.dataclass(slots=True, kw_only=True)
class _NumberValue:
value: float = 0
timestamp: int | None = None
def __init__(
self,
value: float = float('nan'),
path: typing.Optional[Path] = None,
help: typing.Optional[str] = None,
):
super().__init__(value=value, help=help)
if not path is None:
GLOBAL_REGISTRY.register(path, self)
def _get_value_set(self) -> MetricValueSet:
return MetricValueSet(
values=[("", self.value)],
timestamp=self.timestamp,
)
class Summary(MetricGroupBase):
metric_type: typing.ClassVar[str] = 'summary'
group_definition: typing.ClassVar[MetricGroupDefinition] = MetricGroupDefinition(
frozenset(['_sum', '_count']),
frozenset(['quantile']),
)
class _SimpleNumber(MetricPoint):
__slots__ = ('_inc_lock', '_value')
def __init__(self, path: typing.Optional[Path] = None, help: typing.Optional[str] = None):
super().__init__(help=help)
self._lock = MetricMutableValue.DEFAULT_LOCK
self._sum = MetricMutableValue(0, None, self._lock)
self._count = MetricMutableValue(0, None, self._lock)
if not path is None:
GLOBAL_REGISTRY.register(path, self)
def __init__(self, *, value: float, inc_lock: threading.Lock | None) -> None:
super().__init__()
self._inc_lock = inc_lock
self._value = _NumberValue(value=value)
def metrics(self) -> typing.Iterator[typing.Tuple[(Path, MetricMutableValue)]]:
return [
(Path('_sum'), self._sum),
(Path('_count'), self._count),
]
def _get_value_set(self) -> MetricValueSet:
return self._value._get_value_set()
def observe(self, value: float, timestamp: typing.Union[int, None, Now] = NOW):
if isinstance(timestamp, Now):
timestamp = timestamp.time()
with self._lock:
self._count.inc_no_lock(1, timestamp)
self._sum.inc_no_lock(value, timestamp)
def set(self, value: float, *, timestamp: DynamicTimestamp = None) -> None:
self._value = _NumberValue(value=value, timestamp=_to_timestamp(timestamp))
def inc_no_lock(self, add: float = 1, *, timestamp: DynamicTimestamp = None) -> None:
self._value = _NumberValue(value=self._value.value + add, timestamp=_to_timestamp(timestamp))
def inc(self, add: float = 1, *, timestamp: DynamicTimestamp = None) -> None:
timestamp = _to_timestamp(timestamp)
if self._inc_lock:
with self._inc_lock:
self.inc_no_lock(add=add, timestamp=timestamp)
else:
self.inc_no_lock(add=add, timestamp=timestamp)
class Counter(_SimpleNumber):
__slots__ = ()
def __init__(self, *, value: int, inc_lock: threading.Lock | None) -> None:
super().__init__(value=value, inc_lock=inc_lock)
class CounterFamily(MetricFamily[Counter]):
__slots__ = ()
TYPE: typing.ClassVar[str] = 'counter'
SUFFIXES: typing.ClassVar[list[str]] = ['', '_created']
DEFAULT_INC_LOCK: typing.ClassVar[threading.Lock] = threading.Lock()
def create(self, *, value: int = 0, labels: LabelsData) -> Counter:
point = Counter(value=value, inc_lock=self.DEFAULT_INC_LOCK)
return self._init_point(labels, point)
class Gauge(_SimpleNumber):
__slots__ = ()
def __init__(self, *, value: float, inc_lock: threading.Lock | None) -> None:
super().__init__(value=value, inc_lock=inc_lock)
class GaugeFamily(MetricFamily[Gauge]):
__slots__ = ()
TYPE: typing.ClassVar[str] = 'gauge'
SUFFIXES: typing.ClassVar[list[str]] = ['']
DEFAULT_INC_LOCK: typing.ClassVar[threading.Lock] = threading.Lock()
def create(self, *, value: float = 0, labels: LabelsData) -> Gauge:
point = Gauge(value=value, inc_lock=self.DEFAULT_INC_LOCK)
return self._init_point(labels, point)
class Summary(MetricPoint):
# TODO: quantile support
__slots__ = ('_lock', '_sum', '_count', '_timestamp')
def __init__(self, *, lock: threading.Lock | None = None) -> None:
super().__init__()
self._lock = lock
self._sum: float = 0
self._count: int = 0
self._timestamp: int | None = 0
def _get_value_set(self) -> MetricValueSet:
with opt_lock(self._lock):
return MetricValueSet(
values=[
("_sum", self._sum),
("_count", self._count),
],
timestamp=self._timestamp,
)
def observe(self, value: float, timestamp: DynamicTimestamp = None) -> None:
timestamp = _to_timestamp(timestamp)
with opt_lock(self._lock):
self._sum += value
self._count += 1
self._timestamp = timestamp
class SummaryFamily(MetricFamily[Summary]):
__slots__ = ()
TYPE: typing.ClassVar[str] = 'summary'
SUFFIXES: typing.ClassVar[list[str]] = ['', '_count', '_sum', '_created']
RESERVED_LABELS: typing.ClassVar[list[str]] = ['quantile']
DEFAULT_LOCK: typing.ClassVar[threading.Lock] = threading.Lock()
def create(self, *, value: float = float('nan'), labels: LabelsData) -> Summary:
point = Summary(lock=self.DEFAULT_LOCK)
return self._init_point(labels, point)

View File

@@ -2,31 +2,48 @@ import typing
class Labels:
def __init__(self, map: typing.Mapping[str, str]):
__slots__ = ('_map', '_key')
def __init__(self, map: typing.Mapping[str, str]) -> None:
self._map = map
self._key = '{{{0}}}'.format(','.join(
['{0}="{1}"'.format(
k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
for k, v in sorted(map.items())]))
self._key = '{{{0}}}'.format(','.join([
'{0}="{1}"'.format(
k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')
)
for k, v in sorted(map.items())
]))
@property
def key(self) -> str:
"""Metric labels as {...} string to use as suffix for the metric name in prometheus output"""
return self._key
@property
def map(self) -> typing.Mapping[str, str]:
"""Original mapping key got constructed from"""
return self._map
LabelsData = Labels | typing.Mapping[str, str]
class Path:
def __init__(self, name, labels={}):
__slots__ = ('_name', '_labels')
def __init__(
self,
name: str,
labels: typing.Mapping[str, str] = {},
) -> None:
self._name = name
self._labels = Labels(labels)
@property
def name(self) -> str:
"""Metric name"""
return self._name
@property
def labels(self) -> Labels:
"""Metric labels"""
return self._labels

View File

@@ -1,90 +1,22 @@
from threading import Lock
from __future__ import annotations
import math
import threading
import typing
import weakref
import math
from ._metric_base import MetricValue, MetricBase, MetricGroupDefinition, MetricGroupBase, EMPTY_GROUP_DEFINITION
from ._path import Path, Labels
class _MetricGroup:
metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]]
weak_metric: typing.Any
def __init__(self, metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]], weak_metric):
self.metrics = metrics
self.weak_metric = weak_metric
def get(self) -> typing.Sequence[typing.Tuple[Path, MetricValue]]:
return [(k, m.get()) for (k, m) in self.metrics.items()]
class _MetricCollection:
metric_type: str
help_text: typing.Optional[str]
metrics: typing.MutableMapping[Labels, typing.Union[MetricBase, _MetricGroup]]
group_definition: typing.Optional[MetricGroupDefinition]
_groups: typing.MutableSet[_MetricGroup]
def __init__(self, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None):
self.metric_type = metric_type
self.help_text = None
self.metrics = weakref.WeakValueDictionary()
if group_definition is None:
group_definition = EMPTY_GROUP_DEFINITION
self.group_definition = group_definition
self._groups = set()
def check(self, path: Path, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None):
if self.metric_type != metric_type:
raise RuntimeError("metric type doesn't match for {}: {} != {}".format(
path.name, self.metric_type, metric_type))
if group_definition is None:
group_definition = EMPTY_GROUP_DEFINITION
if self.group_definition.reserved_suffixes != group_definition.reserved_suffixes:
raise RuntimeError("reserved suffixes don't match for {}: {} != {}".format(
path.name, self.group_definition.reserved_suffixes, group_definition.reserved_suffixes))
if self.group_definition.reserved_labels != group_definition.reserved_labels:
raise RuntimeError("reserved labels don't match for {}: {} != {}".format(
path.name, self.group_definition.reserved_labels, group_definition.reserved_labels))
for label in self.group_definition.reserved_labels:
if label in path.labels.map:
raise RuntimeError("'{} {}' uses reserved label {}".format(
path.name, path.labels.key, label))
if path.labels.key in self.metrics:
raise RuntimeError("'{} {}' already registered".format(
path.name, path.labels.key))
def insert(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase]):
if isinstance(metric, MetricBase):
self.metrics[path.labels.key] = metric
else:
key = path.labels.key
metrics = dict()
for (add_path, m) in metric.metrics():
assert (
add_path.name == '' or add_path.name in self.group_definition.reserved_suffixes)
for label in add_path.labels.map:
assert label in self.group_definition.reserved_labels
p = Path(path.name + add_path.name, {**path.labels.map, **add_path.labels.map})
metrics[p] = m
g = None # set below, but need for cleanup
def cleanup(_weak_metric):
self._groups.remove(g)
del self.metrics[key]
weak_metric = weakref.ref(metric, cleanup)
g = _MetricGroup(metrics, weak_metric)
self.metrics[key] = g
if self.help_text is None:
self.help_text = metric.help_text
from ._metric_base import (
MetricFamily,
MetricPoint,
MetricValueSet,
)
_INF = float("inf")
_MINUS_INF = float("-inf")
def _floatToGoString(d: float):
def _floatToGoString(d: float) -> str:
if d == _INF:
return '+Inf'
elif d == _MINUS_INF:
@@ -103,141 +35,100 @@ def _floatToGoString(d: float):
return s
class _FamilyInstance:
def __init__(self, family: MetricFamily) -> None:
self._family = family
self._lock = threading.Lock()
self._points: weakref.WeakValueDictionary[str, MetricPoint] = weakref.WeakValueDictionary()
def _add_point(self, point: MetricPoint) -> None:
key = point.labels.key
if not point.family is self._family:
raise KeyError(f"Point {point.family.name}{key} has different family {point.family} than {self._family}")
with self._lock:
if key in self._points:
raise KeyError(f"Point {key} already exists in family instance {self._family.name}")
self._points[key] = point
def _get_value_sets(self) -> list[tuple[str, MetricValueSet]]:
return [
(point.labels.key, point._get_value_set())
for point in self._points.values()
]
class MetricGroup(typing.Protocol):
def metric_points(self) -> typing.Iterable[MetricPoint]:
...
class Registry:
_metrics: typing.MutableMapping[str, typing.Union[_MetricCollection, str]]
_paths: typing.MutableMapping[typing.Union[MetricBase, MetricGroupBase], Path]
_strong_metrics: typing.MutableSet[typing.Union[MetricBase, MetricGroupBase]]
def __init__(self) -> None:
self._lock = threading.Lock()
self._families: dict[str, _FamilyInstance] = {}
self._names: dict[str, _FamilyInstance] = {}
def __init__(self):
self._lock = Lock()
self._metrics = dict()
self._paths = weakref.WeakKeyDictionary()
self._strong_metrics = set()
# requires lock
def _unregister(self, metric: MetricBase):
# don't keep alive anymore
try:
self._strong_metrics.remove(metric)
except KeyError:
pass
try:
path = self._paths[metric]
c = self._metrics[path.name]
del c[path.labels]
except KeyError:
pass
def unregister(self, metric: typing.Union[MetricBase, MetricGroupBase]):
def _register_family(self, family: MetricFamily) -> _FamilyInstance:
with self._lock:
if isinstance(metric, MetricGroupBase):
metrics = metric.metrics(Path(""))
for (_, m) in metrics:
self._unregister(m)
else:
self._unregister(metric)
if family.name in self._families:
instance = self._families[family.name]
if instance._family is family:
return instance
raise KeyError(f"Metric family {family.name} already reserved")
if family.name in self._names:
raise KeyError(f"Metric family {family.name} already reserved by {self._names[family.name]._family}")
for suffix in family.SUFFIXES:
full_name = family.name + suffix
if full_name in self._families:
raise KeyError(f"Metric name {full_name} already reserved by other family")
if full_name in self._names:
raise KeyError(f"Metric name {full_name} already reserved by {self._names[full_name]._family}")
# now register
instance = _FamilyInstance(family=family)
self._families[family.name] = instance
for suffix in family.SUFFIXES:
full_name = family.name + suffix
self._names[full_name] = instance
return instance
# requires lock
def _remove_collection(self, metric_name: str):
try:
c = self._metrics[metric_name]
except KeyError:
return
assert isinstance(
c, _MetricCollection), "cannot remove reserved metric names directly"
for suffix in c.reserved_suffixes:
assert isinstance(
self._metrics[metric_name + suffix], str), "reserved suffix missing"
del self._metrics[metric_name + suffix]
del self._metrics[metric_name]
def register(self, point: MetricPoint) -> None:
instance = self._register_family(point.family)
instance._add_point(point)
# requires lock
def _get_collection(
self,
path: Path,
metric_type: str,
group_definition: typing.Optional[MetricGroupDefinition] = None,
):
c = None
try:
c = self._metrics[path.name]
except KeyError:
pass
if not c is None:
if not isinstance(c, _MetricCollection):
c_parent = self._metrics[c]
assert isinstance(
c_parent, _MetricCollection), "nested metric groups not allowed"
if 0 == len(c_parent.metrics):
self._remove_collection(c)
else:
raise RuntimeError(
"Metric name {} reserved for group {}".format(path.name, c))
else:
if 0 == len(c.metrics):
self._remove_collection(path.name)
else:
c.check(path, metric_type, group_definition)
return c
c = _MetricCollection(metric_type, group_definition)
self._metrics[path.name] = c
return c
def register(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase], strong: bool = False):
with self._lock:
if metric in self._paths:
raise RuntimeError("metric already registered")
if isinstance(metric, MetricGroupBase):
group_definition = metric.group_definition
else:
group_definition = None
c = self._get_collection(
path, metric.metric_type, group_definition)
c.insert(path, metric)
self._paths[metric] = path
if strong:
# keep metric alive
self._strong_metrics.add(metric)
def collect(self):
metrics = []
with self._lock:
for (metric_name, c) in self._metrics.items():
data = [(k, m.get()) for (k, m) in c.metrics.items()]
metrics.append((metric_name, c.metric_type, c.help_text, data))
def register_group(self, group: MetricGroup) -> None:
for point in group.metric_points():
self.register(point)
def collect(self) -> str:
from io import StringIO
result = StringIO()
for (metric_name, metric_type, help_text, data) in metrics:
if 0 == len(data):
continue
if not help_text is None:
result.write("# HELP {} {}\n".format(
metric_name, help_text.replace('\\', r'\\').replace('\n', r'\n')))
result.write('# TYPE {} {}\n'.format(metric_name, metric_type))
for (key, entry) in data:
if isinstance(entry, MetricValue):
if entry.timestamp is None:
timestamp = ''
else:
timestamp = ' {0:d}'.format(entry.timestamp)
result.write("{} {} {}{}\n".format(
metric_name, key, _floatToGoString(entry.value), timestamp))
def escape_text(text: str) -> str:
return text.replace('\\', r'\\').replace('\n', r'\n')
with self._lock:
families = list(self._families.values())
first = True
for fam_instance in families:
if first:
first = False
else:
result.write("\n")
family = fam_instance._family
if family.help:
result.write(f"# HELP {family.name} {escape_text(family.help)}\n")
if family.unit:
result.write(f"# UNIT {family.name} {family.unit}\n")
result.write(f"# TYPE {family.name} {family.TYPE}\n")
for key, value_set in fam_instance._get_value_sets():
if value_set.timestamp is None:
timestamp_s = ''
else:
for (subpath, subentry) in entry:
if subentry.timestamp is None:
timestamp = ''
else:
timestamp = ' {0:d}'.format(subentry.timestamp)
result.write("{} {} {}{}\n".format(
subpath.name, subpath.labels.key, _floatToGoString(subentry.value), timestamp))
timestamp_s = ' {0:d}'.format(value_set.timestamp)
for suffix, value in value_set.values:
value_s = _floatToGoString(value)
result.write(f"{family.name}{suffix}{key} {value_s}{timestamp_s}\n")
return result.getvalue()
GLOBAL_REGISTRY = Registry()