Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES/12281.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Fixed "Future exception was never retrieved" warning when a request handler
is cancelled during TCP write backpressure. ``_drain_helper`` now awaits the
drain waiter directly instead of wrapping it in :func:`asyncio.shield`
-- by :user:`joaquinhuigomez`.
2 changes: 1 addition & 1 deletion aiohttp/base_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,4 @@ async def _drain_helper(self) -> None:
if waiter is None:
waiter = self._loop.create_future()
self._drain_waiter = waiter
await asyncio.shield(waiter)
await waiter
44 changes: 44 additions & 0 deletions tests/test_base_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,50 @@ async def wait() -> None:
assert pr._drain_waiter is None


async def test_cancelled_drain_no_unhandled_future_warning() -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, but I don't feel this test is validating aiohttp's behaviour. It may or may not match aiohttp's implementation at a given point in time.

As mentioned in the issue, I'd like to see a full functional test which shouldn't be too difficult given the reporter's reproducer and the example test I linked to.

"""Cancelling a task during backpressure must not leave an orphaned future.

When the handler task is cancelled while awaiting _drain_helper and
connection_lost fires with an exception afterward, the waiter should
already be done (cancelled) so set_exception is skipped. No "Future
exception was never retrieved" warning should appear.

Regression test for https://github.com/aio-libs/aiohttp/issues/12281
"""
loop = asyncio.get_event_loop()
pr = BaseProtocol(loop=loop)
tr = mock.Mock()
pr.connection_made(tr)
pr.pause_writing()

fut = loop.create_future()

async def wait() -> None:
fut.set_result(None)
await pr._drain_helper()

t = loop.create_task(wait())
await fut
t.cancel()
with suppress(asyncio.CancelledError):
await t

# After cancellation the waiter should be done (cancelled), so
# connection_lost with an exception must not call set_exception.
assert pr._drain_waiter is not None
waiter = pr._drain_waiter
assert waiter.done(), "waiter must be cancelled when task is cancelled"

# This previously left an orphaned future with an unhandled exception
# because asyncio.shield kept the original waiter alive and uncancelled.
exc = RuntimeError("connection died")
pr.connection_lost(exc)
assert pr._drain_waiter is None

# Verify the waiter is cancelled, not set with an exception.
assert waiter.cancelled() # type: ignore[unreachable]


async def test_parallel_drain_race_condition() -> None:
loop = asyncio.get_event_loop()
pr = BaseProtocol(loop=loop)
Expand Down
Loading