Skip to content

Commit dcd7615

Browse files
aliehsaeediimjsax
authored andcommitted
MINOR: Fix ListValueStore's restoration in headers mode (#22068)
Problem: ListValueStore logs raw serialized list bytes to its changelog, but during restoration, the headers-aware converter was incorrectly wrapping the data again, causing deserialization failures when reading restored records (EOFException/NullPointerException in outer join operations). Solution: To get the right RocksDB store, change `OuterStreamJoinStoreFactory` and create the `KeyValueBytesStoreSupplier` by hard-coding the supplier type to PLAIN. Reviewers: Matthias J. Sax <matthias@confluent.io>
1 parent df5748c commit dcd7615

2 files changed

Lines changed: 231 additions & 1 deletion

File tree

Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.integration;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.clients.producer.ProducerConfig;
21+
import org.apache.kafka.common.serialization.Serdes;
22+
import org.apache.kafka.common.serialization.StringDeserializer;
23+
import org.apache.kafka.common.serialization.StringSerializer;
24+
import org.apache.kafka.streams.KafkaStreams;
25+
import org.apache.kafka.streams.KeyValue;
26+
import org.apache.kafka.streams.StreamsBuilder;
27+
import org.apache.kafka.streams.StreamsConfig;
28+
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
29+
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
30+
import org.apache.kafka.streams.kstream.JoinWindows;
31+
import org.apache.kafka.streams.kstream.KStream;
32+
import org.apache.kafka.streams.kstream.StreamJoined;
33+
import org.apache.kafka.test.TestUtils;
34+
35+
import org.junit.jupiter.api.AfterAll;
36+
import org.junit.jupiter.api.AfterEach;
37+
import org.junit.jupiter.api.BeforeAll;
38+
import org.junit.jupiter.api.BeforeEach;
39+
import org.junit.jupiter.api.Tag;
40+
import org.junit.jupiter.api.TestInfo;
41+
import org.junit.jupiter.api.Timeout;
42+
import org.junit.jupiter.params.ParameterizedTest;
43+
import org.junit.jupiter.params.provider.Arguments;
44+
import org.junit.jupiter.params.provider.MethodSource;
45+
46+
import java.time.Duration;
47+
import java.util.List;
48+
import java.util.Properties;
49+
import java.util.stream.Stream;
50+
51+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState;
52+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
53+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForCompletion;
54+
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived;
55+
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
56+
import static org.junit.jupiter.api.Assertions.assertFalse;
57+
58+
/**
59+
* Integration test for verifying ListValueStore deserialization behavior after state restoration
60+
* in header-aware and default stores used by outer join operations.
61+
*/
62+
@Timeout(600)
63+
@Tag("integration")
64+
public class OuterJoinListValueStoreRestorationTest {
65+
66+
private static final int NUM_BROKERS = 1;
67+
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
68+
69+
private static final String LEFT_TOPIC = "left-topic";
70+
private static final String RIGHT_TOPIC = "right-topic";
71+
private static final String OUTPUT_TOPIC = "output-topic";
72+
73+
private String applicationId;
74+
private Properties streamsConfig;
75+
private KafkaStreams streams;
76+
77+
@BeforeAll
78+
public static void startCluster() throws Exception {
79+
CLUSTER.start();
80+
CLUSTER.createTopic(LEFT_TOPIC, 1, 1);
81+
CLUSTER.createTopic(RIGHT_TOPIC, 1, 1);
82+
CLUSTER.createTopic(OUTPUT_TOPIC, 1, 1);
83+
}
84+
85+
@AfterAll
86+
public static void closeCluster() {
87+
CLUSTER.stop();
88+
}
89+
90+
@BeforeEach
91+
public void before(final TestInfo testInfo) {
92+
applicationId = "outer-join-restoration-test-" + safeUniqueTestName(testInfo);
93+
streamsConfig = getStreamsConfig();
94+
}
95+
96+
@AfterEach
97+
public void after() {
98+
if (streams != null) {
99+
streams.close(Duration.ofSeconds(30));
100+
streams.cleanUp();
101+
}
102+
}
103+
104+
private static Stream<Arguments> processingGuaranteeAndStoreFormat() {
105+
return Stream.of(
106+
Arguments.of(StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.DSL_STORE_FORMAT_DEFAULT),
107+
Arguments.of(StreamsConfig.EXACTLY_ONCE_V2, StreamsConfig.DSL_STORE_FORMAT_HEADERS),
108+
Arguments.of(StreamsConfig.AT_LEAST_ONCE, StreamsConfig.DSL_STORE_FORMAT_DEFAULT),
109+
Arguments.of(StreamsConfig.AT_LEAST_ONCE, StreamsConfig.DSL_STORE_FORMAT_HEADERS)
110+
);
111+
}
112+
113+
private Properties getStreamsConfig() {
114+
final Properties props = new Properties();
115+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
116+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
117+
props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
118+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
119+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
120+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
121+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
122+
return props;
123+
}
124+
125+
private KafkaStreams createOuterJoinTopology() {
126+
final StreamsBuilder builder = new StreamsBuilder();
127+
128+
final KStream<String, String> leftStream = builder.stream(LEFT_TOPIC);
129+
final KStream<String, String> rightStream = builder.stream(RIGHT_TOPIC);
130+
131+
leftStream.outerJoin(
132+
rightStream,
133+
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
134+
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofSeconds(60)),
135+
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String())
136+
).to(OUTPUT_TOPIC);
137+
138+
return new KafkaStreams(builder.build(), streamsConfig);
139+
}
140+
141+
@ParameterizedTest
142+
@MethodSource("processingGuaranteeAndStoreFormat")
143+
public void testOuterJoinRestorationWithMultipleRecords(final String processingGuarantee,
144+
final String storeFormat) throws Exception {
145+
// Configure processing guarantee and store format
146+
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, processingGuarantee);
147+
streamsConfig.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, storeFormat);
148+
149+
// Step 1: Initial Topology Start
150+
streams = createOuterJoinTopology();
151+
startApplicationAndWaitUntilRunning(streams);
152+
153+
// Step 2: Create Non-Joined Records
154+
155+
// Produce multiple records to left topic only (no match → non-joined records)
156+
// CRITICAL: Do NOT advance window yet! Records must stay in store before restoration.
157+
long timestamp = 1000L;
158+
for (int i = 0; i < 10; i++) {
159+
final String key = "key" + i;
160+
produceRecord(LEFT_TOPIC, key, "left-" + i, timestamp);
161+
timestamp += 100;
162+
}
163+
164+
165+
// Wait for processing and commit to changelog
166+
167+
// 1- Use a probe record to verify end-to-end: process + commit
168+
produceRecord(LEFT_TOPIC, "probe", "probe-left", timestamp);
169+
produceRecord(RIGHT_TOPIC, "probe", "probe-right", timestamp);
170+
// 2- Wait for the join result - this proves processing happened
171+
waitUntilMinKeyValueRecordsReceived(
172+
getConsumerConfig(),
173+
OUTPUT_TOPIC,
174+
1,
175+
30000
176+
);
177+
// 3- Wait for all records to be processed and committed (zero lag)
178+
// This ensures changelog commits have completed before we close
179+
waitForCompletion(streams, 2, 30000);
180+
181+
// Step 3: Force State Restoration
182+
streams.close(Duration.ofSeconds(30));
183+
purgeLocalStreamsState(streamsConfig);
184+
185+
// Step 4: Restart with Restoration
186+
streams = createOuterJoinTopology();
187+
startApplicationAndWaitUntilRunning(streams);
188+
189+
// Step 5: Trigger Window Advancement
190+
191+
// NOW advance window to trigger emitNonJoinedOuterRecords()
192+
final long timestampBeyondWindow = 62000L; // Beyond 60-second window
193+
produceRecord(LEFT_TOPIC, "trigger", "trigger-value", timestampBeyondWindow);
194+
195+
final List<KeyValue<String, String>> results = waitUntilMinKeyValueRecordsReceived(
196+
getConsumerConfig(),
197+
OUTPUT_TOPIC,
198+
1,
199+
30000
200+
);
201+
202+
assertFalse(results.isEmpty(), "Should have received output records");
203+
}
204+
205+
private void produceRecord(final String topic, final String key, final String value, final long timestamp) {
206+
final Properties producerConfig = new Properties();
207+
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
208+
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
209+
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
210+
producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
211+
212+
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
213+
topic,
214+
List.of(new KeyValue<>(key, value)),
215+
producerConfig,
216+
timestamp
217+
);
218+
}
219+
220+
private Properties getConsumerConfig() {
221+
final Properties consumerConfig = new Properties();
222+
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
223+
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-" + applicationId);
224+
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
225+
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
226+
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
227+
return consumerConfig;
228+
}
229+
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ public StoreBuilder<?> builder() {
9696
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde = new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
9797
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new LeftOrRightValueSerde<>(streamJoined.valueSerde(), streamJoined.otherValueSerde());
9898

99-
final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, dslStoreFormat());
99+
// Once the headers-aware version of ListValueStore is implemented (planned for AK 4.4), replace the PLAIN constant with the dslStoreFormat() method.
100+
final DslKeyValueParams dslKeyValueParams = new DslKeyValueParams(name, DslStoreFormat.PLAIN);
100101
final KeyValueBytesStoreSupplier supplier;
101102

102103
if (passedInDslStoreSuppliers != null) {

0 commit comments

Comments
 (0)