From 725c5c63bfd9f9c7d5bc5d866855efb797c28875 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Wed, 3 Jun 2026 14:19:50 +0200 Subject: [PATCH 1/4] cancle in flight tasks when a component errors --- haystack/core/pipeline/async_pipeline.py | 28 ++++++++++- ...light-tasks-on-error-8b9623a305717328.yaml | 7 +++ test/core/pipeline/test_async_pipeline.py | 47 +++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 8606b38fb2..05b256c8b6 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -204,11 +204,37 @@ async def _wait_for_tasks( done, _pending = await asyncio.wait(running_tasks.keys(), return_when=return_when) for finished in done: finished_component_name = running_tasks.pop(finished) - partial_result = finished.result() + try: + partial_result = finished.result() + except Exception: + # A component failed. Cancel and drain the remaining in-flight tasks so they don't keep running in + # the background (and leak) after the run is aborted, then re-raise the original error. + await AsyncPipeline._cancel_in_flight_tasks(running_tasks, scheduled_components) + raise scheduled_components.discard(finished_component_name) if partial_result: yield {finished_component_name: _deepcopy_with_exceptions(partial_result)} + @staticmethod + async def _cancel_in_flight_tasks(running_tasks: dict[asyncio.Task, str], scheduled_components: set[str]) -> None: + """ + Cancels all in-flight tasks and waits for the cancellations to settle. + + Called when a component fails so that sibling tasks don't keep running in the background after the pipeline + run is aborted. Exceptions from the cancelled tasks are suppressed since we are already unwinding from the + original error. + + :param running_tasks: Mapping of in-flight tasks to component names. Cleared in place. + :param scheduled_components: Set of scheduled-but-unfinished component names. Cleared in place. + """ + for task in running_tasks: + task.cancel() + # return_exceptions=True so a failing or cancelled sibling doesn't mask the original error we re-raise. + await asyncio.gather(*running_tasks.keys(), return_exceptions=True) + for component_name in running_tasks.values(): + scheduled_components.discard(component_name) + running_tasks.clear() + async def _run_component_in_isolation( self, *, diff --git a/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml b/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml new file mode 100644 index 0000000000..90aae8e398 --- /dev/null +++ b/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml @@ -0,0 +1,7 @@ +--- +fixes: + - | + Fixed a task leak in ``AsyncPipeline`` when running components concurrently. Previously, if one component + raised an error while sibling components were still running, those in-flight tasks were neither awaited nor + cancelled and kept running in the background until the event loop was torn down. They are now cancelled and + drained before the original error is re-raised. diff --git a/test/core/pipeline/test_async_pipeline.py b/test/core/pipeline/test_async_pipeline.py index 9042bac134..4d297ff397 100644 --- a/test/core/pipeline/test_async_pipeline.py +++ b/test/core/pipeline/test_async_pipeline.py @@ -10,6 +10,7 @@ from haystack import AsyncPipeline, Document, component from haystack.components.joiners import BranchJoiner +from haystack.core.errors import PipelineRuntimeError def test_async_pipeline_reentrance(waiting_component, spying_tracer): @@ -408,3 +409,49 @@ async def test_include_outputs_from_yields_even_when_consumed(self): # Even though `first`'s output is consumed by `second`, include_outputs_from forces it to be surfaced. assert results == [{"first": {"value": 6}}] assert state["pipeline_outputs"] == {"first": {"value": 6}} + + +class TestInFlightTaskCleanupOnError: + @pytest.mark.asyncio + async def test_sibling_tasks_cancelled_when_a_component_errors(self): + """When a component fails, the other in-flight tasks must be cancelled and not leak.""" + slow_started = asyncio.Event() + slow_cancelled = False + + @component + class Slow: + @component.output_types(value=str) + def run(self, text: str) -> dict[str, str]: + return {"value": text} + + @component.output_types(value=str) + async def run_async(self, text: str) -> dict[str, str]: + nonlocal slow_cancelled + slow_started.set() + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + slow_cancelled = True + raise + return {"value": text} + + @component + class Failing: + @component.output_types(value=str) + def run(self, text: str) -> dict[str, str]: + raise RuntimeError("boom") + + @component.output_types(value=str) + async def run_async(self, text: str) -> dict[str, str]: + # Fail only once the sibling is actually running, so there is an in-flight task to clean up. + await slow_started.wait() + raise RuntimeError("boom") + + pp = AsyncPipeline() + pp.add_component("slow", Slow()) + pp.add_component("failing", Failing()) + + with pytest.raises(PipelineRuntimeError): + await pp.run_async({"slow": {"text": "x"}, "failing": {"text": "y"}}, concurrency_limit=2) + + assert slow_cancelled is True From 043bdc4d75be57c9baf2a0a95411fbb38fa6d079 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 5 Jun 2026 08:40:22 +0200 Subject: [PATCH 2/4] Add drainage on cancellation of run_async_generator --- haystack/core/pipeline/async_pipeline.py | 230 +++++++++--------- ...light-tasks-on-error-8b9623a305717328.yaml | 4 +- test/core/pipeline/test_async_pipeline.py | 46 ++++ 3 files changed, 168 insertions(+), 112 deletions(-) diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 05b256c8b6..0a01e5cfe2 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -512,88 +512,69 @@ async def process_results(): # check if pipeline is blocked before execution self.validate_pipeline(self._fill_queue(ordered_component_names, inputs, component_visits)) - while True: - # We rebuild the priority queue every iteration: each iteration waits for one or more concurrent tasks - # to finish, which mutates `inputs` and can change many components' priorities at once, so we rebuild - # to give every scheduling decision an up-to-date view. - priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits) - candidate = self._get_next_runnable_component(priority_queue, component_visits) - - # If we can't make progress with the queue but tasks are running, we wait for one to finish and retry - # to potentially unblock the priority queue. - if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks: - async for partial_outputs in self._wait_for_tasks( - running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED - ): - yield partial_outputs - continue - - # If there are no runnable components left and nothing is running, we can exit the loop. - if candidate is None and not running_tasks: - break - - priority, component_name, component = candidate # type: ignore - - # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and raise - # a warning if it is. - if priority == ComponentPriority.BLOCKED and not running_tasks: - if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs): - # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning. - self._find_components_blocking_pipeline( - priority_queue=priority_queue, component_visits=component_visits, inputs=inputs - ) - # We always exit the loop since we cannot run the next component. - break - - # If the next component is already scheduled, we wait for a task to finish to make progress. - if component_name in scheduled_components: - async for partial_outputs in self._wait_for_tasks( - running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED - ): - yield partial_outputs - continue - - if priority == ComponentPriority.HIGHEST: - # A HIGHEST priority component must run alone, so we hand off to the isolation helper. - async for partial_outputs in self._run_component_in_isolation( - component_name=component_name, - inputs=inputs, - pipeline_outputs=pipeline_outputs, - component_visits=component_visits, - running_tasks=running_tasks, - scheduled_components=scheduled_components, - cached_receivers=cached_receivers, - include_outputs_from=include_outputs_from, - parent_span=parent_span, - ): - yield partial_outputs - continue - - if priority == ComponentPriority.READY: - # Schedule this component, then schedule as many additional READY components as concurrency allows. - self._schedule_component( - component_name=component_name, - inputs=inputs, - pipeline_outputs=pipeline_outputs, - component_visits=component_visits, - running_tasks=running_tasks, - scheduled_components=scheduled_components, - ready_sem=ready_sem, - cached_receivers=cached_receivers, - include_outputs_from=include_outputs_from, - parent_span=parent_span, - ) + try: + while True: + # We rebuild the priority queue every iteration: each iteration waits for one or more concurrent + # tasks to finish, which mutates `inputs` and can change many components' priorities at once, so + # we rebuild to give every scheduling decision an up-to-date view. + priority_queue = self._fill_queue(ordered_component_names, inputs, component_visits) + candidate = self._get_next_runnable_component(priority_queue, component_visits) + + # If we can't make progress with the queue but tasks are running, we wait for one to finish and + # retry to potentially unblock the priority queue. + if (candidate is None or candidate[0] == ComponentPriority.BLOCKED) and running_tasks: + async for partial_outputs in self._wait_for_tasks( + running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED + ): + yield partial_outputs + continue + + # If there are no runnable components left and nothing is running, we can exit the loop. + if candidate is None and not running_tasks: + break + + priority, component_name, component = candidate # type: ignore + + # If the next component is blocked, we do a check to see if the pipeline is possibly blocked and + # raise a warning if it is. + if priority == ComponentPriority.BLOCKED and not running_tasks: + if self._is_pipeline_possibly_blocked(current_pipeline_outputs=pipeline_outputs): + # Pipeline is most likely blocked (most likely a configuration issue) so we raise a warning. + self._find_components_blocking_pipeline( + priority_queue=priority_queue, component_visits=component_visits, inputs=inputs + ) + # We always exit the loop since we cannot run the next component. + break + + # If the next component is already scheduled, we wait for a task to finish to make progress. + if component_name in scheduled_components: + async for partial_outputs in self._wait_for_tasks( + running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED + ): + yield partial_outputs + continue + + if priority == ComponentPriority.HIGHEST: + # A HIGHEST priority component must run alone, so we hand off to the isolation helper. + async for partial_outputs in self._run_component_in_isolation( + component_name=component_name, + inputs=inputs, + pipeline_outputs=pipeline_outputs, + component_visits=component_visits, + running_tasks=running_tasks, + scheduled_components=scheduled_components, + cached_receivers=cached_receivers, + include_outputs_from=include_outputs_from, + parent_span=parent_span, + ): + yield partial_outputs + continue - # Possibly schedule more READY tasks if concurrency not fully used - while len(priority_queue) > 0 and not ready_sem.locked(): - peek_priority, peek_name = priority_queue.peek() - if peek_priority != ComponentPriority.READY: - # We stop scheduling: the next component is BLOCKED (can't run), HIGHEST (must run alone), - # or DEFER (waiting for more inputs - we only schedule it once it becomes READY). - break - priority_queue.pop() + if priority == ComponentPriority.READY: + # Schedule this component, then schedule as many additional READY components as concurrency + # allows. self._schedule_component( - component_name=peek_name, + component_name=component_name, inputs=inputs, pipeline_outputs=pipeline_outputs, component_visits=component_visits, @@ -605,45 +586,72 @@ async def process_results(): parent_span=parent_span, ) - # We only schedule components with priority DEFER when no other tasks are running. - elif priority == ComponentPriority.DEFER and not running_tasks: - if len(priority_queue) > 0: - component_name, cached_topological_sort = self._tiebreak_waiting_components( + # Possibly schedule more READY tasks if concurrency not fully used + while len(priority_queue) > 0 and not ready_sem.locked(): + peek_priority, peek_name = priority_queue.peek() + if peek_priority != ComponentPriority.READY: + # We stop scheduling: the next component is BLOCKED (can't run), HIGHEST (must run + # alone), or DEFER (waiting for more inputs - we only schedule it once it becomes + # READY). + break + priority_queue.pop() + self._schedule_component( + component_name=peek_name, + inputs=inputs, + pipeline_outputs=pipeline_outputs, + component_visits=component_visits, + running_tasks=running_tasks, + scheduled_components=scheduled_components, + ready_sem=ready_sem, + cached_receivers=cached_receivers, + include_outputs_from=include_outputs_from, + parent_span=parent_span, + ) + + # We only schedule components with priority DEFER when no other tasks are running. + elif priority == ComponentPriority.DEFER and not running_tasks: + if len(priority_queue) > 0: + component_name, cached_topological_sort = self._tiebreak_waiting_components( + component_name=component_name, + priority=priority, + priority_queue=priority_queue, + topological_sort=cached_topological_sort, + ) + + self._schedule_component( component_name=component_name, - priority=priority, - priority_queue=priority_queue, - topological_sort=cached_topological_sort, + inputs=inputs, + pipeline_outputs=pipeline_outputs, + component_visits=component_visits, + running_tasks=running_tasks, + scheduled_components=scheduled_components, + ready_sem=ready_sem, + cached_receivers=cached_receivers, + include_outputs_from=include_outputs_from, + parent_span=parent_span, ) - self._schedule_component( - component_name=component_name, - inputs=inputs, - pipeline_outputs=pipeline_outputs, - component_visits=component_visits, - running_tasks=running_tasks, - scheduled_components=scheduled_components, - ready_sem=ready_sem, - cached_receivers=cached_receivers, - include_outputs_from=include_outputs_from, - parent_span=parent_span, - ) + # To make progress, we wait for one task to complete before restarting the loop. + async for partial_outputs in self._wait_for_tasks( + running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED + ): + yield partial_outputs - # To make progress, we wait for one task to complete before restarting the loop. + # Safety net: drain any leftover tasks once the scheduling loop has finished. With the current loop + # both `break` paths require `running_tasks` to be empty, so this is a no-op. We keep it so that a + # future change adding a `break` that leaves tasks in flight doesn't lose outputs. async for partial_outputs in self._wait_for_tasks( - running_tasks, scheduled_components, return_when=asyncio.FIRST_COMPLETED + running_tasks, scheduled_components, return_when=asyncio.ALL_COMPLETED ): yield partial_outputs - # Safety net: drain any leftover tasks once the scheduling loop has finished. With the current loop both - # `break` paths require `running_tasks` to be empty, so this is a no-op. We keep it so that a future change - # adding a `break` that leaves tasks in flight doesn't lose outputs. - async for partial_outputs in self._wait_for_tasks( - running_tasks, scheduled_components, return_when=asyncio.ALL_COMPLETED - ): - yield partial_outputs - - # Yield the final pipeline outputs. - yield pipeline_outputs + # Yield the final pipeline outputs. + yield pipeline_outputs + finally: + # If iteration is abandoned early (e.g. the consumer stops iterating the generator and closes it) or + # the run is cancelled, cancel any tasks still in flight so they don't leak. + # This is a no-op on normal completion and on a component error, since no tasks are left running by then + await self._cancel_in_flight_tasks(running_tasks, scheduled_components) async def run_async( self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4 diff --git a/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml b/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml index 90aae8e398..8080d579c3 100644 --- a/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml +++ b/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml @@ -4,4 +4,6 @@ fixes: Fixed a task leak in ``AsyncPipeline`` when running components concurrently. Previously, if one component raised an error while sibling components were still running, those in-flight tasks were neither awaited nor cancelled and kept running in the background until the event loop was torn down. They are now cancelled and - drained before the original error is re-raised. + drained before the original error is re-raised. The same cleanup now also applies when iteration of + ``run_async_generator`` is stopped early (e.g. the consumer breaks out of the loop and closes the generator) + or the run is cancelled: any tasks still in flight are cancelled instead of leaking. diff --git a/test/core/pipeline/test_async_pipeline.py b/test/core/pipeline/test_async_pipeline.py index 4d297ff397..ee7fbd9d39 100644 --- a/test/core/pipeline/test_async_pipeline.py +++ b/test/core/pipeline/test_async_pipeline.py @@ -455,3 +455,49 @@ async def run_async(self, text: str) -> dict[str, str]: await pp.run_async({"slow": {"text": "x"}, "failing": {"text": "y"}}, concurrency_limit=2) assert slow_cancelled is True + + @pytest.mark.asyncio + async def test_in_flight_tasks_cancelled_when_generator_iteration_is_abandoned(self): + """When the consumer stops iterating run_async_generator early, in-flight tasks must be cancelled.""" + slow_started = asyncio.Event() + slow_cancelled = False + + @component + class Fast: + @component.output_types(value=str) + def run(self, text: str) -> dict[str, str]: + return {"value": text} + + @component.output_types(value=str) + async def run_async(self, text: str) -> dict[str, str]: + # Yield an output only once the sibling is actually running, so it is in flight when we abandon. + await slow_started.wait() + return {"value": text} + + @component + class Slow: + @component.output_types(value=str) + def run(self, text: str) -> dict[str, str]: + return {"value": text} + + @component.output_types(value=str) + async def run_async(self, text: str) -> dict[str, str]: + nonlocal slow_cancelled + slow_started.set() + try: + await asyncio.sleep(5) + except asyncio.CancelledError: + slow_cancelled = True + raise + return {"value": text} + + pp = AsyncPipeline() + pp.add_component("fast", Fast()) + pp.add_component("slow", Slow()) + + generator = pp.run_async_generator({"fast": {"text": "x"}, "slow": {"text": "y"}}, concurrency_limit=2) + async for _partial in generator: + break # abandon iteration after the first partial output + await generator.aclose() + + assert slow_cancelled is True From eb9074ba4ba7eca8256393567544f6ed7ec7039c Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 5 Jun 2026 08:43:34 +0200 Subject: [PATCH 3/4] Add note about components without run_async --- haystack/core/pipeline/async_pipeline.py | 12 +++++++++--- ...el-in-flight-tasks-on-error-8b9623a305717328.yaml | 3 +++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 0a01e5cfe2..b684cf4eec 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -220,9 +220,15 @@ async def _cancel_in_flight_tasks(running_tasks: dict[asyncio.Task, str], schedu """ Cancels all in-flight tasks and waits for the cancellations to settle. - Called when a component fails so that sibling tasks don't keep running in the background after the pipeline - run is aborted. Exceptions from the cancelled tasks are suppressed since we are already unwinding from the - original error. + Called when a component fails or when the run is abandoned early so that sibling tasks don't keep running in + the background after the pipeline run is aborted. Exceptions from the cancelled tasks are suppressed since we + are already unwinding. + + Note: cancellation is only effective for components that run natively async. Sync components are offloaded to + a thread via `loop.run_in_executor` and a running thread cannot be interrupted: cancelling its task abandons + the await, but the thread keeps running until the component's `run` returns. Its outputs are then discarded + (the task never writes them to the pipeline state), so state stays consistent, but side effects (e.g. API + calls) still complete and the thread can outlive this cleanup. :param running_tasks: Mapping of in-flight tasks to component names. Cleared in place. :param scheduled_components: Set of scheduled-but-unfinished component names. Cleared in place. diff --git a/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml b/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml index 8080d579c3..13b3e34592 100644 --- a/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml +++ b/releasenotes/notes/cancel-in-flight-tasks-on-error-8b9623a305717328.yaml @@ -7,3 +7,6 @@ fixes: drained before the original error is re-raised. The same cleanup now also applies when iteration of ``run_async_generator`` is stopped early (e.g. the consumer breaks out of the loop and closes the generator) or the run is cancelled: any tasks still in flight are cancelled instead of leaking. + Note that cancellation only interrupts components that run natively async. Sync components are offloaded to a + worker thread, which cannot be interrupted and runs to completion in the background; its outputs are discarded, + so pipeline state stays consistent, but the component's side effects still complete. From aa949cd83bdd0a74de8591698b9a5f1fbfced2a4 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Fri, 5 Jun 2026 09:05:20 +0200 Subject: [PATCH 4/4] fix type --- haystack/core/pipeline/async_pipeline.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index b684cf4eec..fefd084b57 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -5,7 +5,7 @@ import asyncio import contextlib import contextvars -from collections.abc import AsyncIterator, Mapping +from collections.abc import AsyncGenerator, AsyncIterator, Mapping from typing import Any, ClassVar, cast from haystack import logging, tracing @@ -378,7 +378,7 @@ async def _runner() -> Mapping[str, Any]: async def run_async_generator( # noqa: PLR0915,C901 self, data: dict[str, Any], include_outputs_from: set[str] | None = None, concurrency_limit: int = 4 - ) -> AsyncIterator[dict[str, Any]]: + ) -> AsyncGenerator[dict[str, Any], None]: """ Executes the pipeline step by step asynchronously, yielding partial outputs when any component finishes.