4646import java .time .Duration ;
4747import java .util .List ;
4848import java .util .Properties ;
49+ import java .util .Set ;
50+ import java .util .stream .Collectors ;
51+ import java .util .stream .IntStream ;
4952import java .util .stream .Stream ;
5053
5154import static org .apache .kafka .streams .integration .utils .IntegrationTestUtils .purgeLocalStreamsState ;
5255import static org .apache .kafka .streams .integration .utils .IntegrationTestUtils .startApplicationAndWaitUntilRunning ;
5356import static org .apache .kafka .streams .integration .utils .IntegrationTestUtils .waitForCompletion ;
5457import static org .apache .kafka .streams .integration .utils .IntegrationTestUtils .waitUntilMinKeyValueRecordsReceived ;
5558import static org .apache .kafka .streams .utils .TestUtils .safeUniqueTestName ;
56- import static org .junit .jupiter .api .Assertions .assertFalse ;
59+ import static org .junit .jupiter .api .Assertions .assertEquals ;
5760
5861/**
5962 * Integration test for verifying ListValueStore deserialization behavior after state restoration
@@ -66,20 +69,16 @@ public class OuterJoinListValueStoreRestorationTest {
6669 private static final int NUM_BROKERS = 1 ;
6770 public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster (NUM_BROKERS );
6871
69- private static final String LEFT_TOPIC = "left-topic" ;
70- private static final String RIGHT_TOPIC = "right-topic" ;
71- private static final String OUTPUT_TOPIC = "output-topic" ;
72-
7372 private String applicationId ;
73+ private String leftTopic ;
74+ private String rightTopic ;
75+ private String outputTopic ;
7476 private Properties streamsConfig ;
7577 private KafkaStreams streams ;
7678
7779 @ BeforeAll
7880 public static void startCluster () throws Exception {
7981 CLUSTER .start ();
80- CLUSTER .createTopic (LEFT_TOPIC , 1 , 1 );
81- CLUSTER .createTopic (RIGHT_TOPIC , 1 , 1 );
82- CLUSTER .createTopic (OUTPUT_TOPIC , 1 , 1 );
8382 }
8483
8584 @ AfterAll
@@ -88,17 +87,24 @@ public static void closeCluster() {
8887 }
8988
9089 @ BeforeEach
91- public void before (final TestInfo testInfo ) {
90+ public void before (final TestInfo testInfo ) throws Exception {
9291 applicationId = "outer-join-restoration-test-" + safeUniqueTestName (testInfo );
92+ leftTopic = applicationId + "-left" ;
93+ rightTopic = applicationId + "-right" ;
94+ outputTopic = applicationId + "-output" ;
95+ CLUSTER .createTopic (leftTopic , 1 , 1 );
96+ CLUSTER .createTopic (rightTopic , 1 , 1 );
97+ CLUSTER .createTopic (outputTopic , 1 , 1 );
9398 streamsConfig = getStreamsConfig ();
9499 }
95100
96101 @ AfterEach
97- public void after () {
102+ public void after () throws Exception {
98103 if (streams != null ) {
99104 streams .close (Duration .ofSeconds (30 ));
100105 streams .cleanUp ();
101106 }
107+ CLUSTER .deleteAllTopics ();
102108 }
103109
104110 private static Stream <Arguments > processingGuaranteeAndStoreFormat () {
@@ -125,15 +131,15 @@ private Properties getStreamsConfig() {
125131 private KafkaStreams createOuterJoinTopology () {
126132 final StreamsBuilder builder = new StreamsBuilder ();
127133
128- final KStream <String , String > leftStream = builder .stream (LEFT_TOPIC );
129- final KStream <String , String > rightStream = builder .stream (RIGHT_TOPIC );
134+ final KStream <String , String > leftStream = builder .stream (leftTopic );
135+ final KStream <String , String > rightStream = builder .stream (rightTopic );
130136
131137 leftStream .outerJoin (
132138 rightStream ,
133139 (leftValue , rightValue ) -> "left=" + leftValue + ", right=" + rightValue ,
134140 JoinWindows .ofTimeDifferenceWithNoGrace (Duration .ofSeconds (60 )),
135141 StreamJoined .with (Serdes .String (), Serdes .String (), Serdes .String ())
136- ).to (OUTPUT_TOPIC );
142+ ).to (outputTopic );
137143
138144 return new KafkaStreams (builder .build (), streamsConfig );
139145 }
@@ -157,20 +163,20 @@ public void testOuterJoinRestorationWithMultipleRecords(final String processingG
157163 long timestamp = 1000L ;
158164 for (int i = 0 ; i < 10 ; i ++) {
159165 final String key = "key" + i ;
160- produceRecord (LEFT_TOPIC , key , "left-" + i , timestamp );
166+ produceRecord (leftTopic , key , "left-" + i , timestamp );
161167 timestamp += 100 ;
162168 }
163169
164170
165171 // Wait for processing and commit to changelog
166172
167173 // 1- Use a probe record to verify end-to-end: process + commit
168- produceRecord (LEFT_TOPIC , "probe" , "probe-left" , timestamp );
169- produceRecord (RIGHT_TOPIC , "probe" , "probe-right" , timestamp );
174+ produceRecord (leftTopic , "probe" , "probe-left" , timestamp );
175+ produceRecord (rightTopic , "probe" , "probe-right" , timestamp );
170176 // 2- Wait for the join result - this proves processing happened
171177 waitUntilMinKeyValueRecordsReceived (
172178 getConsumerConfig (),
173- OUTPUT_TOPIC ,
179+ outputTopic ,
174180 1 ,
175181 30000
176182 );
@@ -190,16 +196,36 @@ public void testOuterJoinRestorationWithMultipleRecords(final String processingG
190196
191197 // NOW advance window to trigger emitNonJoinedOuterRecords()
192198 final long timestampBeyondWindow = 62000L ; // Beyond 60-second window
193- produceRecord (LEFT_TOPIC , "trigger" , "trigger-value" , timestampBeyondWindow );
199+ produceRecord (leftTopic , "trigger" , "trigger-value" , timestampBeyondWindow );
194200
195201 final List <KeyValue <String , String >> results = waitUntilMinKeyValueRecordsReceived (
196202 getConsumerConfig (),
197- OUTPUT_TOPIC ,
198- 1 ,
203+ outputTopic ,
204+ 10 ,
199205 30000
200206 );
201207
202- assertFalse (results .isEmpty (), "Should have received output records" );
208+ final Set <String > expectedKeys = IntStream .range (0 , 10 )
209+ .mapToObj (i -> "key" + i )
210+ .collect (Collectors .toSet ());
211+
212+ final Set <String > unmatchedKeys = results .stream ()
213+ .filter (kv -> kv .value != null && kv .value .endsWith ("right=null" ))
214+ .map (kv -> kv .key )
215+ .collect (Collectors .toSet ());
216+
217+ // assert based on record shape
218+ assertEquals (expectedKeys , unmatchedKeys ,
219+ "All 10 unmatched left records should be emitted after restoration with right=null shape" );
220+
221+ final Set <String > nonProbeKeys = results .stream ()
222+ .filter (kv -> !"probe" .equals (kv .key ))
223+ .map (kv -> kv .key )
224+ .collect (Collectors .toSet ());
225+
226+ // assert based on keys
227+ assertEquals (expectedKeys , nonProbeKeys ,
228+ "No unexpected keys should appear on the output topic after restoration" );
203229 }
204230
205231 private void produceRecord (final String topic , final String key , final String value , final long timestamp ) {
0 commit comments