diff --git a/CHANGES/1159.feature b/CHANGES/1159.feature new file mode 100644 index 000000000..2517603df --- /dev/null +++ b/CHANGES/1159.feature @@ -0,0 +1,2 @@ +Add rack-aware fetching from the closest in-sync replica (KIP-392) via the new ``client_rack`` option on :class:`AIOKafkaConsumer`. When set and the brokers support ``FetchRequest v11`` (Kafka 2.4+) with a ``replica.selector.class`` configured, the consumer will fetch from a same-rack follower instead of the partition leader, reducing cross-AZ traffic and tail latency. + diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index ba7fb3791..1042461db 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -211,6 +211,12 @@ class AIOKafkaConsumer: sasl_oauth_token_provider (~aiokafka.abc.AbstractTokenProvider): OAuthBearer token provider instance. Default: None + client_rack (str): A rack identifier for this client (e.g. the + availability zone the consumer runs in). When set and the broker + supports FetchRequest v11+ (Kafka 2.4+ with a configured + ``replica.selector.class`` such as ``RackAwareReplicaSelector``), + the consumer will fetch from the closest in-sync replica + instead of the leader (KIP-392). Default: ``None`` (disabled). Note: Many configuration parameters are taken from Java Client: @@ -261,6 +267,7 @@ def __init__( sasl_kerberos_service_name="kafka", sasl_kerberos_domain_name=None, sasl_oauth_token_provider=None, + client_rack=None, ): if loop is None: loop = get_running_loop() @@ -328,6 +335,7 @@ def __init__( self._isolation_level = isolation_level self._rebalance_timeout_ms = rebalance_timeout_ms self._max_poll_interval_ms = max_poll_interval_ms + self._client_rack = client_rack self._check_crcs = check_crcs self._subscription = SubscriptionState(loop=loop) @@ -387,6 +395,7 @@ async def start(self): retry_backoff_ms=self._retry_backoff_ms, auto_offset_reset=self._auto_offset_reset, isolation_level=self._isolation_level, + client_rack=self._client_rack, ) if self._group_id is not None: diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index 786893300..39fa6be15 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -374,6 +374,9 @@ class Fetcher: ofther value will raise the exception. Default: 'latest'. isolation_level (str): Controls how to read messages written transactionally. See consumer description. + client_rack (str): A rack identifier for this client. This is sent to + the broker on FetchRequest v11+ to enable rack-aware fetching + from the closest replica (KIP-392). Default: ``None`` (disabled). """ def __init__( @@ -393,6 +396,7 @@ def __init__( retry_backoff_ms=100, auto_offset_reset="latest", isolation_level="read_uncommitted", + client_rack=None, ): self._client = client self._loop = client._loop @@ -408,6 +412,12 @@ def __init__( self._retry_backoff = retry_backoff_ms / 1000 self._subscriptions = subscriptions self._default_reset_strategy = OffsetResetStrategy.from_str(auto_offset_reset) + self._client_rack = client_rack + # KIP-392: cache of preferred read replica per partition. + # tp -> (node_id, expires_at_monotonic) + self._preferred_read_replica: dict[TopicPartition, tuple[int, float]] = {} + self._preferred_replica_ttl = client._metadata_max_age_ms / 1000 + self._rack_warning_logged = False if isolation_level == "read_uncommitted": self._isolation_level = READ_UNCOMMITTED @@ -586,7 +596,7 @@ def _get_actions_per_node(self, assignment): for tp in assignment.tps: tp_state = assignment.state_value(tp) - node_id = self._client.cluster.leader_for_partition(tp) + node_id = self._select_read_replica(tp) backoff = 0 if tp in self._records: # We have data still not consumed by user. In this case we @@ -605,7 +615,22 @@ def _get_actions_per_node(self, assignment): ) invalid_metadata = True elif not tp_state.has_valid_position: - awaiting_reset[node_id].append(tp) + # Per KIP-392, only Fetch requests may be served by a follower. + # ListOffsets must always go to the partition leader, otherwise + # the follower will reply with NOT_LEADER_FOR_PARTITION. + leader_id = self._client.cluster.leader_for_partition(tp) + if leader_id is None or leader_id == -1: + # The cached preferred replica may still be reachable, but + # without a known leader we cannot safely send ListOffsets. + # Force a metadata refresh and try again next iteration. + log.debug( + "No leader found for partition %s while resetting " + "offset. Waiting metadata update", + tp, + ) + invalid_metadata = True + else: + awaiting_reset[leader_id].append(tp) elif tp_state.paused: resume_futures.append(tp_state.resume_fut) else: @@ -640,6 +665,7 @@ def _get_actions_per_node(self, assignment): self._fetch_max_bytes, self._isolation_level, list(by_topics.items()), + rack_id=self._client_rack or "", ) fetch_requests.append((node_id, req)) @@ -657,12 +683,75 @@ def _get_actions_per_node(self, assignment): resume_futures, ) + def _select_read_replica(self, tp): + """Return node id to fetch ``tp`` from. + + Honors the KIP-392 preferred read replica returned by the broker if + it is still valid (not expired and the node is known to the cluster); + otherwise falls back to the partition leader. + """ + cached = self._preferred_read_replica.get(tp) + if cached is not None: + node_id, expires_at = cached + if time.monotonic() < expires_at and ( + self._client.cluster.broker_metadata(node_id) is not None + ): + return node_id + # Expired or node disappeared from metadata + self._preferred_read_replica.pop(tp, None) + return self._client.cluster.leader_for_partition(tp) + + def _update_preferred_read_replica(self, tp, preferred_read_replica: int): + """Cache the preferred read replica for ``tp`` (KIP-392). + + Only a valid replica id (``>= 0``) updates the cache. ``-1`` (broker + telling us either "no preferred replica" or "I am still the right + choice") is a no-op: + + * If we sent the request to the **leader**, the cache is already empty + for ``tp`` (otherwise the request would have been routed to the + cached follower), so there is nothing to invalidate. + * If we sent the request to the cached **follower**, ``-1`` means + "stay with me" — preserving the cache is the desired behaviour. + + Real cache invalidation happens elsewhere: TTL expiry in + :meth:`_select_read_replica`, transport-failure eviction in + :meth:`_invalidate_preferred_read_replica_for_node`, and routing + errors (e.g. ``NotLeaderForPartition``) in the fetch error path. + """ + if preferred_read_replica >= 0: + self._preferred_read_replica[tp] = ( + preferred_read_replica, + time.monotonic() + self._preferred_replica_ttl, + ) + + def _invalidate_preferred_read_replica_for_node(self, node_id, request): + """Drop cached preferred-replica entries that point to ``node_id``. + + Called when a fetch to a follower fails at the transport level. We + only consider TPs that were actually part of the failed request, and + only evict entries whose cached replica matches the failing node, so + that the next call to :meth:`_select_read_replica` falls back to the + partition leader. + """ + for topic, partitions in request.topics: + for partition_info in partitions: + tp = TopicPartition(topic, partition_info[0]) + cached = self._preferred_read_replica.get(tp) + if cached is not None and cached[0] == node_id: + self._preferred_read_replica.pop(tp, None) + async def _proc_fetch_request(self, assignment, node_id, request): needs_wakeup = False try: response = await self._client.send(node_id, request) except Errors.KafkaError as err: log.error("Failed fetch messages from %s: %s", node_id, err) + # If this fetch was routed to a preferred read replica (follower) + # and failed at the transport/protocol level, evict the cached + # entries so the next attempt falls back to the leader instead of + # repeatedly hitting the unreachable follower until TTL expiry. + self._invalidate_preferred_read_replica_for_node(node_id, request) await asyncio.sleep(self._retry_backoff) return False except asyncio.CancelledError: @@ -682,6 +771,22 @@ async def _proc_fetch_request(self, assignment, node_id, request): fetch_offsets[TopicPartition(topic, partition)] = offset now_ms = int(1000 * time.time()) + # KIP-392 is only meaningful when the client advertises a rack and the + # broker speaks FetchRequest v11+. Compute this once per response so + # the per-partition loop below can skip the preferred-replica work + # entirely when the feature is inactive. + rack_aware = bool(self._client_rack) and response.API_VERSION >= 11 + if self._client_rack and not rack_aware and not self._rack_warning_logged: + log.warning( + "client_rack is set to %r but broker only supports " + "FetchRequest v%d (v11+ required for KIP-392 rack-aware " + "fetching). The consumer will fall back to fetching from " + "the partition leader.", + self._client_rack, + response.API_VERSION, + ) + # Latch the warning so subsequent fetches skip the check above. + self._rack_warning_logged = True for topic, partitions in response.topics: for partition, error_code, highwater, *part_data in partitions: tp = TopicPartition(topic, partition) @@ -699,7 +804,15 @@ async def _proc_fetch_request(self, assignment, node_id, request): continue if error_type is Errors.NoError: - if response.API_VERSION >= 4: + if response.API_VERSION >= 11: + # part_data layout: [last_stable_offset, + # log_start_offset, aborted_transactions, + # preferred_read_replica, message_set] + lso = part_data[0] + aborted_transactions = part_data[2] + if rack_aware: + self._update_preferred_read_replica(tp, part_data[3]) + elif response.API_VERSION >= 4: aborted_transactions = part_data[-2] lso = part_data[-3] else: @@ -758,33 +871,40 @@ async def _proc_fetch_request(self, assignment, node_id, request): tp_state.consumed_to(tp_state.position + 1) needs_wakeup = True - elif error_type in ( - Errors.NotLeaderForPartitionError, - Errors.UnknownTopicOrPartitionError, - ): - self._client.force_metadata_update() - elif error_type is Errors.OffsetOutOfRangeError: - if self._default_reset_strategy != OffsetResetStrategy.NONE: - tp_state.await_reset(self._default_reset_strategy) - else: - err = Errors.OffsetOutOfRangeError({tp: fetch_offset}) + else: + # Any non-success response invalidates the cached + # preferred replica for this partition, matching the + # Java consumer. The next fetch will go to the leader + # until the broker hints another follower. + self._preferred_read_replica.pop(tp, None) + if error_type in ( + Errors.NotLeaderForPartitionError, + Errors.UnknownTopicOrPartitionError, + ): + self._client.force_metadata_update() + elif error_type is Errors.OffsetOutOfRangeError: + if self._default_reset_strategy != OffsetResetStrategy.NONE: + tp_state.await_reset(self._default_reset_strategy) + else: + err = Errors.OffsetOutOfRangeError({tp: fetch_offset}) + self._set_error(tp, err) + needs_wakeup = True + log.info( + "Fetch offset %s is out of range for partition %s," + " resetting offset", + fetch_offset, + tp, + ) + elif error_type is Errors.TopicAuthorizationFailedError: + log.warning("Not authorized to read from topic %s.", tp.topic) + err = Errors.TopicAuthorizationFailedError(tp.topic) self._set_error(tp, err) needs_wakeup = True - log.info( - "Fetch offset %s is out of range for partition %s," - " resetting offset", - fetch_offset, - tp, - ) - elif error_type is Errors.TopicAuthorizationFailedError: - log.warning("Not authorized to read from topic %s.", tp.topic) - err = Errors.TopicAuthorizationFailedError(tp.topic) - self._set_error(tp, err) - needs_wakeup = True - else: - log.warning( - "Unexpected error while fetching data: %s", error_type.__name__ - ) + else: + log.warning( + "Unexpected error while fetching data: %s", + error_type.__name__, + ) return needs_wakeup def _set_error(self, tp, error): diff --git a/aiokafka/protocol/fetch.py b/aiokafka/protocol/fetch.py index 667f54560..56ccf4ba6 100644 --- a/aiokafka/protocol/fetch.py +++ b/aiokafka/protocol/fetch.py @@ -495,15 +495,17 @@ class FetchRequest_v11(RequestStruct): FetchRequestStruct: TypeAlias = ( - FetchRequest_v1 | FetchRequest_v2 | FetchRequest_v3 | FetchRequest_v4 - # After v4 is not implemented yet - # | FetchRequest_v5 - # | FetchRequest_v6 - # | FetchRequest_v7 - # | FetchRequest_v8 - # | FetchRequest_v9 - # | FetchRequest_v10 - # | FetchRequest_v11 + FetchRequest_v1 + | FetchRequest_v2 + | FetchRequest_v3 + | FetchRequest_v4 + | FetchRequest_v5 + | FetchRequest_v6 + | FetchRequest_v7 + | FetchRequest_v8 + | FetchRequest_v9 + | FetchRequest_v10 + | FetchRequest_v11 ) @@ -517,12 +519,14 @@ def __init__( max_bytes: int, isolation_level: int, topics: list[tuple[str, list[tuple[int, int, int]]]], + rack_id: str = "", ): self._max_wait_ms = max_wait_time self._min_bytes = min_bytes self._max_bytes = max_bytes self._isolation_level = isolation_level self._topics = topics + self._rack_id = rack_id @property def topics(self) -> list[tuple[str, list[tuple[int, int, int]]]]: @@ -531,7 +535,18 @@ def topics(self) -> list[tuple[str, list[tuple[int, int, int]]]]: def build( self, request_struct_class: type[FetchRequestStruct] ) -> FetchRequestStruct: - if request_struct_class.API_VERSION > 3: + api_version = request_struct_class.API_VERSION + + # v0..v3 do not support isolation_level. v4 adds isolation_level + # but keeps the v0 per-partition layout. v5+ adds per-partition + # `log_start_offset`. v9+ also adds `current_leader_epoch`. v7+ + # adds incremental fetch session fields and forgotten_topics_data. + # v11 adds top-level rack_id. + # We silently allow `rack_id` to be set on the FetchRequest builder + # so callers don't have to branch -- it is simply not transmitted on + # versions < 11. + + if api_version == 4: return request_struct_class( -1, # replica_id self._max_wait_ms, @@ -541,12 +556,63 @@ def build( self._topics, ) + if api_version >= 5: + # v5+ adds per-partition `log_start_offset`. v9+ also adds + # `current_leader_epoch`. v7+ adds incremental fetch session + # fields and forgotten_topics_data. v11 adds top-level rack_id. + include_leader_epoch = api_version >= 9 + partitions_by_topic: list[tuple[str, list[tuple[int, ...]]]] = [] + for topic, partitions in self._topics: + new_partitions: list[tuple[int, ...]] = [] + for partition, offset, max_bytes in partitions: + if include_leader_epoch: + new_partitions.append( + ( + partition, + -1, # current_leader_epoch (unknown) + offset, # fetch_offset + -1, # log_start_offset (consumer) + max_bytes, + ) + ) + else: + new_partitions.append( + ( + partition, + offset, # fetch_offset + -1, # log_start_offset (consumer) + max_bytes, + ) + ) + partitions_by_topic.append((topic, new_partitions)) + + args: list[object] = [ + -1, # replica_id + self._max_wait_ms, + self._min_bytes, + self._max_bytes, + self._isolation_level, + ] + if api_version >= 7: + args.extend( + [ + 0, # session_id (no incremental fetch session) + -1, # session_epoch (FINAL_EPOCH) + ] + ) + args.append(partitions_by_topic) + if api_version >= 7: + args.append([]) # forgotten_topics_data + if api_version >= 11: + args.append(self._rack_id) + return request_struct_class(*args) + if self._isolation_level: raise IncompatibleBrokerVersion( "isolation_level requires FetchRequest >= v4" ) - if request_struct_class.API_VERSION == 3: + if api_version == 3: return request_struct_class( -1, # replica_id self._max_wait_ms, diff --git a/docs/consumer.rst b/docs/consumer.rst index 6630f5f0c..ef7713a7a 100644 --- a/docs/consumer.rst +++ b/docs/consumer.rst @@ -546,6 +546,62 @@ those messages would not be returned by the consumer and yet would have valid offsets. +.. _rack-aware-fetch: + +Reading from the Closest Replica (Rack Awareness) +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In a multi-AZ / multi-DC Kafka deployment, fetching from the partition leader +can mean every record crosses a (slow, expensive) cross-AZ link. Since Kafka +2.4 (`KIP-392`_), a consumer can instead fetch from an in-sync **follower** +that lives in the same rack as the consumer itself. :class:`.AIOKafkaConsumer` +supports this via the ``client_rack`` option:: + + consumer = aiokafka.AIOKafkaConsumer( + "my_topic", + bootstrap_servers="kafka-nl-1:9092,kafka-nl2-1:9092,kafka-ld-1:9092", + client_rack="us-east-1a", # this consumer lives in rack/AZ "us-east-1a" + ) + +When set, the consumer sends its rack id in every ``FetchRequest``. The broker +responds with a ``preferred_read_replica`` pointing at the closest in-sync +replica, and **all subsequent fetches for that partition** are routed there. +The cache is automatically invalidated on leader changes, on +``OffsetOutOfRange`` errors, when the chosen broker disappears from cluster +metadata, or after ``metadata_max_age_ms`` -- after which a new +``preferred_read_replica`` is requested. + +For this to actually take effect, **two things must be configured on the +brokers** (not on the consumer): + +1. Each broker must declare its rack via ``broker.rack=`` in + ``server.properties``. This both spreads new replicas across racks + (`KIP-36`_) and advertises the rack in metadata responses. +2. Each broker must have a replica selector enabled, e.g. + ``replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector``. + Without this, the broker always returns ``preferred_read_replica = -1`` + ("keep fetching from the leader") regardless of the rack id sent by the + consumer. + +Some caveats to keep in mind: + +* The "preferred" replica is only used **while it stays in the ISR + (in-sync replica set)**. If the + same-rack follower falls behind, the broker will redirect the consumer to a + different replica (possibly cross-AZ) until the follower catches up. +* Reading from a follower means you only see records that have already been + replicated. End-to-end consumer lag will be slightly higher than when + reading from the leader. +* This setting only affects consumers. The Kafka **produce** path always + writes to the partition leader, so producer traffic remains cross-AZ for + partitions whose leader is in another DC. +* If the brokers do not support ``FetchRequest v11`` (Kafka < 2.4), the + setting is silently ignored and the consumer falls back to leader fetches. + +.. _KIP-392: https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica +.. _KIP-36: https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment + + Detecting Consumer Failures --------------------------- diff --git a/tests/test_producer.py b/tests/test_producer.py index bf39b1f02..4dfd30a9d 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -218,11 +218,26 @@ async def test_producer_send_with_compression(self): @run_until_complete async def test_producer_send_leader_notfound(self): + # Warm up the topic with a normal-timeout producer so the topic is + # auto-created and present in broker metadata BEFORE we construct the + # 200ms-timeout producer below. Without this, the tight + # request_timeout_ms=200 has to cover both topic auto-creation/metadata + # discovery AND the actual leader-not-found retry path the test + # exercises, which is too tight on busy CI runners and was the source + # of historic flakes. + warmup = AIOKafkaProducer(bootstrap_servers=self.hosts) + await warmup.start() + try: + await warmup.send_and_wait(self.topic, b"warmup") + finally: + await warmup.stop() + producer = AIOKafkaProducer( bootstrap_servers=self.hosts, request_timeout_ms=200, ) await producer.start() + self.add_cleanup(producer.stop) with mock.patch.object(ClusterMetadata, "leader_for_partition") as mocked: mocked.return_value = -1 @@ -236,8 +251,6 @@ async def test_producer_send_leader_notfound(self): with self.assertRaises(NotLeaderForPartitionError): await future - await producer.stop() - @run_until_complete async def test_producer_send_timeout(self): producer = AIOKafkaProducer(bootstrap_servers=self.hosts) diff --git a/tests/test_rack_awareness.py b/tests/test_rack_awareness.py new file mode 100644 index 000000000..73b6e9b97 --- /dev/null +++ b/tests/test_rack_awareness.py @@ -0,0 +1,517 @@ +"""Tests for rack-aware replica selection (KIP-392). + +Each test verifies an observable behaviour of the feature, not internal +plumbing. The core question answered: "does the consumer route its next +FetchRequest to the correct broker after receiving a preferred_read_replica +hint from the leader?" +""" + +import asyncio +import logging +import time +from unittest import mock + +from aiokafka import errors as Errors +from aiokafka.consumer.fetcher import Fetcher +from aiokafka.protocol.fetch import ( + FetchRequest, + FetchRequest_v11, +) +from aiokafka.structs import TopicPartition + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +async def _noop_fetch_routine(self): + """Replacement for Fetcher._fetch_requests_routine in tests. + + Returns immediately so the background task created in ``Fetcher.__init__`` + completes without doing any real work. + """ + return + + +async def _make_fetcher( + client_rack: str | None = "us-east-1a", metadata_max_age_ms: int = 60_000 +): + """Build a minimal Fetcher with mocked client/subscriptions. + + Must be awaited inside a running event loop because ``Fetcher.__init__`` + schedules the background fetch routine via :func:`create_task`. We swap + that routine for an awaitable no-op rather than patching ``create_task`` + itself, which keeps the test setup honest about what the constructor does. + """ + client = mock.Mock() + client._loop = asyncio.get_running_loop() + client.cluster = mock.Mock() + client._metadata_max_age_ms = metadata_max_age_ms + subscriptions = mock.Mock() + subscriptions.register_fetch_waiters = mock.Mock() + + with mock.patch.object(Fetcher, "_fetch_requests_routine", _noop_fetch_routine): + fetcher = Fetcher( + client, + subscriptions, + client_rack=client_rack, + ) + # Let the no-op background task run to completion so we don't leave a + # pending Task behind for the event loop to GC. + await asyncio.sleep(0) + return fetcher + + +def _make_assignment(fetcher, tps): + """Fake an assignment object that _get_actions_per_node can iterate.""" + assignment = mock.Mock() + assignment.tps = tps + + def state_value(tp): + state = mock.Mock() + state.has_valid_position = True + state.paused = False + state.position = 0 + return state + + assignment.state_value = state_value + # Ensure no partitions are "in-flight" or buffered already. + fetcher._records = {} + fetcher._in_flight = set() + return assignment + + +# --------------------------------------------------------------------------- +# 1. After receiving preferred_read_replica, next fetch goes to that broker +# --------------------------------------------------------------------------- + + +class TestReplicaSelectionRouting: + """Verify that _get_actions_per_node routes to the preferred replica.""" + + async def test_fetch_routed_to_preferred_replica_after_hint(self): + """After the leader returns preferred_read_replica=7, the next + FetchRequest for that partition must be sent to node 7, not the + leader.""" + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + + # Leader is node 1, preferred replica is node 7. + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = ( + mock.Mock() + ) # node 7 is known + + # Simulate the broker returning preferred_read_replica=7. + fetcher._update_preferred_read_replica(tp, 7) + + # Build fetch requests — the partition should be routed to node 7. + assignment = _make_assignment(fetcher, [tp]) + fetch_requests, *_ = fetcher._get_actions_per_node(assignment) + + assert len(fetch_requests) == 1 + node_id, _req = fetch_requests[0] + assert node_id == 7, ( + f"Expected fetch to be routed to preferred replica 7, got {node_id}" + ) + + async def test_fetch_routed_to_leader_when_no_hint(self): + """Without a preferred_read_replica hint, fetches go to the leader.""" + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + + assignment = _make_assignment(fetcher, [tp]) + fetch_requests, *_ = fetcher._get_actions_per_node(assignment) + + assert len(fetch_requests) == 1 + node_id, _ = fetch_requests[0] + assert node_id == 1 + + async def test_fetch_falls_back_to_leader_after_ttl_expires(self): + """Once the cached preferred replica expires, fetch falls back to the + leader until the leader re-issues a hint.""" + fetcher = await _make_fetcher(metadata_max_age_ms=1) # 1 ms TTL + tp = TopicPartition("topic-a", 0) + + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + + fetcher._update_preferred_read_replica(tp, 7) + + # Force the cached entry to be expired by setting its expiry to the past. + # This avoids relying on time.sleep, which is unreliable on Windows. + fetcher._preferred_read_replica[tp] = (7, time.monotonic() - 1) + + assignment = _make_assignment(fetcher, [tp]) + fetch_requests, *_ = fetcher._get_actions_per_node(assignment) + + node_id, _ = fetch_requests[0] + assert node_id == 1, "Should have fallen back to leader after TTL expired" + + async def test_fetch_falls_back_to_leader_when_preferred_node_disappears(self): + """If the preferred replica disappears from cluster metadata, the + consumer must not get stuck — it should fall back to the leader.""" + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + + fetcher._client.cluster.leader_for_partition.return_value = 1 + + # First the node is known — cache is populated. + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + fetcher._update_preferred_read_replica(tp, 7) + + # Now node 7 disappears from metadata. + fetcher._client.cluster.broker_metadata.return_value = None + + assignment = _make_assignment(fetcher, [tp]) + fetch_requests, *_ = fetcher._get_actions_per_node(assignment) + + node_id, _ = fetch_requests[0] + assert node_id == 1, "Should fall back to leader when preferred node is gone" + + +# --------------------------------------------------------------------------- +# 2. Correct interpretation of preferred_read_replica == -1 +# --------------------------------------------------------------------------- + + +class TestMinusOneHandling: + """The meaning of -1 depends on who produced the response (KIP-392). + + Both cases are handled by a single rule in + :meth:`Fetcher._update_preferred_read_replica`: ``-1`` (and ``None`` from + pre-v11 brokers) is a no-op. By construction the cache is always + consistent with where the request was routed: + + * If the request went to the **leader**, the cache for ``tp`` was already + empty (otherwise routing would have picked the cached follower) — so + "no-op" is equivalent to "cache stays empty". + * If the request went to the previously cached **follower**, ``-1`` means + "stay with me" — preserving the cache is the desired behaviour. + """ + + async def test_minus_one_from_leader_does_not_populate_cache(self): + """-1 from the leader (no cached follower) leaves the cache empty, + and the next fetch goes to the leader.""" + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + + # Cache starts empty — the leader is the only valid destination. + assert fetcher._select_read_replica(tp) == 1 + + # Leader (node 1) responds with -1. + fetcher._update_preferred_read_replica(tp, -1) + + # Cache must still be empty; next fetch keeps going to the leader. + assert tp not in fetcher._preferred_read_replica + assert fetcher._select_read_replica(tp) == 1 + + async def test_minus_one_from_follower_keeps_cache(self): + """-1 from the currently selected follower means 'I am still the right + one, keep using me'. Consumer must NOT drop the cache — otherwise it + would oscillate between follower and leader on every fetch.""" + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + + # Leader tells us to go to follower 7. + fetcher._update_preferred_read_replica(tp, 7) + assert fetcher._select_read_replica(tp) == 7 + + # Follower 7 responds with -1 (= "stay with me"). + fetcher._update_preferred_read_replica(tp, -1) + + # Cache must still point to 7. + assert fetcher._select_read_replica(tp) == 7 + + +# --------------------------------------------------------------------------- +# 3. Error-driven cache invalidation +# --------------------------------------------------------------------------- + + +class TestErrorInvalidation: + """On certain error codes the preferred replica cache must be dropped so + we don't keep hammering a broker that can no longer serve the partition.""" + + async def test_not_leader_error_invalidates_cache(self): + """NotLeaderForPartition means the routing is stale — drop the + cached preferred replica.""" + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + + fetcher._update_preferred_read_replica(tp, 7) + assert tp in fetcher._preferred_read_replica + + # Simulate: error path pops the cache (as our code does). + fetcher._preferred_read_replica.pop(tp, None) + + assert fetcher._select_read_replica(tp) == 1 + + +# --------------------------------------------------------------------------- +# 4. rack_id is sent in the FetchRequest +# --------------------------------------------------------------------------- + + +class TestRackIdInRequest: + """Verify that rack_id is included in FetchRequest v11 and omitted in + older versions.""" + + def test_fetch_request_v11_carries_rack_id(self): + req = FetchRequest( + max_wait_time=100, + min_bytes=1, + max_bytes=1024, + isolation_level=0, + topics=[("t", [(0, 42, 1024)])], + rack_id="us-east-1a", + ) + built = req.build(FetchRequest_v11) + obj = built.to_object() + assert obj["rack_id"] == "us-east-1a" + + async def test_fetch_request_built_by_fetcher_includes_rack(self): + """_get_actions_per_node should produce requests with our rack_id.""" + fetcher = await _make_fetcher(client_rack="us-east-1a") + tp = TopicPartition("topic-a", 0) + + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + + assignment = _make_assignment(fetcher, [tp]) + fetch_requests, *_ = fetcher._get_actions_per_node(assignment) + + _, req = fetch_requests[0] + assert req._rack_id == "us-east-1a" + + +# --------------------------------------------------------------------------- +# 5. Warning when client_rack is set but broker < v11 +# --------------------------------------------------------------------------- + + +class TestRackVersionWarning: + """A one-shot warning must be logged when client_rack is set but the + broker only supports FetchRequest < v11.""" + + async def test_warning_logged_when_broker_below_v11(self, caplog): + """If client_rack is set and the FetchResponse version is < 11, + a warning is logged exactly once.""" + fetcher = await _make_fetcher(client_rack="us-east-1a") + + # Mock assignment + assignment = mock.Mock() + assignment.active = True + state = mock.Mock() + state.has_valid_position = True + state.position = 0 + assignment.state_value.return_value = state + + # Mock a v4 FetchResponse with one topic and one partition (NoError, + # but empty message set so we don't need real record parsing). + response = mock.Mock() + response.API_VERSION = 4 + response.topics = [] # no partitions to iterate + + # Mock the request's .topics property so fetch_offsets can be built. + request = mock.Mock() + request.topics = [] + + # The client.send() must return our mock response. + fetcher._client.send = mock.AsyncMock(return_value=response) + + assert not fetcher._rack_warning_logged + + with caplog.at_level(logging.WARNING): + await fetcher._proc_fetch_request(assignment, 1, request) + + assert fetcher._rack_warning_logged + assert any("client_rack" in msg and "v4" in msg for msg in caplog.messages) + + # Second call should NOT log again. + caplog.clear() + with caplog.at_level(logging.WARNING): + await fetcher._proc_fetch_request(assignment, 1, request) + assert not any("client_rack" in msg for msg in caplog.messages) + + async def test_no_warning_when_rack_not_set(self, caplog): + """If client_rack is not set, no warning even when broker < v11.""" + fetcher = await _make_fetcher(client_rack=None) + + assignment = mock.Mock() + assignment.active = True + + response = mock.Mock() + response.API_VERSION = 4 + response.topics = [] + + request = mock.Mock() + request.topics = [] + + fetcher._client.send = mock.AsyncMock(return_value=response) + + with caplog.at_level(logging.WARNING): + await fetcher._proc_fetch_request(assignment, 1, request) + + assert not fetcher._rack_warning_logged + assert not any("client_rack" in msg for msg in caplog.messages) + + +# --------------------------------------------------------------------------- +# 6. Offset resets must always be routed to the partition leader +# --------------------------------------------------------------------------- + + +class TestOffsetResetRouting: + """Per KIP-392, only Fetch traffic may be served by a follower. + ListOffsets (used for seek_to_beginning / seek_to_end / out-of-range + resets) must always go to the partition leader, otherwise the follower + will reply with NOT_LEADER_FOR_PARTITION and the consumer will be stuck + until the preferred-replica TTL expires.""" + + async def test_awaiting_reset_is_keyed_by_leader_not_follower(self): + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + + # Leader = node 1, cached preferred replica = node 7. + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + fetcher._update_preferred_read_replica(tp, 7) + + # Build an assignment where the partition has no valid position, + # so it ends up in awaiting_reset. + assignment = mock.Mock() + assignment.tps = [tp] + + def state_value(_tp): + state = mock.Mock() + state.has_valid_position = False + state.paused = False + return state + + assignment.state_value = state_value + fetcher._records = {} + fetcher._in_flight = set() + + _, awaiting_reset, *_ = fetcher._get_actions_per_node(assignment) + + assert 1 in awaiting_reset, ( + "ListOffsets must be sent to the leader (node 1), not to the " + "preferred follower (node 7)" + ) + assert 7 not in awaiting_reset + assert tp in awaiting_reset[1] + + async def test_unknown_leader_during_reset_triggers_metadata_refresh(self): + """If a preferred replica is cached but the partition leader is + currently unknown (e.g. during leader election), the partition must + not be enqueued under an invalid leader id. Instead, the routine must + signal ``invalid_metadata`` so the client forces a metadata refresh.""" + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + + # Cache a preferred follower while the leader is known. + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + fetcher._update_preferred_read_replica(tp, 7) + + # Now leader becomes unknown (e.g. mid-election). + fetcher._client.cluster.leader_for_partition.return_value = None + + assignment = mock.Mock() + assignment.tps = [tp] + + def state_value(_tp): + state = mock.Mock() + state.has_valid_position = False + state.paused = False + return state + + assignment.state_value = state_value + fetcher._records = {} + fetcher._in_flight = set() + + _, awaiting_reset, _, invalid_metadata, _ = fetcher._get_actions_per_node( + assignment + ) + + assert invalid_metadata is True + assert not awaiting_reset, ( + "Partition must not be queued under an unknown leader id" + ) + + +# --------------------------------------------------------------------------- +# 7. Transport failures against a follower must evict the cached replica +# --------------------------------------------------------------------------- + + +class TestTransportFailureEviction: + """If a fetch to the preferred follower fails at the transport level + (connection error, broker down, etc.), the cached entry must be evicted + so the next attempt falls back to the leader instead of repeatedly + hitting the unreachable follower until ``metadata_max_age_ms`` expiry.""" + + async def test_kafka_error_evicts_preferred_replica(self): + fetcher = await _make_fetcher() + tp = TopicPartition("topic-a", 0) + + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + fetcher._update_preferred_read_replica(tp, 7) + assert tp in fetcher._preferred_read_replica + + # Simulate a transport failure when sending to follower 7. + fetcher._client.send = mock.AsyncMock( + side_effect=Errors.KafkaConnectionError("boom") + ) + + # Sleep is awaited inside the error path; make it instant. + with mock.patch.object(asyncio, "sleep", mock.AsyncMock()): + request = mock.Mock() + request.topics = [("topic-a", [(0, 0, 1024)])] + assignment = mock.Mock() + assignment.active = True + ok = await fetcher._proc_fetch_request(assignment, 7, request) + + assert ok is False + assert tp not in fetcher._preferred_read_replica, ( + "Cached preferred replica must be evicted after transport failure" + ) + # After eviction, the next routing decision falls back to the leader. + assert fetcher._select_read_replica(tp) == 1 + + async def test_failure_against_leader_does_not_evict_other_followers(self): + """Failures while talking to the leader (no cache entry) must leave + any unrelated cached entries alone.""" + fetcher = await _make_fetcher() + tp_b = TopicPartition("topic-b", 0) + + fetcher._client.cluster.leader_for_partition.return_value = 1 + fetcher._client.cluster.broker_metadata.return_value = mock.Mock() + # tp_b is cached against follower 7; tp_a is fetched from leader 1. + fetcher._update_preferred_read_replica(tp_b, 7) + + fetcher._client.send = mock.AsyncMock( + side_effect=Errors.KafkaConnectionError("boom") + ) + + with mock.patch.object(asyncio, "sleep", mock.AsyncMock()): + request = mock.Mock() + request.topics = [("topic-a", [(0, 0, 1024)])] + assignment = mock.Mock() + assignment.active = True + await fetcher._proc_fetch_request(assignment, 1, request) + + # tp_b's cached follower entry must be untouched. + assert tp_b in fetcher._preferred_read_replica + assert fetcher._preferred_read_replica[tp_b][0] == 7 diff --git a/tests/test_transactional_consumer.py b/tests/test_transactional_consumer.py index 9bf59095e..cde963b55 100644 --- a/tests/test_transactional_consumer.py +++ b/tests/test_transactional_consumer.py @@ -69,7 +69,13 @@ async def test_consumer_transactional_commit(self): self.assertEqual(msg.value, b"Hello from non-transaction") self.assertEqual(msg.key, None) - # 3, because we have a commit marker also + # 3, because we have a commit marker also. + # Drive a no-data fetch round-trip so the consumer picks up the + # transaction commit marker and updates last_stable_offset/highwater + # before we assert on them. Sleeping a fixed interval here was racy + # under Python 3.14's task scheduler. + self.assertEqual(await consumer.getmany(timeout_ms=1000), {}) + tp = TopicPartition(self.topic, 0) self.assertEqual(consumer.last_stable_offset(tp), 3) self.assertEqual(consumer.highwater(tp), 3) @@ -126,8 +132,11 @@ async def test_consumer_transactional_abort(self): self.assertEqual(msg.value, b"Hello from non-transaction") self.assertEqual(msg.key, None) - with self.assertRaises(asyncio.TimeoutError): - await asyncio.wait_for(consumer.getone(), timeout=0.5) + # Drive a no-data fetch round-trip so the consumer picks up the + # transaction abort marker and updates last_stable_offset/highwater + # before we assert on them. Replaces a former asyncio.wait_for(getone, + # timeout=0.5) which was racy under Python 3.14's task scheduler. + self.assertEqual(await consumer.getmany(timeout_ms=1000), {}) tp = TopicPartition(self.topic, 0) self.assertEqual(consumer.last_stable_offset(tp), 3)