Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1da0a73
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
f1a0889
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
c7c7edc
[DI-5741] Add rack-aware fetching from closest replica (KIP-392)
GlebShipilov Apr 20, 2026
723f3f5
Merge remote-tracking branch 'origin/DI-5741' into DI-5741
GlebShipilov Apr 20, 2026
713e529
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
bdae9c2
Merge pull request #1 from exness/DI-5741
GlebShipilov Apr 20, 2026
b91e175
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
5431109
Merge pull request #2 from exness/DI-5741
GlebShipilov Apr 20, 2026
44fc867
[DI-5741] change feature file name
GlebShipilov Apr 21, 2026
8434290
[DI-5741] rewrite tests
GlebShipilov Apr 21, 2026
5a03ae4
[DI-5741] Fix formatting (ruff) and remove unused pytest import
GlebShipilov Apr 21, 2026
6da902f
[DI-5741] Add more comprehensive rack id to docs and tests
GlebShipilov Apr 21, 2026
31dc873
[DI-5741] Fix lint: sort imports, remove in-method imports, prefix un…
GlebShipilov Apr 21, 2026
c2dee2d
[DI-5741] Address review feedback (vmaurin) and fix CI lint/mypy errors
GlebShipilov Apr 21, 2026
ff8a672
[DI-5741] Log a one-shot warning when client_rack is set but the brok…
GlebShipilov Apr 21, 2026
a75ef5d
[DI-5741] fix: revert rack_id default to empty string and fix Windows…
GlebShipilov Apr 21, 2026
ffc9ef6
[DI-5741]
GlebShipilov Apr 21, 2026
d7a01d9
[DI-5741]
GlebShipilov Apr 21, 2026
f02b242
[DI-5741]
GlebShipilov Apr 21, 2026
6bc83f9
[DI-5741] refactor(consumer): drop dead branch in _update_preferred_r…
GlebShipilov Apr 22, 2026
cd2793c
[DI-5741] tighten _update_preferred_read_replica signature
GlebShipilov Apr 22, 2026
624ba88
[DI-5741] fix(fetcher): drop preferred read replica on any fetch erro…
GlebShipilov Apr 22, 2026
3484400
[DI-5741] address review feedback and stabilize flaky CI tests
GlebShipilov Apr 23, 2026
e33fa60
[DI-5741] fix format
GlebShipilov Apr 23, 2026
5e2148c
[DI-5741] fix: remove unused pytest import and explicit return None
GlebShipilov Apr 23, 2026
f6175b6
[DI-5741] docs: expand ISR abbreviation in consumer.rst
GlebShipilov Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/1159.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add rack-aware fetching from the closest in-sync replica (KIP-392) via the new ``client_rack`` option on :class:`AIOKafkaConsumer`. When set and the brokers support ``FetchRequest v11`` (Kafka 2.4+) with a ``replica.selector.class`` configured, the consumer will fetch from a same-rack follower instead of the partition leader, reducing cross-AZ traffic and tail latency.

9 changes: 9 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ class AIOKafkaConsumer:
sasl_oauth_token_provider (~aiokafka.abc.AbstractTokenProvider):
OAuthBearer token provider instance.
Default: None
client_rack (str): A rack identifier for this client (e.g. the
availability zone the consumer runs in). When set and the broker
supports FetchRequest v11+ (Kafka 2.4+ with a configured
``replica.selector.class`` such as ``RackAwareReplicaSelector``),
the consumer will fetch from the closest in-sync replica
instead of the leader (KIP-392). Default: ``None`` (disabled).

Note:
Many configuration parameters are taken from Java Client:
Expand Down Expand Up @@ -261,6 +267,7 @@ def __init__(
sasl_kerberos_service_name="kafka",
sasl_kerberos_domain_name=None,
sasl_oauth_token_provider=None,
client_rack=None,
):
if loop is None:
loop = get_running_loop()
Expand Down Expand Up @@ -328,6 +335,7 @@ def __init__(
self._isolation_level = isolation_level
self._rebalance_timeout_ms = rebalance_timeout_ms
self._max_poll_interval_ms = max_poll_interval_ms
self._client_rack = client_rack

self._check_crcs = check_crcs
self._subscription = SubscriptionState(loop=loop)
Expand Down Expand Up @@ -387,6 +395,7 @@ async def start(self):
retry_backoff_ms=self._retry_backoff_ms,
auto_offset_reset=self._auto_offset_reset,
isolation_level=self._isolation_level,
client_rack=self._client_rack,
)

if self._group_id is not None:
Expand Down
176 changes: 148 additions & 28 deletions aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ class Fetcher:
ofther value will raise the exception. Default: 'latest'.
isolation_level (str): Controls how to read messages written
transactionally. See consumer description.
client_rack (str): A rack identifier for this client. This is sent to
the broker on FetchRequest v11+ to enable rack-aware fetching
from the closest replica (KIP-392). Default: ``None`` (disabled).
"""

def __init__(
Expand All @@ -393,6 +396,7 @@ def __init__(
retry_backoff_ms=100,
auto_offset_reset="latest",
isolation_level="read_uncommitted",
client_rack=None,
):
self._client = client
self._loop = client._loop
Expand All @@ -408,6 +412,12 @@ def __init__(
self._retry_backoff = retry_backoff_ms / 1000
self._subscriptions = subscriptions
self._default_reset_strategy = OffsetResetStrategy.from_str(auto_offset_reset)
self._client_rack = client_rack
# KIP-392: cache of preferred read replica per partition.
# tp -> (node_id, expires_at_monotonic)
self._preferred_read_replica: dict[TopicPartition, tuple[int, float]] = {}
self._preferred_replica_ttl = client._metadata_max_age_ms / 1000
self._rack_warning_logged = False

if isolation_level == "read_uncommitted":
self._isolation_level = READ_UNCOMMITTED
Expand Down Expand Up @@ -586,7 +596,7 @@ def _get_actions_per_node(self, assignment):
for tp in assignment.tps:
tp_state = assignment.state_value(tp)

node_id = self._client.cluster.leader_for_partition(tp)
node_id = self._select_read_replica(tp)
backoff = 0
if tp in self._records:
# We have data still not consumed by user. In this case we
Expand All @@ -605,7 +615,22 @@ def _get_actions_per_node(self, assignment):
)
invalid_metadata = True
elif not tp_state.has_valid_position:
awaiting_reset[node_id].append(tp)
# Per KIP-392, only Fetch requests may be served by a follower.
# ListOffsets must always go to the partition leader, otherwise
# the follower will reply with NOT_LEADER_FOR_PARTITION.
leader_id = self._client.cluster.leader_for_partition(tp)
if leader_id is None or leader_id == -1:
# The cached preferred replica may still be reachable, but
# without a known leader we cannot safely send ListOffsets.
# Force a metadata refresh and try again next iteration.
log.debug(
"No leader found for partition %s while resetting "
"offset. Waiting metadata update",
tp,
)
invalid_metadata = True
else:
awaiting_reset[leader_id].append(tp)
elif tp_state.paused:
resume_futures.append(tp_state.resume_fut)
else:
Expand Down Expand Up @@ -640,6 +665,7 @@ def _get_actions_per_node(self, assignment):
self._fetch_max_bytes,
self._isolation_level,
list(by_topics.items()),
rack_id=self._client_rack or "",
)
fetch_requests.append((node_id, req))

Expand All @@ -657,12 +683,75 @@ def _get_actions_per_node(self, assignment):
resume_futures,
)

def _select_read_replica(self, tp):
"""Return node id to fetch ``tp`` from.

Honors the KIP-392 preferred read replica returned by the broker if
it is still valid (not expired and the node is known to the cluster);
otherwise falls back to the partition leader.
"""
cached = self._preferred_read_replica.get(tp)
if cached is not None:
node_id, expires_at = cached
if time.monotonic() < expires_at and (
self._client.cluster.broker_metadata(node_id) is not None
):
return node_id
# Expired or node disappeared from metadata
self._preferred_read_replica.pop(tp, None)
return self._client.cluster.leader_for_partition(tp)

def _update_preferred_read_replica(self, tp, preferred_read_replica: int):
"""Cache the preferred read replica for ``tp`` (KIP-392).

Only a valid replica id (``>= 0``) updates the cache. ``-1`` (broker
telling us either "no preferred replica" or "I am still the right
choice") is a no-op:

* If we sent the request to the **leader**, the cache is already empty
for ``tp`` (otherwise the request would have been routed to the
cached follower), so there is nothing to invalidate.
* If we sent the request to the cached **follower**, ``-1`` means
"stay with me" — preserving the cache is the desired behaviour.

Real cache invalidation happens elsewhere: TTL expiry in
:meth:`_select_read_replica`, transport-failure eviction in
:meth:`_invalidate_preferred_read_replica_for_node`, and routing
errors (e.g. ``NotLeaderForPartition``) in the fetch error path.
"""
if preferred_read_replica >= 0:
self._preferred_read_replica[tp] = (
preferred_read_replica,
time.monotonic() + self._preferred_replica_ttl,
)

def _invalidate_preferred_read_replica_for_node(self, node_id, request):
"""Drop cached preferred-replica entries that point to ``node_id``.

Called when a fetch to a follower fails at the transport level. We
only consider TPs that were actually part of the failed request, and
only evict entries whose cached replica matches the failing node, so
that the next call to :meth:`_select_read_replica` falls back to the
partition leader.
"""
for topic, partitions in request.topics:
for partition_info in partitions:
tp = TopicPartition(topic, partition_info[0])
cached = self._preferred_read_replica.get(tp)
if cached is not None and cached[0] == node_id:
self._preferred_read_replica.pop(tp, None)

async def _proc_fetch_request(self, assignment, node_id, request):
needs_wakeup = False
try:
response = await self._client.send(node_id, request)
except Errors.KafkaError as err:
log.error("Failed fetch messages from %s: %s", node_id, err)
# If this fetch was routed to a preferred read replica (follower)
# and failed at the transport/protocol level, evict the cached
# entries so the next attempt falls back to the leader instead of
# repeatedly hitting the unreachable follower until TTL expiry.
self._invalidate_preferred_read_replica_for_node(node_id, request)
await asyncio.sleep(self._retry_backoff)
return False
except asyncio.CancelledError:
Expand All @@ -682,6 +771,22 @@ async def _proc_fetch_request(self, assignment, node_id, request):
fetch_offsets[TopicPartition(topic, partition)] = offset

now_ms = int(1000 * time.time())
# KIP-392 is only meaningful when the client advertises a rack and the
# broker speaks FetchRequest v11+. Compute this once per response so
# the per-partition loop below can skip the preferred-replica work
# entirely when the feature is inactive.
rack_aware = bool(self._client_rack) and response.API_VERSION >= 11
if self._client_rack and not rack_aware and not self._rack_warning_logged:
log.warning(
"client_rack is set to %r but broker only supports "
"FetchRequest v%d (v11+ required for KIP-392 rack-aware "
"fetching). The consumer will fall back to fetching from "
"the partition leader.",
self._client_rack,
response.API_VERSION,
)
# Latch the warning so subsequent fetches skip the check above.
self._rack_warning_logged = True
for topic, partitions in response.topics:
for partition, error_code, highwater, *part_data in partitions:
tp = TopicPartition(topic, partition)
Expand All @@ -699,7 +804,15 @@ async def _proc_fetch_request(self, assignment, node_id, request):
continue

if error_type is Errors.NoError:
if response.API_VERSION >= 4:
if response.API_VERSION >= 11:
# part_data layout: [last_stable_offset,
# log_start_offset, aborted_transactions,
# preferred_read_replica, message_set]
lso = part_data[0]
aborted_transactions = part_data[2]
if rack_aware:
self._update_preferred_read_replica(tp, part_data[3])
elif response.API_VERSION >= 4:
aborted_transactions = part_data[-2]
lso = part_data[-3]
else:
Expand Down Expand Up @@ -758,33 +871,40 @@ async def _proc_fetch_request(self, assignment, node_id, request):
tp_state.consumed_to(tp_state.position + 1)
needs_wakeup = True

elif error_type in (
Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError,
):
self._client.force_metadata_update()
elif error_type is Errors.OffsetOutOfRangeError:
if self._default_reset_strategy != OffsetResetStrategy.NONE:
tp_state.await_reset(self._default_reset_strategy)
else:
err = Errors.OffsetOutOfRangeError({tp: fetch_offset})
else:
# Any non-success response invalidates the cached
# preferred replica for this partition, matching the
# Java consumer. The next fetch will go to the leader
# until the broker hints another follower.
self._preferred_read_replica.pop(tp, None)
if error_type in (
Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError,
):
self._client.force_metadata_update()
elif error_type is Errors.OffsetOutOfRangeError:
if self._default_reset_strategy != OffsetResetStrategy.NONE:
tp_state.await_reset(self._default_reset_strategy)
else:
err = Errors.OffsetOutOfRangeError({tp: fetch_offset})
self._set_error(tp, err)
needs_wakeup = True
log.info(
"Fetch offset %s is out of range for partition %s,"
" resetting offset",
fetch_offset,
tp,
)
elif error_type is Errors.TopicAuthorizationFailedError:
log.warning("Not authorized to read from topic %s.", tp.topic)
err = Errors.TopicAuthorizationFailedError(tp.topic)
self._set_error(tp, err)
needs_wakeup = True
log.info(
"Fetch offset %s is out of range for partition %s,"
" resetting offset",
fetch_offset,
tp,
)
elif error_type is Errors.TopicAuthorizationFailedError:
log.warning("Not authorized to read from topic %s.", tp.topic)
err = Errors.TopicAuthorizationFailedError(tp.topic)
self._set_error(tp, err)
needs_wakeup = True
else:
log.warning(
"Unexpected error while fetching data: %s", error_type.__name__
)
else:
log.warning(
"Unexpected error while fetching data: %s",
error_type.__name__,
)
return needs_wakeup

def _set_error(self, tp, error):
Expand Down
Loading
Loading