commit b4bbd6c250bf0e319aff4fe6e879da4237fd7139 Author: Stefan Bühler Date: Wed Apr 24 11:42:20 2019 +0200 initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2a45ed7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.pyc +.vscode diff --git a/README.md b/README.md new file mode 100644 index 0000000..600d1fd --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +Various parts copied/adapted from https://github.com/prometheus/client_python (Apache License 2.0). diff --git a/prometheus/__init__.py b/prometheus/__init__.py new file mode 100644 index 0000000..8d943c3 --- /dev/null +++ b/prometheus/__init__.py @@ -0,0 +1,3 @@ +from prometheus.registry import Registry, Path, GLOBAL_REGISTRY +from prometheus.metrics import * +from prometheus.metric_base import NOW diff --git a/prometheus/metric_base.py b/prometheus/metric_base.py new file mode 100644 index 0000000..4b572d5 --- /dev/null +++ b/prometheus/metric_base.py @@ -0,0 +1,83 @@ +from threading import Lock +import typing +import time +import abc + +from .path import Path + + +class Now(typing.NamedTuple): + def time(self) -> int: + return round(time.time() * 1000) + + +NOW = Now() + + +class MetricValue(typing.NamedTuple): + value: float + timestamp: typing.Optional[int] + + +_NAN = float('nan') + + +class MetricMutableValue: + # global lock for metrics by default + DEFAULT_LOCK: typing.ClassVar[Lock] = Lock() + + def __init__(self, value: float = _NAN, timestamp: typing.Optional[int] = None, lock: Lock = DEFAULT_LOCK): + self._lock = lock + self._value = MetricValue(value, timestamp) + + def set_no_lock(self, value: float, timestamp: typing.Optional[int]): + self._value = MetricValue(value, timestamp) + + def set(self, value: float, timestamp: typing.Union[int, None, Now] = NOW): + if isinstance(timestamp, Now): + timestamp = timestamp.time() + with self._lock: + self.set_no_lock(value, timestamp) + + def inc_no_lock(self, add: float, timestamp: typing.Optional[int]): + self._value = MetricValue(self._value.value + add, timestamp) + + def inc(self, add: float = 1, timestamp: typing.Union[int, None, Now] = NOW): + if isinstance(timestamp, Now): + timestamp = timestamp.time() + with self._lock: + self.inc_no_lock(add, timestamp) + + def get(self) -> MetricValue: + with self._lock: + return self._value + + +class MetricBase(MetricMutableValue): + metric_type: typing.ClassVar[str] + help_text: typing.Optional[str] = None + + def __init__(self, value: float = _NAN, timestamp: typing.Optional[int] = None, help: typing.Optional[str] = None): + super().__init__(value, timestamp) + self.help_text = help + + +class MetricGroupDefinition(typing.NamedTuple): + reserved_suffixes: typing.FrozenSet[str] + reserved_labels: typing.FrozenSet[str] + + +EMPTY_GROUP_DEFINITION = MetricGroupDefinition(frozenset(), frozenset()) + + +class MetricGroupBase(abc.ABC): + metric_type: typing.ClassVar[str] + group_definition: typing.ClassVar[MetricGroupDefinition] + help_text: typing.Optional[str] = None + + def __init__(self, help: typing.Optional[str] = None): + self.help_text = help + + @abc.abstractmethod + def metrics(self) -> typing.Iterator[typing.Tuple[(Path, MetricMutableValue)]]: + pass diff --git a/prometheus/metrics.py b/prometheus/metrics.py new file mode 100644 index 0000000..882f496 --- /dev/null +++ b/prometheus/metrics.py @@ -0,0 +1,57 @@ +import typing + +from .metric_base import MetricBase, MetricGroupBase, MetricGroupDefinition, MetricMutableValue, Now, NOW +from .registry import Path, GLOBAL_REGISTRY + +__all__ = [ + 'Counter', + 'Gauge', + 'Summary', +] + + +class Counter(MetricBase): + metric_type: typing.ClassVar[str] = 'counter' + + def __init__(self, value: float = 0, path: typing.Optional[Path] = None, help: typing.Optional[str] = None): + super().__init__(value=value, help=help) + if not path is None: + GLOBAL_REGISTRY.register(path, self) + + +class Gauge(MetricBase): + metric_type: typing.ClassVar[str] = 'gauge' + + def __init__(self, value: float = float('nan'), path: typing.Optional[Path] = None, help: typing.Optional[str] = None): + super().__init__(value=value, help=help) + if not path is None: + GLOBAL_REGISTRY.register(path, self) + + +class Summary(MetricGroupBase): + metric_type: typing.ClassVar[str] = 'summary' + group_definition: typing.ClassVar[MetricGroupDefinition] = MetricGroupDefinition( + frozenset(['_sum', '_count']), + frozenset(['quantile']), + ) + + def __init__(self, path: typing.Optional[Path] = None, help: typing.Optional[str] = None): + super().__init__(help=help) + self._lock = MetricMutableValue.DEFAULT_LOCK + self._sum = MetricMutableValue(0, None, self._lock) + self._count = MetricMutableValue(0, None, self._lock) + if not path is None: + GLOBAL_REGISTRY.register(path, self) + + def metrics(self) -> typing.Iterator[typing.Tuple[(Path, MetricMutableValue)]]: + return [ + (Path('_sum'), self._sum), + (Path('_count'), self._count), + ] + + def observe(self, value: float, timestamp: typing.Union[int, None, Now] = NOW): + if isinstance(timestamp, Now): + timestamp = timestamp.time() + with self._lock: + self._count.inc_no_lock(1, timestamp) + self._sum.inc_no_lock(value, timestamp) diff --git a/prometheus/path.py b/prometheus/path.py new file mode 100644 index 0000000..1745736 --- /dev/null +++ b/prometheus/path.py @@ -0,0 +1,32 @@ +import typing + + +class Labels: + def __init__(self, map: typing.Mapping[str, str]): + self._map = map + self._key = '{{{0}}}'.format(','.join( + ['{0}="{1}"'.format( + k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"')) + for k, v in sorted(map.items())])) + + @property + def key(self) -> str: + return self._key + + @property + def map(self) -> typing.Mapping[str, str]: + return self._map + + +class Path: + def __init__(self, name, labels={}): + self._name = name + self._labels = Labels(labels) + + @property + def name(self) -> str: + return self._name + + @property + def labels(self) -> Labels: + return self._labels diff --git a/prometheus/registry.py b/prometheus/registry.py new file mode 100644 index 0000000..62b10a5 --- /dev/null +++ b/prometheus/registry.py @@ -0,0 +1,240 @@ +from threading import Lock +import typing +import weakref +import collections +import time +import math + +from .metric_base import MetricValue, MetricBase, MetricGroupDefinition, MetricGroupBase, EMPTY_GROUP_DEFINITION +from .path import Path, Labels + + +class _MetricGroup: + metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]] + weak_metric: typing.Any + + def __init__(self, metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]], weak_metric): + self.metrics = metrics + self.weak_metric = weak_metric + + def get(self) -> typing.Sequence[typing.Tuple[Path, MetricValue]]: + return [(k, m.get()) for (k, m) in self.metrics.items()] + + +class _MetricCollection: + metric_type: str + help_text: typing.Optional[str] + metrics: typing.MutableMapping[Labels, typing.Union[MetricBase, _MetricGroup]] + group_definition: typing.Optional[MetricGroupDefinition] + _groups: typing.MutableSet[_MetricGroup] + + def __init__(self, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None): + self.metric_type = metric_type + self.help_text = None + self.metrics = weakref.WeakValueDictionary() + if group_definition is None: + group_definition = EMPTY_GROUP_DEFINITION + self.group_definition = group_definition + self._groups = set() + + def check(self, path: Path, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None): + if self.metric_type != metric_type: + raise RuntimeError("metric type doesn't match for {}: {} != {}".format( + path.name, self.metric_type, metric_type)) + if group_definition is None: + group_definition = EMPTY_GROUP_DEFINITION + if self.group_definition.reserved_suffixes != group_definition.reserved_suffixes: + raise RuntimeError("reserved suffixes don't match for {}: {} != {}".format( + path.name, self.group_definition.reserved_suffixes, group_definition.reserved_suffixes)) + if self.group_definition.reserved_labels != group_definition.reserved_labels: + raise RuntimeError("reserved labels don't match for {}: {} != {}".format( + path.name, self.group_definition.reserved_labels, group_definition.reserved_labels)) + for l in self.group_definition.reserved_labels: + if l in path.labels.map: + raise RuntimeError("'{} {}' uses reserved label {}".format( + path.name, path.labels.key, l)) + if path.labels.key in self.metrics: + raise RuntimeError("'{} {}' already registered".format( + path.name, path.labels.key)) + + def insert(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase]): + if isinstance(metric, MetricBase): + self.metrics[path.labels.key] = metric + else: + key = path.labels.key + metrics = dict() + for (add_path, m) in metric.metrics(): + assert ( + add_path.name == '' or add_path.name in self.group_definition.reserved_suffixes) + for l in add_path.labels.map: + assert l in self.group_definition.reserved_labels + p = Path(path.name + add_path.name, {**path.labels.map, **add_path.labels.map}) + metrics[p] = m + g = None # set below, but need for cleanup + + def cleanup(_weak_metric): + self._groups.remove(g) + del self.metrics[key] + weak_metric = weakref.ref(metric, cleanup) + g = _MetricGroup(metrics, weak_metric) + self.metrics[key] = g + if self.help_text is None: + self.help_text = metric.help_text + + +_INF = float("inf") +_MINUS_INF = float("-inf") + + +def _floatToGoString(d: float): + if d == _INF: + return '+Inf' + elif d == _MINUS_INF: + return '-Inf' + elif math.isnan(d): + return 'NaN' + else: + s = repr(d) + dot = s.find('.') + # Go switches to exponents sooner than Python. + # We only need to care about positive values for le/quantile. + if d > 0 and dot > 6: + mantissa = '{0}.{1}{2}'.format( + s[0], s[1:dot], s[dot + 1:]).rstrip('0.') + return '{0}e+0{1}'.format(mantissa, dot - 1) + return s + + +class Registry: + _metrics: typing.MutableMapping[str, typing.Union[_MetricCollection, str]] + _paths: typing.MutableMapping[typing.Union[MetricBase, MetricGroupBase], Path] + _strong_metrics: typing.MutableSet[typing.Union[MetricBase, MetricGroupBase]] + + def __init__(self): + self._lock = Lock() + self._metrics = dict() + self._paths = weakref.WeakKeyDictionary() + self._strong_metrics = set() + + # requires lock + def _unregister(self, metric: MetricBase): + # don't keep alive anymore + try: + self._strong_metrics.remove(metric) + except KeyError: + pass + + try: + path = self._paths[metric] + c = self._metrics[path.name] + del c[path.labels] + except KeyError: + pass + + def unregister(self, metric: typing.Union[MetricBase, MetricGroupBase]): + with self._lock: + if isinstance(metric, MetricGroupBase): + metrics = metric.metrics(Path("")) + for (_, m) in metrics: + self._unregister(m) + else: + self._unregister(metric) + + # requires lock + def _remove_collection(self, metric_name: str): + try: + c = self._metrics[metric_name] + except KeyError: + return + assert isinstance( + c, _MetricCollection), "cannot remove reserved metric names directly" + for suffix in c.reserved_suffixes: + assert isinstance( + self._metrics[metric_name + suffix], str), "reserved suffix missing" + del self._metrics[metric_name + suffix] + del self._metrics[metric_name] + + # requires lock + def _get_collection(self, path: Path, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None): + c = None + try: + c = self._metrics[path.name] + except KeyError: + pass + + if not c is None: + if not isinstance(c, _MetricCollection): + c_parent = self._metrics[c] + assert isinstance( + c_parent, _MetricCollection), "nested metric groups not allowed" + if 0 == len(c_parent.metrics): + self._remove_collection(c) + else: + raise RuntimeError( + "Metric name {} reserved for group {}".format(path.name, c)) + else: + if 0 == len(c.metrics): + self._remove_collection(path.name) + else: + c.check(path, metric_type, group_definition) + return c + + c = _MetricCollection(metric_type, group_definition) + self._metrics[path.name] = c + return c + + def register(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase], strong: bool = False): + with self._lock: + if metric in self._paths: + raise RuntimeError("metric already registered") + + if isinstance(metric, MetricGroupBase): + group_definition = metric.group_definition + else: + group_definition = None + c = self._get_collection( + path, metric.metric_type, group_definition) + + c.insert(path, metric) + self._paths[metric] = path + if strong: + # keep metric alive + self._strong_metrics.add(metric) + + def collect(self): + metrics = [] + with self._lock: + for (metric_name, c) in self._metrics.items(): + l = [(k, m.get()) for (k, m) in c.metrics.items()] + metrics.append((metric_name, c.metric_type, c.help_text, l)) + + from io import StringIO + result = StringIO() + for (metric_name, metric_type, help_text, data) in metrics: + if 0 == len(data): + continue + if not help_text is None: + result.write("# HELP {} {}\n".format( + metric_name, help_text.replace('\\', r'\\').replace('\n', r'\n'))) + result.write('# TYPE {} {}\n'.format(metric_name, metric_type)) + for (key, entry) in data: + if isinstance(entry, MetricValue): + if entry.timestamp is None: + timestamp = '' + else: + timestamp = ' {0:d}'.format(entry.timestamp) + result.write("{} {} {}{}\n".format( + metric_name, key, _floatToGoString(entry.value), timestamp)) + else: + for (subpath, subentry) in entry: + if subentry.timestamp is None: + timestamp = '' + else: + timestamp = ' {0:d}'.format(subentry.timestamp) + result.write("{} {} {}{}\n".format( + subpath.name, subpath.labels.key, _floatToGoString(subentry.value), timestamp)) + + return result.getvalue() + + +GLOBAL_REGISTRY = Registry() diff --git a/test.py b/test.py new file mode 100644 index 0000000..493520c --- /dev/null +++ b/test.py @@ -0,0 +1,13 @@ +from prometheus import Path, Counter, Gauge, Summary, GLOBAL_REGISTRY, NOW + +g1 = Gauge(value=20, path=Path("foobar_g1", {"server": "[host:xy]"}), help="foo help with bar") +g2 = Gauge(path=Path("foobar_g2", {"server": "[host:xy]"}), help="foo help with bar") +g1.inc(10) +c = Counter( + path=Path("foobar", {"server": "[host:xy]"}), help="foo help with bar") +c.set(1024.12374981723) +s = Summary(path=Path("m_sum_foo", {"tag": "sum"}), help="count on it!") +s.observe(1) +s.observe(2) +s.observe(3, NOW) +print(GLOBAL_REGISTRY.collect())