from threading import Lock import typing import weakref import collections import time import math from .metric_base import MetricValue, MetricBase, MetricGroupDefinition, MetricGroupBase, EMPTY_GROUP_DEFINITION from .path import Path, Labels class _MetricGroup: metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]] weak_metric: typing.Any def __init__(self, metrics: typing.Mapping[Labels, typing.Union[MetricBase, MetricGroupBase]], weak_metric): self.metrics = metrics self.weak_metric = weak_metric def get(self) -> typing.Sequence[typing.Tuple[Path, MetricValue]]: return [(k, m.get()) for (k, m) in self.metrics.items()] class _MetricCollection: metric_type: str help_text: typing.Optional[str] metrics: typing.MutableMapping[Labels, typing.Union[MetricBase, _MetricGroup]] group_definition: typing.Optional[MetricGroupDefinition] _groups: typing.MutableSet[_MetricGroup] def __init__(self, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None): self.metric_type = metric_type self.help_text = None self.metrics = weakref.WeakValueDictionary() if group_definition is None: group_definition = EMPTY_GROUP_DEFINITION self.group_definition = group_definition self._groups = set() def check(self, path: Path, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None): if self.metric_type != metric_type: raise RuntimeError("metric type doesn't match for {}: {} != {}".format( path.name, self.metric_type, metric_type)) if group_definition is None: group_definition = EMPTY_GROUP_DEFINITION if self.group_definition.reserved_suffixes != group_definition.reserved_suffixes: raise RuntimeError("reserved suffixes don't match for {}: {} != {}".format( path.name, self.group_definition.reserved_suffixes, group_definition.reserved_suffixes)) if self.group_definition.reserved_labels != group_definition.reserved_labels: raise RuntimeError("reserved labels don't match for {}: {} != {}".format( path.name, self.group_definition.reserved_labels, group_definition.reserved_labels)) for l in self.group_definition.reserved_labels: if l in path.labels.map: raise RuntimeError("'{} {}' uses reserved label {}".format( path.name, path.labels.key, l)) if path.labels.key in self.metrics: raise RuntimeError("'{} {}' already registered".format( path.name, path.labels.key)) def insert(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase]): if isinstance(metric, MetricBase): self.metrics[path.labels.key] = metric else: key = path.labels.key metrics = dict() for (add_path, m) in metric.metrics(): assert ( add_path.name == '' or add_path.name in self.group_definition.reserved_suffixes) for l in add_path.labels.map: assert l in self.group_definition.reserved_labels p = Path(path.name + add_path.name, {**path.labels.map, **add_path.labels.map}) metrics[p] = m g = None # set below, but need for cleanup def cleanup(_weak_metric): self._groups.remove(g) del self.metrics[key] weak_metric = weakref.ref(metric, cleanup) g = _MetricGroup(metrics, weak_metric) self.metrics[key] = g if self.help_text is None: self.help_text = metric.help_text _INF = float("inf") _MINUS_INF = float("-inf") def _floatToGoString(d: float): if d == _INF: return '+Inf' elif d == _MINUS_INF: return '-Inf' elif math.isnan(d): return 'NaN' else: s = repr(d) dot = s.find('.') # Go switches to exponents sooner than Python. # We only need to care about positive values for le/quantile. if d > 0 and dot > 6: mantissa = '{0}.{1}{2}'.format( s[0], s[1:dot], s[dot + 1:]).rstrip('0.') return '{0}e+0{1}'.format(mantissa, dot - 1) return s class Registry: _metrics: typing.MutableMapping[str, typing.Union[_MetricCollection, str]] _paths: typing.MutableMapping[typing.Union[MetricBase, MetricGroupBase], Path] _strong_metrics: typing.MutableSet[typing.Union[MetricBase, MetricGroupBase]] def __init__(self): self._lock = Lock() self._metrics = dict() self._paths = weakref.WeakKeyDictionary() self._strong_metrics = set() # requires lock def _unregister(self, metric: MetricBase): # don't keep alive anymore try: self._strong_metrics.remove(metric) except KeyError: pass try: path = self._paths[metric] c = self._metrics[path.name] del c[path.labels] except KeyError: pass def unregister(self, metric: typing.Union[MetricBase, MetricGroupBase]): with self._lock: if isinstance(metric, MetricGroupBase): metrics = metric.metrics(Path("")) for (_, m) in metrics: self._unregister(m) else: self._unregister(metric) # requires lock def _remove_collection(self, metric_name: str): try: c = self._metrics[metric_name] except KeyError: return assert isinstance( c, _MetricCollection), "cannot remove reserved metric names directly" for suffix in c.reserved_suffixes: assert isinstance( self._metrics[metric_name + suffix], str), "reserved suffix missing" del self._metrics[metric_name + suffix] del self._metrics[metric_name] # requires lock def _get_collection(self, path: Path, metric_type: str, group_definition: typing.Optional[MetricGroupDefinition] = None): c = None try: c = self._metrics[path.name] except KeyError: pass if not c is None: if not isinstance(c, _MetricCollection): c_parent = self._metrics[c] assert isinstance( c_parent, _MetricCollection), "nested metric groups not allowed" if 0 == len(c_parent.metrics): self._remove_collection(c) else: raise RuntimeError( "Metric name {} reserved for group {}".format(path.name, c)) else: if 0 == len(c.metrics): self._remove_collection(path.name) else: c.check(path, metric_type, group_definition) return c c = _MetricCollection(metric_type, group_definition) self._metrics[path.name] = c return c def register(self, path: Path, metric: typing.Union[MetricBase, MetricGroupBase], strong: bool = False): with self._lock: if metric in self._paths: raise RuntimeError("metric already registered") if isinstance(metric, MetricGroupBase): group_definition = metric.group_definition else: group_definition = None c = self._get_collection( path, metric.metric_type, group_definition) c.insert(path, metric) self._paths[metric] = path if strong: # keep metric alive self._strong_metrics.add(metric) def collect(self): metrics = [] with self._lock: for (metric_name, c) in self._metrics.items(): l = [(k, m.get()) for (k, m) in c.metrics.items()] metrics.append((metric_name, c.metric_type, c.help_text, l)) from io import StringIO result = StringIO() for (metric_name, metric_type, help_text, data) in metrics: if 0 == len(data): continue if not help_text is None: result.write("# HELP {} {}\n".format( metric_name, help_text.replace('\\', r'\\').replace('\n', r'\n'))) result.write('# TYPE {} {}\n'.format(metric_name, metric_type)) for (key, entry) in data: if isinstance(entry, MetricValue): if entry.timestamp is None: timestamp = '' else: timestamp = ' {0:d}'.format(entry.timestamp) result.write("{} {} {}{}\n".format( metric_name, key, _floatToGoString(entry.value), timestamp)) else: for (subpath, subentry) in entry: if subentry.timestamp is None: timestamp = '' else: timestamp = ' {0:d}'.format(subentry.timestamp) result.write("{} {} {}{}\n".format( subpath.name, subpath.labels.key, _floatToGoString(subentry.value), timestamp)) return result.getvalue() GLOBAL_REGISTRY = Registry()