Skip to content

KAFKA-20456: Bound waitForFuture() timeout to less than max.poll.interval#22092

Open
bbejeck wants to merge 3 commits intoapache:trunkfrom
bbejeck:KAFKA-20456_cut_wait_time_less_than_max_block
Open

KAFKA-20456: Bound waitForFuture() timeout to less than max.poll.interval#22092
bbejeck wants to merge 3 commits intoapache:trunkfrom
bbejeck:KAFKA-20456_cut_wait_time_less_than_max_block

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Apr 18, 2026

  • TaskManager.waitForFuture() previously used a hardcoded 5-minute
    timeout when waiting for the StateUpdater to process a REMOVE action.
    When the StateUpdater is blocked (e.g., RocksDB write stall during
    changelog restoration), the StreamThread blocks for the full timeout
    duration and cannot poll, exceeding max.poll.interval.ms and getting
    kicked from the consumer group. This triggers a rebalance cascade
    that can lead to a crash loop.
  • The timeout is now derived from the consumer's max.poll.interval.ms
    config (maxPollIntervalMs / 2), ensuring the StreamThread regains
    control and can poll before the consumer is removed from the group.
  • This is a mitigation — the underlying task leak (when waitForFuture()
    times out and the task is silently dropped) is addressed in a follow-up
    PR.

Reviewers: Uladzislau Blok
123193120+UladzislauBlok@users.noreply.github.com

@github-actions github-actions Bot added streams small Small PRs labels Apr 18, 2026
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, but I have one question:

Don't you think max.poll.interval.ms / 2 is too strict? It can be something with percentage, e.g. max.poll.interval.ms * 0.9 with conversion back to long

final StreamsConfig config = new StreamsConfig(properties);
thread = createStreamThread(CLIENT_ID, config);

assertThat(thread.taskManager().waitForFutureTimeoutMs(), equalTo(10_000L));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: shouldn't we use assertj?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a dependency in AK right now, we use Junit+Hamcrest, plus IMHO what's there now is clear enough.

@bbejeck
Copy link
Copy Markdown
Member Author

bbejeck commented Apr 20, 2026

Don't you think max.poll.interval.ms / 2 is too strict? It can be something with percentage, e.g. max.poll.interval.ms * 0.9 with conversion back to long

I'm not sure. One thing, this PR part of another stacked one - #22094 this fix isn't meant to go in by itself, so I think giving up early is ok as the task gets cleaned up. My concern would be not giving the StreamThread enough time to do other tasks before the embedded consumer can call poll() again. What would think about .75?

@UladzislauBlok
Copy link
Copy Markdown
Contributor

UladzislauBlok commented Apr 20, 2026

Hey
90% was just an example
75% works as well for me

Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants