Skip to content

Commit 890ad65

Browse files
committed
Added test confirming correct math
1 parent 7c985e9 commit 890ad65

3 files changed

Lines changed: 26 additions & 4 deletions

File tree

streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,9 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
483483
threadIdx
484484
);
485485

486-
final int dummyThreadIdxForConfig = 1;
487-
final long maxPollIntervalMs = new InternalConsumerConfig(
488-
config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", dummyThreadIdxForConfig))
489-
.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
486+
final long maxPollIntervalMs = Integer.parseInt(
487+
config.getMainConsumerConfigs("dummy", "dummy", threadIdx)
488+
.getOrDefault(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000").toString());
490489

491490
final TaskManager taskManager = new TaskManager(
492491
time,

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
158158
this.mainConsumer = mainConsumer;
159159
}
160160

161+
// visible for testing
162+
long waitForFutureTimeoutMs() {
163+
return waitForFutureTimeoutMs;
164+
}
165+
161166
public double totalProducerBlockedTime() {
162167
return activeTaskCreator.totalProducerBlockedTime();
163168
}

streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.admin.MockAdminClient;
2020
import org.apache.kafka.clients.consumer.Consumer;
21+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2122
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
2223
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -4199,6 +4200,23 @@ private void runUntilTimeoutOrException(final Runnable action) {
41994200
}
42004201
}
42014202

4203+
@Test
4204+
public void shouldSetWaitForFutureTimeoutFromMaxPollIntervalMs() {
4205+
final Properties properties = configProps(false, false);
4206+
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "20000");
4207+
final StreamsConfig config = new StreamsConfig(properties);
4208+
thread = createStreamThread(CLIENT_ID, config);
4209+
4210+
assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(10_000L));
4211+
}
4212+
4213+
@Test
4214+
public void shouldSetDefaultWaitForFutureTimeoutFromDefaultMaxPollIntervalMs() {
4215+
thread = createStreamThread(CLIENT_ID, false);
4216+
4217+
assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(150_000L));
4218+
}
4219+
42024220
private boolean runUntilTimeoutOrCondition(final Runnable action, final TestCondition testCondition) throws Exception {
42034221
final long expectedEnd = System.currentTimeMillis() + DEFAULT_MAX_WAIT_MS;
42044222
while (System.currentTimeMillis() < expectedEnd) {

0 commit comments

Comments
 (0)