Skip to content

Commit 63f6458

Browse files
committed
Update to not throw on null result from future
1 parent 132f46b commit 63f6458

3 files changed

Lines changed: 473 additions & 6 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.streams.integration;
18+
19+
import org.apache.kafka.clients.consumer.ConsumerConfig;
20+
import org.apache.kafka.clients.producer.KafkaProducer;
21+
import org.apache.kafka.clients.producer.ProducerConfig;
22+
import org.apache.kafka.clients.producer.ProducerRecord;
23+
import org.apache.kafka.common.serialization.IntegerSerializer;
24+
import org.apache.kafka.common.serialization.Serdes;
25+
import org.apache.kafka.common.serialization.StringSerializer;
26+
import org.apache.kafka.common.utils.LogCaptureAppender;
27+
import org.apache.kafka.common.utils.MockTime;
28+
import org.apache.kafka.streams.KafkaStreams;
29+
import org.apache.kafka.streams.StreamsConfig;
30+
import org.apache.kafka.streams.TopologyWrapper;
31+
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
32+
import org.apache.kafka.streams.processor.StateStore;
33+
import org.apache.kafka.streams.processor.StateStoreContext;
34+
import org.apache.kafka.streams.processor.internals.TaskManager;
35+
import org.apache.kafka.streams.state.KeyValueStore;
36+
import org.apache.kafka.streams.state.StoreBuilder;
37+
import org.apache.kafka.streams.state.Stores;
38+
import org.apache.kafka.streams.state.internals.AbstractStoreBuilder;
39+
import org.apache.kafka.test.MockApiProcessorSupplier;
40+
import org.apache.kafka.test.MockKeyValueStore;
41+
import org.apache.kafka.test.TestUtils;
42+
43+
import org.junit.jupiter.api.AfterEach;
44+
import org.junit.jupiter.api.BeforeEach;
45+
import org.junit.jupiter.api.Test;
46+
import org.junit.jupiter.api.TestInfo;
47+
import org.junit.jupiter.api.Timeout;
48+
49+
import java.io.IOException;
50+
import java.time.Duration;
51+
import java.util.Properties;
52+
import java.util.concurrent.CountDownLatch;
53+
import java.util.concurrent.TimeUnit;
54+
import java.util.concurrent.atomic.AtomicBoolean;
55+
56+
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
57+
import static org.junit.jupiter.api.Assertions.assertTrue;
58+
59+
/**
60+
* Integration test that verifies the deferred future tracking fix for the race condition
61+
* between the StateUpdater thread and the StreamThread when a rebalance triggers task
62+
* removal while the StateUpdater is blocked during changelog restoration.
63+
*
64+
* <p>Without the fix, the race condition chain is:
65+
* <ol>
66+
* <li>StateUpdater thread is blocked in restoration (e.g., RocksDB write stall)</li>
67+
* <li>A rebalance occurs, StreamThread calls {@code waitForFuture()} which times out</li>
68+
* <li>The task is silently dropped — nobody tracks it</li>
69+
* <li>StateUpdater eventually processes the REMOVE, suspends the task (stores NOT closed)</li>
70+
* <li>The orphaned task holds the RocksDB file LOCK</li>
71+
* <li>On restart, {@code RocksDB.open()} fails with {@code ProcessorStateException}</li>
72+
* </ol>
73+
*
74+
* <p>With the fix ({@code pendingRemoveFutures} tracking in {@code TaskManager}):
75+
* <ol>
76+
* <li>When {@code waitForFuture()} times out, the future is stashed (not discarded)</li>
77+
* <li>On the next {@code checkStateUpdater()} call, completed futures are polled</li>
78+
* <li>The returned task is closed dirty, releasing the RocksDB LOCK</li>
79+
* <li>On restart, {@code RocksDB.open()} succeeds — no LOCK conflict</li>
80+
* </ol>
81+
*
82+
* @see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035">KIP-1035</a>
83+
*/
84+
@Timeout(120)
85+
public class StateUpdaterRestorationRaceIntegrationTest {
86+
87+
private static final int NUM_BROKERS = 1;
88+
private static final String INPUT_TOPIC = "input-topic";
89+
private static final String BLOCKING_STORE_NAME = "blocking-store";
90+
private static final String ROCKSDB_STORE_NAME = "rocksdb-store";
91+
private static final int NUM_PARTITIONS = 6;
92+
93+
private final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(NUM_BROKERS);
94+
95+
private String appId;
96+
private KafkaStreams streams1;
97+
private KafkaStreams streams2;
98+
99+
// Controls whether the restore callback should block
100+
private final AtomicBoolean blockDuringRestore = new AtomicBoolean(false);
101+
// Ensures only the first restore record triggers the block
102+
private final AtomicBoolean hasBlocked = new AtomicBoolean(false);
103+
// Signaled when restoration has started (StateUpdater is in restore)
104+
private final CountDownLatch restorationStartedLatch = new CountDownLatch(1);
105+
// Released to unblock the StateUpdater's restore callback
106+
private final CountDownLatch restoreBlockLatch = new CountDownLatch(1);
107+
108+
@BeforeEach
109+
public void before(final TestInfo testInfo) throws InterruptedException, IOException {
110+
cluster.start();
111+
cluster.createTopic(INPUT_TOPIC, NUM_PARTITIONS, 1);
112+
appId = "app-" + safeUniqueTestName(testInfo);
113+
}
114+
115+
@AfterEach
116+
public void after() {
117+
// Release the block latch in case the test failed before doing so
118+
restoreBlockLatch.countDown();
119+
if (streams1 != null) {
120+
streams1.close(Duration.ofSeconds(30));
121+
}
122+
if (streams2 != null) {
123+
streams2.close(Duration.ofSeconds(30));
124+
}
125+
cluster.stop();
126+
}
127+
128+
/**
129+
* Verifies that when {@code waitForFuture()} times out during a rebalance, the deferred
130+
* future tracking in {@code TaskManager} cleans up the leaked task and releases the RocksDB
131+
* LOCK, allowing a subsequent restart to succeed without {@code ProcessorStateException}.
132+
*
133+
* <p>Uses a two-store topology:
134+
* <ul>
135+
* <li>{@code blocking-store} (MockKeyValueStore): blocks the StateUpdater thread during restoration</li>
136+
* <li>{@code rocksdb-store} (real RocksDB): acquires the file LOCK that would cause a conflict
137+
* if the task were leaked</li>
138+
* </ul>
139+
*
140+
* <p>Test flow:
141+
* <ol>
142+
* <li>Start instance 1 — both stores initialized, RocksDB LOCK acquired, restoration blocks</li>
143+
* <li>Start instance 2 → rebalance → {@code waitForFuture()} times out → future stashed</li>
144+
* <li>Unblock restoration → StateUpdater processes REMOVE → future completes</li>
145+
* <li>Next {@code checkStateUpdater()} call → {@code processPendingRemoveFutures()} →
146+
* task closed dirty → RocksDB LOCK released</li>
147+
* <li>Close both instances, restart instance 1 with same state directory</li>
148+
* <li>Assert: instance starts successfully (no ProcessorStateException)</li>
149+
* </ol>
150+
*
151+
* <p>Without the fix, step 6 would fail with {@code ProcessorStateException: Error opening store}
152+
* because the orphaned task's RocksDB handle would still hold the file LOCK.
153+
*/
154+
@Test
155+
public void shouldCleanUpLeakedTaskAndReleaseRocksDBLockAfterWaitForFutureTimeout() throws Exception {
156+
blockDuringRestore.set(true);
157+
158+
// Pre-create changelog topics for both stores
159+
final String blockingChangelog = appId + "-" + BLOCKING_STORE_NAME + "-changelog";
160+
final String rocksdbChangelog = appId + "-" + ROCKSDB_STORE_NAME + "-changelog";
161+
cluster.createTopic(blockingChangelog, NUM_PARTITIONS, 1);
162+
cluster.createTopic(rocksdbChangelog, NUM_PARTITIONS, 1);
163+
populateChangelog(blockingChangelog, 50);
164+
populateChangelog(rocksdbChangelog, 50);
165+
166+
final String stateDir1 = TestUtils.tempDirectory().getPath();
167+
final Properties props1 = props(stateDir1);
168+
// waitForFuture timeout = maxPollIntervalMs / 2 = 7.5s
169+
props1.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 15_000);
170+
171+
try (final LogCaptureAppender taskManagerAppender = LogCaptureAppender.createAndRegister(TaskManager.class)) {
172+
streams1 = new KafkaStreams(buildTopologyWithRocksDB(), props1);
173+
streams1.start();
174+
175+
// Wait for the StateUpdater to begin restoration (and block on the latch)
176+
assertTrue(
177+
restorationStartedLatch.await(30, TimeUnit.SECONDS),
178+
"Restoration never started on instance 1"
179+
);
180+
181+
// Start instance 2 to trigger a rebalance while restoration is blocked.
182+
// StreamThread will call handleTasksInStateUpdater() → waitForFuture() → timeout
183+
// → future stashed in pendingRemoveFutures
184+
final String stateDir2 = TestUtils.tempDirectory().getPath();
185+
streams2 = new KafkaStreams(buildTopologyWithRocksDB(), props(stateDir2));
186+
streams2.start();
187+
188+
// Wait for waitForFuture timeout — proves the deferred tracking path was exercised.
189+
TestUtils.waitForCondition(
190+
() -> taskManagerAppender.getMessages().stream()
191+
.anyMatch(msg -> msg.contains("Deferring cleanup to next checkStateUpdater()")),
192+
30_000,
193+
"Expected waitForFuture() to time out and defer cleanup"
194+
);
195+
196+
// Unblock restoration so the StateUpdater can process the pending REMOVE action.
197+
// With the fix: the completed future is picked up by processPendingRemoveFutures()
198+
// on the next checkStateUpdater() call, the task is closed dirty, and the RocksDB
199+
// LOCK is released.
200+
restoreBlockLatch.countDown();
201+
202+
// Wait for processPendingRemoveFutures() to clean up the leaked task:
203+
// 1. StateUpdater finishes restoration and processes the REMOVE
204+
// 2. checkStateUpdater() calls processPendingRemoveFutures()
205+
// 3. closeTaskDirty() closes the RocksDB handle and releases the LOCK
206+
TestUtils.waitForCondition(
207+
() -> taskManagerAppender.getMessages().stream()
208+
.anyMatch(msg -> msg.contains("Processing deferred removal of task")),
209+
30_000,
210+
"Expected processPendingRemoveFutures() to clean up the leaked task"
211+
);
212+
213+
// Close both instances
214+
streams2.close(Duration.ofSeconds(10));
215+
streams2 = null;
216+
streams1.close(Duration.ofSeconds(10));
217+
streams1 = null;
218+
219+
// Restart instance 1 with the SAME state directory.
220+
// Without the fix: the orphaned task's RocksDB handle still holds the file LOCK
221+
// → RocksDB.open() fails → ProcessorStateException
222+
// With the fix: the task was closed dirty, LOCK released
223+
// → RocksDB.open() succeeds → instance starts normally
224+
blockDuringRestore.set(false);
225+
streams1 = new KafkaStreams(buildTopologyWithRocksDB(), props1);
226+
streams1.start();
227+
228+
// Assert: instance starts successfully and reaches RUNNING.
229+
// Without the fix, this would fail with ProcessorStateException from the
230+
// RocksDB LOCK conflict. With the fix, processPendingRemoveFutures() closed
231+
// the leaked task dirty, releasing the LOCK before restart.
232+
TestUtils.waitForCondition(
233+
() -> streams1.state() == KafkaStreams.State.RUNNING,
234+
30_000,
235+
"Instance should reach RUNNING state — the deferred future tracking " +
236+
"should have cleaned up the leaked task and released the RocksDB LOCK"
237+
);
238+
}
239+
}
240+
241+
/**
242+
* Builds a topology with two stores: a blocking MockKeyValueStore (to halt the StateUpdater
243+
* during restoration) and a real RocksDB store (whose file LOCK causes ProcessorStateException
244+
* when the task is leaked and later reassigned).
245+
*/
246+
private TopologyWrapper buildTopologyWithRocksDB() {
247+
// Store 1: MockKeyValueStore with blocking restore callback
248+
final StoreBuilder<KeyValueStore<Object, Object>> blockingStoreBuilder =
249+
new AbstractStoreBuilder<>(BLOCKING_STORE_NAME, Serdes.Integer(), Serdes.String(), new MockTime()) {
250+
@Override
251+
public KeyValueStore<Object, Object> build() {
252+
return new MockKeyValueStore(name, true) {
253+
@Override
254+
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
255+
stateStoreContext.register(root, (key, value) -> {
256+
if (blockDuringRestore.get() && !hasBlocked.getAndSet(true)) {
257+
restorationStartedLatch.countDown();
258+
try {
259+
restoreBlockLatch.await(60, TimeUnit.SECONDS);
260+
} catch (final InterruptedException e) {
261+
Thread.currentThread().interrupt();
262+
}
263+
}
264+
});
265+
initialized = true;
266+
closed = false;
267+
}
268+
};
269+
}
270+
};
271+
272+
// Store 2: Real RocksDB store — acquires file LOCK during init
273+
final StoreBuilder<KeyValueStore<Integer, String>> rocksdbStoreBuilder =
274+
Stores.keyValueStoreBuilder(
275+
Stores.persistentKeyValueStore(ROCKSDB_STORE_NAME),
276+
Serdes.Integer(), Serdes.String());
277+
278+
final TopologyWrapper topology = new TopologyWrapper();
279+
topology.addSource("source", INPUT_TOPIC);
280+
topology.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
281+
topology.addStateStore(blockingStoreBuilder, "processor");
282+
topology.addStateStore(rocksdbStoreBuilder, "processor");
283+
return topology;
284+
}
285+
286+
private Properties props(final String stateDir) {
287+
final Properties props = new Properties();
288+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
289+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
290+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
291+
props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir);
292+
props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
293+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
294+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class);
295+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
296+
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
297+
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), 10_000);
298+
props.put(StreamsConfig.mainConsumerPrefix(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), 3_000);
299+
return props;
300+
}
301+
302+
/**
303+
* Produce records directly to the changelog topic so that restoration is needed
304+
* when an instance starts with an empty state directory.
305+
*/
306+
private void populateChangelog(final String changelogTopic, final int recordsPerPartition) {
307+
final Properties producerConfig = new Properties();
308+
producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
309+
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
310+
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
311+
312+
try (final KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerConfig)) {
313+
for (int partition = 0; partition < NUM_PARTITIONS; partition++) {
314+
for (int i = 0; i < recordsPerPartition; i++) {
315+
producer.send(new ProducerRecord<>(changelogTopic, partition, i, "value-" + i));
316+
}
317+
}
318+
producer.flush();
319+
}
320+
}
321+
}

0 commit comments

Comments
 (0)