diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 33bfbacea7..03374d40bc 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -486,6 +486,93 @@ def __ne__(self, other: object) -> bool: return not self.__eq__(other) +class LockedModelProxy: + """Lock-guarded proxy for a nested Pydantic ``BaseModel`` held in flow state. + + ``StateProxy`` wraps ``list`` and ``dict`` attributes so that mutations on + nested containers acquire the flow state lock. Pydantic ``BaseModel`` + instances were previously returned unwrapped (the ``return value`` fall + through), so ``flow.state.profile.name = "x"`` mutated the model attribute + entirely outside the lock. When parallel listeners mutate nested-model + attributes, those writes race and corrupt state silently. This proxy closes + that gap by routing every attribute read/write on the wrapped model through + the same lock, recursively wrapping nested lists, dicts, and models. + """ + + __slots__ = ("_lock", "_model") + + def __init__(self, model: BaseModel, lock: threading.Lock) -> None: + """Wrap ``model``, guarding all attribute access with ``lock``. + + Args: + model: The nested Pydantic model held in the flow's state. + lock: The shared flow-state lock that serializes access to the + state tree. The same lock instance is propagated to every + nested proxy so the whole subtree is guarded by one lock. + """ + object.__setattr__(self, "_model", model) + object.__setattr__(self, "_lock", lock) + + def __getattr__(self, name: str) -> Any: + """Read ``name`` from the wrapped model while holding the lock. + + The attribute is fetched under the flow-state lock, then re-wrapped so + deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`, + ``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` -> + :class:`LockedModelProxy` (recursively). Scalars are returned as-is. + + Args: + name: Attribute name to read from the wrapped model. + + Returns: + The attribute value, wrapped in the matching lock-aware proxy when + it is a list, dict, or nested ``BaseModel``; otherwise the raw value. + """ + lock = object.__getattribute__(self, "_lock") + model = object.__getattribute__(self, "_model") + with lock: + value = getattr(model, name) + + if isinstance(value, list): + return LockedListProxy(value, lock) + if isinstance(value, dict): + return LockedDictProxy(value, lock) + if isinstance(value, BaseModel): + return LockedModelProxy(value, lock) + return value + + def __setattr__(self, name: str, value: Any) -> None: + """Write ``name`` on the wrapped model while holding the lock. + + The private ``_model`` / ``_lock`` slots are assigned directly (they + carry no shared state and must be settable before the lock exists); + every other attribute write acquires the flow-state lock before mutating + the wrapped model, so concurrent listeners cannot interleave writes. A + value read back through a proxy (``LockedListProxy`` / ``LockedDictProxy`` + / ``LockedModelProxy``) is unwrapped to its native object first, so a + proxy wrapper is never persisted inside the model. + + Args: + name: Attribute name to set on the wrapped model. + value: Value to assign. + """ + if name in ("_model", "_lock"): + object.__setattr__(self, name, value) + return + + if isinstance(value, LockedListProxy): + value = value._list + elif isinstance(value, LockedDictProxy): + value = value._dict + elif isinstance(value, LockedModelProxy): + value = object.__getattribute__(value, "_model") + + lock = object.__getattribute__(self, "_lock") + model = object.__getattribute__(self, "_model") + with lock: + setattr(model, name, value) + + class StateProxy(Generic[T]): """Proxy that provides thread-safe access to flow state. @@ -506,6 +593,8 @@ def __getattr__(self, name: str) -> Any: return LockedListProxy(value, lock) if isinstance(value, dict): return LockedDictProxy(value, lock) + if isinstance(value, BaseModel): + return LockedModelProxy(value, lock) return value def __setattr__(self, name: str, value: Any) -> None: @@ -516,6 +605,8 @@ def __setattr__(self, name: str, value: Any) -> None: value = value._list elif isinstance(value, LockedDictProxy): value = value._dict + elif isinstance(value, LockedModelProxy): + value = object.__getattribute__(value, "_model") with object.__getattribute__(self, "_proxy_lock"): setattr(object.__getattribute__(self, "_proxy_state"), name, value) diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index e5eaade212..3481506900 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -2,6 +2,7 @@ import asyncio import threading +import time from datetime import datetime from typing import Optional @@ -17,6 +18,7 @@ MethodExecutionStartedEvent, ) from crewai.flow.flow import Flow, and_, listen, or_, router, start +from crewai.flow.runtime import LockedModelProxy, StateProxy def test_simple_sequential_flow(): @@ -420,6 +422,121 @@ def second_method(self): assert flow.state.message == "final" +def test_flow_state_proxy_wraps_nested_pydantic_models(): + """Semantic-negative test for the nested-BaseModel locking gap. + + ``StateProxy`` only wrapped ``list`` and ``dict`` attributes; nested Pydantic + models fell through unwrapped, so attribute writes on them bypassed the flow + state lock and could race under parallel listeners. With the + ``LockedModelProxy`` fix, ``flow.state.profile`` is a lock-guarded proxy + (distinct from the raw model and exposing ``_lock``), and mutations route + through the lock. Before the fix, ``profile is not self._state.profile`` and + ``hasattr(profile, "_lock")`` are both False and this test fails. + """ + + class UserProfile(BaseModel): + counter: int = 0 + + class MyStructuredState(BaseModel): + profile: UserProfile = UserProfile() + + class NestedModelFlow(Flow[MyStructuredState]): + @start() + def first_method(self): + profile = self.state.profile + # The proxy must wrap the nested model, not return it raw. + assert profile is not self._state.profile + assert hasattr(profile, "_lock") + + # Lock-guarded mutation is reflected on the underlying model. + profile.counter += 1 + assert self.state.profile.counter == 1 + + flow = NestedModelFlow() + flow.kickoff() + + +def test_locked_model_proxy_serializes_writes_under_thread_contention(): + """Behavioral concurrency test for the nested-``BaseModel`` locking fix. + + Unlike the structural test above (which asserts the model is now lock-wrapped), + this drives real thread contention to prove the lock is actually *engaged*: a + nested-model attribute write must acquire the flow-state lock, so it cannot + proceed while another holder — e.g. a state snapshot/persist — holds that lock. + + Before the ``LockedModelProxy`` fix, ``state.profile`` is the raw model and its + writes bypass the lock entirely, so the write would complete *while the lock is + held* (the race this PR closes). After the fix the write blocks until the lock + is released. The assertion ``not write_done.is_set()`` therefore fails before the + fix and passes after. + """ + + class UserProfile(BaseModel): + value: int = 0 + + class MyStructuredState(BaseModel): + profile: UserProfile = UserProfile() + + lock = threading.Lock() + proxy = StateProxy(MyStructuredState(), lock) + + write_started = threading.Event() + write_done = threading.Event() + + def writer() -> None: + write_started.set() + # Routes through LockedModelProxy.__setattr__ -> acquires `lock`. + proxy.profile.value = 42 + write_done.set() + + worker = threading.Thread(target=writer) + with lock: # stand in for a lock-protected state operation (snapshot/persist) + worker.start() + assert write_started.wait(timeout=2.0) + # Give the writer time to attempt the write. With the fix it is blocked on + # the lock we hold, so it must NOT have completed yet. + time.sleep(0.1) + assert not write_done.is_set(), ( + "nested-model write proceeded while the flow-state lock was held — " + "the write bypassed the lock (pre-fix race)" + ) + + # Lock released: the serialized write now completes with the written value. + assert write_done.wait(timeout=2.0) + worker.join() + assert proxy.profile.value == 42 + + +def test_state_proxy_setattr_unwraps_locked_model_proxy(): + """Assigning a proxied nested model back into state must store the native model. + + The read-side fix returns a nested ``BaseModel`` as a ``LockedModelProxy``. If + that value is assigned back (``state.profile = state.profile``), ``__setattr__`` + must unwrap it to the underlying model first, mirroring the existing + ``LockedListProxy``/``LockedDictProxy`` handling, so a proxy wrapper is never + persisted inside state. Without the unwrap, the stored value is the proxy itself. + """ + + class UserProfile(BaseModel): + value: int = 0 + + class MyStructuredState(BaseModel): + profile: UserProfile = UserProfile() + + lock = threading.Lock() + proxy = StateProxy(MyStructuredState(), lock) + + read_back = proxy.profile + assert isinstance(read_back, LockedModelProxy) + + # Re-assign the proxied model back onto state; it must be unwrapped. + proxy.profile = read_back + + stored = object.__getattribute__(proxy, "_proxy_state").profile + assert isinstance(stored, UserProfile) + assert not isinstance(stored, LockedModelProxy) + + def test_router_with_multiple_conditions(): """Test a router that triggers when any of multiple steps complete (OR condition), and another router that triggers only after all specified steps complete (AND condition).