Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#5076](https://github.com/open-telemetry/opentelemetry-python/pull/5076))
- `opentelemetry-semantic-conventions`: use `X | Y` union annotation
([#5096](https://github.com/open-telemetry/opentelemetry-python/pull/5096))
- `opentelemetry-exporter-prometheus`: add support for Resource attributes configuration for Prometheus exporter
([#5122](https://github.com/open-telemetry/opentelemetry-python/pull/5122))


## Version 1.41.0/0.62b0 (2026-04-09)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
from json import dumps
from logging import getLogger
from os import environ
from typing import Deque, Dict, Iterable, Sequence, Tuple, Union
from typing import Callable, Deque, Dict, Iterable, Sequence, Tuple, Union

from prometheus_client import start_http_server
from prometheus_client.core import (
Expand Down Expand Up @@ -135,7 +135,10 @@ class PrometheusMetricReader(MetricReader):
"""Prometheus metric exporter for OpenTelemetry."""

def __init__(
self, disable_target_info: bool = False, prefix: str = ""
self,
disable_target_info: bool = False,
prefix: str = "",
resource_attr_filter: Callable[[str], bool] | None = None,
) -> None:
super().__init__(
preferred_temporality={
Expand All @@ -149,7 +152,9 @@ def __init__(
otel_component_type=OtelComponentTypeValues.PROMETHEUS_HTTP_TEXT_METRIC_EXPORTER,
)
self._collector = _CustomCollector(
disable_target_info=disable_target_info, prefix=prefix
disable_target_info=disable_target_info,
prefix=prefix,
resource_attr_filter=resource_attr_filter,
)
REGISTRY.register(self._collector)
self._collector._callback = self.collect
Expand All @@ -176,12 +181,18 @@ class _CustomCollector:
https://github.com/prometheus/client_python#custom-collectors
"""

def __init__(self, disable_target_info: bool = False, prefix: str = ""):
def __init__(
self,
disable_target_info: bool = False,
prefix: str = "",
resource_attr_filter: Callable[[str], bool] | None = None,
):
self._callback = None
self._metrics_datas: Deque[MetricsData] = deque()
self._disable_target_info = disable_target_info
self._target_info = None
self._prefix = prefix
self._resource_attr_filter = resource_attr_filter

def add_metrics_data(self, metrics_data: MetricsData) -> None:
"""Add metrics to Prometheus data"""
Expand Down Expand Up @@ -226,161 +237,182 @@ def _translate_to_prometheus(
metrics_data: MetricsData,
metric_family_id_metric_family: Dict[str, PrometheusMetric],
):
metrics = []

for resource_metrics in metrics_data.resource_metrics:
resource_attrs = (
{
key: value
for key, value in resource_metrics.resource.attributes.items()
if self._resource_attr_filter(key)
}
if self._resource_attr_filter is not None
else {}
)

for scope_metrics in resource_metrics.scope_metrics:
for metric in scope_metrics.metrics:
metrics.append(metric)

for metric in metrics:
label_values_data_points = []
values = []

metric_name = metric.name
if self._prefix:
metric_name = self._prefix + "_" + metric_name
metric_name = sanitize_full_name(metric_name)
metric_description = metric.description or ""
metric_unit = map_unit(metric.unit)

# First pass: collect all unique label keys across all data points
all_label_keys_set = set()
data_point_attributes = []
for number_data_point in metric.data.data_points:
attrs = {}
for key, value in number_data_point.attributes.items():
sanitized_key = sanitize_attribute(key)
all_label_keys_set.add(sanitized_key)
attrs[sanitized_key] = self._check_value(value)
data_point_attributes.append(attrs)

if isinstance(number_data_point, HistogramDataPoint):
values.append(
{
"bucket_counts": number_data_point.bucket_counts,
"explicit_bounds": (
number_data_point.explicit_bounds
),
"sum": number_data_point.sum,
}
label_values_data_points = []
values = []

metric_name = metric.name
if self._prefix:
metric_name = self._prefix + "_" + metric_name
metric_name = sanitize_full_name(metric_name)
metric_description = metric.description or ""
metric_unit = map_unit(metric.unit)

# First pass: collect all unique label keys across all data points
all_label_keys_set = set()
data_point_attributes = []
for number_data_point in metric.data.data_points:
attrs = {}
for key, value in chain(
resource_attrs.items(),
number_data_point.attributes.items(),
):
sanitized_key = sanitize_attribute(key)
all_label_keys_set.add(sanitized_key)
attrs[sanitized_key] = self._check_value(value)
data_point_attributes.append(attrs)

if isinstance(number_data_point, HistogramDataPoint):
values.append(
{
"bucket_counts": number_data_point.bucket_counts,
"explicit_bounds": (
number_data_point.explicit_bounds
),
"sum": number_data_point.sum,
}
)
else:
values.append(number_data_point.value)

# Sort label keys for consistent ordering
all_label_keys = sorted(all_label_keys_set)

# Second pass: build label values with empty strings for missing labels
for attrs in data_point_attributes:
label_values = []
for key in all_label_keys:
label_values.append(attrs.get(key, ""))
label_values_data_points.append(label_values)

# Create metric family ID without label keys
per_metric_family_id = "|".join(
[
metric_name,
metric_description,
metric_unit,
]
)
else:
values.append(number_data_point.value)

# Sort label keys for consistent ordering
all_label_keys = sorted(all_label_keys_set)

# Second pass: build label values with empty strings for missing labels
for attrs in data_point_attributes:
label_values = []
for key in all_label_keys:
label_values.append(attrs.get(key, ""))
label_values_data_points.append(label_values)

# Create metric family ID without label keys
per_metric_family_id = "|".join(
[
metric_name,
metric_description,
metric_unit,
]
)

is_non_monotonic_sum = (
isinstance(metric.data, Sum)
and metric.data.is_monotonic is False
)
is_cumulative = (
isinstance(metric.data, Sum)
and metric.data.aggregation_temporality
== AggregationTemporality.CUMULATIVE
)
is_non_monotonic_sum = (
isinstance(metric.data, Sum)
and metric.data.is_monotonic is False
)
is_cumulative = (
isinstance(metric.data, Sum)
and metric.data.aggregation_temporality
== AggregationTemporality.CUMULATIVE
)

# The prometheus compatibility spec for sums says: If the aggregation temporality is cumulative and the sum is non-monotonic, it MUST be converted to a Prometheus Gauge.
should_convert_sum_to_gauge = (
is_non_monotonic_sum and is_cumulative
)
# The prometheus compatibility spec for sums says: If the aggregation temporality is cumulative and the sum is non-monotonic, it MUST be converted to a Prometheus Gauge.
should_convert_sum_to_gauge = (
is_non_monotonic_sum and is_cumulative
)

if (
isinstance(metric.data, Sum)
and not should_convert_sum_to_gauge
):
metric_family_id = "|".join(
[per_metric_family_id, CounterMetricFamily.__name__]
)
if (
isinstance(metric.data, Sum)
and not should_convert_sum_to_gauge
):
metric_family_id = "|".join(
[
per_metric_family_id,
CounterMetricFamily.__name__,
]
)

if metric_family_id not in metric_family_id_metric_family:
metric_family_id_metric_family[metric_family_id] = (
CounterMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
if (
metric_family_id
not in metric_family_id_metric_family
):
metric_family_id_metric_family[
metric_family_id
] = CounterMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif (
isinstance(metric.data, Gauge)
or should_convert_sum_to_gauge
):
metric_family_id = "|".join(
[per_metric_family_id, GaugeMetricFamily.__name__]
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif isinstance(metric.data, Gauge) or should_convert_sum_to_gauge:
metric_family_id = "|".join(
[per_metric_family_id, GaugeMetricFamily.__name__]
)

if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[metric_family_id] = (
GaugeMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = GaugeMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif isinstance(metric.data, Histogram):
metric_family_id = "|".join(
[
per_metric_family_id,
HistogramMetricFamily.__name__,
]
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(labels=label_values, value=value)
elif isinstance(metric.data, Histogram):
metric_family_id = "|".join(
[per_metric_family_id, HistogramMetricFamily.__name__]
)

if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[metric_family_id] = (
HistogramMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
if (
metric_family_id
not in metric_family_id_metric_family.keys()
):
metric_family_id_metric_family[
metric_family_id
] = HistogramMetricFamily(
name=metric_name,
documentation=metric_description,
labels=all_label_keys,
unit=metric_unit,
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(
labels=label_values,
buckets=_convert_buckets(
value["bucket_counts"],
value["explicit_bounds"],
),
sum_value=value["sum"],
)
else:
_logger.warning(
"Unsupported metric data. %s", type(metric.data)
)
)
for label_values, value in zip(
label_values_data_points, values
):
metric_family_id_metric_family[
metric_family_id
].add_metric(
labels=label_values,
buckets=_convert_buckets(
value["bucket_counts"], value["explicit_bounds"]
),
sum_value=value["sum"],
)
else:
_logger.warning(
"Unsupported metric data. %s", type(metric.data)
)

# pylint: disable=no-self-use
def _check_value(self, value: Union[int, float, str, Sequence]) -> str:
Expand Down
Loading
Loading