Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def __init__(
)

self._closed = False
self._started = False

# Warn if producer was not closed properly
# We don't attempt to close the Consumer, as __del__ is synchronous
Expand All @@ -340,13 +341,28 @@ def __del__(self, _warnings=warnings):
self._loop.call_exception_handler(context)

async def start(self):
"""Connect to Kafka cluster and check server version"""
"""Connect to Kafka cluster and check server version.

This method is idempotent - calling it multiple times on the same
producer instance will only start the producer once. This prevents
creating multiple sender tasks which would cause a busy loop due to
multiple senders sharing the same message accumulator.
"""
if self._started:
log.debug("Producer already started, skipping duplicate start() call")
return

assert self._loop is get_running_loop(), (
"Please create objects with the same loop as running with"
)
log.debug("Starting the Kafka producer") # trace
await self.client.bootstrap()
await self._sender.start()
self._started = True
try:
await self.client.bootstrap()
await self._sender.start()
except Exception:
self._started = False
raise
log.debug("Kafka producer started")

async def flush(self):
Expand Down