Source code for opentelemetry.exporter.prometheus

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

"""
This library allows export of metrics data to `Prometheus <https://prometheus.io/>`_.

Usage
-----

The **OpenTelemetry Prometheus Exporter** allows export of `OpenTelemetry`_
metrics to `Prometheus`_.


.. _Prometheus: https://prometheus.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/

.. code:: python

    from prometheus_client import start_http_server

    from opentelemetry.exporter.prometheus import PrometheusMetricReader
    from opentelemetry.metrics import get_meter_provider, set_meter_provider
    from opentelemetry.sdk.metrics import MeterProvider

    # Start Prometheus client
    start_http_server(port=8000, addr="localhost")

    # Exporter to export metrics to Prometheus
    prefix = "MyAppPrefix"
    reader = PrometheusMetricReader(prefix=prefix)

    # Meter is responsible for creating and recording metrics
    set_meter_provider(MeterProvider(metric_readers=[reader]))
    meter = get_meter_provider().get_meter("myapp", "0.1.2")

    counter = meter.create_counter(
        "requests",
        "requests",
        "number of requests",
    )

    # Labels are used to identify key-values that are associated with a specific
    # metric that you want to record. These are useful for pre-aggregation and can
    # be used to store custom dimensions pertaining to a metric
    labels = {"environment": "staging"}

    counter.add(25, labels)
    input("Press any key to exit...")

API
---
"""

from collections import deque
from collections.abc import Iterable, Sequence
from itertools import chain
from json import dumps
from logging import getLogger
from os import environ

from prometheus_client import CollectorRegistry, start_http_server
from prometheus_client.core import (
    REGISTRY,
    CounterMetricFamily,
    GaugeMetricFamily,
    HistogramMetricFamily,
    InfoMetricFamily,
)
from prometheus_client.core import Metric as PrometheusMetric

from opentelemetry.exporter.prometheus._mapping import (
    map_unit,
    sanitize_attribute,
    sanitize_full_name,
)
from opentelemetry.sdk.environment_variables import (
    OTEL_EXPORTER_PROMETHEUS_HOST,
    OTEL_EXPORTER_PROMETHEUS_PORT,
)
from opentelemetry.sdk.metrics import (
    Counter,
    ObservableCounter,
    ObservableGauge,
    ObservableUpDownCounter,
    UpDownCounter,
)
from opentelemetry.sdk.metrics import Histogram as HistogramInstrument
from opentelemetry.sdk.metrics.export import (
    AggregationTemporality,
    Gauge,
    Histogram,
    HistogramDataPoint,
    MetricReader,
    MetricsData,
    Sum,
)
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
    OtelComponentTypeValues,
)
from opentelemetry.util.types import Attributes

_logger = getLogger(__name__)

_TARGET_INFO_NAME = "target"
_TARGET_INFO_DESCRIPTION = "Target metadata"


def _convert_buckets(
    bucket_counts: Sequence[int], explicit_bounds: Sequence[float]
) -> Sequence[tuple[str, int]]:
    buckets = []
    total_count = 0
    for upper_bound, count in zip(
        chain(explicit_bounds, ["+Inf"]),
        bucket_counts,
    ):
        total_count += count
        buckets.append((f"{upper_bound}", total_count))

    return buckets


[docs] class PrometheusMetricReader(MetricReader): """Prometheus metric exporter for OpenTelemetry.""" def __init__( self, disable_target_info: bool = False, prefix: str = "", *, registry: CollectorRegistry = REGISTRY, ) -> None: super().__init__( preferred_temporality={ Counter: AggregationTemporality.CUMULATIVE, UpDownCounter: AggregationTemporality.CUMULATIVE, HistogramInstrument: AggregationTemporality.CUMULATIVE, ObservableCounter: AggregationTemporality.CUMULATIVE, ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, ObservableGauge: AggregationTemporality.CUMULATIVE, }, otel_component_type=OtelComponentTypeValues.PROMETHEUS_HTTP_TEXT_METRIC_EXPORTER, ) self._collector = _CustomCollector( disable_target_info=disable_target_info, prefix=prefix ) self._registry = registry self._registry.register(self._collector) self._collector._callback = self.collect self._prefix = prefix def _receive_metrics( self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> None: if metrics_data is None: return self._collector.add_metrics_data(metrics_data)
[docs] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._registry.unregister(self._collector)
class _CustomCollector: """_CustomCollector represents the Prometheus Collector object See more: https://github.com/prometheus/client_python#custom-collectors """ def __init__(self, disable_target_info: bool = False, prefix: str = ""): self._callback = None self._metrics_datas: deque[MetricsData] = deque() self._disable_target_info = disable_target_info self._target_info = None self._prefix = prefix def add_metrics_data(self, metrics_data: MetricsData) -> None: """Add metrics to Prometheus data""" self._metrics_datas.append(metrics_data) def collect(self) -> Iterable[PrometheusMetric]: """Collect fetches the metrics from OpenTelemetry and delivers them as Prometheus Metrics. Collect is invoked every time a ``prometheus.Gatherer`` is run for example when the HTTP endpoint is invoked by Prometheus. """ if self._callback is not None: self._callback() metric_family_id_metric_family = {} if len(self._metrics_datas): if not self._disable_target_info: if self._target_info is None: attributes: Attributes = {} for res in self._metrics_datas[0].resource_metrics: attributes = {**attributes, **res.resource.attributes} self._target_info = self._create_info_metric( _TARGET_INFO_NAME, _TARGET_INFO_DESCRIPTION, attributes ) metric_family_id_metric_family[_TARGET_INFO_NAME] = ( self._target_info ) while self._metrics_datas: self._translate_to_prometheus( self._metrics_datas.popleft(), metric_family_id_metric_family ) if metric_family_id_metric_family: yield from metric_family_id_metric_family.values() # pylint: disable=too-many-locals,too-many-branches def _translate_to_prometheus( self, metrics_data: MetricsData, metric_family_id_metric_family: dict[str, PrometheusMetric], ): metrics = [] for resource_metrics in metrics_data.resource_metrics: for scope_metrics in resource_metrics.scope_metrics: for metric in scope_metrics.metrics: metrics.append(metric) for metric in metrics: label_values_data_points = [] values = [] metric_name = metric.name if self._prefix: metric_name = self._prefix + "_" + metric_name metric_name = sanitize_full_name(metric_name) metric_description = metric.description or "" metric_unit = map_unit(metric.unit) # First pass: collect all unique label keys across all data points all_label_keys_set = set() data_point_attributes = [] for number_data_point in metric.data.data_points: attrs = {} for key, value in number_data_point.attributes.items(): sanitized_key = sanitize_attribute(key) all_label_keys_set.add(sanitized_key) attrs[sanitized_key] = self._check_value(value) data_point_attributes.append(attrs) if isinstance(number_data_point, HistogramDataPoint): values.append( { "bucket_counts": number_data_point.bucket_counts, "explicit_bounds": ( number_data_point.explicit_bounds ), "sum": number_data_point.sum, } ) else: values.append(number_data_point.value) # Sort label keys for consistent ordering all_label_keys = sorted(all_label_keys_set) # Second pass: build label values with empty strings for missing labels for attrs in data_point_attributes: label_values = [] for key in all_label_keys: label_values.append(attrs.get(key, "")) label_values_data_points.append(label_values) # Create metric family ID without label keys per_metric_family_id = "|".join( [ metric_name, metric_description, metric_unit, ] ) is_non_monotonic_sum = ( isinstance(metric.data, Sum) and metric.data.is_monotonic is False ) is_cumulative = ( isinstance(metric.data, Sum) and metric.data.aggregation_temporality == AggregationTemporality.CUMULATIVE ) # The prometheus compatibility spec for sums says: If the aggregation temporality is cumulative and the sum is non-monotonic, it MUST be converted to a Prometheus Gauge. should_convert_sum_to_gauge = ( is_non_monotonic_sum and is_cumulative ) if ( isinstance(metric.data, Sum) and not should_convert_sum_to_gauge ): metric_family_id = "|".join( [per_metric_family_id, CounterMetricFamily.__name__] ) if metric_family_id not in metric_family_id_metric_family: metric_family_id_metric_family[metric_family_id] = ( CounterMetricFamily( name=metric_name, documentation=metric_description, labels=all_label_keys, unit=metric_unit, ) ) for label_values, value in zip( label_values_data_points, values ): metric_family_id_metric_family[ metric_family_id ].add_metric(labels=label_values, value=value) elif isinstance(metric.data, Gauge) or should_convert_sum_to_gauge: metric_family_id = "|".join( [per_metric_family_id, GaugeMetricFamily.__name__] ) if ( metric_family_id not in metric_family_id_metric_family.keys() ): metric_family_id_metric_family[metric_family_id] = ( GaugeMetricFamily( name=metric_name, documentation=metric_description, labels=all_label_keys, unit=metric_unit, ) ) for label_values, value in zip( label_values_data_points, values ): metric_family_id_metric_family[ metric_family_id ].add_metric(labels=label_values, value=value) elif isinstance(metric.data, Histogram): metric_family_id = "|".join( [per_metric_family_id, HistogramMetricFamily.__name__] ) if ( metric_family_id not in metric_family_id_metric_family.keys() ): metric_family_id_metric_family[metric_family_id] = ( HistogramMetricFamily( name=metric_name, documentation=metric_description, labels=all_label_keys, unit=metric_unit, ) ) for label_values, value in zip( label_values_data_points, values ): metric_family_id_metric_family[ metric_family_id ].add_metric( labels=label_values, buckets=_convert_buckets( value["bucket_counts"], value["explicit_bounds"] ), sum_value=value["sum"], ) else: _logger.warning( "Unsupported metric data. %s", type(metric.data) ) # pylint: disable=no-self-use def _check_value(self, value: int | float | str | Sequence) -> str: """Check the label value and return is appropriate representation""" if not isinstance(value, str): return dumps(value, default=str) return str(value) def _create_info_metric( self, name: str, description: str, attributes: dict[str, str] ) -> InfoMetricFamily: """Create an Info Metric Family with list of attributes""" # sanitize the attribute names according to Prometheus rule attributes = { sanitize_attribute(key): self._check_value(value) for key, value in attributes.items() } info = InfoMetricFamily(name, description, labels=attributes) info.add_metric(labels=list(attributes.keys()), value=attributes) return info class _AutoPrometheusMetricReader(PrometheusMetricReader): """Thin wrapper around PrometheusMetricReader used for the opentelemetry_metrics_exporter entry point. This allows users to use the prometheus exporter with opentelemetry-instrument. It handles starting the Prometheus http server on the the correct port and host. """ def __init__(self) -> None: super().__init__() # Default values are specified in # https://github.com/open-telemetry/opentelemetry-specification/blob/v1.24.0/specification/configuration/sdk-environment-variables.md#prometheus-exporter start_http_server( port=int(environ.get(OTEL_EXPORTER_PROMETHEUS_PORT, "9464")), addr=environ.get(OTEL_EXPORTER_PROMETHEUS_HOST, "localhost"), )