diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java index 98e49fa669b06..bdf824290bf32 100644 --- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java +++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/OuterJoinListValueStoreRestorationTest.java @@ -46,6 +46,9 @@ import java.time.Duration; import java.util.List; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState; @@ -53,7 +56,7 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; /** * Integration test for verifying ListValueStore deserialization behavior after state restoration @@ -66,20 +69,16 @@ public class OuterJoinListValueStoreRestorationTest { private static final int NUM_BROKERS = 1; public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); - private static final String LEFT_TOPIC = "left-topic"; - private static final String RIGHT_TOPIC = "right-topic"; - private static final String OUTPUT_TOPIC = "output-topic"; - private String applicationId; + private String leftTopic; + private String rightTopic; + private String outputTopic; private Properties streamsConfig; private KafkaStreams streams; @BeforeAll public static void startCluster() throws Exception { CLUSTER.start(); - CLUSTER.createTopic(LEFT_TOPIC, 1, 1); - CLUSTER.createTopic(RIGHT_TOPIC, 1, 1); - CLUSTER.createTopic(OUTPUT_TOPIC, 1, 1); } @AfterAll @@ -88,17 +87,24 @@ public static void closeCluster() { } @BeforeEach - public void before(final TestInfo testInfo) { + public void before(final TestInfo testInfo) throws Exception { applicationId = "outer-join-restoration-test-" + safeUniqueTestName(testInfo); + leftTopic = applicationId + "-left"; + rightTopic = applicationId + "-right"; + outputTopic = applicationId + "-output"; + CLUSTER.createTopic(leftTopic, 1, 1); + CLUSTER.createTopic(rightTopic, 1, 1); + CLUSTER.createTopic(outputTopic, 1, 1); streamsConfig = getStreamsConfig(); } @AfterEach - public void after() { + public void after() throws Exception { if (streams != null) { streams.close(Duration.ofSeconds(30)); streams.cleanUp(); } + CLUSTER.deleteAllTopics(); } private static Stream processingGuaranteeAndStoreFormat() { @@ -125,15 +131,15 @@ private Properties getStreamsConfig() { private KafkaStreams createOuterJoinTopology() { final StreamsBuilder builder = new StreamsBuilder(); - final KStream leftStream = builder.stream(LEFT_TOPIC); - final KStream rightStream = builder.stream(RIGHT_TOPIC); + final KStream leftStream = builder.stream(leftTopic); + final KStream rightStream = builder.stream(rightTopic); leftStream.outerJoin( rightStream, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(60)), StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String()) - ).to(OUTPUT_TOPIC); + ).to(outputTopic); return new KafkaStreams(builder.build(), streamsConfig); } @@ -157,7 +163,7 @@ public void testOuterJoinRestorationWithMultipleRecords(final String processingG long timestamp = 1000L; for (int i = 0; i < 10; i++) { final String key = "key" + i; - produceRecord(LEFT_TOPIC, key, "left-" + i, timestamp); + produceRecord(leftTopic, key, "left-" + i, timestamp); timestamp += 100; } @@ -165,12 +171,12 @@ public void testOuterJoinRestorationWithMultipleRecords(final String processingG // Wait for processing and commit to changelog // 1- Use a probe record to verify end-to-end: process + commit - produceRecord(LEFT_TOPIC, "probe", "probe-left", timestamp); - produceRecord(RIGHT_TOPIC, "probe", "probe-right", timestamp); + produceRecord(leftTopic, "probe", "probe-left", timestamp); + produceRecord(rightTopic, "probe", "probe-right", timestamp); // 2- Wait for the join result - this proves processing happened waitUntilMinKeyValueRecordsReceived( getConsumerConfig(), - OUTPUT_TOPIC, + outputTopic, 1, 30000 ); @@ -190,16 +196,36 @@ public void testOuterJoinRestorationWithMultipleRecords(final String processingG // NOW advance window to trigger emitNonJoinedOuterRecords() final long timestampBeyondWindow = 62000L; // Beyond 60-second window - produceRecord(LEFT_TOPIC, "trigger", "trigger-value", timestampBeyondWindow); + produceRecord(leftTopic, "trigger", "trigger-value", timestampBeyondWindow); final List> results = waitUntilMinKeyValueRecordsReceived( getConsumerConfig(), - OUTPUT_TOPIC, - 1, + outputTopic, + 10, 30000 ); - assertFalse(results.isEmpty(), "Should have received output records"); + final Set expectedKeys = IntStream.range(0, 10) + .mapToObj(i -> "key" + i) + .collect(Collectors.toSet()); + + final Set unmatchedKeys = results.stream() + .filter(kv -> kv.value != null && kv.value.endsWith("right=null")) + .map(kv -> kv.key) + .collect(Collectors.toSet()); + + // assert based on record shape + assertEquals(expectedKeys, unmatchedKeys, + "All 10 unmatched left records should be emitted after restoration with right=null shape"); + + final Set nonProbeKeys = results.stream() + .filter(kv -> !"probe".equals(kv.key)) + .map(kv -> kv.key) + .collect(Collectors.toSet()); + + // assert based on keys + assertEquals(expectedKeys, nonProbeKeys, + "No unexpected keys should appear on the output topic after restoration"); } private void produceRecord(final String topic, final String key, final String value, final long timestamp) {