Skip to content

Commit e90ab8d

Browse files
committed
Add tests to verify sequential export for PeriodicMetricReader
1 parent 48575c2 commit e90ab8d

1 file changed

Lines changed: 193 additions & 2 deletions

File tree

sdk/metrics/src/test/java/io/opentelemetry/sdk/metrics/export/PeriodicMetricReaderTest.java

Lines changed: 193 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@
4242
import java.util.concurrent.TimeUnit;
4343
import java.util.concurrent.atomic.AtomicBoolean;
4444
import java.util.concurrent.atomic.AtomicInteger;
45+
import java.util.logging.Handler;
46+
import java.util.logging.Level;
47+
import java.util.logging.LogRecord;
48+
import java.util.logging.Logger;
4549
import javax.annotation.Nullable;
4650
import org.junit.jupiter.api.BeforeEach;
4751
import org.junit.jupiter.api.Test;
@@ -413,7 +417,8 @@ public CompletableResultCode shutdown() {
413417
shutdownThread.start();
414418

415419
// Give shutdown() time to reach the flushInProgress.join() wait.
416-
// Even if this executes before shutdown enters the wait, the assertions below still
420+
// Even if this executes before shutdown enters the wait, the assertions below
421+
// still
417422
// validate correctness — they just won't exercise the concurrent case.
418423
Thread.sleep(200);
419424

@@ -428,7 +433,8 @@ public CompletableResultCode shutdown() {
428433
assertThat(flushResult.isSuccess()).isTrue();
429434
// Final shutdown export also ran (in-flight + final = 2)
430435
assertThat(exportCount.get()).isEqualTo(2);
431-
// Exporter.shutdown() was not called while the in-flight export was still pending
436+
// Exporter.shutdown() was not called while the in-flight export was still
437+
// pending
432438
assertThat(shutdownCalledWhileExportPending.get()).isFalse();
433439
}
434440

@@ -493,6 +499,170 @@ void periodicExport_SequentialBatches() throws Exception {
493499
reader.shutdown();
494500
}
495501

502+
@Test
503+
void periodicExport_SequentialBatches_PartialFailure() throws Exception {
504+
MetricExporter mockExporter = mock(MetricExporter.class);
505+
when(mockExporter.getAggregationTemporality(any()))
506+
.thenReturn(AggregationTemporality.CUMULATIVE);
507+
when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
508+
when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
509+
510+
CompletableResultCode batch1Result = new CompletableResultCode();
511+
CompletableResultCode batch2Result = new CompletableResultCode();
512+
CompletableResultCode batch3Result = new CompletableResultCode();
513+
514+
when(mockExporter.export(any()))
515+
.thenReturn(batch1Result)
516+
.thenReturn(batch2Result)
517+
.thenReturn(batch3Result);
518+
519+
PeriodicMetricReader reader =
520+
PeriodicMetricReader.builder(mockExporter)
521+
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
522+
.setMaxExportBatchSize(2) // 6 points / 2 = 3 batches
523+
.build();
524+
525+
when(collectionRegistration.collectAllMetrics())
526+
.thenReturn(Collections.singletonList(METRIC_DATA));
527+
reader.register(collectionRegistration);
528+
529+
Logger targetLogger = Logger.getLogger(PeriodicMetricReader.class.getName());
530+
Level originalLevel = targetLogger.getLevel();
531+
targetLogger.setLevel(Level.FINE);
532+
533+
TestHandler testHandler = new TestHandler();
534+
testHandler.setLevel(Level.FINE);
535+
targetLogger.addHandler(testHandler);
536+
537+
try {
538+
CompletableResultCode flushResult = reader.forceFlush();
539+
540+
verify(mockExporter, times(1)).export(any());
541+
542+
batch1Result.succeed();
543+
verify(mockExporter, times(2)).export(any());
544+
545+
batch2Result.fail();
546+
verify(mockExporter, times(3)).export(any());
547+
548+
batch3Result.succeed();
549+
550+
// Flush result should still be success
551+
assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
552+
553+
boolean logFound =
554+
testHandler.getLogRecords().stream()
555+
.anyMatch(
556+
record ->
557+
record.getLevel().equals(Level.FINE)
558+
&& record.getMessage().equals("Exporter failed"));
559+
assertThat(logFound).isTrue();
560+
561+
reader.shutdown();
562+
} finally {
563+
targetLogger.removeHandler(testHandler);
564+
targetLogger.setLevel(originalLevel);
565+
}
566+
}
567+
568+
@Test
569+
void periodicExport_SequentialBatches_PurelySynchronous() throws Exception {
570+
MetricExporter mockExporter = mock(MetricExporter.class);
571+
when(mockExporter.getAggregationTemporality(any()))
572+
.thenReturn(AggregationTemporality.CUMULATIVE);
573+
when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
574+
when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
575+
576+
when(mockExporter.export(any()))
577+
.thenReturn(CompletableResultCode.ofSuccess())
578+
.thenReturn(CompletableResultCode.ofSuccess())
579+
.thenReturn(CompletableResultCode.ofSuccess());
580+
581+
PeriodicMetricReader reader =
582+
PeriodicMetricReader.builder(mockExporter)
583+
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
584+
.setMaxExportBatchSize(2) // 6 points / 2 = 3 batches
585+
.build();
586+
587+
when(collectionRegistration.collectAllMetrics())
588+
.thenReturn(Collections.singletonList(METRIC_DATA));
589+
reader.register(collectionRegistration);
590+
591+
CompletableResultCode flushResult = reader.forceFlush();
592+
593+
// Verify that all 3 batches WERE exported immediately
594+
verify(mockExporter, times(3)).export(any());
595+
596+
assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
597+
598+
reader.shutdown();
599+
}
600+
601+
@Test
602+
void periodicExport_SequentialBatches_PurelyAsynchronous() throws Exception {
603+
MetricExporter mockExporter = mock(MetricExporter.class);
604+
when(mockExporter.getAggregationTemporality(any()))
605+
.thenReturn(AggregationTemporality.CUMULATIVE);
606+
when(mockExporter.flush()).thenReturn(CompletableResultCode.ofSuccess());
607+
when(mockExporter.shutdown()).thenReturn(CompletableResultCode.ofSuccess());
608+
609+
CompletableResultCode batch1Result = new CompletableResultCode();
610+
CompletableResultCode batch2Result = new CompletableResultCode();
611+
CompletableResultCode batch3Result = new CompletableResultCode();
612+
613+
when(mockExporter.export(any()))
614+
.thenReturn(batch1Result)
615+
.thenReturn(batch2Result)
616+
.thenReturn(batch3Result);
617+
618+
PeriodicMetricReader reader =
619+
PeriodicMetricReader.builder(mockExporter)
620+
.setInterval(Duration.ofSeconds(Integer.MAX_VALUE))
621+
.setMaxExportBatchSize(2) // 6 points / 2 = 3 batches
622+
.build();
623+
624+
when(collectionRegistration.collectAllMetrics())
625+
.thenReturn(Collections.singletonList(METRIC_DATA));
626+
reader.register(collectionRegistration);
627+
628+
Logger targetLogger = Logger.getLogger(PeriodicMetricReader.class.getName());
629+
Level originalLevel = targetLogger.getLevel();
630+
targetLogger.setLevel(Level.FINE);
631+
632+
TestHandler testHandler = new TestHandler();
633+
testHandler.setLevel(Level.FINE);
634+
targetLogger.addHandler(testHandler);
635+
636+
try {
637+
CompletableResultCode flushResult = reader.forceFlush();
638+
639+
verify(mockExporter, times(1)).export(any());
640+
641+
batch1Result.succeed();
642+
verify(mockExporter, times(2)).export(any());
643+
644+
batch2Result.succeed();
645+
verify(mockExporter, times(3)).export(any());
646+
647+
batch3Result.succeed();
648+
649+
assertThat(flushResult.join(5, TimeUnit.SECONDS).isSuccess()).isTrue();
650+
651+
boolean logFound =
652+
testHandler.getLogRecords().stream()
653+
.anyMatch(
654+
record ->
655+
record.getLevel().equals(Level.FINE)
656+
&& record.getMessage().equals("Exporter failed"));
657+
assertThat(logFound).isFalse();
658+
659+
reader.shutdown();
660+
} finally {
661+
targetLogger.removeHandler(testHandler);
662+
targetLogger.setLevel(originalLevel);
663+
}
664+
}
665+
496666
@Test
497667
void stringRepresentation() {
498668
when(metricExporter.toString()).thenReturn("MockMetricExporter{}");
@@ -567,4 +737,25 @@ List<List<MetricData>> waitForNumberOfExports(int numberOfExports) throws Except
567737
return result;
568738
}
569739
}
740+
741+
private static class TestHandler extends Handler {
742+
private final List<LogRecord> logRecords = new ArrayList<>();
743+
744+
private TestHandler() {}
745+
746+
@Override
747+
public void publish(LogRecord record) {
748+
logRecords.add(record);
749+
}
750+
751+
@Override
752+
public void flush() {}
753+
754+
@Override
755+
public void close() {}
756+
757+
List<LogRecord> getLogRecords() {
758+
return logRecords;
759+
}
760+
}
570761
}

0 commit comments

Comments
 (0)