Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import time
from collections.abc import Sequence

import async_timeout

from aiokafka.errors import (
KafkaTimeoutError,
LeaderNotAvailableError,
Expand Down Expand Up @@ -260,9 +262,19 @@ def failure(self, exception):
self._drain_waiter.set_exception(exception)

async def wait_drain(self, timeout=None):
"""Wait until all message from this batch is processed"""
"""Wait until all message from this batch is processed.

This implementation uses async_timeout directly instead of
asyncio.wait([single_future]) for better performance. The original
asyncio.wait() approach is inefficient for a single future as it
creates sets and registers/removes callbacks on each call.
"""
waiter = self._drain_waiter
await asyncio.wait([waiter], timeout=timeout)
try:
async with async_timeout.timeout(timeout):
await waiter
except asyncio.TimeoutError:
pass
if waiter.done():
waiter.result() # Check for exception

Expand Down