Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,10 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
threadIdx
);

final long maxPollIntervalMs = Integer.parseInt(
config.getMainConsumerConfigs("dummy", "dummy", threadIdx)
.getOrDefault(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000").toString());

final TaskManager taskManager = new TaskManager(
time,
changelogReader,
Expand All @@ -495,7 +499,8 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
adminClient,
stateDirectory,
stateUpdater,
schedulingTaskManager
schedulingTaskManager,
maxPollIntervalMs * 3 / 4
);
referenceContainer.taskManager = taskManager;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ public class TaskManager {
private final StandbyTaskCreator standbyTaskCreator;
private final StateUpdater stateUpdater;
private final DefaultTaskManager schedulingTaskManager;
private final long waitForFutureTimeoutMs;

TaskManager(final Time time,
Comment thread
bbejeck marked this conversation as resolved.
final ChangelogReader changelogReader,
final ProcessId processId,
Expand All @@ -121,7 +123,8 @@ public class TaskManager {
final Admin adminClient,
final StateDirectory stateDirectory,
final StateUpdater stateUpdater,
final DefaultTaskManager schedulingTaskManager
final DefaultTaskManager schedulingTaskManager,
final long waitForFutureTimeoutMs
) {
this.time = time;
this.processId = processId;
Expand All @@ -139,6 +142,7 @@ public class TaskManager {

this.stateUpdater = stateUpdater;
this.schedulingTaskManager = schedulingTaskManager;
this.waitForFutureTimeoutMs = waitForFutureTimeoutMs;
this.tasks = tasks;
this.taskExecutor = new TaskExecutor(
this.tasks,
Expand All @@ -155,6 +159,11 @@ void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
this.mainConsumer = mainConsumer;
}

// visible for testing
long waitForFutureTimeoutMs() {
return waitForFutureTimeoutMs;
}

public double totalProducerBlockedTime() {
return activeTaskCreator.totalProducerBlockedTime();
}
Expand Down Expand Up @@ -705,7 +714,7 @@ private StateUpdater.RemovedTaskResult waitForFuture(final TaskId taskId,
final CompletableFuture<StateUpdater.RemovedTaskResult> future) {
final StateUpdater.RemovedTaskResult removedTaskResult;
try {
removedTaskResult = future.get(5, TimeUnit.MINUTES);
removedTaskResult = future.get(waitForFutureTimeoutMs, TimeUnit.MILLISECONDS);
if (removedTaskResult == null) {
throw new IllegalStateException("Task " + taskId + " was not found in the state updater. "
+ BUG_ERROR_MESSAGE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.admin.MockAdminClient;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -1067,7 +1068,8 @@ public void shouldCommitAfterCommitInterval(final boolean processingThreadsEnabl
null,
null,
null,
null
null,
300_000L
) {
@Override
int commit(final Collection<? extends Task> tasksToCommit) {
Expand Down Expand Up @@ -1175,7 +1177,8 @@ public void shouldRecordCommitLatency(final boolean processingThreadsEnabled) {
null,
stateDirectory,
stateUpdater,
schedulingTaskManager
schedulingTaskManager,
300_000L
) {
@Override
int commit(final Collection<? extends Task> tasksToCommit) {
Expand Down Expand Up @@ -4197,6 +4200,23 @@ private void runUntilTimeoutOrException(final Runnable action) {
}
}

@Test
public void shouldSetWaitForFutureTimeoutFromMaxPollIntervalMs() {
final Properties properties = configProps(false, false);
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "20000");
final StreamsConfig config = new StreamsConfig(properties);
thread = createStreamThread(CLIENT_ID, config);

assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(15_000L));
}

@Test
public void shouldSetDefaultWaitForFutureTimeoutFromDefaultMaxPollIntervalMs() {
thread = createStreamThread(CLIENT_ID, false);

assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(225_000L));
}

private boolean runUntilTimeoutOrCondition(final Runnable action, final TestCondition testCondition) throws Exception {
final long expectedEnd = System.currentTimeMillis() + DEFAULT_MAX_WAIT_MS;
while (System.currentTimeMillis() < expectedEnd) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ private TaskManager setUpTaskManager(final ProcessingMode processingMode,
adminClient,
stateDirectory,
stateUpdater,
processingThreadsEnabled ? schedulingTaskManager : null
processingThreadsEnabled ? schedulingTaskManager : null,
300_000L
);
taskManager.setMainConsumer(consumer);
return taskManager;
Expand Down
Loading