Skip to content
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
1da0a73
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
f1a0889
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
c7c7edc
[DI-5741] Add rack-aware fetching from closest replica (KIP-392)
GlebShipilov Apr 20, 2026
723f3f5
Merge remote-tracking branch 'origin/DI-5741' into DI-5741
GlebShipilov Apr 20, 2026
713e529
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
bdae9c2
Merge pull request #1 from exness/DI-5741
GlebShipilov Apr 20, 2026
b91e175
[DI-5741] Add rack awareness to aiokafka
GlebShipilov Apr 20, 2026
5431109
Merge pull request #2 from exness/DI-5741
GlebShipilov Apr 20, 2026
44fc867
[DI-5741] change feature file name
GlebShipilov Apr 21, 2026
8434290
[DI-5741] rewrite tests
GlebShipilov Apr 21, 2026
5a03ae4
[DI-5741] Fix formatting (ruff) and remove unused pytest import
GlebShipilov Apr 21, 2026
6da902f
[DI-5741] Add more comprehensive rack id to docs and tests
GlebShipilov Apr 21, 2026
31dc873
[DI-5741] Fix lint: sort imports, remove in-method imports, prefix un…
GlebShipilov Apr 21, 2026
c2dee2d
[DI-5741] Address review feedback (vmaurin) and fix CI lint/mypy errors
GlebShipilov Apr 21, 2026
ff8a672
[DI-5741] Log a one-shot warning when client_rack is set but the brok…
GlebShipilov Apr 21, 2026
a75ef5d
[DI-5741] fix: revert rack_id default to empty string and fix Windows…
GlebShipilov Apr 21, 2026
ffc9ef6
[DI-5741]
GlebShipilov Apr 21, 2026
d7a01d9
[DI-5741]
GlebShipilov Apr 21, 2026
f02b242
[DI-5741]
GlebShipilov Apr 21, 2026
6bc83f9
[DI-5741] refactor(consumer): drop dead branch in _update_preferred_r…
GlebShipilov Apr 22, 2026
cd2793c
[DI-5741] tighten _update_preferred_read_replica signature
GlebShipilov Apr 22, 2026
624ba88
[DI-5741] fix(fetcher): drop preferred read replica on any fetch erro…
GlebShipilov Apr 22, 2026
3484400
[DI-5741] address review feedback and stabilize flaky CI tests
GlebShipilov Apr 23, 2026
e33fa60
[DI-5741] fix format
GlebShipilov Apr 23, 2026
5e2148c
[DI-5741] fix: remove unused pytest import and explicit return None
GlebShipilov Apr 23, 2026
f6175b6
[DI-5741] docs: expand ISR abbreviation in consumer.rst
GlebShipilov Apr 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES/1159.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Add rack-aware fetching from the closest in-sync replica (KIP-392) via the new ``client_rack`` option on :class:`AIOKafkaConsumer`. When set and the brokers support ``FetchRequest v11`` (Kafka 2.4+) with a ``replica.selector.class`` configured, the consumer will fetch from a same-rack follower instead of the partition leader, reducing cross-AZ traffic and tail latency.

9 changes: 9 additions & 0 deletions aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ class AIOKafkaConsumer:
sasl_oauth_token_provider (~aiokafka.abc.AbstractTokenProvider):
OAuthBearer token provider instance.
Default: None
client_rack (str): A rack identifier for this client (e.g. the
availability zone the consumer runs in). When set and the broker
supports FetchRequest v11+ (Kafka 2.4+ with a configured
``replica.selector.class`` such as ``RackAwareReplicaSelector``),
the consumer will fetch from the closest in-sync replica
instead of the leader (KIP-392). Default: ``None`` (disabled).

Note:
Many configuration parameters are taken from Java Client:
Expand Down Expand Up @@ -261,6 +267,7 @@ def __init__(
sasl_kerberos_service_name="kafka",
sasl_kerberos_domain_name=None,
sasl_oauth_token_provider=None,
client_rack=None,
):
if loop is None:
loop = get_running_loop()
Expand Down Expand Up @@ -328,6 +335,7 @@ def __init__(
self._isolation_level = isolation_level
self._rebalance_timeout_ms = rebalance_timeout_ms
self._max_poll_interval_ms = max_poll_interval_ms
self._client_rack = client_rack

self._check_crcs = check_crcs
self._subscription = SubscriptionState(loop=loop)
Expand Down Expand Up @@ -387,6 +395,7 @@ async def start(self):
retry_backoff_ms=self._retry_backoff_ms,
auto_offset_reset=self._auto_offset_reset,
isolation_level=self._isolation_level,
client_rack=self._client_rack,
)

if self._group_id is not None:
Expand Down
127 changes: 124 additions & 3 deletions aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,9 @@ class Fetcher:
ofther value will raise the exception. Default: 'latest'.
isolation_level (str): Controls how to read messages written
transactionally. See consumer description.
client_rack (str): A rack identifier for this client. This is sent to
the broker on FetchRequest v11+ to enable rack-aware fetching
from the closest replica (KIP-392). Default: ``None`` (disabled).
"""

def __init__(
Expand All @@ -393,6 +396,7 @@ def __init__(
retry_backoff_ms=100,
auto_offset_reset="latest",
isolation_level="read_uncommitted",
client_rack=None,
):
self._client = client
self._loop = client._loop
Expand All @@ -408,6 +412,12 @@ def __init__(
self._retry_backoff = retry_backoff_ms / 1000
self._subscriptions = subscriptions
self._default_reset_strategy = OffsetResetStrategy.from_str(auto_offset_reset)
self._client_rack = client_rack
# KIP-392: cache of preferred read replica per partition.
# tp -> (node_id, expires_at_monotonic)
self._preferred_read_replica: dict[TopicPartition, tuple[int, float]] = {}
self._preferred_replica_ttl = client._metadata_max_age_ms / 1000
self._rack_warning_logged = False

if isolation_level == "read_uncommitted":
self._isolation_level = READ_UNCOMMITTED
Expand Down Expand Up @@ -586,7 +596,7 @@ def _get_actions_per_node(self, assignment):
for tp in assignment.tps:
tp_state = assignment.state_value(tp)

node_id = self._client.cluster.leader_for_partition(tp)
node_id = self._select_read_replica(tp)
backoff = 0
if tp in self._records:
# We have data still not consumed by user. In this case we
Expand All @@ -605,7 +615,22 @@ def _get_actions_per_node(self, assignment):
)
invalid_metadata = True
elif not tp_state.has_valid_position:
awaiting_reset[node_id].append(tp)
# Per KIP-392, only Fetch requests may be served by a follower.
# ListOffsets must always go to the partition leader, otherwise
# the follower will reply with NOT_LEADER_FOR_PARTITION.
leader_id = self._client.cluster.leader_for_partition(tp)
if leader_id is None or leader_id == -1:
# The cached preferred replica may still be reachable, but
# without a known leader we cannot safely send ListOffsets.
# Force a metadata refresh and try again next iteration.
log.debug(
"No leader found for partition %s while resetting "
"offset. Waiting metadata update",
tp,
)
invalid_metadata = True
else:
awaiting_reset[leader_id].append(tp)
elif tp_state.paused:
resume_futures.append(tp_state.resume_fut)
else:
Expand Down Expand Up @@ -640,6 +665,7 @@ def _get_actions_per_node(self, assignment):
self._fetch_max_bytes,
self._isolation_level,
list(by_topics.items()),
rack_id=self._client_rack or "",
)
fetch_requests.append((node_id, req))

Expand All @@ -657,12 +683,75 @@ def _get_actions_per_node(self, assignment):
resume_futures,
)

def _select_read_replica(self, tp):
"""Return node id to fetch ``tp`` from.

Honors the KIP-392 preferred read replica returned by the broker if
it is still valid (not expired and the node is known to the cluster);
otherwise falls back to the partition leader.
"""
cached = self._preferred_read_replica.get(tp)
if cached is not None:
node_id, expires_at = cached
if time.monotonic() < expires_at and (
self._client.cluster.broker_metadata(node_id) is not None
):
return node_id
# Expired or node disappeared from metadata
self._preferred_read_replica.pop(tp, None)
return self._client.cluster.leader_for_partition(tp)

def _update_preferred_read_replica(self, tp, preferred_read_replica: int):
"""Cache the preferred read replica for ``tp`` (KIP-392).

Only a valid replica id (``>= 0``) updates the cache. ``-1`` (broker
telling us either "no preferred replica" or "I am still the right
choice") is a no-op:

* If we sent the request to the **leader**, the cache is already empty
for ``tp`` (otherwise the request would have been routed to the
cached follower), so there is nothing to invalidate.
* If we sent the request to the cached **follower**, ``-1`` means
"stay with me" — preserving the cache is the desired behaviour.

Real cache invalidation happens elsewhere: TTL expiry in
:meth:`_select_read_replica`, transport-failure eviction in
:meth:`_invalidate_preferred_read_replica_for_node`, and routing
errors (e.g. ``NotLeaderForPartition``) in the fetch error path.
"""
if preferred_read_replica >= 0:
self._preferred_read_replica[tp] = (
preferred_read_replica,
time.monotonic() + self._preferred_replica_ttl,
)

def _invalidate_preferred_read_replica_for_node(self, node_id, request):
"""Drop cached preferred-replica entries that point to ``node_id``.

Called when a fetch to a follower fails at the transport level. We
only consider TPs that were actually part of the failed request, and
only evict entries whose cached replica matches the failing node, so
that the next call to :meth:`_select_read_replica` falls back to the
partition leader.
"""
for topic, partitions in request.topics:
for partition_info in partitions:
tp = TopicPartition(topic, partition_info[0])
cached = self._preferred_read_replica.get(tp)
if cached is not None and cached[0] == node_id:
self._preferred_read_replica.pop(tp, None)

async def _proc_fetch_request(self, assignment, node_id, request):
needs_wakeup = False
try:
response = await self._client.send(node_id, request)
except Errors.KafkaError as err:
log.error("Failed fetch messages from %s: %s", node_id, err)
# If this fetch was routed to a preferred read replica (follower)
# and failed at the transport/protocol level, evict the cached
# entries so the next attempt falls back to the leader instead of
# repeatedly hitting the unreachable follower until TTL expiry.
self._invalidate_preferred_read_replica_for_node(node_id, request)
await asyncio.sleep(self._retry_backoff)
return False
except asyncio.CancelledError:
Expand All @@ -682,6 +771,20 @@ async def _proc_fetch_request(self, assignment, node_id, request):
fetch_offsets[TopicPartition(topic, partition)] = offset

now_ms = int(1000 * time.time())
if (
self._client_rack
and response.API_VERSION < 11
and not self._rack_warning_logged
):
log.warning(
"client_rack is set to %r but broker only supports "
"FetchRequest v%d (v11+ required for KIP-392 rack-aware "
"fetching). The consumer will fall back to fetching from "
"the partition leader.",
self._client_rack,
response.API_VERSION,
)
self._rack_warning_logged = True
for topic, partitions in response.topics:
for partition, error_code, highwater, *part_data in partitions:
tp = TopicPartition(topic, partition)
Expand All @@ -699,7 +802,18 @@ async def _proc_fetch_request(self, assignment, node_id, request):
continue

if error_type is Errors.NoError:
if response.API_VERSION >= 4:
if response.API_VERSION >= 11:
# part_data layout: [last_stable_offset,
# log_start_offset, aborted_transactions,
# preferred_read_replica, message_set]
lso = part_data[0]
aborted_transactions = part_data[2]
preferred_read_replica = part_data[3]
self._update_preferred_read_replica(
tp,
preferred_read_replica,
)
elif response.API_VERSION >= 4:
aborted_transactions = part_data[-2]
lso = part_data[-3]
else:
Expand Down Expand Up @@ -762,8 +876,15 @@ async def _proc_fetch_request(self, assignment, node_id, request):
Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError,
):
# Stale routing -- drop the cached preferred replica so we
# fall back to the leader returned by the next metadata
# update.
self._preferred_read_replica.pop(tp, None)
self._client.force_metadata_update()
elif error_type is Errors.OffsetOutOfRangeError:
# The follower replica may be lagging; force re-fetch from
# the leader after reset.
self._preferred_read_replica.pop(tp, None)
if self._default_reset_strategy != OffsetResetStrategy.NONE:
tp_state.await_reset(self._default_reset_strategy)
else:
Expand Down
88 changes: 77 additions & 11 deletions aiokafka/protocol/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,15 +495,17 @@ class FetchRequest_v11(RequestStruct):


FetchRequestStruct: TypeAlias = (
FetchRequest_v1 | FetchRequest_v2 | FetchRequest_v3 | FetchRequest_v4
# After v4 is not implemented yet
# | FetchRequest_v5
# | FetchRequest_v6
# | FetchRequest_v7
# | FetchRequest_v8
# | FetchRequest_v9
# | FetchRequest_v10
# | FetchRequest_v11
FetchRequest_v1
| FetchRequest_v2
| FetchRequest_v3
| FetchRequest_v4
| FetchRequest_v5
| FetchRequest_v6
| FetchRequest_v7
| FetchRequest_v8
| FetchRequest_v9
| FetchRequest_v10
| FetchRequest_v11
)


Expand All @@ -517,12 +519,14 @@ def __init__(
max_bytes: int,
isolation_level: int,
topics: list[tuple[str, list[tuple[int, int, int]]]],
rack_id: str = "",
Comment thread
ods marked this conversation as resolved.
):
self._max_wait_ms = max_wait_time
self._min_bytes = min_bytes
self._max_bytes = max_bytes
self._isolation_level = isolation_level
self._topics = topics
self._rack_id = rack_id

@property
def topics(self) -> list[tuple[str, list[tuple[int, int, int]]]]:
Expand All @@ -531,7 +535,18 @@ def topics(self) -> list[tuple[str, list[tuple[int, int, int]]]]:
def build(
self, request_struct_class: type[FetchRequestStruct]
) -> FetchRequestStruct:
if request_struct_class.API_VERSION > 3:
api_version = request_struct_class.API_VERSION

# v0..v3 do not support isolation_level. v4 adds isolation_level
# but keeps the v0 per-partition layout. v5+ adds per-partition
# `log_start_offset`. v9+ also adds `current_leader_epoch`. v7+
# adds incremental fetch session fields and forgotten_topics_data.
# v11 adds top-level rack_id.
# We silently allow `rack_id` to be set on the FetchRequest builder
# so callers don't have to branch -- it is simply not transmitted on
# versions < 11.

if api_version == 4:
return request_struct_class(
-1, # replica_id
self._max_wait_ms,
Expand All @@ -541,12 +556,63 @@ def build(
self._topics,
)

if api_version >= 5:
# v5+ adds per-partition `log_start_offset`. v9+ also adds
# `current_leader_epoch`. v7+ adds incremental fetch session
# fields and forgotten_topics_data. v11 adds top-level rack_id.
include_leader_epoch = api_version >= 9
partitions_by_topic: list[tuple[str, list[tuple[int, ...]]]] = []
for topic, partitions in self._topics:
new_partitions: list[tuple[int, ...]] = []
for partition, offset, max_bytes in partitions:
if include_leader_epoch:
new_partitions.append(
(
partition,
-1, # current_leader_epoch (unknown)
Comment thread
ods marked this conversation as resolved.
offset, # fetch_offset
-1, # log_start_offset (consumer)
max_bytes,
)
)
else:
new_partitions.append(
(
partition,
offset, # fetch_offset
-1, # log_start_offset (consumer)
max_bytes,
)
)
partitions_by_topic.append((topic, new_partitions))

args: list[object] = [
-1, # replica_id
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit unsure if we should still send -1 when consuming from a follower, but it seems to be valid when I look to the java client code https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java#L317

I am just a bit confused by this sentence in the KIP

the FetchRequest schema has field for the replica id. Consumers typically use the sentinel -1, which indicates that fetching is only allowed from the leader. A lesser known sentinel is -2, which was originally intended to be used for debugging and allows fetching from followers. We propose to let the consumer use this to indicate the intent to allow fetching from a follower.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the Java client does the same. -1 means "I'm a consumer" not "only fetch from leader".

self._max_wait_ms,
self._min_bytes,
self._max_bytes,
self._isolation_level,
]
if api_version >= 7:
args.extend(
[
0, # session_id (no incremental fetch session)
-1, # session_epoch (FINAL_EPOCH)
]
)
args.append(partitions_by_topic)
if api_version >= 7:
args.append([]) # forgotten_topics_data
if api_version >= 11:
args.append(self._rack_id)
return request_struct_class(*args)

if self._isolation_level:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we raise a similar exception is someone is setting a rack_id, while the broker doesn't support v11 ?

  • yes, you see the error and you can maybe fix your consumer config / update the broker. On the other side, your consumer keeps crashing
  • no, the consumer "works", but it will never consume from a follower in the same rack

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, silent fallback is what Java does. But worth a log warning.

raise IncompatibleBrokerVersion(
"isolation_level requires FetchRequest >= v4"
)

if request_struct_class.API_VERSION == 3:
if api_version == 3:
return request_struct_class(
-1, # replica_id
self._max_wait_ms,
Expand Down
Loading
Loading