diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 833d42aeae4ea..0c5577153cd14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -483,6 +483,10 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, threadIdx ); + final long maxPollIntervalMs = Integer.parseInt( + config.getMainConsumerConfigs("dummy", "dummy", threadIdx) + .getOrDefault(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000").toString()); + final TaskManager taskManager = new TaskManager( time, changelogReader, @@ -495,7 +499,8 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, adminClient, stateDirectory, stateUpdater, - schedulingTaskManager + schedulingTaskManager, + maxPollIntervalMs * 3 / 4 ); referenceContainer.taskManager = taskManager; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 4eb2ad36fc8ad..e217cd0bf0b70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -110,6 +110,8 @@ public class TaskManager { private final StandbyTaskCreator standbyTaskCreator; private final StateUpdater stateUpdater; private final DefaultTaskManager schedulingTaskManager; + private final long waitForFutureTimeoutMs; + TaskManager(final Time time, final ChangelogReader changelogReader, final ProcessId processId, @@ -121,7 +123,8 @@ public class TaskManager { final Admin adminClient, final StateDirectory stateDirectory, final StateUpdater stateUpdater, - final DefaultTaskManager schedulingTaskManager + final DefaultTaskManager schedulingTaskManager, + final long waitForFutureTimeoutMs ) { this.time = time; this.processId = processId; @@ -139,6 +142,7 @@ public class TaskManager { this.stateUpdater = stateUpdater; this.schedulingTaskManager = schedulingTaskManager; + this.waitForFutureTimeoutMs = waitForFutureTimeoutMs; this.tasks = tasks; this.taskExecutor = new TaskExecutor( this.tasks, @@ -155,6 +159,11 @@ void setMainConsumer(final Consumer mainConsumer) { this.mainConsumer = mainConsumer; } + // visible for testing + long waitForFutureTimeoutMs() { + return waitForFutureTimeoutMs; + } + public double totalProducerBlockedTime() { return activeTaskCreator.totalProducerBlockedTime(); } @@ -705,7 +714,7 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId, final CompletableFuture future) { final StateUpdater.RemovedTaskResult removedTaskResult; try { - removedTaskResult = future.get(5, TimeUnit.MINUTES); + removedTaskResult = future.get(waitForFutureTimeoutMs, TimeUnit.MILLISECONDS); if (removedTaskResult == null) { throw new IllegalStateException("Task " + taskId + " was not found in the state updater. " + BUG_ERROR_MESSAGE); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 21e06b19cc129..8d6414072b097 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.clients.admin.MockAdminClient; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -1067,7 +1068,8 @@ public void shouldCommitAfterCommitInterval(final boolean processingThreadsEnabl null, null, null, - null + null, + 300_000L ) { @Override int commit(final Collection tasksToCommit) { @@ -1175,7 +1177,8 @@ public void shouldRecordCommitLatency(final boolean processingThreadsEnabled) { null, stateDirectory, stateUpdater, - schedulingTaskManager + schedulingTaskManager, + 300_000L ) { @Override int commit(final Collection tasksToCommit) { @@ -4197,6 +4200,23 @@ private void runUntilTimeoutOrException(final Runnable action) { } } + @Test + public void shouldSetWaitForFutureTimeoutFromMaxPollIntervalMs() { + final Properties properties = configProps(false, false); + properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "20000"); + final StreamsConfig config = new StreamsConfig(properties); + thread = createStreamThread(CLIENT_ID, config); + + assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(15_000L)); + } + + @Test + public void shouldSetDefaultWaitForFutureTimeoutFromDefaultMaxPollIntervalMs() { + thread = createStreamThread(CLIENT_ID, false); + + assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(225_000L)); + } + private boolean runUntilTimeoutOrCondition(final Runnable action, final TestCondition testCondition) throws Exception { final long expectedEnd = System.currentTimeMillis() + DEFAULT_MAX_WAIT_MS; while (System.currentTimeMillis() < expectedEnd) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index cc3cd12b91810..8170038e853df 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -228,7 +228,8 @@ private TaskManager setUpTaskManager(final ProcessingMode processingMode, adminClient, stateDirectory, stateUpdater, - processingThreadsEnabled ? schedulingTaskManager : null + processingThreadsEnabled ? schedulingTaskManager : null, + 300_000L ); taskManager.setMainConsumer(consumer); return taskManager;