initial commit
This commit is contained in:
240
prometheus/registry.py
Normal file
240
prometheus/registry.py
Normal file
@@ -0,0 +1,240 @@
|
||||
from threading import Lock
|
||||
import typing
|
||||
import weakref
|
||||
import collections
|
||||
import time
|
||||
import math
|
||||
|
||||
from .metric_base import MetricValue, MetricBase, MetricGroupDefinition, MetricGroupBase, EMPTY_GROUP_DEFINITION
|
||||
from .path import Path, Labels
|
||||
|
||||
|
||||
class _MetricGroup:
|
||||
metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]]
|
||||
weak_metric: typing.Any
|
||||
|
||||
def __init__(self, metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]], weak_metric):
|
||||
self.metrics = metrics
|
||||
self.weak_metric = weak_metric
|
||||
|
||||
def get(self) -> typing.Sequence[typing.Tuple[Path, MetricValue]]:
|
||||
return [(k, m.get()) for (k, m) in self.metrics.items()]
|
||||
|
||||
|
||||
class _MetricCollection:
|
||||
metric_type: str
|
||||
help_text: typing.Optional[str]
|
||||
metrics: typing.MutableMapping[Labels, typing.Union[MetricBase, _MetricGroup]]
|
||||
group_definition: typing.Optional[MetricGroupDefinition]
|
||||
_groups: typing.MutableSet[_MetricGroup]
|
||||
|
||||
def __init__(self, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None):
|
||||
self.metric_type = metric_type
|
||||
self.help_text = None
|
||||
self.metrics = weakref.WeakValueDictionary()
|
||||
if group_definition is None:
|
||||
group_definition = EMPTY_GROUP_DEFINITION
|
||||
self.group_definition = group_definition
|
||||
self._groups = set()
|
||||
|
||||
def check(self, path: Path, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None):
|
||||
if self.metric_type != metric_type:
|
||||
raise RuntimeError("metric type doesn't match for {}: {} != {}".format(
|
||||
path.name, self.metric_type, metric_type))
|
||||
if group_definition is None:
|
||||
group_definition = EMPTY_GROUP_DEFINITION
|
||||
if self.group_definition.reserved_suffixes != group_definition.reserved_suffixes:
|
||||
raise RuntimeError("reserved suffixes don't match for {}: {} != {}".format(
|
||||
path.name, self.group_definition.reserved_suffixes, group_definition.reserved_suffixes))
|
||||
if self.group_definition.reserved_labels != group_definition.reserved_labels:
|
||||
raise RuntimeError("reserved labels don't match for {}: {} != {}".format(
|
||||
path.name, self.group_definition.reserved_labels, group_definition.reserved_labels))
|
||||
for l in self.group_definition.reserved_labels:
|
||||
if l in path.labels.map:
|
||||
raise RuntimeError("'{} {}' uses reserved label {}".format(
|
||||
path.name, path.labels.key, l))
|
||||
if path.labels.key in self.metrics:
|
||||
raise RuntimeError("'{} {}' already registered".format(
|
||||
path.name, path.labels.key))
|
||||
|
||||
def insert(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase]):
|
||||
if isinstance(metric, MetricBase):
|
||||
self.metrics[path.labels.key] = metric
|
||||
else:
|
||||
key = path.labels.key
|
||||
metrics = dict()
|
||||
for (add_path, m) in metric.metrics():
|
||||
assert (
|
||||
add_path.name == '' or add_path.name in self.group_definition.reserved_suffixes)
|
||||
for l in add_path.labels.map:
|
||||
assert l in self.group_definition.reserved_labels
|
||||
p = Path(path.name + add_path.name, {**path.labels.map, **add_path.labels.map})
|
||||
metrics[p] = m
|
||||
g = None # set below, but need for cleanup
|
||||
|
||||
def cleanup(_weak_metric):
|
||||
self._groups.remove(g)
|
||||
del self.metrics[key]
|
||||
weak_metric = weakref.ref(metric, cleanup)
|
||||
g = _MetricGroup(metrics, weak_metric)
|
||||
self.metrics[key] = g
|
||||
if self.help_text is None:
|
||||
self.help_text = metric.help_text
|
||||
|
||||
|
||||
_INF = float("inf")
|
||||
_MINUS_INF = float("-inf")
|
||||
|
||||
|
||||
def _floatToGoString(d: float):
|
||||
if d == _INF:
|
||||
return '+Inf'
|
||||
elif d == _MINUS_INF:
|
||||
return '-Inf'
|
||||
elif math.isnan(d):
|
||||
return 'NaN'
|
||||
else:
|
||||
s = repr(d)
|
||||
dot = s.find('.')
|
||||
# Go switches to exponents sooner than Python.
|
||||
# We only need to care about positive values for le/quantile.
|
||||
if d > 0 and dot > 6:
|
||||
mantissa = '{0}.{1}{2}'.format(
|
||||
s[0], s[1:dot], s[dot + 1:]).rstrip('0.')
|
||||
return '{0}e+0{1}'.format(mantissa, dot - 1)
|
||||
return s
|
||||
|
||||
|
||||
class Registry:
|
||||
_metrics: typing.MutableMapping[str, typing.Union[_MetricCollection, str]]
|
||||
_paths: typing.MutableMapping[typing.Union[MetricBase, MetricGroupBase], Path]
|
||||
_strong_metrics: typing.MutableSet[typing.Union[MetricBase, MetricGroupBase]]
|
||||
|
||||
def __init__(self):
|
||||
self._lock = Lock()
|
||||
self._metrics = dict()
|
||||
self._paths = weakref.WeakKeyDictionary()
|
||||
self._strong_metrics = set()
|
||||
|
||||
# requires lock
|
||||
def _unregister(self, metric: MetricBase):
|
||||
# don't keep alive anymore
|
||||
try:
|
||||
self._strong_metrics.remove(metric)
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
try:
|
||||
path = self._paths[metric]
|
||||
c = self._metrics[path.name]
|
||||
del c[path.labels]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
def unregister(self, metric: typing.Union[MetricBase, MetricGroupBase]):
|
||||
with self._lock:
|
||||
if isinstance(metric, MetricGroupBase):
|
||||
metrics = metric.metrics(Path(""))
|
||||
for (_, m) in metrics:
|
||||
self._unregister(m)
|
||||
else:
|
||||
self._unregister(metric)
|
||||
|
||||
# requires lock
|
||||
def _remove_collection(self, metric_name: str):
|
||||
try:
|
||||
c = self._metrics[metric_name]
|
||||
except KeyError:
|
||||
return
|
||||
assert isinstance(
|
||||
c, _MetricCollection), "cannot remove reserved metric names directly"
|
||||
for suffix in c.reserved_suffixes:
|
||||
assert isinstance(
|
||||
self._metrics[metric_name + suffix], str), "reserved suffix missing"
|
||||
del self._metrics[metric_name + suffix]
|
||||
del self._metrics[metric_name]
|
||||
|
||||
# requires lock
|
||||
def _get_collection(self, path: Path, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None):
|
||||
c = None
|
||||
try:
|
||||
c = self._metrics[path.name]
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
if not c is None:
|
||||
if not isinstance(c, _MetricCollection):
|
||||
c_parent = self._metrics[c]
|
||||
assert isinstance(
|
||||
c_parent, _MetricCollection), "nested metric groups not allowed"
|
||||
if 0 == len(c_parent.metrics):
|
||||
self._remove_collection(c)
|
||||
else:
|
||||
raise RuntimeError(
|
||||
"Metric name {} reserved for group {}".format(path.name, c))
|
||||
else:
|
||||
if 0 == len(c.metrics):
|
||||
self._remove_collection(path.name)
|
||||
else:
|
||||
c.check(path, metric_type, group_definition)
|
||||
return c
|
||||
|
||||
c = _MetricCollection(metric_type, group_definition)
|
||||
self._metrics[path.name] = c
|
||||
return c
|
||||
|
||||
def register(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase], strong: bool = False):
|
||||
with self._lock:
|
||||
if metric in self._paths:
|
||||
raise RuntimeError("metric already registered")
|
||||
|
||||
if isinstance(metric, MetricGroupBase):
|
||||
group_definition = metric.group_definition
|
||||
else:
|
||||
group_definition = None
|
||||
c = self._get_collection(
|
||||
path, metric.metric_type, group_definition)
|
||||
|
||||
c.insert(path, metric)
|
||||
self._paths[metric] = path
|
||||
if strong:
|
||||
# keep metric alive
|
||||
self._strong_metrics.add(metric)
|
||||
|
||||
def collect(self):
|
||||
metrics = []
|
||||
with self._lock:
|
||||
for (metric_name, c) in self._metrics.items():
|
||||
l = [(k, m.get()) for (k, m) in c.metrics.items()]
|
||||
metrics.append((metric_name, c.metric_type, c.help_text, l))
|
||||
|
||||
from io import StringIO
|
||||
result = StringIO()
|
||||
for (metric_name, metric_type, help_text, data) in metrics:
|
||||
if 0 == len(data):
|
||||
continue
|
||||
if not help_text is None:
|
||||
result.write("# HELP {} {}\n".format(
|
||||
metric_name, help_text.replace('\\', r'\\').replace('\n', r'\n')))
|
||||
result.write('# TYPE {} {}\n'.format(metric_name, metric_type))
|
||||
for (key, entry) in data:
|
||||
if isinstance(entry, MetricValue):
|
||||
if entry.timestamp is None:
|
||||
timestamp = ''
|
||||
else:
|
||||
timestamp = ' {0:d}'.format(entry.timestamp)
|
||||
result.write("{} {} {}{}\n".format(
|
||||
metric_name, key, _floatToGoString(entry.value), timestamp))
|
||||
else:
|
||||
for (subpath, subentry) in entry:
|
||||
if subentry.timestamp is None:
|
||||
timestamp = ''
|
||||
else:
|
||||
timestamp = ' {0:d}'.format(subentry.timestamp)
|
||||
result.write("{} {} {}{}\n".format(
|
||||
subpath.name, subpath.labels.key, _floatToGoString(subentry.value), timestamp))
|
||||
|
||||
return result.getvalue()
|
||||
|
||||
|
||||
GLOBAL_REGISTRY = Registry()
|
||||
Reference in New Issue
Block a user