Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions haystack/core/pipeline/async_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import asyncio
import contextvars
from collections.abc import AsyncIterator, Mapping
import threading
from collections.abc import AsyncIterator, Coroutine, Mapping
from typing import Any

from haystack import logging, tracing
Expand All @@ -25,6 +26,43 @@
logger = logging.getLogger(__name__)


def _run_coroutine_in_new_loop(coro: Coroutine[Any, Any, Any]) -> Any:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would probably use:

T = TypeVar("T")

def _run_coroutine_in_new_loop(coro: Coroutine[Any, Any, T]) -> T:

as a small type refinement.

"""
Run ``coro`` to completion on a fresh event loop in a short-lived dedicated thread.

This makes ``AsyncPipeline.run()`` usable even when it is called from a thread that already has a
running event loop (for example inside a Jupyter notebook, or when synchronous ``run()`` is invoked
from within an async application). In those situations ``asyncio.run()`` cannot be used because a loop
is already running in the calling thread, so we run the coroutine on its own loop in a separate
thread and block the caller until it finishes. The thread and loop are created per call and torn down
when the coroutine completes.

The caller's :mod:`contextvars` context (e.g. the active tracing span) is copied and used to run the
coroutine so that context-dependent behavior is preserved across the thread boundary.

:param coro: The coroutine to execute.
:returns: The result returned by the coroutine.
"""
# Copy the caller's context so the coroutine sees the same context variables (active tracing span,
# etc.). Running ``asyncio.run`` inside ``ctx.run`` makes the new loop's main task inherit this context.
ctx = contextvars.copy_context()
box: dict[str, Any] = {}

def _worker() -> None:
try:
box["result"] = ctx.run(asyncio.run, coro)
except BaseException as error: # noqa: BLE001 - captured and re-raised in the calling thread
box["error"] = error

thread = threading.Thread(target=_worker, name="haystack-async-pipeline-run", daemon=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using daemon=True here? Since run() cannot return until that thread has finished, a non-daemon thread better matches the contract: the pipeline run is foreground work and should complete / raise. WDYT?

thread.start()
thread.join()

if "error" in box:
raise box["error"]
return box["result"]


class AsyncPipeline(PipelineBase):
"""
Asynchronous version of the Pipeline orchestration engine.
Expand Down Expand Up @@ -591,7 +629,11 @@ def run(
Internally, the pipeline components are executed asynchronously, but the method itself
will block until the entire pipeline execution is complete.

In case you need asynchronous methods, consider using `run_async` or `run_async_generator`.
This method can be called both from a regular synchronous context and from a thread that already
has a running event loop (for example inside a Jupyter notebook). In the latter case the pipeline
is executed on its own event loop in a separate thread to avoid conflicting with the running loop,
and the call still blocks until completion. If you are already in an async context, prefer
`run_async` or `run_async_generator` to avoid blocking the event loop.

Usage:
```python
Expand Down Expand Up @@ -690,20 +732,15 @@ def run(
Or if a Component fails or returns output in an unsupported type.
:raises PipelineMaxComponentRuns:
If a Component reaches the maximum number of times it can be run in this Pipeline.
:raises RuntimeError:
If called from within an async context. Use `run_async` instead.
"""
coro = self.run_async(data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would maybe try to create this coroutine after the exact runner strategy is known

try:
asyncio.get_running_loop()
except RuntimeError:
# No running loop: safe to use asyncio.run()
return asyncio.run(
self.run_async(
data=data, include_outputs_from=include_outputs_from, concurrency_limit=concurrency_limit
)
)
# No event loop running in this thread: safe to run directly in a fresh loop.
return asyncio.run(coro)
else:
# Running loop present: do not create the coroutine and do not call asyncio.run()
raise RuntimeError(
"Cannot call run() from within an async context. Use 'await pipeline.run_async(...)' instead."
)
# An event loop is already running in this thread (e.g. inside a Jupyter notebook, or when
# run() is called from within an async application). asyncio.run() cannot be used here, so we
# run the coroutine on its own loop in a separate thread and block until it completes.
return _run_coroutine_in_new_loop(coro)
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
enhancements:
- |
`AsyncPipeline.run()` can now be called from a thread that already has a running event loop,
such as inside a Jupyter notebook or from synchronous code invoked within an async application.
Previously this raised `RuntimeError: Cannot call run() from within an async context`. The pipeline
is now executed on its own event loop in a separate thread, and the call still blocks until
completion. The caller's `contextvars` context (for example the active tracing span) is propagated
to the pipeline execution. When already in an async context, prefer `run_async` to avoid blocking
the event loop.
48 changes: 44 additions & 4 deletions test/core/pipeline/test_async_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@
# SPDX-License-Identifier: Apache-2.0

import asyncio
import contextvars
import logging
from dataclasses import replace

import pytest

from haystack import AsyncPipeline, Document, component

# Used by test_run_propagates_contextvars_into_separate_loop to verify the caller's context
# is propagated when run() offloads execution to a separate event loop.
_ctx_probe: contextvars.ContextVar[str | None] = contextvars.ContextVar("ctx_probe", default=None)


def test_async_pipeline_reentrance(waiting_component, spying_tracer):
pp = AsyncPipeline()
Expand Down Expand Up @@ -37,14 +42,49 @@ def test_run_in_sync_context(waiting_component):
assert result == {"wait": {"waited_for": 0.001}}


def test_run_in_async_context_raises_runtime_error():
def test_run_in_async_context_executes_on_separate_loop(waiting_component):
"""run() must work even when called from a thread that already has a running event loop
(e.g. inside a Jupyter notebook). It should not raise and should block until completion."""
pp = AsyncPipeline()
pp.add_component("wait", waiting_component())

async def call_run():
pp.run({})
# A loop is already running in this thread; the bare sync run() should still work.
return pp.run({"wait_for": 0.001})

result = asyncio.run(call_run())

assert result == {"wait": {"waited_for": 0.001}}


def test_run_propagates_contextvars_into_separate_loop():
"""When run() offloads to a separate loop, the caller's contextvars (e.g. an active tracing
span) must be propagated to the components executing on that loop."""
seen: dict[str, str | None] = {}

@component
class ContextReader:
@component.output_types(value=str)
def run(self) -> dict[str, str | None]:
seen["value"] = _ctx_probe.get()
return {"value": _ctx_probe.get()}

@component.output_types(value=str)
async def run_async(self) -> dict[str, str | None]:
seen["value"] = _ctx_probe.get()
return {"value": _ctx_probe.get()}

pp = AsyncPipeline()
pp.add_component("reader", ContextReader())

async def call_run():
_ctx_probe.set("from-caller")
return pp.run({})

result = asyncio.run(call_run())

with pytest.raises(RuntimeError, match="Cannot call run\\(\\) from within an async context"):
asyncio.run(call_run())
assert result == {"reader": {"value": "from-caller"}}
assert seen["value"] == "from-caller"


def test_component_with_empty_dict_as_output_appears_in_results():
Expand Down
Loading