Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions src/agents/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@

T = TypeVar("T")

_STREAMING_CANCEL_TASK_DRAIN_SECONDS = 0.25


@dataclass(frozen=True)
class AgentToolInvocation:
Expand Down Expand Up @@ -677,15 +679,14 @@ def cancel(self, mode: Literal["immediate", "after_turn"] = "immediate") -> None
if mode == "immediate":
# Existing behavior - immediate shutdown
self._cleanup_tasks() # Cancel all running tasks
self.is_complete = True # Mark the run as complete to stop event streaming

while not self._input_guardrail_queue.empty():
self._input_guardrail_queue.get_nowait()

# Unblock any streamers waiting on the event queue.
self._event_queue.put_nowait(QueueCompleteSentinel())
if not self._waiting_on_event_queue:
self._drain_event_queue()
self._event_queue.put_nowait(QueueCompleteSentinel())

elif mode == "after_turn":
# Soft cancel - just set the flag
Expand Down Expand Up @@ -735,7 +736,8 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
if isinstance(item, QueueCompleteSentinel):
# Await input guardrails if they are still running, so late
# exceptions are captured.
await self._await_task_safely(self._input_guardrails_task)
if self._cancel_mode != "immediate":
await self._await_task_safely(self._input_guardrails_task)

self._event_queue.task_done()

Expand All @@ -752,6 +754,11 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
# Cancellation should return promptly, so avoid waiting on long-running tasks.
# Tasks have already been cancelled above.
self._cleanup_tasks()
self.is_complete = True
elif self._cancel_mode == "immediate":
await self._drain_cancelled_tasks()
self._check_errors()
self.is_complete = True
Comment on lines +758 to +761

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Mark timed-out streams complete after cancellation

When stream_events() itself is cancelled (for example an asyncio.wait_for(....__anext__()) timeout), the except asyncio.CancelledError path above calls self.cancel(), but cancelled stays true so this new immediate-cancel drain/completion branch is skipped. Since cancel() no longer sets is_complete synchronously and the final queue drain removes the sentinel it enqueued, a result whose run loop has not already posted its own sentinel can be left incomplete with an empty queue; following the documented advice to continue consuming stream_events() can then wait forever. Please also put the result into a terminal state in the cancelled path without blocking on the drain.

Useful? React with 👍 / 👎.

else:
# Ensure main execution completes before cleanup to avoid race conditions
# with session operations.
Expand All @@ -764,7 +771,7 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
# Safely terminate all background tasks after main execution has finished.
self._cleanup_tasks()

if not cancelled:
if not cancelled and self._cancel_mode != "immediate":
await self._run_sandbox_cleanup()
Comment on lines +774 to 775

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep sandbox cleanup alive after immediate cancel

When a sandboxed streamed run has already finished its run loop but is still in ensure_sandbox_cleanup_on_completion()'s wrapper awaiting _run_sandbox_cleanup(), calling cancel(mode="immediate") cancels run_loop_task; because that wrapper is awaiting the cleanup task, the cleanup task is cancelled too. This new guard then skips _run_sandbox_cleanup() in stream_events(), so runner-owned sandbox sessions can miss shutdown(), client delete(), and dependency close (for example, a fast model enqueues multiple events and the consumer cancels after the first event while cleanup is running). Immediate cancel should return promptly, but cleanup still needs to stay scheduled or be retried/shielded.

Useful? React with 👍 / 👎.

finally:
# Allow any pending callbacks (e.g., cancellation handlers) to enqueue their
Expand Down Expand Up @@ -846,6 +853,45 @@ def _cleanup_tasks(self):
if self._output_guardrails_task and not self._output_guardrails_task.done():
self._output_guardrails_task.cancel()

def _owned_background_tasks(self) -> list[asyncio.Task[Any]]:
return [
task
for task in (
self.run_loop_task,
self._input_guardrails_task,
self._output_guardrails_task,
)
if task is not None
]

async def _drain_cancelled_tasks(self) -> None:
tasks = self._owned_background_tasks()
if not tasks:
return

for task in tasks:
if not task.done():
task.cancel()

done, pending = await asyncio.wait(
tasks,
timeout=_STREAMING_CANCEL_TASK_DRAIN_SECONDS,
)
if done:
await asyncio.gather(*done, return_exceptions=True)

for task in pending:
task.add_done_callback(self._consume_background_task_result)

@staticmethod
def _consume_background_task_result(task: asyncio.Task[Any]) -> None:
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as exc:
logger.debug(f"Background streaming task failed after cancellation: {exc}")

def __str__(self) -> str:
return pretty_print_run_result_streaming(self)

Expand Down
89 changes: 88 additions & 1 deletion tests/test_cancel_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import pytest
from openai.types.responses import ResponseCompletedEvent

from agents import Agent, Runner
from agents import Agent, Runner, RunResultStreaming
from agents.run_context import RunContextWrapper
from agents.stream_events import RawResponsesStreamEvent

from .fake_model import FakeModel
Expand Down Expand Up @@ -126,14 +127,98 @@ async def test_cancel_cleans_up_resources():
async for _ in result.stream_events():
result.cancel()
break
remaining_events = [event async for event in result.stream_events()]
# After cancel, queues should be empty and is_complete True
assert remaining_events == []
assert result.is_complete, "Result should be marked complete after cancel."
assert result._event_queue.empty(), "Event queue should be empty after cancel."
assert result._input_guardrail_queue.empty(), (
"Input guardrail queue should be empty after cancel."
)


@pytest.mark.asyncio
async def test_cancel_immediate_drains_owned_tasks_before_marking_complete():
result = RunResultStreaming(
input="hi",
new_items=[],
raw_responses=[],
final_output=None,
input_guardrail_results=[],
output_guardrail_results=[],
tool_input_guardrail_results=[],
tool_output_guardrail_results=[],
context_wrapper=RunContextWrapper(context=None),
current_agent=Agent(name="A", model=FakeModel()),
current_turn=0,
max_turns=1,
_current_agent_output_schema=None,
trace=None,
)
cleanup_finished = [asyncio.Event() for _ in range(3)]

async def wait_until_cancelled(cleanup_event: asyncio.Event) -> None:
try:
await asyncio.Event().wait()
finally:
await asyncio.sleep(0)
cleanup_event.set()

tasks = [asyncio.create_task(wait_until_cancelled(event)) for event in cleanup_finished]
(
result.run_loop_task,
result._input_guardrails_task,
result._output_guardrails_task,
) = tasks

await asyncio.sleep(0)
result.cancel(mode="immediate")

assert result.is_complete is False

events = [event async for event in result.stream_events()]

assert events == []
assert result.is_complete is True
assert all(task.done() for task in tasks)
assert all(event.is_set() for event in cleanup_finished)


@pytest.mark.asyncio
async def test_stream_events_timeout_marks_result_complete_without_sentinel():
result = RunResultStreaming(
input="hi",
new_items=[],
raw_responses=[],
final_output=None,
input_guardrail_results=[],
output_guardrail_results=[],
tool_input_guardrail_results=[],
tool_output_guardrail_results=[],
context_wrapper=RunContextWrapper(context=None),
current_agent=Agent(name="A", model=FakeModel()),
current_turn=0,
max_turns=1,
_current_agent_output_schema=None,
trace=None,
)

async def wait_forever() -> None:
await asyncio.Event().wait()

result.run_loop_task = asyncio.create_task(wait_forever())
event_iter = result.stream_events().__aiter__()

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(event_iter.__anext__(), timeout=0.01)

assert result.is_complete is True

remaining_events = [event async for event in result.stream_events()]

assert remaining_events == []


@pytest.mark.asyncio
async def test_cancel_immediate_mode_explicit():
"""Test explicit immediate mode behaves same as default."""
Expand All @@ -145,7 +230,9 @@ async def test_cancel_immediate_mode_explicit():
async for _ in result.stream_events():
result.cancel(mode="immediate")
break
remaining_events = [event async for event in result.stream_events()]

assert remaining_events == []
assert result.is_complete
assert result._event_queue.empty()
assert result._cancel_mode == "immediate"
Expand Down
Loading