From 8845d451a18e13d2228c3629f41b39dcc1f439b0 Mon Sep 17 00:00:00 2001 From: Miguel Ingram Date: Tue, 2 Jun 2026 16:30:53 -0500 Subject: [PATCH] fix(flow): lock nested pydantic models in StateProxy StateProxy wrapped list and dict attributes with the flow-state lock but returned nested Pydantic BaseModel attributes unwrapped, so mutations on them bypassed the lock and could race under parallel listeners (silent state corruption). BaseModel is the default Flow state type, so this is the common case. Add a LockedModelProxy that routes BaseModel attribute reads/writes through the lock (recursively wrapping nested lists/dicts/models) and dispatch to it from StateProxy.__getattr__. Adds a semantic-negative test that fails before this change and passes after. --- lib/crewai/src/crewai/flow/runtime.py | 91 ++++++++++++++++++++ lib/crewai/tests/test_flow.py | 117 ++++++++++++++++++++++++++ 2 files changed, 208 insertions(+) diff --git a/lib/crewai/src/crewai/flow/runtime.py b/lib/crewai/src/crewai/flow/runtime.py index 33bfbacea7..03374d40bc 100644 --- a/lib/crewai/src/crewai/flow/runtime.py +++ b/lib/crewai/src/crewai/flow/runtime.py @@ -486,6 +486,93 @@ def __ne__(self, other: object) -> bool: return not self.__eq__(other) +class LockedModelProxy: + """Lock-guarded proxy for a nested Pydantic ``BaseModel`` held in flow state. + + ``StateProxy`` wraps ``list`` and ``dict`` attributes so that mutations on + nested containers acquire the flow state lock. Pydantic ``BaseModel`` + instances were previously returned unwrapped (the ``return value`` fall + through), so ``flow.state.profile.name = "x"`` mutated the model attribute + entirely outside the lock. When parallel listeners mutate nested-model + attributes, those writes race and corrupt state silently. This proxy closes + that gap by routing every attribute read/write on the wrapped model through + the same lock, recursively wrapping nested lists, dicts, and models. + """ + + __slots__ = ("_lock", "_model") + + def __init__(self, model: BaseModel, lock: threading.Lock) -> None: + """Wrap ``model``, guarding all attribute access with ``lock``. + + Args: + model: The nested Pydantic model held in the flow's state. + lock: The shared flow-state lock that serializes access to the + state tree. The same lock instance is propagated to every + nested proxy so the whole subtree is guarded by one lock. + """ + object.__setattr__(self, "_model", model) + object.__setattr__(self, "_lock", lock) + + def __getattr__(self, name: str) -> Any: + """Read ``name`` from the wrapped model while holding the lock. + + The attribute is fetched under the flow-state lock, then re-wrapped so + deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`, + ``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` -> + :class:`LockedModelProxy` (recursively). Scalars are returned as-is. + + Args: + name: Attribute name to read from the wrapped model. + + Returns: + The attribute value, wrapped in the matching lock-aware proxy when + it is a list, dict, or nested ``BaseModel``; otherwise the raw value. + """ + lock = object.__getattribute__(self, "_lock") + model = object.__getattribute__(self, "_model") + with lock: + value = getattr(model, name) + + if isinstance(value, list): + return LockedListProxy(value, lock) + if isinstance(value, dict): + return LockedDictProxy(value, lock) + if isinstance(value, BaseModel): + return LockedModelProxy(value, lock) + return value + + def __setattr__(self, name: str, value: Any) -> None: + """Write ``name`` on the wrapped model while holding the lock. + + The private ``_model`` / ``_lock`` slots are assigned directly (they + carry no shared state and must be settable before the lock exists); + every other attribute write acquires the flow-state lock before mutating + the wrapped model, so concurrent listeners cannot interleave writes. A + value read back through a proxy (``LockedListProxy`` / ``LockedDictProxy`` + / ``LockedModelProxy``) is unwrapped to its native object first, so a + proxy wrapper is never persisted inside the model. + + Args: + name: Attribute name to set on the wrapped model. + value: Value to assign. + """ + if name in ("_model", "_lock"): + object.__setattr__(self, name, value) + return + + if isinstance(value, LockedListProxy): + value = value._list + elif isinstance(value, LockedDictProxy): + value = value._dict + elif isinstance(value, LockedModelProxy): + value = object.__getattribute__(value, "_model") + + lock = object.__getattribute__(self, "_lock") + model = object.__getattribute__(self, "_model") + with lock: + setattr(model, name, value) + + class StateProxy(Generic[T]): """Proxy that provides thread-safe access to flow state. @@ -506,6 +593,8 @@ def __getattr__(self, name: str) -> Any: return LockedListProxy(value, lock) if isinstance(value, dict): return LockedDictProxy(value, lock) + if isinstance(value, BaseModel): + return LockedModelProxy(value, lock) return value def __setattr__(self, name: str, value: Any) -> None: @@ -516,6 +605,8 @@ def __setattr__(self, name: str, value: Any) -> None: value = value._list elif isinstance(value, LockedDictProxy): value = value._dict + elif isinstance(value, LockedModelProxy): + value = object.__getattribute__(value, "_model") with object.__getattribute__(self, "_proxy_lock"): setattr(object.__getattribute__(self, "_proxy_state"), name, value) diff --git a/lib/crewai/tests/test_flow.py b/lib/crewai/tests/test_flow.py index e5eaade212..3481506900 100644 --- a/lib/crewai/tests/test_flow.py +++ b/lib/crewai/tests/test_flow.py @@ -2,6 +2,7 @@ import asyncio import threading +import time from datetime import datetime from typing import Optional @@ -17,6 +18,7 @@ MethodExecutionStartedEvent, ) from crewai.flow.flow import Flow, and_, listen, or_, router, start +from crewai.flow.runtime import LockedModelProxy, StateProxy def test_simple_sequential_flow(): @@ -420,6 +422,121 @@ def second_method(self): assert flow.state.message == "final" +def test_flow_state_proxy_wraps_nested_pydantic_models(): + """Semantic-negative test for the nested-BaseModel locking gap. + + ``StateProxy`` only wrapped ``list`` and ``dict`` attributes; nested Pydantic + models fell through unwrapped, so attribute writes on them bypassed the flow + state lock and could race under parallel listeners. With the + ``LockedModelProxy`` fix, ``flow.state.profile`` is a lock-guarded proxy + (distinct from the raw model and exposing ``_lock``), and mutations route + through the lock. Before the fix, ``profile is not self._state.profile`` and + ``hasattr(profile, "_lock")`` are both False and this test fails. + """ + + class UserProfile(BaseModel): + counter: int = 0 + + class MyStructuredState(BaseModel): + profile: UserProfile = UserProfile() + + class NestedModelFlow(Flow[MyStructuredState]): + @start() + def first_method(self): + profile = self.state.profile + # The proxy must wrap the nested model, not return it raw. + assert profile is not self._state.profile + assert hasattr(profile, "_lock") + + # Lock-guarded mutation is reflected on the underlying model. + profile.counter += 1 + assert self.state.profile.counter == 1 + + flow = NestedModelFlow() + flow.kickoff() + + +def test_locked_model_proxy_serializes_writes_under_thread_contention(): + """Behavioral concurrency test for the nested-``BaseModel`` locking fix. + + Unlike the structural test above (which asserts the model is now lock-wrapped), + this drives real thread contention to prove the lock is actually *engaged*: a + nested-model attribute write must acquire the flow-state lock, so it cannot + proceed while another holder — e.g. a state snapshot/persist — holds that lock. + + Before the ``LockedModelProxy`` fix, ``state.profile`` is the raw model and its + writes bypass the lock entirely, so the write would complete *while the lock is + held* (the race this PR closes). After the fix the write blocks until the lock + is released. The assertion ``not write_done.is_set()`` therefore fails before the + fix and passes after. + """ + + class UserProfile(BaseModel): + value: int = 0 + + class MyStructuredState(BaseModel): + profile: UserProfile = UserProfile() + + lock = threading.Lock() + proxy = StateProxy(MyStructuredState(), lock) + + write_started = threading.Event() + write_done = threading.Event() + + def writer() -> None: + write_started.set() + # Routes through LockedModelProxy.__setattr__ -> acquires `lock`. + proxy.profile.value = 42 + write_done.set() + + worker = threading.Thread(target=writer) + with lock: # stand in for a lock-protected state operation (snapshot/persist) + worker.start() + assert write_started.wait(timeout=2.0) + # Give the writer time to attempt the write. With the fix it is blocked on + # the lock we hold, so it must NOT have completed yet. + time.sleep(0.1) + assert not write_done.is_set(), ( + "nested-model write proceeded while the flow-state lock was held — " + "the write bypassed the lock (pre-fix race)" + ) + + # Lock released: the serialized write now completes with the written value. + assert write_done.wait(timeout=2.0) + worker.join() + assert proxy.profile.value == 42 + + +def test_state_proxy_setattr_unwraps_locked_model_proxy(): + """Assigning a proxied nested model back into state must store the native model. + + The read-side fix returns a nested ``BaseModel`` as a ``LockedModelProxy``. If + that value is assigned back (``state.profile = state.profile``), ``__setattr__`` + must unwrap it to the underlying model first, mirroring the existing + ``LockedListProxy``/``LockedDictProxy`` handling, so a proxy wrapper is never + persisted inside state. Without the unwrap, the stored value is the proxy itself. + """ + + class UserProfile(BaseModel): + value: int = 0 + + class MyStructuredState(BaseModel): + profile: UserProfile = UserProfile() + + lock = threading.Lock() + proxy = StateProxy(MyStructuredState(), lock) + + read_back = proxy.profile + assert isinstance(read_back, LockedModelProxy) + + # Re-assign the proxied model back onto state; it must be unwrapped. + proxy.profile = read_back + + stored = object.__getattribute__(proxy, "_proxy_state").profile + assert isinstance(stored, UserProfile) + assert not isinstance(stored, LockedModelProxy) + + def test_router_with_multiple_conditions(): """Test a router that triggers when any of multiple steps complete (OR condition), and another router that triggers only after all specified steps complete (AND condition).