Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,18 @@
import java.time.Duration;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForActiveRestoringTask;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertEquals;

/**
* Integration test for verifying ListValueStore deserialization behavior after state restoration
Expand All @@ -66,20 +70,16 @@ public class OuterJoinListValueStoreRestorationTest {
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);

private static final String LEFT_TOPIC = "left-topic";
private static final String RIGHT_TOPIC = "right-topic";
private static final String OUTPUT_TOPIC = "output-topic";

private String applicationId;
private String leftTopic;
private String rightTopic;
private String outputTopic;
private Properties streamsConfig;
private KafkaStreams streams;

@BeforeAll
public static void startCluster() throws Exception {
CLUSTER.start();
CLUSTER.createTopic(LEFT_TOPIC, 1, 1);
CLUSTER.createTopic(RIGHT_TOPIC, 1, 1);
CLUSTER.createTopic(OUTPUT_TOPIC, 1, 1);
}

@AfterAll
Expand All @@ -88,17 +88,24 @@ public static void closeCluster() {
}

@BeforeEach
public void before(final TestInfo testInfo) {
public void before(final TestInfo testInfo) throws InterruptedException {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public void before(final TestInfo testInfo) throws InterruptedException {
public void before(final TestInfo testInfo) throws Exception {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nobody will catch an exception, so using generic throws Exception is best to avoid future edits in case other exception types might get throws in the future; because nobody catches it, there is no value to declared the exact exception type.

applicationId = "outer-join-restoration-test-" + safeUniqueTestName(testInfo);
leftTopic = applicationId + "-left";
rightTopic = applicationId + "-right";
outputTopic = applicationId + "-output";
CLUSTER.createTopic(leftTopic, 1, 1);
CLUSTER.createTopic(rightTopic, 1, 1);
CLUSTER.createTopic(outputTopic, 1, 1);
streamsConfig = getStreamsConfig();
}

@AfterEach
public void after() {
public void after() throws Exception {
if (streams != null) {
streams.close(Duration.ofSeconds(30));
streams.cleanUp();
}
CLUSTER.deleteAllTopics();
}

private static Stream<Arguments> processingGuaranteeAndStoreFormat() {
Expand All @@ -125,15 +132,15 @@ private Properties getStreamsConfig() {
private KafkaStreams createOuterJoinTopology() {
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> leftStream = builder.stream(LEFT_TOPIC);
final KStream<String, String> rightStream = builder.stream(RIGHT_TOPIC);
final KStream<String, String> leftStream = builder.stream(leftTopic);
final KStream<String, String> rightStream = builder.stream(rightTopic);

leftStream.outerJoin(
rightStream,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(60)),
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
).to(OUTPUT_TOPIC);
).to(outputTopic);

return new KafkaStreams(builder.build(), streamsConfig);
}
Expand All @@ -157,20 +164,20 @@ public void testOuterJoinRestorationWithMultipleRecords(final String processingG
long timestamp = 1000L;
for (int i = 0; i < 10; i++) {
final String key = "key" + i;
produceRecord(LEFT_TOPIC, key, "left-" + i, timestamp);
produceRecord(leftTopic, key, "left-" + i, timestamp);
timestamp += 100;
}


// Wait for processing and commit to changelog

// 1- Use a probe record to verify end-to-end: process + commit
produceRecord(LEFT_TOPIC, "probe", "probe-left", timestamp);
produceRecord(RIGHT_TOPIC, "probe", "probe-right", timestamp);
produceRecord(leftTopic, "probe", "probe-left", timestamp);
produceRecord(rightTopic, "probe", "probe-right", timestamp);
// 2- Wait for the join result - this proves processing happened
waitUntilMinKeyValueRecordsReceived(
getConsumerConfig(),
OUTPUT_TOPIC,
outputTopic,
1,
30000
);
Expand All @@ -186,20 +193,33 @@ public void testOuterJoinRestorationWithMultipleRecords(final String processingG
streams = createOuterJoinTopology();
startApplicationAndWaitUntilRunning(streams);

// Wait until restoring tasks have been started
waitForActiveRestoringTask(streams, 0, 30000);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why de we need this addition step?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After "Step 4: Restart with Restoration", added a little wait time, but deleted it now.


// Step 5: Trigger Window Advancement

// NOW advance window to trigger emitNonJoinedOuterRecords()
final long timestampBeyondWindow = 62000L; // Beyond 60-second window
produceRecord(LEFT_TOPIC, "trigger", "trigger-value", timestampBeyondWindow);
produceRecord(leftTopic, "trigger", "trigger-value", timestampBeyondWindow);

final List<KeyValue<String, String>> results = waitUntilMinKeyValueRecordsReceived(
getConsumerConfig(),
OUTPUT_TOPIC,
1,
outputTopic,
10,
30000
);

assertFalse(results.isEmpty(), "Should have received output records");
final Set<String> unmatchedKeys = results.stream()
.filter(kv -> kv.value != null && kv.value.endsWith("right=null"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we revers this filter, and only drop if !"probe".equals(kv.key) ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added 1 more assertion. Now we verify record shapes and key based recs.

.map(kv -> kv.key)
.collect(Collectors.toSet());

final Set<String> expectedKeys = IntStream.range(0, 10)
.mapToObj(i -> "key" + i)
.collect(Collectors.toSet());

assertEquals(expectedKeys, unmatchedKeys,
"All 10 unmatched left records should be emitted after restoration");
}

private void produceRecord(final String topic, final String key, final String value, final long timestamp) {
Expand Down
Loading