Skip to content

Commit b38742d

Browse files
author
Vincent Maurin
committed
Simplify flexible versions
The flexible versions is a protocol specificity for newer versions of the API. When an API is flexible, it is using more compact structures and also allow additional "dynamic" fields that could be added without the need to introduce a new API versions. This commit move the flexible versions support to the protocol layer, so it is more transparent and easy when defining Struct classes and schemas. When defining the schema, we can specify a tagged field with a tuple containing the field name and the field tag.
1 parent 7d0bd25 commit b38742d

11 files changed

Lines changed: 330 additions & 366 deletions

File tree

CHANGES.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ Breaking changes:
1111
`api_version` parameter has been removed from the different clients (admin/consumer/producer)
1212
(pr #1136 by @vmaurin)
1313

14+
New features:
15+
16+
* Simplify flexible versions in schema.
17+
Defining an API request or response schemas that should support
18+
flexible versions (KIP-482) is now achieved by setting `FLEXIBLE_VERSION` to True.
19+
Tagged fields could be expressed with ("name", tag) instead of just a name.
20+
(pr #1139 by @vmaurin)
21+
1422
Improved Documentation:
1523

1624
* Fix incomplete documentation for `AIOKafkaConsumer.offset_for_times``

aiokafka/conn.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,10 +428,11 @@ def send(self, request, expect_response=True):
428428
) from err
429429

430430
log.debug(
431-
"Request to %s:%d %d: %s",
431+
"Request to %s:%d %d: %s, %s",
432432
self._host,
433433
self._port,
434434
correlation_id,
435+
header,
435436
request_struct,
436437
)
437438

@@ -565,10 +566,11 @@ def _handle_frame(self, resp):
565566
if not fut.done():
566567
response = resp_type.decode(resp)
567568
log.debug(
568-
"Response from %s:%d %d: %s",
569+
"Response from %s:%d %d: %s, %s",
569570
self._host,
570571
self._port,
571572
correlation_id,
573+
response_header,
572574
response,
573575
)
574576
fut.set_result(response)

aiokafka/protocol/abstract.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@
88
class AbstractType(Generic[T], metaclass=abc.ABCMeta):
99
@classmethod
1010
@abc.abstractmethod
11-
def encode(cls, value: T) -> bytes: ...
11+
def encode(cls, value: T, flexible: bool) -> bytes: ...
1212

1313
@classmethod
1414
@abc.abstractmethod
15-
def decode(cls, data: BytesIO) -> T: ...
15+
def decode(cls, data: BytesIO, flexible: bool) -> T: ...
1616

1717
@classmethod
1818
def repr(cls, value: T) -> str:

aiokafka/protocol/admin.py

Lines changed: 30 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,13 @@
88
Array,
99
Boolean,
1010
Bytes,
11-
CompactArray,
12-
CompactString,
1311
Float64,
1412
Int8,
1513
Int16,
1614
Int32,
1715
Int64,
1816
Schema,
1917
String,
20-
TaggedFields,
2118
)
2219

2320

@@ -1453,53 +1450,48 @@ def build(
14531450
class AlterPartitionReassignmentsResponse_v0(Response):
14541451
API_KEY = 45
14551452
API_VERSION = 0
1453+
FLEXIBLE_VERSION = True
14561454
SCHEMA = Schema(
14571455
("throttle_time_ms", Int32),
14581456
("error_code", Int16),
1459-
("error_message", CompactString("utf-8")),
1457+
("error_message", String("utf-8")),
14601458
(
14611459
"responses",
1462-
CompactArray(
1463-
("name", CompactString("utf-8")),
1460+
Array(
1461+
("name", String("utf-8")),
14641462
(
14651463
"partitions",
1466-
CompactArray(
1464+
Array(
14671465
("partition_index", Int32),
14681466
("error_code", Int16),
1469-
("error_message", CompactString("utf-8")),
1470-
("tags", TaggedFields),
1467+
("error_message", String("utf-8")),
14711468
),
14721469
),
1473-
("tags", TaggedFields),
14741470
),
14751471
),
1476-
("tags", TaggedFields),
14771472
)
14781473

14791474

14801475
class AlterPartitionReassignmentsRequest_v0(RequestStruct):
1481-
FLEXIBLE_VERSION = True
14821476
API_KEY = 45
14831477
API_VERSION = 0
1478+
FLEXIBLE_VERSION = True
14841479
RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
14851480
SCHEMA = Schema(
14861481
("timeout_ms", Int32),
14871482
(
14881483
"topics",
1489-
CompactArray(
1490-
("name", CompactString("utf-8")),
1484+
Array(
1485+
("name", String("utf-8")),
14911486
(
14921487
"partitions",
1493-
CompactArray(
1488+
Array(
14941489
("partition_index", Int32),
1495-
("replicas", CompactArray(Int32)),
1496-
("tags", TaggedFields),
1490+
("replicas", Array(Int32)),
14971491
),
14981492
),
1499-
("tags", TaggedFields),
15001493
),
15011494
),
1502-
("tags", TaggedFields),
15031495
)
15041496

15051497

@@ -1516,44 +1508,40 @@ class AlterPartitionReassignmentsRequest(
15161508
def __init__(
15171509
self,
15181510
timeout_ms: int,
1519-
topics: list[tuple[str, tuple[int, list[int], TaggedFields], TaggedFields]],
1520-
tags: TaggedFields,
1511+
topics: list[tuple[str, tuple[int, list[int]]]],
15211512
):
15221513
self._timeout_ms = timeout_ms
15231514
self._topics = topics
1524-
self._tags = tags
15251515

15261516
def build(
15271517
self, request_struct_class: type[AlterPartitionReassignmentsRequestStruct]
15281518
) -> AlterPartitionReassignmentsRequestStruct:
1529-
return request_struct_class(self._timeout_ms, self._topics, self._tags)
1519+
return request_struct_class(self._timeout_ms, self._topics)
15301520

15311521

15321522
class ListPartitionReassignmentsResponse_v0(Response):
15331523
API_KEY = 46
15341524
API_VERSION = 0
1525+
FLEXIBLE_VERSION = True
15351526
SCHEMA = Schema(
15361527
("throttle_time_ms", Int32),
15371528
("error_code", Int16),
1538-
("error_message", CompactString("utf-8")),
1529+
("error_message", String("utf-8")),
15391530
(
15401531
"topics",
1541-
CompactArray(
1542-
("name", CompactString("utf-8")),
1532+
Array(
1533+
("name", String("utf-8")),
15431534
(
15441535
"partitions",
1545-
CompactArray(
1536+
Array(
15461537
("partition_index", Int32),
1547-
("replicas", CompactArray(Int32)),
1548-
("adding_replicas", CompactArray(Int32)),
1549-
("removing_replicas", CompactArray(Int32)),
1550-
("tags", TaggedFields),
1538+
("replicas", Array(Int32)),
1539+
("adding_replicas", Array(Int32)),
1540+
("removing_replicas", Array(Int32)),
15511541
),
15521542
),
1553-
("tags", TaggedFields),
15541543
),
15551544
),
1556-
("tags", TaggedFields),
15571545
)
15581546

15591547

@@ -1566,13 +1554,11 @@ class ListPartitionReassignmentsRequest_v0(RequestStruct):
15661554
("timeout_ms", Int32),
15671555
(
15681556
"topics",
1569-
CompactArray(
1570-
("name", CompactString("utf-8")),
1571-
("partition_index", CompactArray(Int32)),
1572-
("tags", TaggedFields),
1557+
Array(
1558+
("name", String("utf-8")),
1559+
("partition_index", Array(Int32)),
15731560
),
15741561
),
1575-
("tags", TaggedFields),
15761562
)
15771563

15781564

@@ -1589,17 +1575,15 @@ class ListPartitionReassignmentsRequest(
15891575
def __init__(
15901576
self,
15911577
timeout_ms: int,
1592-
topics: list[tuple[str, tuple[int, list[int], TaggedFields], TaggedFields]],
1593-
tags: TaggedFields,
1578+
topics: list[tuple[str, tuple[int, list[int]]]],
15941579
):
15951580
self._timeout_ms = timeout_ms
15961581
self._topics = topics
1597-
self._tags = tags
15981582

15991583
def build(
16001584
self, request_struct_class: type[ListPartitionReassignmentsRequestStruct]
16011585
) -> ListPartitionReassignmentsRequestStruct:
1602-
return request_struct_class(self._timeout_ms, self._topics, self._tags)
1586+
return request_struct_class(self._timeout_ms, self._topics)
16031587

16041588

16051589
class DeleteRecordsResponse_v0(Response):
@@ -1633,26 +1617,8 @@ class DeleteRecordsResponse_v1(Response):
16331617
class DeleteRecordsResponse_v2(Response):
16341618
API_KEY = 21
16351619
API_VERSION = 2
1636-
SCHEMA = Schema(
1637-
("throttle_time_ms", Int32),
1638-
(
1639-
"topics",
1640-
CompactArray(
1641-
("name", CompactString("utf-8")),
1642-
(
1643-
"partitions",
1644-
CompactArray(
1645-
("partition_index", Int32),
1646-
("low_watermark", Int64),
1647-
("error_code", Int16),
1648-
("tags", TaggedFields),
1649-
),
1650-
),
1651-
("tags", TaggedFields),
1652-
),
1653-
),
1654-
("tags", TaggedFields),
1655-
)
1620+
FLEXIBLE_VERSION = True
1621+
SCHEMA = DeleteRecordsResponse_v0.SCHEMA
16561622

16571623

16581624
class DeleteRecordsRequest_v0(RequestStruct):
@@ -1689,25 +1655,7 @@ class DeleteRecordsRequest_v2(RequestStruct):
16891655
API_VERSION = 2
16901656
FLEXIBLE_VERSION = True
16911657
RESPONSE_TYPE = DeleteRecordsResponse_v2
1692-
SCHEMA = Schema(
1693-
(
1694-
"topics",
1695-
CompactArray(
1696-
("name", CompactString("utf-8")),
1697-
(
1698-
"partitions",
1699-
CompactArray(
1700-
("partition_index", Int32),
1701-
("offset", Int64),
1702-
("tags", TaggedFields),
1703-
),
1704-
),
1705-
("tags", TaggedFields),
1706-
),
1707-
),
1708-
("timeout_ms", Int32),
1709-
("tags", TaggedFields),
1710-
)
1658+
SCHEMA = DeleteRecordsRequest_v0.SCHEMA
17111659

17121660

17131661
DeleteRecordsRequestStruct: TypeAlias = (
@@ -1722,43 +1670,20 @@ def __init__(
17221670
self,
17231671
topics: Iterable[tuple[str, Iterable[tuple[int, int]]]],
17241672
timeout_ms: int,
1725-
tags: dict[int, bytes] | None = None,
17261673
) -> None:
17271674
self._topics = topics
17281675
self._timeout_ms = timeout_ms
1729-
self._tags = tags
17301676

17311677
def build(
17321678
self, request_struct_class: type[DeleteRecordsRequestStruct]
17331679
) -> DeleteRecordsRequestStruct:
1734-
if request_struct_class.API_VERSION < 2:
1735-
if self._tags is not None:
1736-
raise IncompatibleBrokerVersion(
1737-
"tags requires DeleteRecordsRequest >= v2"
1738-
)
1739-
1740-
return request_struct_class(
1741-
[
1742-
(
1743-
topic,
1744-
list(partitions),
1745-
)
1746-
for (topic, partitions) in self._topics
1747-
],
1748-
self._timeout_ms,
1749-
)
17501680
return request_struct_class(
17511681
[
17521682
(
17531683
topic,
1754-
[
1755-
(partition, before_offset, {})
1756-
for partition, before_offset in partitions
1757-
],
1758-
{},
1684+
list(partitions),
17591685
)
17601686
for (topic, partitions) in self._topics
17611687
],
17621688
self._timeout_ms,
1763-
self._tags or {},
17641689
)

0 commit comments

Comments
 (0)