diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 9d6266e1..0f6b7a73 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -321,6 +321,7 @@ def __init__( ) self._closed = False + self._started = False # Warn if producer was not closed properly # We don't attempt to close the Consumer, as __del__ is synchronous @@ -340,13 +341,28 @@ def __del__(self, _warnings=warnings): self._loop.call_exception_handler(context) async def start(self): - """Connect to Kafka cluster and check server version""" + """Connect to Kafka cluster and check server version. + + This method is idempotent - calling it multiple times on the same + producer instance will only start the producer once. This prevents + creating multiple sender tasks which would cause a busy loop due to + multiple senders sharing the same message accumulator. + """ + if self._started: + log.debug("Producer already started, skipping duplicate start() call") + return + assert self._loop is get_running_loop(), ( "Please create objects with the same loop as running with" ) log.debug("Starting the Kafka producer") # trace - await self.client.bootstrap() - await self._sender.start() + self._started = True + try: + await self.client.bootstrap() + await self._sender.start() + except Exception: + self._started = False + raise log.debug("Kafka producer started") async def flush(self):