1+ from __future__ import annotations
2+
13import collections
24import copy
35import logging
46import threading
57import time
68from concurrent .futures import Future
7- from typing import Optional
9+ from typing import Any , Callable , Optional , Sequence , Set , TypedDict , Union
810
911from aiokafka import errors as Errors
12+ from aiokafka .client import CoordinationType
1013from aiokafka .conn import collect_hosts
14+ from aiokafka .protocol .commit import GroupCoordinatorResponse_v0 , GroupCoordinatorResponse_v1
15+ from aiokafka .protocol .metadata import MetadataResponse_v0 , MetadataResponse_v1 , MetadataResponse_v2 , MetadataResponse_v3 , MetadataResponse_v4 , MetadataResponse_v5
1116from aiokafka .structs import BrokerMetadata , PartitionMetadata , TopicPartition
1217
1318log = logging .getLogger (__name__ )
19+ MetadataResponse = Union [
20+ MetadataResponse_v0 ,
21+ MetadataResponse_v1 ,
22+ MetadataResponse_v2 ,
23+ MetadataResponse_v3 ,
24+ MetadataResponse_v4 ,
25+ MetadataResponse_v5 ,
26+ ]
27+ GroupCoordinatorResponse = Union [
28+ GroupCoordinatorResponse_v0 ,
29+ GroupCoordinatorResponse_v1
30+ ]
31+
32+
33+ class ClusterConfig (TypedDict ):
34+ retry_backoff_ms : int
35+ metadata_max_age_ms : int
36+ bootstrap_servers : str | list [str ]
1437
1538
1639class ClusterMetadata :
@@ -35,65 +58,65 @@ class ClusterMetadata:
3558 specified, will default to localhost:9092.
3659 """
3760
38- DEFAULT_CONFIG = {
61+ DEFAULT_CONFIG : ClusterConfig = {
3962 "retry_backoff_ms" : 100 ,
4063 "metadata_max_age_ms" : 300000 ,
4164 "bootstrap_servers" : [],
4265 }
4366
44- def __init__ (self , ** configs ):
45- self ._brokers = {} # node_id -> BrokerMetadata
46- self ._partitions = {} # topic -> partition -> PartitionMetadata
67+ def __init__ (self , ** configs : int | str | list [ str ] ):
68+ self ._brokers : dict [ str , BrokerMetadata ] = {} # node_id -> BrokerMetadata
69+ self ._partitions : dict [ str , dict [ int , PartitionMetadata ]] = {} # topic -> partition -> PartitionMetadata
4770 # node_id -> {TopicPartition...}
48- self ._broker_partitions = collections .defaultdict (set )
49- self ._groups = {} # group_name -> node_id
50- self ._last_refresh_ms = 0
51- self ._last_successful_refresh_ms = 0
52- self ._need_update = True
53- self ._future = None
54- self ._listeners = set ()
55- self ._lock = threading .Lock ()
56- self .need_all_topic_metadata = False
57- self .unauthorized_topics = set ()
58- self .internal_topics = set ()
59- self .controller = None
71+ self ._broker_partitions : dict [ int | str , set [ TopicPartition ]] = collections .defaultdict (set )
72+ self ._groups : dict [ str , int | str ] = {} # group_name -> node_id
73+ self ._last_refresh_ms : int = 0
74+ self ._last_successful_refresh_ms : int = 0
75+ self ._need_update : bool = True
76+ self ._future : Future [ ClusterMetadata ] | None = None
77+ self ._listeners : set [ Callable [[ ClusterMetadata ], Any ]] = set ()
78+ self ._lock : threading . Lock = threading .Lock ()
79+ self .need_all_topic_metadata : bool = False
80+ self .unauthorized_topics : set [ str ] = set ()
81+ self .internal_topics : set [ str ] = set ()
82+ self .controller : BrokerMetadata | None = None
6083
6184 self .config = copy .copy (self .DEFAULT_CONFIG )
6285 for key in self .config :
6386 if key in configs :
6487 self .config [key ] = configs [key ]
6588
6689 self ._bootstrap_brokers = self ._generate_bootstrap_brokers ()
67- self ._coordinator_brokers = {}
68- self ._coordinators = {}
69- self ._coordinator_by_key = {}
90+ self ._coordinator_brokers : dict [ str , BrokerMetadata ] = {}
91+ self ._coordinators : dict [ int | str , BrokerMetadata ] = {}
92+ self ._coordinator_by_key : dict [ tuple [ CoordinationType , str ], int | str ] = {}
7093
71- def _generate_bootstrap_brokers (self ):
94+ def _generate_bootstrap_brokers (self ) -> dict [ str , BrokerMetadata ] :
7295 # collect_hosts does not perform DNS, so we should be fine to re-use
7396 bootstrap_hosts = collect_hosts (self .config ["bootstrap_servers" ])
7497
75- brokers = {}
98+ brokers : dict [ str , BrokerMetadata ] = {}
7699 for i , (host , port , _ ) in enumerate (bootstrap_hosts ):
77100 node_id = f"bootstrap-{ i } "
78101 brokers [node_id ] = BrokerMetadata (node_id , host , port , None )
79102 return brokers
80103
81- def is_bootstrap (self , node_id ) :
104+ def is_bootstrap (self , node_id : str ) -> bool :
82105 return node_id in self ._bootstrap_brokers
83106
84- def brokers (self ):
107+ def brokers (self ) -> set [ BrokerMetadata ] :
85108 """Get all BrokerMetadata
86109
87110 Returns:
88111 set: {BrokerMetadata, ...}
89112 """
90113 return set (self ._brokers .values ()) or set (self ._bootstrap_brokers .values ())
91114
92- def broker_metadata (self , broker_id ) :
115+ def broker_metadata (self , broker_id : str ) -> BrokerMetadata | None :
93116 """Get BrokerMetadata
94117
95118 Arguments:
96- broker_id (int ): node_id for a broker to check
119+ broker_id (str ): node_id for a broker to check
97120
98121 Returns:
99122 BrokerMetadata or None if not found
@@ -117,7 +140,7 @@ def partitions_for_topic(self, topic: str) -> Optional[set[int]]:
117140 return None
118141 return set (self ._partitions [topic ].keys ())
119142
120- def available_partitions_for_topic (self , topic ) :
143+ def available_partitions_for_topic (self , topic : str ) -> Optional [ Set [ int ]] :
121144 """Return set of partitions with known leaders
122145
123146 Arguments:
@@ -135,7 +158,7 @@ def available_partitions_for_topic(self, topic):
135158 if metadata .leader != - 1
136159 }
137160
138- def leader_for_partition (self , partition ) :
161+ def leader_for_partition (self , partition : PartitionMetadata ) -> int | None :
139162 """Return node_id of leader, -1 unavailable, None if unknown."""
140163 if partition .topic not in self ._partitions :
141164 return None
@@ -144,7 +167,7 @@ def leader_for_partition(self, partition):
144167 return None
145168 return partitions [partition .partition ].leader
146169
147- def partitions_for_broker (self , broker_id ) :
170+ def partitions_for_broker (self , broker_id : int | str ) -> set [ TopicPartition ] | None :
148171 """Return TopicPartitions for which the broker is a leader.
149172
150173 Arguments:
@@ -156,7 +179,7 @@ def partitions_for_broker(self, broker_id):
156179 """
157180 return self ._broker_partitions .get (broker_id )
158181
159- def coordinator_for_group (self , group ) :
182+ def coordinator_for_group (self , group : str ) -> int | str | None :
160183 """Return node_id of group coordinator.
161184
162185 Arguments:
@@ -168,7 +191,7 @@ def coordinator_for_group(self, group):
168191 """
169192 return self ._groups .get (group )
170193
171- def request_update (self ):
194+ def request_update (self ) -> Future [ ClusterMetadata ] :
172195 """Flags metadata for update, return Future()
173196
174197 Actual update must be handled separately. This method will only
@@ -179,11 +202,11 @@ def request_update(self):
179202 """
180203 with self ._lock :
181204 self ._need_update = True
182- if not self ._future or self ._future .is_done :
205+ if not self ._future or self ._future .done () :
183206 self ._future = Future ()
184207 return self ._future
185208
186- def topics (self , exclude_internal_topics = True ):
209+ def topics (self , exclude_internal_topics : bool = True ) -> set [ str ] :
187210 """Get set of known topics.
188211
189212 Arguments:
@@ -201,7 +224,7 @@ def topics(self, exclude_internal_topics=True):
201224 else :
202225 return topics
203226
204- def failed_update (self , exception ) :
227+ def failed_update (self , exception : BaseException ) -> None :
205228 """Update cluster state given a failed MetadataRequest."""
206229 f = None
207230 with self ._lock :
@@ -212,7 +235,7 @@ def failed_update(self, exception):
212235 f .set_exception (exception )
213236 self ._last_refresh_ms = time .time () * 1000
214237
215- def update_metadata (self , metadata ) :
238+ def update_metadata (self , metadata : MetadataResponse ) -> None :
216239 """Update cluster state given a MetadataResponse.
217240
218241 Arguments:
@@ -241,8 +264,8 @@ def update_metadata(self, metadata):
241264
242265 _new_partitions = {}
243266 _new_broker_partitions = collections .defaultdict (set )
244- _new_unauthorized_topics = set ()
245- _new_internal_topics = set ()
267+ _new_unauthorized_topics : set [ str ] = set ()
268+ _new_internal_topics : set [ str ] = set ()
246269
247270 for topic_data in metadata .topics :
248271 if metadata .API_VERSION == 0 :
@@ -320,15 +343,15 @@ def update_metadata(self, metadata):
320343 # another fetch should be unnecessary.
321344 self ._need_update = False
322345
323- def add_listener (self , listener ) :
346+ def add_listener (self , listener : Callable [[ ClusterMetadata ], Any ]) -> None :
324347 """Add a callback function to be called on each metadata update"""
325348 self ._listeners .add (listener )
326349
327- def remove_listener (self , listener ) :
350+ def remove_listener (self , listener : Callable [[ ClusterMetadata ], Any ]) -> None :
328351 """Remove a previously added listener callback"""
329352 self ._listeners .remove (listener )
330353
331- def add_group_coordinator (self , group , response ) :
354+ def add_group_coordinator (self , group : str , response : GroupCoordinatorResponse ) -> str | None :
332355 """Update with metadata for a group coordinator
333356
334357 Arguments:
@@ -355,7 +378,7 @@ def add_group_coordinator(self, group, response):
355378 self ._groups [group ] = node_id
356379 return node_id
357380
358- def with_partitions (self , partitions_to_add ) :
381+ def with_partitions (self , partitions_to_add : Sequence [ PartitionMetadata ]) -> ClusterMetadata :
359382 """Returns a copy of cluster metadata with partitions added"""
360383 new_metadata = ClusterMetadata (** self .config )
361384 new_metadata ._brokers = copy .deepcopy (self ._brokers )
@@ -375,10 +398,10 @@ def with_partitions(self, partitions_to_add):
375398
376399 return new_metadata
377400
378- def coordinator_metadata (self , node_id ) :
401+ def coordinator_metadata (self , node_id : int | str ) -> BrokerMetadata | None :
379402 return self ._coordinators .get (node_id )
380403
381- def add_coordinator (self , node_id , host , port , rack = None , * , purpose ) :
404+ def add_coordinator (self , node_id : int | str , host : str , port : int , rack : str | None = None , * , purpose : tuple [ CoordinationType , str ]) -> None :
382405 """Keep track of all coordinator nodes separately and remove them if
383406 a new one was elected for the same purpose (For example group
384407 coordinator for group X).
@@ -390,7 +413,7 @@ def add_coordinator(self, node_id, host, port, rack=None, *, purpose):
390413 self ._coordinators [node_id ] = BrokerMetadata (node_id , host , port , rack )
391414 self ._coordinator_by_key [purpose ] = node_id
392415
393- def __str__ (self ):
416+ def __str__ (self ) -> str :
394417 return "ClusterMetadata(brokers: %d, topics: %d, groups: %d)" % (
395418 len (self ._brokers ),
396419 len (self ._partitions ),
0 commit comments