Conversation
Adds a 'client_rack' option to AIOKafkaConsumer. When set and the broker supports FetchRequest v11+ (Kafka 2.4+) with a configured replica.selector.class such as RackAwareReplicaSelector, the consumer fetches from the closest in-sync replica instead of the leader, reducing cross-AZ traffic and tail latency. Implementation: * Enable FetchRequest v5..v11 in the protocol layer with a per-version builder that fills the new fields (log_start_offset, current_leader_epoch, session_id/epoch, forgotten_topics_data, rack_id) appropriately. * Maintain a per-partition preferred-read-replica cache in Fetcher, populated from FetchResponse v11 and invalidated on TTL expiry, on -1 from the broker, when the node leaves cluster metadata, or on NotLeaderForPartition / UnknownTopicOrPartition / OffsetOutOfRange. * Route fetches through the cache via a new _select_read_replica helper instead of always using the partition leader. * Documentation in docs/consumer.rst including required broker-side configuration (broker.rack and replica.selector.class). * 12 broker-less unit tests covering both protocol and runtime cache behavior. Upstream PR: aio-libs#1158
[DI-5741] Add rack awareness to aiokafka
|
@vmaurin, could you please take a look too? |
vmaurin
left a comment
There was a problem hiding this comment.
- Implementation of the KIP-392 sounds correct (added few comments)
- v11 fetch request/response is not using flexible version yet, so we should be fine here
- I am not so sure about the implication of KIP-320 (v9) and incremental fetch support (v7), but it sounds we pass default value there, so maybe we should just be sure tests are working well with brokers using these versions of the protocol (probably already in the list)
| partitions_by_topic.append((topic, new_parts)) | ||
|
|
||
| args: list = [ | ||
| -1, # replica_id |
There was a problem hiding this comment.
I am a bit unsure if we should still send -1 when consuming from a follower, but it seems to be valid when I look to the java client code https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L317
I am just a bit confused by this sentence in the KIP
the FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower.
There was a problem hiding this comment.
yes, the Java client does the same. -1 means "I'm a consumer" not "only fetch from leader".
| args.append(self._rack_id) | ||
| return request_struct_class(*args) | ||
|
|
||
| if self._isolation_level: |
There was a problem hiding this comment.
Should we raise a similar exception is someone is setting a rack_id, while the broker doesn't support v11 ?
- yes, you see the error and you can maybe fix your consumer config / update the broker. On the other side, your consumer keeps crashing
- no, the consumer "works", but it will never consume from a follower in the same rack
There was a problem hiding this comment.
No, silent fallback is what Java does. But worth a log warning.
Review changes: - Default `client_rack` to `None` instead of `""` in Consumer, Fetcher, and FetchRequest so the wire encoding sends null (-1) rather than an empty string (0) when rack awareness is disabled, matching the Java client. - Remove unused `rack_id` property from FetchRequest. - Remove `metadata_max_age_ms` param from Fetcher; read the TTL for the preferred-replica cache directly from `client._metadata_max_age_ms`. This also removes `self._metadata_max_age_ms` from AIOKafkaConsumer. - Rename `node_id` to `preferred_read_replica` in `_update_preferred_read_replica` for readability. - Make `responder_node_id` a required keyword argument (no default); remove the conservative `None` fallback branch and its test since the caller always knows who responded. - Rename local `parts`/`new_parts` to `partitions`/`new_partitions` in FetchRequest.build(). CI fixes: - Add explicit type annotations (`list[tuple[int, ...]]`, `list[object]`) in FetchRequest.build() to satisfy mypy.
…er only supports FetchRequest < v11 (rack-aware fetching silently falls back to the partition leader).
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1159 +/- ##
==========================================
+ Coverage 94.87% 94.95% +0.08%
==========================================
Files 88 89 +1
Lines 15743 16041 +298
Branches 1380 1397 +17
==========================================
+ Hits 14936 15232 +296
Misses 558 558
- Partials 249 251 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
… test flakiness
- Revert rack_id parameter from `str | None = None` back to `str = ""`,
matching the Kafka protocol definition (STRING, not NULLABLE_STRING).
- Remove `or ""` fallback in FetchRequest.build_request() since rack_id
is now always a string.
- Add `or ""` at the call site in fetcher.py to convert None client_rack
to empty string.
- Replace time.sleep-based TTL expiry test with direct cache manipulation
to fix flakiness on Windows (coarse timer resolution).
- Add tests for rack version warning logging:
- test_warning_logged_when_broker_below_v11: verifies warning is logged
once when client_rack is set but broker doesn't support FetchRequest v11.
- test_no_warning_when_rack_not_set: verifies no warning when client_rack
is None.
- Evict cached preferred replica on fetch transport failure so the next attempt falls back to the leader instead of waiting up to metadata_max_age_ms (P1). - Route ListOffsets / offset-reset traffic to the partition leader, not the preferred follower; keep rack-aware routing scoped to Fetch as KIP-392 requires (P2). - Treat an unknown leader during reset routing as stale metadata: skip the partition and signal invalid_metadata so the client forces a refresh, instead of enqueuing under None/-1 (P3). Add 4 new tests covering all three paths.
| if preferred_read_replica is None or preferred_read_replica == -1: | ||
| leader = self._client.cluster.leader_for_partition(tp) | ||
| # Only drop the cache if the leader itself told us there is no | ||
| # preferred replica. A `-1` from the previously chosen follower | ||
| # means "I am still the right choice" — keep it. | ||
| if responder_node_id == leader: | ||
| self._preferred_read_replica.pop(tp, None) | ||
| return |
There was a problem hiding this comment.
Maybe this whole block is not needed:
- either we just sent the fetch request to the leader meaning that the cache was empty, or pointing to the leader, and basically the leader just tell us "keep going"
- or we just sent the request to a replica, that return -1 to say "don't change the cache"
So maybe the whole function could just update the cache when
if preferred_read_replica is not None and preferred_read_replica >= 0:
self._preferred_read_replica[tp] = (
preferred_read_replica,
time.monotonic() + self._preferred_replica_ttl,
)
The cache invalidation is happening on error or when the timer is expiring, or when a broker indicate us to look in other place
There was a problem hiding this comment.
Yes, you are right. The two cases you outlined are exhaustive (the routing layer reads the same cache, so responder == leader always implies the cache is already empty or already pointing at the leader), and TTL + transport-failure eviction + the routing-error path already cover real invalidation.
I'll collapse the function to the update-only branch and update test_minus_one_from_leader_clears_cache to assert "no entry is created" instead of "entry is dropped". Pushing in the next commit.
There was a problem hiding this comment.
@GlebShipilov Please also add this justification in a comment
…ead_replica The leader-vs-follower invalidation branch is unreachable: when the fetch was routed to the leader, the cache for that partition was already empty by construction. Collapse to a single guarded write and rely on TTL / transport-failure / routing-error paths for real invalidation. Per @vmaurin's review.
|
I can see that there are some failed tests. All 5 failing CI jobs are not caused by the rack-awareness changes in this PR.
What do you think? |
Annotate `preferred_read_replica` as `int` and drop the redundant `is not None` guard. The value always comes from FetchResponse v11+ parsing, which yields an int (`-1` when absent), so `None` is no longer reachable.
…r, trim per-fetch overhead Align KIP-392 preferred-replica handling with the Java consumer and cut work done on every fetch response when the feature is inactive. * Invalidate cached preferred replica on any non-NoError code. Previously only NotLeaderForPartition / UnknownTopicOrPartition and OffsetOutOfRange evicted the cache, so any other partition-level error kept routing fetches to a follower the broker had implicitly rejected until the TTL expired. The per-error branches are now folded into a single `else:` that pops `_preferred_read_replica[tp]` first and only then dispatches the error-specific handling (metadata refresh, offset reset, auth warning, fallback log). Matches Java consumer behaviour. * Compute `rack_aware` once per response. KIP-392 only applies when `client_rack` is set AND the broker speaks FetchRequest v11+. Previously every partition in every response paid for a `part_data[3]` lookup and an `_update_preferred_read_replica` call, even though a non-rack-aware setup is guaranteed to receive -1 and never update the cache. The per-partition update is now gated by a single `rack_aware` boolean computed once per response. * Make the v<11 fallback warning truly one-shot. The old `client_rack and API_VERSION < 11 and not _rack_warning_logged` triple-check ran on every fetch response for the lifetime of the consumer, even though the warning itself was emitted only once. The check now reuses `rack_aware`, short-circuits on `bool(self._client_rack)` for the common (unset) case, and a comment makes the latch explicit.
tests/test_rack_awareness.py (review feedback):
- Remove unused `from __future__ import annotations`
- Remove `test_fetch_request_v9_does_not_carry_rack_id` (no requirement)
- Drop redundant `@pytest.mark.asyncio` (project uses auto mode)
- Remove unused `import pytest`
- Fix `_make_fetcher` annotation: `str` → `str | None`
- Use `mock.patch.object(asyncio, "sleep", ...)` instead of string path
- Replace `mock.patch("...create_task")` with async no-op routine
tests/test_transactional_consumer.py (Py3.14 flake fix):
Replace racy `asyncio.sleep(1)` + bare offset assertions with
`await consumer.getmany(timeout_ms=1000)` — a deterministic fetch
round-trip that guarantees the transaction control marker has been
processed and last_stable_offset/highwater are updated before asserting.
Fixes consistent `AssertionError: 0 != 3` on all Py3.14 CI jobs.
tests/test_producer.py (leader-not-found flake fix):
Warm up the topic with a separate default-timeout producer before
constructing the 200ms producer. Separates topic auto-creation from the
leader-not-found retry path the test exercises. Use `add_cleanup` to
prevent unclosed producer resource warnings on failure.
Spell out "in-sync replica set" on first use of the ISR acronym
|
Thanks, @GlebShipilov — really high-quality work! It looks ready to merge to me. @vmaurin, your opinion? |
Yes, it looks good, thanks @GlebShipilov ! I haven't got a chance to test it, but I feel it could be merged to the main branch. I think the most important thing before a release would be to check if it doesn't break for people not using the rack awareness feature |
We already did this for our setup. And I'm going to release beta version first, so that more people can do this. |
Add support for KIP-392 (fetch from closest replica) so consumers can read from a same-rack follower instead of the partition leader, eliminating cross-AZ / cross-region traffic when the cluster is configured with a rack-aware replica.selector.class.
Verified against a 3-rack test Kafka cluster: 100% of records were served from same-rack brokers, with the expected mix of leader and follower fetches per partition.
Checklist