diff --git a/aiokafka/producer/message_accumulator.py b/aiokafka/producer/message_accumulator.py index 578e9522..7848b960 100644 --- a/aiokafka/producer/message_accumulator.py +++ b/aiokafka/producer/message_accumulator.py @@ -4,6 +4,8 @@ import time from collections.abc import Sequence +import async_timeout + from aiokafka.errors import ( KafkaTimeoutError, LeaderNotAvailableError, @@ -260,9 +262,19 @@ def failure(self, exception): self._drain_waiter.set_exception(exception) async def wait_drain(self, timeout=None): - """Wait until all message from this batch is processed""" + """Wait until all message from this batch is processed. + + This implementation uses async_timeout directly instead of + asyncio.wait([single_future]) for better performance. The original + asyncio.wait() approach is inefficient for a single future as it + creates sets and registers/removes callbacks on each call. + """ waiter = self._drain_waiter - await asyncio.wait([waiter], timeout=timeout) + try: + async with async_timeout.timeout(timeout): + await waiter + except asyncio.TimeoutError: + pass if waiter.done(): waiter.result() # Check for exception