Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion src/a2a/server/request_handlers/default_request_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
TaskPushNotificationConfig,
TaskQueryParams,
TaskState,
TaskStatus,
UnsupportedOperationError,
)
from a2a.utils.errors import ServerError
Expand Down Expand Up @@ -286,7 +287,7 @@
if isinstance(latest_task, Task):
await self._push_sender.send_notification(latest_task)

async def on_message_send(

Check failure on line 290 in src/a2a/server/request_handlers/default_request_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

ruff (PLR0915)

src/a2a/server/request_handlers/default_request_handler.py:290:15: PLR0915 Too many statements (51 > 50)
self,
params: MessageSendParams,
context: ServerCallContext | None = None,
Expand All @@ -295,6 +296,13 @@

Starts the agent execution for the message and waits for the final
result (Task or Message).

When ``blocking`` is ``False``, the handler returns the task
immediately without waiting for executor events and processes
everything in the background. Results are delivered via push
notifications. This avoids the latency introduced by the
``EventConsumer`` polling loop which can add seconds of delay
when the event loop is busy with other work.
"""
(
_task_manager,
Expand All @@ -311,6 +319,46 @@
if params.configuration and params.configuration.blocking is False:
blocking = False

# Non-blocking fast path: return the task immediately and process
# events entirely in the background via push notifications.
if not blocking:
task = await _task_manager.get_task()
if not task:
task = Task(
id=task_id,
context_id=params.message.context_id,
status=TaskStatus(state=TaskState.submitted),
history=[params.message],
)
Comment thread
ruimgf marked this conversation as resolved.

async def _background_consume() -> None:
try:
async for _event in result_aggregator.consume_and_emit(
consumer
):
await self._send_push_notification_if_needed(
task_id, result_aggregator
)
except Exception:
logger.exception(
'Background event consumption failed for task %s',
task_id,
)
finally:
await self._cleanup_producer(producer_task, task_id)

bg_task = asyncio.create_task(_background_consume())
bg_task.set_name(f'non_blocking_consume:{task_id}')
self._track_background_task(bg_task)

if params.configuration:
task = apply_history_length(
task, params.configuration.history_length
)

return task

# Blocking path: wait for completion or interruption.
interrupted_or_non_blocking = False
try:
# Create async callback for push notifications
Expand All @@ -325,7 +373,7 @@
bg_consume_task,
) = await result_aggregator.consume_and_break_on_interrupt(
consumer,
blocking=blocking,
blocking=True,
event_callback=push_notification_callback,
)

Expand Down
Loading