Skip to content

Commit bddd85b

Browse files
Nikita-ShupletsovFrankYang0529
authored andcommitted
MINOR: Keep pendingTask as WakeupFuture if currentTask is completed already. (#21586)
System tests that use VerifiableConsumer are flaky because VerifiableConsumer isn't shutting down on request in certain situations. There can be a race condition in the commitSync method, as the future that we set as the active task to the wakeupTrigger can be already completed by the time we are setting it. Which leads to the wakeup request never being fulfilled. Added a check if the task we are receiving in setActiveTask was triggered when we complete it exceptionally. Also added additional logging when a shutdown is requested to make debugging easier. Reviewers: Kirk True <ktrue@confluent.io>, Bill Bejeck <bbejeck@apache.org> (cherry picked from commit fdece9c)
1 parent d5bc851 commit bddd85b

4 files changed

Lines changed: 50 additions & 2 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/WakeupTrigger.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,12 @@ public <T> CompletableFuture<T> setActiveTask(final CompletableFuture<T> current
7878
if (task == null) {
7979
return new ActiveFuture(currentTask);
8080
} else if (task instanceof WakeupFuture) {
81-
currentTask.completeExceptionally(new WakeupException());
82-
return null;
81+
boolean wasTriggered = currentTask.completeExceptionally(new WakeupException());
82+
83+
// If the Future was *already* completed when we invoke completeExceptionally, the WakeupException
84+
// will be ignored. If it was already completed, we then need to return a new WakeupFuture so that the
85+
// next call to setActiveTask will throw the WakeupException.
86+
return wasTriggered ? null : new WakeupFuture();
8387
} else if (task instanceof DisabledWakeups) {
8488
return task;
8589
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/WakeupTriggerTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,39 @@ public void testExceptionTriggeredWhenTaskAsynchronouslyCancelled() {
219219
assertThrows(WakeupException.class, () -> wakeupTrigger.maybeTriggerWakeup());
220220
}
221221

222+
@Test
223+
public void testExceptionTriggeredWhenTaskAsynchronouslyCompletedBeforeSet() {
224+
final CompletableFuture<Void> task = new CompletableFuture<>();
225+
task.complete(null);
226+
wakeupTrigger.wakeup();
227+
wakeupTrigger.setActiveTask(task);
228+
assertNotNull(wakeupTrigger.getPendingTask());
229+
assertInstanceOf(WakeupTrigger.WakeupFuture.class, wakeupTrigger.getPendingTask());
230+
assertThrows(WakeupException.class, () -> wakeupTrigger.maybeTriggerWakeup());
231+
}
232+
233+
@Test
234+
public void testExceptionTriggeredWhenTaskAsynchronouslyFailedBeforeSet() {
235+
final CompletableFuture<Void> task = new CompletableFuture<>();
236+
task.completeExceptionally(new RuntimeException("Simulated error"));
237+
wakeupTrigger.wakeup();
238+
wakeupTrigger.setActiveTask(task);
239+
assertNotNull(wakeupTrigger.getPendingTask());
240+
assertInstanceOf(WakeupTrigger.WakeupFuture.class, wakeupTrigger.getPendingTask());
241+
assertThrows(WakeupException.class, () -> wakeupTrigger.maybeTriggerWakeup());
242+
}
243+
244+
@Test
245+
public void testExceptionTriggeredWhenTaskAsynchronouslyCancelledBeforeSet() {
246+
final CompletableFuture<Void> task = new CompletableFuture<>();
247+
task.cancel(true);
248+
wakeupTrigger.wakeup();
249+
wakeupTrigger.setActiveTask(task);
250+
assertNotNull(wakeupTrigger.getPendingTask());
251+
assertInstanceOf(WakeupTrigger.WakeupFuture.class, wakeupTrigger.getPendingTask());
252+
assertThrows(WakeupException.class, () -> wakeupTrigger.maybeTriggerWakeup());
253+
}
254+
222255
private void assertWakeupExceptionIsThrown(final CompletableFuture<?> future) {
223256
assertTrue(future.isCompletedExceptionally());
224257
assertInstanceOf(WakeupException.class,

tests/kafkatest/services/verifiable_consumer.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ def _worker(self, idx, node):
344344
handler.handle_partitions_revoked(event, node, self.logger)
345345
elif name == "partitions_assigned":
346346
handler.handle_partitions_assigned(event, node, self.logger)
347+
elif name == "shutdown_requested":
348+
self.logger.debug("Shutdown has been requested")
347349
else:
348350
self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event))
349351

tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ public void run() {
259259
public void close() {
260260
boolean interrupted = false;
261261
try {
262+
printJson(new ShutdownRequested());
262263
consumer.wakeup();
263264
while (true) {
264265
try {
@@ -295,6 +296,14 @@ public String name() {
295296
}
296297
}
297298

299+
private static class ShutdownRequested extends ConsumerEvent {
300+
301+
@Override
302+
public String name() {
303+
return "shutdown_requested";
304+
}
305+
}
306+
298307
private static class ShutdownComplete extends ConsumerEvent {
299308

300309
@Override

0 commit comments

Comments
 (0)