Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 154 additions & 114 deletions haystack/core/pipeline/async_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import asyncio
import contextlib
import contextvars
from collections.abc import AsyncIterator, Mapping
from collections.abc import AsyncGenerator, AsyncIterator, Mapping
from typing import Any, ClassVar, cast

from haystack import logging, tracing
Expand Down Expand Up @@ -204,11 +204,43 @@ async def _wait_for_tasks(
done, _pending = await asyncio.wait(running_tasks.keys(), return_when=return_when)
for finished in done:
finished_component_name = running_tasks.pop(finished)
partial_result = finished.result()
try:
partial_result = finished.result()
except Exception:
# A component failed. Cancel and drain the remaining in-flight tasks so they don't keep running in
# the background (and leak) after the run is aborted, then re-raise the original error.
await AsyncPipeline._cancel_in_flight_tasks(running_tasks, scheduled_components)
raise
Comment thread
sjrl marked this conversation as resolved.
scheduled_components.discard(finished_component_name)
if partial_result:
yield {finished_component_name: _deepcopy_with_exceptions(partial_result)}

@staticmethod
async def _cancel_in_flight_tasks(running_tasks: dict[asyncio.Task, str], scheduled_components: set[str]) -> None:
"""
Cancels all in-flight tasks and waits for the cancellations to settle.

Called when a component fails or when the run is abandoned early so that sibling tasks don't keep running in
the background after the pipeline run is aborted. Exceptions from the cancelled tasks are suppressed since we
are already unwinding.

Note: cancellation is only effective for components that run natively async. Sync components are offloaded to
a thread via `loop.run_in_executor` and a running thread cannot be interrupted: cancelling its task abandons
the await, but the thread keeps running until the component's `run` returns. Its outputs are then discarded
(the task never writes them to the pipeline state), so state stays consistent, but side effects (e.g. API
calls) still complete and the thread can outlive this cleanup.

:param running_tasks: Mapping of in-flight tasks to component names. Cleared in place.
:param scheduled_components: Set of scheduled-but-unfinished component names. Cleared in place.
"""
for task in running_tasks:
task.cancel()
# return_exceptions=True so a failing or cancelled sibling doesn't mask the original error we re-raise.
await asyncio.gather(*running_tasks.keys(), return_exceptions=True)
for component_name in running_tasks.values():
scheduled_components.discard(component_name)
running_tasks.clear()

async def _run_component_in_isolation(
self,
*,
Expand Down Expand Up @@ -346,7 +378,7 @@ async def _runner() -> Mapping[str, Any]:

async def run_async_generator( # noqa: PLR0915,C901
self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4
) -> AsyncIterator[dict[str, Any]]:
) -> AsyncGenerator[dict[str, Any], None]:
"""
Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.

Expand Down Expand Up @@ -486,88 +518,69 @@ async def process_results():
# check if pipeline is blocked before execution
self.validate_pipeline(self._fill_queue(ordered_component_names, inputs, component_visits))

while True:
# We rebuild the priority queue every iteration: each iteration waits for one or more concurrent tasks
# to finish, which mutates `inputs` and can change many components' priorities at once, so we rebuild
# to give every scheduling decision an up-to-date view.
priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
candidate = self._get_next_runnable_component(priority_queue, component_visits)

# If we can't make progress with the queue but tasks are running, we wait for one to finish and retry
# to potentially unblock the priority queue.
if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks:
async for partial_outputs in self._wait_for_tasks(
running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED
):
yield partial_outputs
continue

# If there are no runnable components left and nothing is running, we can exit the loop.
if candidate is None and not running_tasks:
break

priority, component_name, component = candidate # type: ignore

# If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise
# a warning if it is.
if priority == ComponentPriority.BLOCKED and not running_tasks:
if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
# Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
self._find_components_blocking_pipeline(
priority_queue=priority_queue, component_visits=component_visits, inputs=inputs
)
# We always exit the loop since we cannot run the next component.
break

# If the next component is already scheduled, we wait for a task to finish to make progress.
if component_name in scheduled_components:
async for partial_outputs in self._wait_for_tasks(
running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED
):
yield partial_outputs
continue

if priority == ComponentPriority.HIGHEST:
# A HIGHEST priority component must run alone, so we hand off to the isolation helper.
async for partial_outputs in self._run_component_in_isolation(
component_name=component_name,
inputs=inputs,
pipeline_outputs=pipeline_outputs,
component_visits=component_visits,
running_tasks=running_tasks,
scheduled_components=scheduled_components,
cached_receivers=cached_receivers,
include_outputs_from=include_outputs_from,
parent_span=parent_span,
):
yield partial_outputs
continue

if priority == ComponentPriority.READY:
# Schedule this component, then schedule as many additional READY components as concurrency allows.
self._schedule_component(
component_name=component_name,
inputs=inputs,
pipeline_outputs=pipeline_outputs,
component_visits=component_visits,
running_tasks=running_tasks,
scheduled_components=scheduled_components,
ready_sem=ready_sem,
cached_receivers=cached_receivers,
include_outputs_from=include_outputs_from,
parent_span=parent_span,
)
try:
while True:
# We rebuild the priority queue every iteration: each iteration waits for one or more concurrent
# tasks to finish, which mutates `inputs` and can change many components' priorities at once, so
# we rebuild to give every scheduling decision an up-to-date view.
priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits)
candidate = self._get_next_runnable_component(priority_queue, component_visits)

# If we can't make progress with the queue but tasks are running, we wait for one to finish and
# retry to potentially unblock the priority queue.
if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks:
async for partial_outputs in self._wait_for_tasks(
running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED
):
yield partial_outputs
continue

# If there are no runnable components left and nothing is running, we can exit the loop.
if candidate is None and not running_tasks:
break

priority, component_name, component = candidate # type: ignore

# If the next component is blocked, we do a check to see if the pipeline is possibly blocked and
# raise a warning if it is.
if priority == ComponentPriority.BLOCKED and not running_tasks:
if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs):
# Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning.
self._find_components_blocking_pipeline(
priority_queue=priority_queue, component_visits=component_visits, inputs=inputs
)
# We always exit the loop since we cannot run the next component.
break

# If the next component is already scheduled, we wait for a task to finish to make progress.
if component_name in scheduled_components:
async for partial_outputs in self._wait_for_tasks(
running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED
):
yield partial_outputs
continue

if priority == ComponentPriority.HIGHEST:
# A HIGHEST priority component must run alone, so we hand off to the isolation helper.
async for partial_outputs in self._run_component_in_isolation(
component_name=component_name,
inputs=inputs,
pipeline_outputs=pipeline_outputs,
component_visits=component_visits,
running_tasks=running_tasks,
scheduled_components=scheduled_components,
cached_receivers=cached_receivers,
include_outputs_from=include_outputs_from,
parent_span=parent_span,
):
yield partial_outputs
continue

# Possibly schedule more READY tasks if concurrency not fully used
while len(priority_queue) > 0 and not ready_sem.locked():
peek_priority, peek_name = priority_queue.peek()
if peek_priority != ComponentPriority.READY:
# We stop scheduling: the next component is BLOCKED (can't run), HIGHEST (must run alone),
# or DEFER (waiting for more inputs - we only schedule it once it becomes READY).
break
priority_queue.pop()
if priority == ComponentPriority.READY:
# Schedule this component, then schedule as many additional READY components as concurrency
# allows.
self._schedule_component(
component_name=peek_name,
component_name=component_name,
inputs=inputs,
pipeline_outputs=pipeline_outputs,
component_visits=component_visits,
Expand All @@ -579,45 +592,72 @@ async def process_results():
parent_span=parent_span,
)

# We only schedule components with priority DEFER when no other tasks are running.
elif priority == ComponentPriority.DEFER and not running_tasks:
if len(priority_queue) > 0:
component_name, cached_topological_sort = self._tiebreak_waiting_components(
# Possibly schedule more READY tasks if concurrency not fully used
while len(priority_queue) > 0 and not ready_sem.locked():
peek_priority, peek_name = priority_queue.peek()
if peek_priority != ComponentPriority.READY:
# We stop scheduling: the next component is BLOCKED (can't run), HIGHEST (must run
# alone), or DEFER (waiting for more inputs - we only schedule it once it becomes
# READY).
break
priority_queue.pop()
self._schedule_component(
component_name=peek_name,
inputs=inputs,
pipeline_outputs=pipeline_outputs,
component_visits=component_visits,
running_tasks=running_tasks,
scheduled_components=scheduled_components,
ready_sem=ready_sem,
cached_receivers=cached_receivers,
include_outputs_from=include_outputs_from,
parent_span=parent_span,
)

# We only schedule components with priority DEFER when no other tasks are running.
elif priority == ComponentPriority.DEFER and not running_tasks:
if len(priority_queue) > 0:
component_name, cached_topological_sort = self._tiebreak_waiting_components(
component_name=component_name,
priority=priority,
priority_queue=priority_queue,
topological_sort=cached_topological_sort,
)

self._schedule_component(
component_name=component_name,
priority=priority,
priority_queue=priority_queue,
topological_sort=cached_topological_sort,
inputs=inputs,
pipeline_outputs=pipeline_outputs,
component_visits=component_visits,
running_tasks=running_tasks,
scheduled_components=scheduled_components,
ready_sem=ready_sem,
cached_receivers=cached_receivers,
include_outputs_from=include_outputs_from,
parent_span=parent_span,
)

self._schedule_component(
component_name=component_name,
inputs=inputs,
pipeline_outputs=pipeline_outputs,
component_visits=component_visits,
running_tasks=running_tasks,
scheduled_components=scheduled_components,
ready_sem=ready_sem,
cached_receivers=cached_receivers,
include_outputs_from=include_outputs_from,
parent_span=parent_span,
)
# To make progress, we wait for one task to complete before restarting the loop.
async for partial_outputs in self._wait_for_tasks(
running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED
):
yield partial_outputs

# To make progress, we wait for one task to complete before restarting the loop.
# Safety net: drain any leftover tasks once the scheduling loop has finished. With the current loop
# both `break` paths require `running_tasks` to be empty, so this is a no-op. We keep it so that a
# future change adding a `break` that leaves tasks in flight doesn't lose outputs.
async for partial_outputs in self._wait_for_tasks(
running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED
running_tasks, scheduled_components, return_when=asyncio.ALL_COMPLETED
):
yield partial_outputs

# Safety net: drain any leftover tasks once the scheduling loop has finished. With the current loop both
# `break` paths require `running_tasks` to be empty, so this is a no-op. We keep it so that a future change
# adding a `break` that leaves tasks in flight doesn't lose outputs.
async for partial_outputs in self._wait_for_tasks(
running_tasks, scheduled_components, return_when=asyncio.ALL_COMPLETED
):
yield partial_outputs

# Yield the final pipeline outputs.
yield pipeline_outputs
# Yield the final pipeline outputs.
yield pipeline_outputs
finally:
# If iteration is abandoned early (e.g. the consumer stops iterating the generator and closes it) or
# the run is cancelled, cancel any tasks still in flight so they don't leak.
# This is a no-op on normal completion and on a component error, since no tasks are left running by then
await self._cancel_in_flight_tasks(running_tasks, scheduled_components)

async def run_async(
self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
fixes:
- |
Fixed a task leak in ``AsyncPipeline`` when running components concurrently. Previously, if one component
raised an error while sibling components were still running, those in-flight tasks were neither awaited nor
cancelled and kept running in the background until the event loop was torn down. They are now cancelled and
drained before the original error is re-raised. The same cleanup now also applies when iteration of
``run_async_generator`` is stopped early (e.g. the consumer breaks out of the loop and closes the generator)
or the run is cancelled: any tasks still in flight are cancelled instead of leaking.
Note that cancellation only interrupts components that run natively async. Sync components are offloaded to a
worker thread, which cannot be interrupted and runs to completion in the background; its outputs are discarded,
so pipeline state stays consistent, but the component's side effects still complete.
Loading
Loading