Skip to content

KAFKA-20456: Track timed-out waitForFuture() futures and clean up leaked tasks #22094

Open
bbejeck wants to merge 4 commits intoapache:trunkfrom
bbejeck:KAFKA-20456_handle_null_from_get_gracefully
Open

KAFKA-20456: Track timed-out waitForFuture() futures and clean up leaked tasks #22094
bbejeck wants to merge 4 commits intoapache:trunkfrom
bbejeck:KAFKA-20456_handle_null_from_get_gracefully

Conversation

@bbejeck
Copy link
Copy Markdown
Member

@bbejeck bbejeck commented Apr 19, 2026

  • When TaskManager.waitForFuture() received a null result it would
    throw an IllegalStateException, now the future is now stashed in a
    pendingRemoveFutures map instead of being silently discarded.
    Previously, this caused the task to be orphaned — nobody tracked it, and
    its RocksDB file LOCK was never released. - A new
    processPendingRemoveFutures() method, called at the start of
    checkStateUpdater(), polls completed futures and closes the returned
    task dirty, releasing the RocksDB LOCK.
  • handleTasksInStateUpdater() now skips tasks that have a pending
    future, preventing duplicate REMOVE actions from being enqueued.
  • The IllegalStateException thrown when future.get() returns null (line
  1. is replaced with a warning log — the null result is handled the
    same as a timeout (stashed for deferred cleanup).

Changes

TaskManager.java

  • Added pendingRemoveFutures map (LinkedHashMap<TaskId,
    CompletableFuture>) - waitForFuture(): timeout and null-result paths
    now stash the future instead of discarding it

  • New processPendingRemoveFutures(): iterates pending futures, closes
    completed tasks dirty - checkStateUpdater(): calls
    processPendingRemoveFutures() before addTasksToStateUpdater()

  • handleTasksInStateUpdater(): skips tasks with pending futures

    TaskManagerTest.java — 4 new tests:

  • shouldStashFutureOnWaitForFutureTimeout

  • shouldNotThrowOnNullRemovedTaskResult

  • shouldProcessPendingRemoveFuturesAndCloseTaskDirty

  • shouldSkipTaskInStateUpdaterWithPendingFuture

    StateUpdaterRestorationRaceIntegrationTest.java — new integration
    test:

  • shouldCleanUpLeakedTaskAndReleaseRocksDBLockAfterWaitForFutureTimeout:
    verifies the full round trip — timeout fires, future stashed, task
    cleaned up, restart succeeds without ProcessorStateException

This PR is stacked on top of the waitForFuture() timeout bound PR. To
review only the changes in this PR, diff from commit 890ad65.

Reviewers: Uladzislau Blok
123193120+UladzislauBlok@users.noreply.github.com, ChickenchickenLove
ojt90902@naver.com

Comment on lines +113 to +117
private final long waitForFutureTimeoutMs;


private final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> pendingRemoveFutures = new LinkedHashMap<>();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final long waitForFutureTimeoutMs;
private final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> pendingRemoveFutures = new LinkedHashMap<>();
private final long waitForFutureTimeoutMs;
private final Map<TaskId, CompletableFuture<StateUpdater.RemovedTaskResult>> pendingRemoveFutures = new LinkedHashMap<>();

} catch (final Exception e) {
log.warn("Exception processing deferred removal of task {}", entry.getKey(), e);
}
it.remove();
Copy link
Copy Markdown
Contributor

@UladzislauBlok UladzislauBlok Apr 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In waitForFuture if we get null result we put it into map to process later:

            if (removedTaskResult == null) {
                throw new IllegalStateException("Task " + taskId + " was not found in the state updater. "
                    + BUG_ERROR_MESSAGE);
                log.warn("Task {} was not found in the state updater. "
                    + "Deferring cleanup to next checkStateUpdater() call.", taskId);
                pendingRemoveFutures.put(taskId, future);
                return null;
            }

Here if there is no result we just remove it:

                try {
                    final StateUpdater.RemovedTaskResult result = entry.getValue().get();
                    if (result != null) {
                        log.info("Processing deferred removal of task {}", entry.getKey());
                        closeTaskDirty(result.task(), false);
                    }
                } catch (final Exception e) {
                    log.warn("Exception processing deferred removal of task {}", entry.getKey(), e);
                }
                it.remove();

Could you please explain difference, because for me it looks like we do exactly the same, but in different place

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're correct, it's mainly there for consistency. But with the new approach with this PR I'm thinking we should go back to throwing if the the removedTaskResult is null as that should really never happen.

Copy link
Copy Markdown
Contributor

@chickenchickenlove chickenchickenlove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch!
I left minor comments, when you get a chance, please take a look.

}
}

private void processPendingRemoveFutures() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForFuture() is used not only for the close path, but also by updateInputPartitions(), addToActiveTasksToRecycle(), and addToStandbyTasksToRecycle().

With the current change, any timed-out remove future is stashed in pendingRemoveFutures, and later processPendingRemoveFutures() appears to handle all such futures uniformly via closeTaskDirty(). That seems reasonable for the leaked-task / dirty-close case, but I may be missing how the intended continuation is preserved for the update/recycle paths.

For example, if a timed-out future originated from one of the recycle paths, do we still resume that recycle flow once the remove eventually completes, or is the intent here to scope this fix strictly to the leaked-task cleanup case?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chickenchickenlove thanks for the comment.

I still think this is the correct approach regardless of how a task get to pendingRemoveFutures, but some context on TaskManager.waitForFuture will help explain my reasoning.

  1. KAFKA-10199 (KAFKA-10199: Remove lost tasks in state updater with new remove #15870) — future.get() with no timeout. Blocked the StreamThread indefinitely. If the StateUpdater thread died or stalled, Streams hung forever, hence never a TimeoutExcpetion
  2. KAFKA-19831 (KAFKA-19831: Improved error handling in DefaultStateUpdater. #20767) — Added a 5-minute timeout and a TimeoutException catch that returns null. This prevented infinite blocking, but now the null flows through getNonFailedTasks() → .filter(Objects::nonNull) → the task is silently dropped. Nobody holds a reference to the future anymore. When the StateUpdater eventually processes the REMOVE, the suspended task (with stores still
    open) is orphaned — no cleanup ever happens.

The scenario outlined in point 2 surfaced in a soak application where a task timed out and after a rebalance and the task was reassigned -> ProcessorStateException crash from a LOCK conflict with the orphaned task.

So why closeTaskDirty() for all paths? By the time processPendingRemoveFutures() runs (on the next checkStateUpdater() call), the rebalance context that originally requested the update/recycle/close is gone —
activeTasksToCreate/standbyTasksToCreate were already mutated during the original handleTasksInStateUpdater() call. The task is in limbo: suspended, stores open, owned by nobody.

The closeTaskDirty() call releases the resources (critically, the RocksDB file LOCK) so the next rebalance can safely create a fresh task for that partition. With this approach the worst-case cost for the update/recycle paths is a full restore — but without this, the worst case is a crash.

But the situation gets better. We're currently working on delivering KIP-892 so this situation should not result in a full restore.

@bbejeck bbejeck changed the title KAFKA-20456: Don't throw on waitForFuture() futures and clean up leaked tasks KAFKA-20456: Track timed-out waitForFuture() futures and clean up leaked tasks Apr 20, 2026
@bbejeck bbejeck force-pushed the KAFKA-20456_handle_null_from_get_gracefully branch from 6908143 to 63f6458 Compare April 20, 2026 23:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants