Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#5093](https://github.com/open-telemetry/opentelemetry-python/pull/5093))
- `opentelemetry-sdk`: fix YAML structure injection via environment variable substitution in declarative file configuration; values containing newlines are now emitted as quoted YAML scalars per spec requirement
([#5091](https://github.com/open-telemetry/opentelemetry-python/pull/5091))
- `opentelemetry-sdk`: Fix `force_flush` on `MetricReader` and `PeriodicExportingMetricReader` to return a meaningful `bool` reflecting actual export success/failure instead of always returning `True`. Also fixes `detach(token)` not being called when export raises an exception. ([#5020](https://github.com/open-telemetry/opentelemetry-python/issues/5020))
Comment thread
MikeGoldsmith marked this conversation as resolved.
Outdated
- `opentelemetry-sdk`: Add `create_logger_provider`/`configure_logger_provider` to declarative file configuration, enabling LoggerProvider instantiation from config files without reading env vars
([#4990](https://github.com/open-telemetry/opentelemetry-python/pull/4990))
- `opentelemetry-sdk`: Add `service` resource detector support to declarative file configuration via `detection_development.detectors[].service`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from sys import stdout
from threading import Event, Lock, RLock, Thread
from time import perf_counter, time_ns
from typing import IO, Callable, Iterable, Optional
from typing import IO, Callable, Iterable

from typing_extensions import final

Expand Down Expand Up @@ -336,7 +336,7 @@ def __init__(
)

@final
def collect(self, timeout_millis: float = 10_000) -> None:
def collect(self, timeout_millis: float = 10_000) -> bool | None:
"""Collects the metrics from the internal SDK state and
invokes the `_receive_metrics` with the collection.

Expand All @@ -361,10 +361,11 @@ def collect(self, timeout_millis: float = 10_000) -> None:
self._metrics.record_collection(perf_counter() - start_time)

if metrics is not None:
self._receive_metrics(
return self._receive_metrics(
metrics,
timeout_millis=timeout_millis,
)
return None

@final
def _set_collect_callback(
Expand All @@ -386,17 +387,25 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
"""Called by `MetricReader.collect` when it receives a batch of metrics"""
) -> bool | None:
"""Called by `MetricReader.collect` when it receives a batch of metrics.

Subclasses must return ``True`` on success and ``False`` on failure.
Comment thread
MikeGoldsmith marked this conversation as resolved.
Outdated

.. note::
Existing subclasses that return ``None`` (the old implicit default)
will be treated as vacuous success by ``force_flush``, preserving
backward-compatible behaviour.
"""

def _set_meter_provider(self, meter_provider: MeterProvider) -> None:
self._metrics = MetricReaderMetrics(
self._otel_component_type, meter_provider
)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
self.collect(timeout_millis=timeout_millis)
return True
result = self.collect(timeout_millis=timeout_millis)
return result is not False

@abstractmethod
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
Expand Down Expand Up @@ -436,7 +445,7 @@ def __init__(

def get_metrics_data(
self,
) -> Optional[MetricsData]:
) -> MetricsData | None:
"""Reads and returns current metrics from the SDK"""
with self._lock:
self.collect()
Expand All @@ -449,9 +458,10 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
) -> bool:
with self._lock:
self._metrics_data = metrics_data
return True

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
pass
Expand All @@ -470,8 +480,8 @@ class PeriodicExportingMetricReader(MetricReader):
def __init__(
self,
exporter: MetricExporter,
export_interval_millis: Optional[float] = None,
export_timeout_millis: Optional[float] = None,
export_interval_millis: float | None = None,
export_timeout_millis: float | None = None,
) -> None:
# PeriodicExportingMetricReader defers to exporter for configuration
super().__init__(
Expand Down Expand Up @@ -567,17 +577,19 @@ def _receive_metrics(
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
) -> bool:
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
# pylint: disable=broad-exception-caught,invalid-name
try:
with self._export_lock:
self._exporter.export(
result = self._exporter.export(
metrics_data, timeout_millis=timeout_millis
)
return result is MetricExportResult.SUCCESS
except Exception:
_logger.exception("Exception while exporting metrics")
detach(token)
return False
finally:
detach(token)

def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
deadline_ns = time_ns() + timeout_millis * 10**6
Expand All @@ -596,6 +608,6 @@ def _shutdown():
self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)

def force_flush(self, timeout_millis: float = 10_000) -> bool:
super().force_flush(timeout_millis=timeout_millis)
self._exporter.force_flush(timeout_millis=timeout_millis)
return True
if not super().force_flush(timeout_millis=timeout_millis):
return False
return self._exporter.force_flush(timeout_millis=timeout_millis)
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,54 @@ def test_metric_reader_metrics(self):
assert isinstance(name, str)
self.assertTrue(name.startswith("periodic_metric_reader/"))

mp.shutdown()
mp.shutdown()

def test_force_flush_returns_true_on_success(self):
exporter = FakeMetricsExporter()
pmr = self._create_periodic_reader(metrics, exporter)
result = pmr.force_flush(timeout_millis=5_000)
self.assertTrue(result)
pmr.shutdown()

def test_force_flush_returns_false_on_export_failure(self):
exporter = FakeMetricsExporter()
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
pmr = self._create_periodic_reader(metrics, exporter)
result = pmr.force_flush(timeout_millis=5_000)
self.assertFalse(result)
pmr.shutdown()

def test_force_flush_skips_exporter_flush_when_collect_fails(self):
exporter = FakeMetricsExporter()
exporter.force_flush = Mock(return_value=True)
pmr = PeriodicExportingMetricReader(
exporter, export_interval_millis=math.inf
)
# No collect callback registered → collect returns None → force_flush
# on base treats None as not-False (success), so wire up a failing one
exporter.export = Mock(return_value=MetricExportResult.FAILURE)

def _collect_failure(reader, timeout_millis):
return metrics

pmr._set_collect_callback(_collect_failure)
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
result = pmr.force_flush(timeout_millis=5_000)
self.assertFalse(result)
exporter.force_flush.assert_not_called()
pmr.shutdown()

def test_detach_called_on_export_failure(self):
"""detach(token) must run in finally even when export returns FAILURE."""
from unittest.mock import patch

exporter = FakeMetricsExporter()
exporter.export = Mock(return_value=MetricExportResult.FAILURE)
pmr = self._create_periodic_reader(metrics, exporter)

with patch(
"opentelemetry.sdk.metrics._internal.export.detach"
) as mock_detach:
pmr.force_flush(timeout_millis=5_000)
self.assertTrue(mock_detach.called)
pmr.shutdown()
Loading