Skip to content

Commit 7d0bd25

Browse files
vmaurinVincent Maurin
andauthored
Resolve API versions on connection (#1136)
* Resolve API versions on connection As stated in the Kafka protocol documentation https://kafka.apache.org/protocol#api_versions the API versions should be solved for each single connection we open with a broker (the broker might be updated while we talk to it). This commit aim to move all the versions logic into the protocol and the connection only instead of being spread in various classes. It is introducing for that a new "Request" class that is acting as a builder for the Struct we are going to send into the wire. The Request class tries to collect all possible parameters, and then will react according the best available version in two possible ways: * failing, as the intent of the caller was clearly to use a new feature that is not available on the broker yet * best effort, as the caller can totally work with an older version of the API This choice is made on per API basis. From the caller perspective, if a different logic need to be ran according the API version, it can look up on the Response.API_VERSION Doing such a change will help aiokafka supporting Kafka 4.0 as some API versions were removed as mentioned in #1085 It might not achieve the compatibility as we could lack some protocol definition, but it will definitively help. * Remove support for old brokers <0.11 * Improve typing in protocol package * Leverage type annotations for Request Instead of using a list of classes in the Request class for the structs, we can instead define a type union: * it could be then transformed and used at runtime, as the former CLASSES var * it could help detecting bugs with a type checker like mypy * Fix flaky test * Adjust logging --------- Co-authored-by: Vincent Maurin <vincent.maurin@checkstep.com>
1 parent 5ec91e7 commit 7d0bd25

54 files changed

Lines changed: 2548 additions & 1938 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -226,12 +226,6 @@ jobs:
226226
scala: "2.13"
227227

228228
# Older brokers against latest python version
229-
- python: "3.13"
230-
kafka: "0.9.0.1"
231-
scala: "2.11"
232-
- python: "3.13"
233-
kafka: "0.10.2.1"
234-
scala: "2.11"
235229
- python: "3.13"
236230
kafka: "0.11.0.3"
237231
scala: "2.12"

CHANGES.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ Changelog
55
0.13.0 (????-??-??)
66
===================
77

8+
Breaking changes:
9+
10+
* Resolve API versions at connection with brokers
11+
`api_version` parameter has been removed from the different clients (admin/consumer/producer)
12+
(pr #1136 by @vmaurin)
13+
814
Improved Documentation:
915

1016
* Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times``

README.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ AIOKafkaConsumer
4444

4545
AIOKafkaConsumer is a high-level, asynchronous message consumer.
4646
It interacts with the assigned Kafka Group Coordinator node to allow multiple
47-
consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
47+
consumers to load balance consumption of topics (requires kafka >= 0.11).
4848

4949
Example of AIOKafkaConsumer usage:
5050

aiokafka/abc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,6 @@ def extensions(self):
136136

137137

138138
__all__ = [
139-
"ConsumerRebalanceListener",
140139
"AbstractTokenProvider",
140+
"ConsumerRebalanceListener",
141141
]

aiokafka/admin/client.py

Lines changed: 25 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import asyncio
22
import logging
33
from collections import defaultdict
4-
from collections.abc import Sequence
54
from ssl import SSLContext
65
from typing import Any
76

@@ -10,15 +9,13 @@
109
from aiokafka import __version__
1110
from aiokafka.client import AIOKafkaClient
1211
from aiokafka.errors import (
13-
IncompatibleBrokerVersion,
1412
LeaderNotAvailableError,
1513
NotControllerError,
1614
NotLeaderForPartitionError,
1715
for_code,
1816
)
1917
from aiokafka.protocol.admin import (
2018
AlterConfigsRequest,
21-
ApiVersionRequest_v0,
2219
CreatePartitionsRequest,
2320
CreateTopicsRequest,
2421
DeleteRecordsRequest,
@@ -28,7 +25,8 @@
2825
ListGroupsRequest,
2926
)
3027
from aiokafka.protocol.api import Request, Response
31-
from aiokafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
28+
from aiokafka.protocol.commit import OffsetFetchRequest
29+
from aiokafka.protocol.coordination import FindCoordinatorRequest
3230
from aiokafka.protocol.metadata import MetadataRequest
3331
from aiokafka.structs import OffsetAndMetadata, TopicPartition
3432

@@ -79,10 +77,6 @@ class AIOKafkaAdminClient:
7977
ssl_context (ssl.SSLContext): Pre-configured SSLContext for wrapping
8078
socket connections. If provided, all other ssl_* configurations
8179
will be ignored. Default: None.
82-
api_version (str): Specify which kafka API version to use.
83-
AIOKafka supports Kafka API versions >=0.9 only.
84-
If set to 'auto', will attempt to infer the broker version by
85-
probing various APIs. Default: auto
8680
"""
8781

8882
def __init__(
@@ -97,7 +91,6 @@ def __init__(
9791
metadata_max_age_ms: int = 300000,
9892
security_protocol: str = "PLAINTEXT",
9993
ssl_context: SSLContext | None = None,
100-
api_version: str = "auto",
10194
sasl_mechanism: str = "PLAIN",
10295
sasl_plain_username: str | None = None,
10396
sasl_plain_password: str | None = None,
@@ -116,7 +109,6 @@ def __init__(
116109
metadata_max_age_ms=metadata_max_age_ms,
117110
request_timeout_ms=request_timeout_ms,
118111
retry_backoff_ms=retry_backoff_ms,
119-
api_version=api_version,
120112
ssl_context=ssl_context,
121113
security_protocol=security_protocol,
122114
connections_max_idle_ms=connections_max_idle_ms,
@@ -147,46 +139,13 @@ async def _send_request(
147139
node_id = self._client.get_random_node()
148140
return await self._client.send(node_id, request)
149141

150-
async def _get_version_info(self):
151-
resp = await self._send_request(ApiVersionRequest_v0())
152-
for api_key, min_version, max_version in resp.api_versions:
153-
self._version_info[api_key] = (min_version, max_version)
154-
155142
async def start(self):
156143
if self._started:
157144
return
158145
await self._client.bootstrap()
159-
await self._get_version_info()
160146
log.debug("AIOKafkaAdminClient started")
161147
self._started = True
162148

163-
def _matching_api_version(self, operation: Sequence[type[Request]]) -> int:
164-
"""Find the latest version of the protocol operation
165-
supported by both this library and the broker.
166-
167-
This resolves to the lesser of either the latest api
168-
version this library supports, or the max version
169-
supported by the broker.
170-
171-
:param operation: A list of protocol operation versions from
172-
aiokafka.protocol.
173-
:return: The max matching version number between client and broker.
174-
"""
175-
api_key = operation[0].API_KEY
176-
if not self._version_info or api_key not in self._version_info:
177-
raise IncompatibleBrokerVersion(
178-
f"Kafka broker does not support the '{operation[0].__name__}' "
179-
"Kafka protocol."
180-
)
181-
min_version, max_version = self._version_info[api_key]
182-
version = min(len(operation) - 1, max_version)
183-
if version < min_version:
184-
raise IncompatibleBrokerVersion(
185-
f"No version of the '{operation[0].__name__}' Kafka protocol "
186-
"is supported by both the client and broker."
187-
)
188-
return version
189-
190149
async def _send_request_to_node(self, node_id: int, request: Request) -> Response:
191150
async with async_timeout.timeout(self._client._request_timeout_ms / 1000):
192151
while True:
@@ -198,8 +157,6 @@ async def _send_request_to_node(self, node_id: int, request: Request) -> Respons
198157
return await self._client.send(node_id, request)
199158

200159
async def _send_to_controller(self, request: Request) -> Response:
201-
# With "auto" api_version the first request is sent with minimal
202-
# version, so the controller is not returned in metadata.
203160
if self._client.cluster.controller is None:
204161
await self._client.force_metadata_update()
205162

@@ -239,32 +196,16 @@ async def create_topics(
239196
Not supported by all versions. Default: False
240197
:return: Appropriate version of CreateTopicResponse class.
241198
"""
242-
version = self._matching_api_version(CreateTopicsRequest)
243199
topics = [self._convert_new_topic_request(nt) for nt in new_topics]
244200
log.debug("Attempting to send create topic request for %r", new_topics)
245201
timeout_ms = timeout_ms or self._request_timeout_ms
246-
if version == 0:
247-
if validate_only:
248-
raise IncompatibleBrokerVersion(
249-
"validate_only requires CreateTopicsRequest >= v1, "
250-
f"which is not supported by Kafka {self._client.api_version}."
251-
)
252-
request = CreateTopicsRequest[version](
253-
create_topic_requests=topics,
254-
timeout=timeout_ms,
255-
)
256-
elif version <= 3:
257-
request = CreateTopicsRequest[version](
202+
return await self._send_to_controller(
203+
CreateTopicsRequest(
258204
create_topic_requests=topics,
259205
timeout=timeout_ms,
260206
validate_only=validate_only,
261207
)
262-
else:
263-
raise NotImplementedError(
264-
f"Support for CreateTopics v{version} has not yet been added "
265-
"to AIOKafkaAdminClient."
266-
)
267-
return await self._send_to_controller(request)
208+
)
268209

269210
async def delete_topics(
270211
self,
@@ -278,9 +219,7 @@ async def delete_topics(
278219
before the broker returns.
279220
:return: Appropriate version of DeleteTopicsResponse class.
280221
"""
281-
version = self._matching_api_version(DeleteTopicsRequest)
282-
req_cls = DeleteTopicsRequest[version]
283-
request = req_cls(topics, timeout_ms or self._request_timeout_ms)
222+
request = DeleteTopicsRequest(topics, timeout_ms or self._request_timeout_ms)
284223
return await self._send_to_controller(request)
285224

286225
async def _get_cluster_metadata(
@@ -292,8 +231,7 @@ async def _get_cluster_metadata(
292231
:param topics List of topic names, None means "get all topics"
293232
:return MetadataResponse
294233
"""
295-
req_cls = MetadataRequest[self._matching_api_version(MetadataRequest)]
296-
request = req_cls(topics=topics)
234+
request = MetadataRequest(topics)
297235
return await self._send_request(request)
298236

299237
async def list_topics(self) -> list[str]:
@@ -332,31 +270,18 @@ async def describe_configs(
332270
"""
333271

334272
futures = []
335-
version = self._matching_api_version(DescribeConfigsRequest)
336-
if version == 0 and include_synonyms:
337-
raise IncompatibleBrokerVersion(
338-
"include_synonyms requires DescribeConfigsRequest >= v1,"
339-
f" which is not supported by Kafka {self._client.api_version}."
340-
)
341273
broker_res, topic_res = self._convert_config_resources(
342274
config_resources,
343275
"describe",
344276
)
345-
req_cls = DescribeConfigsRequest[version]
346277
for broker_id in broker_res:
347-
if version == 0:
348-
req = req_cls(resources=broker_res[broker_id])
349-
else:
350-
req = req_cls(
351-
resources=broker_res[broker_id],
352-
include_synonyms=include_synonyms,
353-
)
278+
req = DescribeConfigsRequest(
279+
resources=broker_res[broker_id],
280+
include_synonyms=include_synonyms,
281+
)
354282
futures.append(self._send_request(req, broker_id))
355283
if topic_res:
356-
if version == 0:
357-
req = req_cls(topic_res)
358-
else:
359-
req = req_cls(topic_res, include_synonyms)
284+
req = DescribeConfigsRequest(topic_res, include_synonyms)
360285
futures.append(self._send_request(req))
361286
return await asyncio.gather(*futures)
362287

@@ -368,15 +293,15 @@ async def alter_configs(
368293
:return: Appropriate version of AlterConfigsResponse class.
369294
"""
370295
futures = []
371-
version = self._matching_api_version(AlterConfigsRequest)
372296
broker_resources, topic_resources = self._convert_config_resources(
373297
config_resources,
374298
"alter",
375299
)
376-
req_cls = AlterConfigsRequest[version]
377-
futures.append(self._send_request(req_cls(resources=topic_resources)))
300+
futures.append(
301+
self._send_request(AlterConfigsRequest(resources=topic_resources))
302+
)
378303
for broker_id in broker_resources:
379-
req = req_cls(resources=broker_resources[broker_id])
304+
req = AlterConfigsRequest(resources=broker_resources[broker_id])
380305
futures.append(self._send_request(req, broker_id))
381306
return await asyncio.gather(*futures)
382307

@@ -439,10 +364,8 @@ async def create_partitions(
439364
Default: False
440365
:return: Appropriate version of CreatePartitionsResponse class.
441366
"""
442-
version = self._matching_api_version(CreatePartitionsRequest)
443-
req_class = CreatePartitionsRequest[version]
444367
converted_partitions = self._convert_topic_partitions(topic_partitions)
445-
req = req_class(
368+
req = CreatePartitionsRequest(
446369
topic_partitions=converted_partitions,
447370
timeout=timeout_ms or self._request_timeout_ms,
448371
validate_only=validate_only,
@@ -478,14 +401,6 @@ async def describe_consumer_groups(
478401
:return: A list of group descriptions. For now the group descriptions
479402
are the raw results from the DescribeGroupsResponse.
480403
"""
481-
version = self._matching_api_version(DescribeGroupsRequest)
482-
if version < 3 and include_authorized_operations:
483-
raise IncompatibleBrokerVersion(
484-
"include_authorized_operations requests "
485-
"DescribeGroupsRequest >= v3, which is not "
486-
f"supported by Kafka {version}"
487-
)
488-
req_class = DescribeGroupsRequest[version]
489404
futures = []
490405
node_to_groups = defaultdict(set)
491406
for group_id in group_ids:
@@ -495,13 +410,10 @@ async def describe_consumer_groups(
495410
node_id = group_coordinator_id
496411
node_to_groups[node_id].add(group_id)
497412
for node_id, groups in node_to_groups.items():
498-
if include_authorized_operations:
499-
req = req_class(
500-
groups=list(groups),
501-
include_authorized_operations=include_authorized_operations,
502-
)
503-
else:
504-
req = req_class(groups=list(groups))
413+
req = DescribeGroupsRequest(
414+
groups=list(groups),
415+
include_authorized_operations=include_authorized_operations,
416+
)
505417
future = self._send_request(req, node_id)
506418
futures.append(future)
507419
results = await asyncio.gather(*futures)
@@ -541,7 +453,7 @@ async def list_consumer_groups(
541453
consumer_groups = set()
542454
for broker_id in broker_ids:
543455
response = await self._send_request(
544-
ListGroupsRequest[self._matching_api_version(ListGroupsRequest)](),
456+
ListGroupsRequest(),
545457
broker_id,
546458
)
547459
if response.error_code:
@@ -559,16 +471,7 @@ async def find_coordinator(self, group_id: str, coordinator_type: int = 0) -> in
559471
560472
:return int: the acting coordinator broker id
561473
"""
562-
version = self._matching_api_version(GroupCoordinatorRequest)
563-
if version == 0 and coordinator_type:
564-
raise IncompatibleBrokerVersion(
565-
"Cannot query for transaction id on current broker version"
566-
)
567-
req_class = GroupCoordinatorRequest[version]
568-
if version == 0:
569-
request = req_class(consumer_group=group_id)
570-
else:
571-
request = req_class(group_id, coordinator_type)
474+
request = FindCoordinatorRequest(group_id, coordinator_type)
572475
response = await self._send_request(request)
573476
if response.error_code:
574477
err = for_code(response.error_code)
@@ -605,13 +508,6 @@ async def list_consumer_group_offsets(
605508
TopicPartition. A `-1` can only happen for partitions that are
606509
explicitly specified.
607510
"""
608-
version = self._matching_api_version(OffsetFetchRequest)
609-
if version <= 1 and partitions is None:
610-
raise ValueError(
611-
f"""OffsetFetchRequest_v{version} requires specifying the
612-
partitions for which to fetch offsets. Omitting the
613-
partitions is only supported on brokers >= 0.10.2"""
614-
)
615511
if partitions:
616512
topics_partitions_dict = defaultdict(set)
617513
for topic, partition in partitions:
@@ -620,7 +516,7 @@ async def list_consumer_group_offsets(
620516
(topic, list(partitions))
621517
for topic, partitions in topics_partitions_dict.items()
622518
]
623-
request = OffsetFetchRequest[version](group_id, partitions)
519+
request = OffsetFetchRequest(group_id, partitions)
624520
if group_coordinator_id is None:
625521
group_coordinator_id = await self.find_coordinator(group_id)
626522
response = await self._send_request(request, group_coordinator_id)
@@ -646,8 +542,6 @@ async def delete_records(
646542
:param timeout_ms: Milliseconds to wait for the deletion to complete.
647543
:return: Appropriate version of DeleteRecordsResponse class.
648544
"""
649-
version = self._matching_api_version(DeleteRecordsRequest)
650-
651545
metadata = await self._get_cluster_metadata()
652546

653547
self._client.cluster.update_metadata(metadata)
@@ -663,10 +557,8 @@ async def delete_records(
663557
raise LeaderNotAvailableError()
664558
requests[leader][tp.topic].append((tp.partition, records))
665559

666-
req_cls = DeleteRecordsRequest[version]
667-
668560
for leader, delete_request in requests.items():
669-
request = req_cls(
561+
request = DeleteRecordsRequest(
670562
self._convert_records_to_delete(delete_request),
671563
timeout_ms or self._request_timeout_ms,
672564
)

0 commit comments

Comments
 (0)