From ee40fafe6212fc3e41b457e18530e6b7e5367105 Mon Sep 17 00:00:00 2001 From: Sebastian Husch Lee Date: Mon, 1 Jun 2026 09:15:10 +0200 Subject: [PATCH] Allow running asyncpipeline.run in a place with an existing loop --- haystack/core/pipeline/async_pipeline.py | 65 +++++++++++++++---- ...-run-in-running-loop-7c0e9f1a2b3d4e5f.yaml | 10 +++ test/core/pipeline/test_async_pipeline.py | 48 ++++++++++++-- 3 files changed, 105 insertions(+), 18 deletions(-) create mode 100644 releasenotes/notes/async-pipeline-run-in-running-loop-7c0e9f1a2b3d4e5f.yaml diff --git a/haystack/core/pipeline/async_pipeline.py b/haystack/core/pipeline/async_pipeline.py index 4d8b93526a..a904c45706 100644 --- a/haystack/core/pipeline/async_pipeline.py +++ b/haystack/core/pipeline/async_pipeline.py @@ -4,7 +4,8 @@ import asyncio import contextvars -from collections.abc import AsyncIterator, Mapping +import threading +from collections.abc import AsyncIterator, Coroutine, Mapping from typing import Any from haystack import logging, tracing @@ -25,6 +26,43 @@ logger = logging.getLogger(__name__) +def _run_coroutine_in_new_loop(coro: Coroutine[Any, Any, Any]) -> Any: + """ + Run ``coro`` to completion on a fresh event loop in a short-lived dedicated thread. + + This makes ``AsyncPipeline.run()`` usable even when it is called from a thread that already has a + running event loop (for example inside a Jupyter notebook, or when synchronous ``run()`` is invoked + from within an async application). In those situations ``asyncio.run()`` cannot be used because a loop + is already running in the calling thread, so we run the coroutine on its own loop in a separate + thread and block the caller until it finishes. The thread and loop are created per call and torn down + when the coroutine completes. + + The caller's :mod:`contextvars` context (e.g. the active tracing span) is copied and used to run the + coroutine so that context-dependent behavior is preserved across the thread boundary. + + :param coro: The coroutine to execute. + :returns: The result returned by the coroutine. + """ + # Copy the caller's context so the coroutine sees the same context variables (active tracing span, + # etc.). Running ``asyncio.run`` inside ``ctx.run`` makes the new loop's main task inherit this context. + ctx = contextvars.copy_context() + box: dict[str, Any] = {} + + def _worker() -> None: + try: + box["result"] = ctx.run(asyncio.run, coro) + except BaseException as error: # noqa: BLE001 - captured and re-raised in the calling thread + box["error"] = error + + thread = threading.Thread(target=_worker, name="haystack-async-pipeline-run", daemon=True) + thread.start() + thread.join() + + if "error" in box: + raise box["error"] + return box["result"] + + class AsyncPipeline(PipelineBase): """ Asynchronous version of the Pipeline orchestration engine. @@ -591,7 +629,11 @@ def run( Internally, the pipeline components are executed asynchronously, but the method itself will block until the entire pipeline execution is complete. - In case you need asynchronous methods, consider using `run_async` or `run_async_generator`. + This method can be called both from a regular synchronous context and from a thread that already + has a running event loop (for example inside a Jupyter notebook). In the latter case the pipeline + is executed on its own event loop in a separate thread to avoid conflicting with the running loop, + and the call still blocks until completion. If you are already in an async context, prefer + `run_async` or `run_async_generator` to avoid blocking the event loop. Usage: ```python @@ -690,20 +732,15 @@ def run( Or if a Component fails or returns output in an unsupported type. :raises PipelineMaxComponentRuns: If a Component reaches the maximum number of times it can be run in this Pipeline. - :raises RuntimeError: - If called from within an async context. Use `run_async` instead. """ + coro = self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit) try: asyncio.get_running_loop() except RuntimeError: - # No running loop: safe to use asyncio.run() - return asyncio.run( - self.run_async( - data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit - ) - ) + # No event loop running in this thread: safe to run directly in a fresh loop. + return asyncio.run(coro) else: - # Running loop present: do not create the coroutine and do not call asyncio.run() - raise RuntimeError( - "Cannot call run() from within an async context. Use 'await pipeline.run_async(...)' instead." - ) + # An event loop is already running in this thread (e.g. inside a Jupyter notebook, or when + # run() is called from within an async application). asyncio.run() cannot be used here, so we + # run the coroutine on its own loop in a separate thread and block until it completes. + return _run_coroutine_in_new_loop(coro) diff --git a/releasenotes/notes/async-pipeline-run-in-running-loop-7c0e9f1a2b3d4e5f.yaml b/releasenotes/notes/async-pipeline-run-in-running-loop-7c0e9f1a2b3d4e5f.yaml new file mode 100644 index 0000000000..8ac3f86915 --- /dev/null +++ b/releasenotes/notes/async-pipeline-run-in-running-loop-7c0e9f1a2b3d4e5f.yaml @@ -0,0 +1,10 @@ +--- +enhancements: + - | + `AsyncPipeline.run()` can now be called from a thread that already has a running event loop, + such as inside a Jupyter notebook or from synchronous code invoked within an async application. + Previously this raised `RuntimeError: Cannot call run() from within an async context`. The pipeline + is now executed on its own event loop in a separate thread, and the call still blocks until + completion. The caller's `contextvars` context (for example the active tracing span) is propagated + to the pipeline execution. When already in an async context, prefer `run_async` to avoid blocking + the event loop. diff --git a/test/core/pipeline/test_async_pipeline.py b/test/core/pipeline/test_async_pipeline.py index 3235e0741c..f92394aad3 100644 --- a/test/core/pipeline/test_async_pipeline.py +++ b/test/core/pipeline/test_async_pipeline.py @@ -3,6 +3,7 @@ # SPDX-License-Identifier: Apache-2.0 import asyncio +import contextvars import logging from dataclasses import replace @@ -10,6 +11,10 @@ from haystack import AsyncPipeline, Document, component +# Used by test_run_propagates_contextvars_into_separate_loop to verify the caller's context +# is propagated when run() offloads execution to a separate event loop. +_ctx_probe: contextvars.ContextVar[str | None] = contextvars.ContextVar("ctx_probe", default=None) + def test_async_pipeline_reentrance(waiting_component, spying_tracer): pp = AsyncPipeline() @@ -37,14 +42,49 @@ def test_run_in_sync_context(waiting_component): assert result == {"wait": {"waited_for": 0.001}} -def test_run_in_async_context_raises_runtime_error(): +def test_run_in_async_context_executes_on_separate_loop(waiting_component): + """run() must work even when called from a thread that already has a running event loop + (e.g. inside a Jupyter notebook). It should not raise and should block until completion.""" pp = AsyncPipeline() + pp.add_component("wait", waiting_component()) async def call_run(): - pp.run({}) + # A loop is already running in this thread; the bare sync run() should still work. + return pp.run({"wait_for": 0.001}) + + result = asyncio.run(call_run()) + + assert result == {"wait": {"waited_for": 0.001}} + + +def test_run_propagates_contextvars_into_separate_loop(): + """When run() offloads to a separate loop, the caller's contextvars (e.g. an active tracing + span) must be propagated to the components executing on that loop.""" + seen: dict[str, str | None] = {} + + @component + class ContextReader: + @component.output_types(value=str) + def run(self) -> dict[str, str | None]: + seen["value"] = _ctx_probe.get() + return {"value": _ctx_probe.get()} + + @component.output_types(value=str) + async def run_async(self) -> dict[str, str | None]: + seen["value"] = _ctx_probe.get() + return {"value": _ctx_probe.get()} + + pp = AsyncPipeline() + pp.add_component("reader", ContextReader()) + + async def call_run(): + _ctx_probe.set("from-caller") + return pp.run({}) + + result = asyncio.run(call_run()) - with pytest.raises(RuntimeError, match="Cannot call run\\(\\) from within an async context"): - asyncio.run(call_run()) + assert result == {"reader": {"value": "from-caller"}} + assert seen["value"] == "from-caller" def test_component_with_empty_dict_as_output_appears_in_results():