Skip to content

Commit 132f46b

Browse files
committed
Address review comments
1 parent 890ad65 commit 132f46b

3 files changed

Lines changed: 4 additions & 3 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -500,7 +500,7 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
500500
stateDirectory,
501501
stateUpdater,
502502
schedulingTaskManager,
503-
maxPollIntervalMs / 2
503+
maxPollIntervalMs * 3 / 4
504504
);
505505
referenceContainer.taskManager = taskManager;
506506

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public class TaskManager {
111111
private final StateUpdater stateUpdater;
112112
private final DefaultTaskManager schedulingTaskManager;
113113
private final long waitForFutureTimeoutMs;
114+
114115
TaskManager(final Time time,
115116
final ChangelogReader changelogReader,
116117
final ProcessId processId,

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4207,14 +4207,14 @@ public void shouldSetWaitForFutureTimeoutFromMaxPollIntervalMs() {
42074207
final StreamsConfig config = new StreamsConfig(properties);
42084208
thread = createStreamThread(CLIENT_ID, config);
42094209

4210-
assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(10_000L));
4210+
assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(15_000L));
42114211
}
42124212

42134213
@Test
42144214
public void shouldSetDefaultWaitForFutureTimeoutFromDefaultMaxPollIntervalMs() {
42154215
thread = createStreamThread(CLIENT_ID, false);
42164216

4217-
assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(150_000L));
4217+
assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(225_000L));
42184218
}
42194219

42204220
private boolean runUntilTimeoutOrCondition(final Runnable action, final TestCondition testCondition) throws Exception {

0 commit comments

Comments
 (0)