Skip to content

Commit 48575c2

Browse files
committed
Switch to sequential export
1 parent 1e8c645 commit 48575c2

2 files changed

Lines changed: 79 additions & 9 deletions

File tree

sdk/metrics/src/main/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReader.java

Lines changed: 37 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616
import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
1717
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
1818
import io.opentelemetry.sdk.metrics.data.MetricData;
19-
import java.util.ArrayList;
2019
import java.util.Collection;
21-
import java.util.List;
20+
import java.util.Iterator;
2221
import java.util.concurrent.ScheduledExecutorService;
2322
import java.util.concurrent.ScheduledFuture;
2423
import java.util.concurrent.TimeUnit;
@@ -226,15 +225,44 @@ CompletableResultCode doRun() {
226225
flushResult.succeed();
227226
exportAvailable.set(true);
228227
} else {
229-
Collection<Collection<MetricData>> batches = null;
230228
CompletableResultCode result;
231229
if (metricsBatcher != null) {
232-
batches = metricsBatcher.batchMetrics(metricData);
233-
List<CompletableResultCode> results = new ArrayList<>(batches.size());
234-
for (Collection<MetricData> batch : batches) {
235-
results.add(exporter.export(batch));
236-
}
237-
result = CompletableResultCode.ofAll(results);
230+
Collection<Collection<MetricData>> batches = metricsBatcher.batchMetrics(metricData);
231+
CompletableResultCode sequentialResult = new CompletableResultCode();
232+
AtomicBoolean anyFailed = new AtomicBoolean(false);
233+
Iterator<Collection<MetricData>> batchIterator = batches.iterator();
234+
235+
Runnable exportNext =
236+
new Runnable() {
237+
@Override
238+
public void run() {
239+
while (batchIterator.hasNext()) {
240+
Collection<MetricData> currentBatch = batchIterator.next();
241+
CompletableResultCode currentResult = exporter.export(currentBatch);
242+
if (currentResult.isDone()) {
243+
if (!currentResult.isSuccess()) {
244+
anyFailed.set(true);
245+
}
246+
} else {
247+
currentResult.whenComplete(
248+
() -> {
249+
if (!currentResult.isSuccess()) {
250+
anyFailed.set(true);
251+
}
252+
this.run();
253+
});
254+
return;
255+
}
256+
}
257+
if (anyFailed.get()) {
258+
sequentialResult.fail();
259+
} else {
260+
sequentialResult.succeed();
261+
}
262+
}
263+
};
264+
exportNext.run();
265+
result = sequentialResult;
238266
} else {
239267
result = exporter.export(metricData);
240268
}

sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,6 +451,48 @@ void invalidConfig() {
451451
.hasMessage("executor");
452452
}
453453

454+
@Test
455+
void periodicExport_SequentialBatches() throws Exception {
456+
MetricExporter mockExporter = mock(MetricExporter.class);
457+
when(mockExporter.getAggregationTemporality(any()))
458+
.thenReturn(AggregationTemporality.CUMULATIVE);
459+
when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
460+
when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
461+
462+
CompletableResultCode batch1Result = new CompletableResultCode();
463+
CompletableResultCode batch2Result = CompletableResultCode.ofSuccess();
464+
465+
// Configure mock to return pending for 1st call, success for 2nd
466+
when(mockExporter.export(any())).thenReturn(batch1Result).thenReturn(batch2Result);
467+
468+
PeriodicMetricReader reader =
469+
PeriodicMetricReader.builder(mockExporter)
470+
.setInterval(
471+
Duration.ofSeconds(Integer.MAX_VALUE)) // Long interval to prevent auto-trigger
472+
.setMaxExportBatchSize(3)
473+
.build();
474+
// Setup metrics that will result in 2 batches (we have 6 points in
475+
// LONG_POINT_LIST)
476+
when(collectionRegistration.collectAllMetrics())
477+
.thenReturn(Collections.singletonList(METRIC_DATA));
478+
reader.register(collectionRegistration);
479+
480+
// Trigger manual flush
481+
CompletableResultCode flushResult = reader.forceFlush();
482+
// Verify that the first batch WAS exported
483+
verify(mockExporter, times(1)).export(any());
484+
// At this point, batch 1 is stuck waiting. Batch 2 should NOT be exported yet.
485+
// We verify that export was only called once in total so far.
486+
verify(mockExporter, times(1)).export(any());
487+
// Now we complete the first batch
488+
batch1Result.succeed();
489+
// Verify that the second batch IS NOW exported
490+
verify(mockExporter, times(2)).export(any());
491+
// Ensure the flush operation completes successfully
492+
assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
493+
reader.shutdown();
494+
}
495+
454496
@Test
455497
void stringRepresentation() {
456498
when(metricExporter.toString()).thenReturn("MockMetricExporter{}");

0 commit comments

Comments
 (0)