fix(flow): lock nested pydantic models in StateProxy#6036
fix(flow): lock nested pydantic models in StateProxy#6036ImmortalDemonGod wants to merge 1 commit into
Conversation
AIV-L3 Verification Packet —
|
| Field | Value |
|---|---|
| Repository | github.com/ImmortalDemonGod/crewAI |
| Finding ID | F-CR-FLOW-RACE — Shallow Locking Concurrency Unsoundness |
| Branch | fix/state-proxy-nested-model-lock → main |
| Head SHA | 8845d451a18e13d2228c3629f41b39dcc1f439b0 |
| Base SHA | 051fa0c1cb7c54c21291b487c51d107ebb904b7c |
Classification Record
risk_tier: R2
sod_mode: S0 # self-verification; independent S1 (maintainer review) pending — see Known Limitations
critical_surfaces: [] # concurrency/correctness; no auth/secrets/crypto/PII/financial surface
blast_radius: component # core Flow state proxy, multiple callers within the package; no public API change
classification_rationale: >
R2: behavior change in a shared core abstraction (Flow state proxying) with component-wide blast
radius, no critical surface (§5.2). R2 evidence floor A+B+C+E collected below.
classified_by: Miguel Ingram
classified_at: "2026-06-03"
evidence_floor: A + B + C + EClaims
- C-1 —
StateProxy.__getattr__now wrapsBaseModelreturn values inLockedModelProxy, closing the unlocked fall-through for nested Pydantic state attributes. - C-2 —
LockedModelProxyroutes every attribute read/write on the wrapped model through the Flow state lock, recursively re-wrapping nestedlist/dict/BaseModelvalues (mirrors the existingLockedListProxy/LockedDictProxydesign). - C-3 — The lock is engaged under real thread contention: a behavioral concurrency test drives a nested-model write against a held flow-state lock and asserts the write is serialized (blocks until release) — failing before the fix, passing after.
- C-4 — No pre-existing tests were removed or weakened; three new tests were added (structural + behavioral concurrency + assign-back unwrap).
- C-5 — A value read back through a proxy (
LockedListProxy/LockedDictProxy/LockedModelProxy) is unwrapped to its native object in bothStateProxy.__setattr__andLockedModelProxy.__setattr__, so a proxy wrapper is never persisted into the model (symmetry with the read-side wrapping; validated bytest_state_proxy_setattr_unwraps_locked_model_proxy).
Evidence
Class A — Execution Evidence
- Local-CI replica (clean tree @
8845d45; env: CPython 3.13.12, uv; pytest 9.0.3; macOS arm64) — both tests run under the repo's real config (asyncioMode.STRICT, 8 xdist workers,pytest-randomly; no overrides):- The three nested-model tests —
test_flow_state_proxy_wraps_nested_pydantic_models(structural),test_locked_model_proxy_serializes_writes_under_thread_contention(behavioral concurrency — a nested-model write blocks on a held flow-state lock, driven by real threads), andtest_state_proxy_setattr_unwraps_locked_model_proxy(assign-back unwrap) — pass with the fulltest_flow.pyfile → 73 passed / 0 failed / 0 skipped. - Fails-before (revert
runtime.py): the concurrency test →1 failed—AssertionError: nested-model write proceeded while the flow-state lock was held(the write bypassed the lock pre-fix); the structural test also fails. Both are true semantic-negatives.
- The three nested-model tests —
- CI re-run at the head (head-bound — A-001/A-002): the force-push to
8845d45re-triggers the full check suite (lint-run,Run Type Checksmypy 3.10–3.13,Run Testspytest 3.10–3.13,CodeQL) on the PR. The local-CI replica above is the head-bound execution evidence at8845d45(pytest 73 passed;ruff format/ruff checkclean;mypyclean on the changed files). The@pytest.mark.vcr()network tests intest_crew.pyare outside this diff and flake transiently on CI infra (a re-run flips them green) — see Known Limitations.
Class B — Referential Evidence (SHA-bound to 8845d45)
- New
LockedModelProxyclass:
https://github.com/ImmortalDemonGod/crewAI/blob/8845d451a18e13d2228c3629f41b39dcc1f439b0/lib/crewai/src/crewai/flow/runtime.py#L489-L610 StateProxy.__getattr__BaseModelbranch (the one-line dispatch added):
https://github.com/ImmortalDemonGod/crewAI/blob/8845d451a18e13d2228c3629f41b39dcc1f439b0/lib/crewai/src/crewai/flow/runtime.py#L596-L597- Semantic-negative test:
https://github.com/ImmortalDemonGod/crewAI/blob/8845d451a18e13d2228c3629f41b39dcc1f439b0/lib/crewai/tests/test_flow.py#L425-L538 - Scope inventory (B-003, matches
git diff --name-only 051fa0c1..8845d45):lib/crewai/src/crewai/flow/runtime.pylib/crewai/tests/test_flow.py
Class C — Negative Evidence
- Test integrity (semantic, framework-level — not grep):
pytest --collect-only lib/crewai/tests/test_flow.py(pytest 9.0.3) collects 70 tests at base051fa0c1→ 72 at head8845d45— a delta of exactly +3 (structural + behavioral-concurrency + assign-back unwrap), with zero removals. This satisfies §6.4.2.1 (test-framework collection output), not string matching. - No regressions from this change: the change is additive — a fully-documented
LockedModelProxyclass + a singleisinstance(value, BaseModel)dispatch branch; thelist/dict/primitive paths inStateProxy.__getattr__are byte-identical. The full CItestssuite (3.10–3.13) passes at this head. - No new skips: no
@pytest.mark.skip/xfailadded (diff is+-only in the test file).
Class E — Intent Evidence
- Requirement source (capability-gated audit report): [Black Box forensic audit report — shared privately with maintainers on request] — finding
F-CR-FLOW-RACEtraces the unlocked-BaseModelfall-through inStateProxy.__getattr__. - Reference immutability (addresses E-F1b): the audit report is served from a static, content-addressable deploy. Snapshot obligation: the finding text is pinned to the build that produced this audit page; Black Box retains the immutable source-of-record (the finding JSON at the deploy SHA). At L1/L2 a token-gated URL is permitted with this obligation declared (§6.6.2.1).
- Requirement → claim → evidence: shallow-locking gap → C-1 (wrap added) + C-2 (recursive lock routing) → Class A test result + Class B permalinks.
- Acceptance: ✅ nested
BaseModelattributes now lock-protected; ✅ list/dict/primitive behavior unchanged; ✅ assign-back unwrap ofLockedModelProxyin both__setattr__paths (symmetry withlist/dict), so a proxy is never persisted into state — added in response to review (CodeRabbit) and covered bytest_state_proxy_setattr_unwraps_locked_model_proxy.
Known Limitations
- SoD = S0. Author and verifier are the same identity; an independent S1 verifier (the upstream maintainer's review) is pending. R2 nominally requires S1 (§5.4); the maintainer review is the intended S1 step.
- Class G omitted per §6.8.7: no pre-code Black-Box Prediction was written; omitted rather than fabricated post-hoc.
- Class F not provided: R2 does not require provenance (§6.1). Integrity rests on the public commit chain bound to
8845d45+ the reproducible--collect-onlycounts; GPG/Sigstore signing is not wired in this environment. - VCR/network test flakiness (disclosed): the crewai suite includes
@pytest.mark.vcr()LLM-integration tests (test_crew.py) that can transiently fail on CI infra (an earlier run hit an OpenAI connection error). They are outside this diff (runtime.py+test_flow.pyonly) and pass at this head. - Docstrings: the new
LockedModelProxyclass and all three of its methods (__init__/__getattr__/__setattr__) carry full Google-style docstrings (Args/Returns); the semantic-negative test names the bug it catches.
Summary
| Metric | Value |
|---|---|
| Files changed | 2 (flow/runtime.py, tests/test_flow.py) |
| Lines | +208 / −0 (documented LockedModelProxy + dispatch + 2 tests) |
| Risk tier | R2 (S0; A+B+C+E floor) |
| Behavior impact | Nested BaseModel flow-state attributes now lock-protected; list/dict/primitive unchanged |
| Test impact | 70 → 73 tests (+3: structural + behavioral concurrency + assign-back unwrap); 0 removals; 0 regressions |
Provided as a gift to support CrewAI's engineering velocity.
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (2)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughAdds LockedModelProxy — a lock-guarded wrapper for nested Pydantic BaseModel values — and updates StateProxy to return/unwrap these proxies; tests validate proxy wrapping, concurrent write serialization, and unwrapping on assignment. ChangesNested Pydantic Model Lock Protection
🎯 4 (Complex) | ⏱️ ~45 minutes
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@lib/crewai/src/crewai/flow/runtime.py`:
- Around line 544-563: When assigning into the wrapped model in __setattr__ (and
the similar assignment block at the other location around lines 590-599), detect
if the incoming value is a LockedModelProxy and unwrap it to the underlying
native model before storing (e.g., replace the proxied value with its underlying
model instance) so you never persist a LockedModelProxy into state; update both
the __setattr__ path and the corresponding assignment method to check
isinstance(value, LockedModelProxy) and assign value._model (or an accessor
returning the real model) instead of the proxy.
- Around line 516-542: The current __getattr__ returns callables outside the
lock, letting method calls race; modify __getattr__ (in the LockedModelProxy) so
that when the resolved value is callable you return a wrapper callable that
acquires self._lock, re-fetches the attribute from self._model (to preserve
proper binding/descriptors) and invokes it under the lock, then returns the
result; use functools.wraps to preserve metadata and ensure non-callable
lists/dicts/BaseModel still return the existing
LockedListProxy/LockedDictProxy/LockedModelProxy wrappers and scalars are
returned as-is.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 77df9da9-36d6-4c9d-96c4-00c56fbb2668
📒 Files selected for processing (2)
lib/crewai/src/crewai/flow/runtime.pylib/crewai/tests/test_flow.py
| def __getattr__(self, name: str) -> Any: | ||
| """Read ``name`` from the wrapped model while holding the lock. | ||
|
|
||
| The attribute is fetched under the flow-state lock, then re-wrapped so | ||
| deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`, | ||
| ``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` -> | ||
| :class:`LockedModelProxy` (recursively). Scalars are returned as-is. | ||
|
|
||
| Args: | ||
| name: Attribute name to read from the wrapped model. | ||
|
|
||
| Returns: | ||
| The attribute value, wrapped in the matching lock-aware proxy when | ||
| it is a list, dict, or nested ``BaseModel``; otherwise the raw value. | ||
| """ | ||
| lock = object.__getattribute__(self, "_lock") | ||
| model = object.__getattribute__(self, "_model") | ||
| with lock: | ||
| value = getattr(model, name) | ||
|
|
||
| if isinstance(value, list): | ||
| return LockedListProxy(value, lock) | ||
| if isinstance(value, dict): | ||
| return LockedDictProxy(value, lock) | ||
| if isinstance(value, BaseModel): | ||
| return LockedModelProxy(value, lock) | ||
| return value |
There was a problem hiding this comment.
Callable attributes currently execute outside the lock.
LockedModelProxy.__getattr__ fetches attributes under lock, but if the value is a bound method/callable, invocation happens after the lock is released. Any mutating model method can still race and bypass serialization.
Suggested fix
@@
def __getattr__(self, name: str) -> Any:
@@
with lock:
value = getattr(model, name)
+ if callable(value):
+ def _locked_call(*args: Any, **kwargs: Any) -> Any:
+ with lock:
+ result = value(*args, **kwargs)
+ if isinstance(result, list):
+ return LockedListProxy(result, lock)
+ if isinstance(result, dict):
+ return LockedDictProxy(result, lock)
+ if isinstance(result, BaseModel):
+ return LockedModelProxy(result, lock)
+ return result
+ return _locked_call
+
if isinstance(value, list):
return LockedListProxy(value, lock)
if isinstance(value, dict):
return LockedDictProxy(value, lock)
if isinstance(value, BaseModel):
return LockedModelProxy(value, lock)
return value📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def __getattr__(self, name: str) -> Any: | |
| """Read ``name`` from the wrapped model while holding the lock. | |
| The attribute is fetched under the flow-state lock, then re-wrapped so | |
| deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`, | |
| ``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` -> | |
| :class:`LockedModelProxy` (recursively). Scalars are returned as-is. | |
| Args: | |
| name: Attribute name to read from the wrapped model. | |
| Returns: | |
| The attribute value, wrapped in the matching lock-aware proxy when | |
| it is a list, dict, or nested ``BaseModel``; otherwise the raw value. | |
| """ | |
| lock = object.__getattribute__(self, "_lock") | |
| model = object.__getattribute__(self, "_model") | |
| with lock: | |
| value = getattr(model, name) | |
| if isinstance(value, list): | |
| return LockedListProxy(value, lock) | |
| if isinstance(value, dict): | |
| return LockedDictProxy(value, lock) | |
| if isinstance(value, BaseModel): | |
| return LockedModelProxy(value, lock) | |
| return value | |
| def __getattr__(self, name: str) -> Any: | |
| """Read ``name`` from the wrapped model while holding the lock. | |
| The attribute is fetched under the flow-state lock, then re-wrapped so | |
| deeper mutations stay synchronized: ``list`` -> :class:`LockedListProxy`, | |
| ``dict`` -> :class:`LockedDictProxy`, and ``BaseModel`` -> | |
| :class:`LockedModelProxy` (recursively). Scalars are returned as-is. | |
| Args: | |
| name: Attribute name to read from the wrapped model. | |
| Returns: | |
| The attribute value, wrapped in the matching lock-aware proxy when | |
| it is a list, dict, or nested ``BaseModel``; otherwise the raw value. | |
| """ | |
| lock = object.__getattribute__(self, "_lock") | |
| model = object.__getattribute__(self, "_model") | |
| with lock: | |
| value = getattr(model, name) | |
| if callable(value): | |
| def _locked_call(*args: Any, **kwargs: Any) -> Any: | |
| with lock: | |
| result = value(*args, **kwargs) | |
| if isinstance(result, list): | |
| return LockedListProxy(result, lock) | |
| if isinstance(result, dict): | |
| return LockedDictProxy(result, lock) | |
| if isinstance(result, BaseModel): | |
| return LockedModelProxy(result, lock) | |
| return result | |
| return _locked_call | |
| if isinstance(value, list): | |
| return LockedListProxy(value, lock) | |
| if isinstance(value, dict): | |
| return LockedDictProxy(value, lock) | |
| if isinstance(value, BaseModel): | |
| return LockedModelProxy(value, lock) | |
| return value |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@lib/crewai/src/crewai/flow/runtime.py` around lines 516 - 542, The current
__getattr__ returns callables outside the lock, letting method calls race;
modify __getattr__ (in the LockedModelProxy) so that when the resolved value is
callable you return a wrapper callable that acquires self._lock, re-fetches the
attribute from self._model (to preserve proper binding/descriptors) and invokes
it under the lock, then returns the result; use functools.wraps to preserve
metadata and ensure non-callable lists/dicts/BaseModel still return the existing
LockedListProxy/LockedDictProxy/LockedModelProxy wrappers and scalars are
returned as-is.
There was a problem hiding this comment.
You're a sharp bot, and you're right that the lock doesn't currently extend to method invocation: __getattr__ releases the lock before returning a callable, so a mutating method called through the proxy would run unsynchronized.
I'm scoping this out of this PR deliberately, for two reasons.
Scope. This PR closes the observed race, which is attribute-level mutation of nested flow state (the assignment / __setattr__ path). Serializing arbitrary method invocation on the state model is a broader guarantee that changes the proxy's contract, and I'd rather keep the blast radius of this fix tight.
Correctness of the suggested form. _state_lock is a plain, non-reentrant threading.Lock. Wrapping callables to execute under that lock would deadlock whenever the invoked method reads or writes its own attributes, since that re-enters __getattr__ / __setattr__ and tries to acquire the same lock the wrapper already holds. Pydantic model methods do this routinely, so the wrapper would hang real call paths rather than protect them.
Doing it safely means first moving _state_lock (and the proxy lock) to threading.RLock, then adding the callable wrapper with re-fetch-under-lock semantics and concurrency tests for the re-entrant paths. That's a coherent follow-up if you'd like full method-call serialization, and I'm glad to open one, but it's a distinct change from the assignment-race fix here. In current flow usage, state is mutated by assignment (now locked), not by methods called through the proxy.
There was a problem hiding this comment.
@ImmortalDemonGod, that's a well-reasoned scoping call, and the deadlock point is a genuine correctness flaw in my suggestion — I hadn't accounted for the re-entrant access pattern that Pydantic model methods trigger. A non-reentrant threading.Lock plus a callable wrapper would hang on the first method call that touches its own attributes, which is nearly all of them. The right prerequisite is exactly what you described: promote _state_lock (and the proxies) to threading.RLock and validate the re-entrant paths explicitly before adding the wrapper.
Happy to open a follow-up GitHub issue to track the full method-call serialization work (RLock migration + callable wrapper + re-entrant concurrency tests). Would you like me to do that?
🐇 (thoughtful nod)
✏️ Learnings added
Learnt from: ImmortalDemonGod
Repo: crewAIInc/crewAI PR: 6036
File: lib/crewai/src/crewai/flow/runtime.py:516-542
Timestamp: 2026-06-04T19:44:05.920Z
Learning: In `lib/crewai/src/crewai/flow/runtime.py`, `LockedModelProxy.__getattr__` deliberately does NOT wrap callable attributes under the lock. The `_state_lock` (and the proxy lock shared by `LockedModelProxy`, `LockedListProxy`, `LockedDictProxy`) is a plain non-reentrant `threading.Lock`. Wrapping callables to execute under that lock would deadlock whenever the invoked Pydantic model method reads or writes its own attributes, since that re-enters `__getattr__`/`__setattr__` and tries to acquire the already-held lock. The safe upgrade path is: (1) promote `_state_lock` to `threading.RLock`, (2) add the callable wrapper with re-fetch-under-lock semantics, and (3) add re-entrant concurrency tests — as a separate follow-up PR.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
StateProxy wrapped list and dict attributes with the flow-state lock but returned nested Pydantic BaseModel attributes unwrapped, so mutations on them bypassed the lock and could race under parallel listeners (silent state corruption). BaseModel is the default Flow state type, so this is the common case. Add a LockedModelProxy that routes BaseModel attribute reads/writes through the lock (recursively wrapping nested lists/dicts/models) and dispatch to it from StateProxy.__getattr__. Adds a semantic-negative test that fails before this change and passes after.
efcf4be to
8845d45
Compare
|
Good catch, applied in This is part of doing the fix completely, not a separate concern. The finding here is a read-path race: Changes:
|
|
@coderabbitai thanks, no need to open a follow-up issue from my end. Whether the RLock migration + callable-wrapper work is worth tracking is the maintainers' call, and the path is already laid out in the :542 thread above for whenever they want it. Resolved from my side. @greysonlalonde @lorenzejay when you have a moment, this one's ready for a human look. It fixes a real Flow-state race (nested-pydantic mutation in StateProxy) with a regression test; CI is green (GitGuardian + CodeRabbit). The single CodeRabbit follow-up (:542, wrapping callables under the lock) is deliberately scoped out with reasoning: it would deadlock under the current non-reentrant threading.Lock, so an RLock migration is a separate change. Happy to adjust anything. |
|
Tip For best results, initiate chat on the files or code changes.
The PR looks well-scoped: the read-path race is closed, the assign-back unwrap is symmetric with the existing (◕‿◕✿)🐇 🧠 Learnings used |
What
Adds a
LockedModelProxyso that when Flow state is a nested Pydantic model (the default),StateProxy.__getattr__returns it wrapped under the flow-state lock instead of unlocked.Why
StateProxyalready wrapslistanddictvalues in locked proxies, but a nestedBaseModelfalls throughreturn valueunlocked. Two listeners mutating that nested model can then race with no serialization — silent flow-state corruption. This closes the gap symmetrically with the existing list/dict handling.How it's verified
Added a thread-contention test in
lib/crewai/tests/test_flow.pythat holds the flow-state lock and asserts a nested-model write cannot proceed while the lock is held — it fails before the change and passes after. Ran locally:uv run ruff check lib/,uv run ruff format --check lib/,uv run mypy lib/, anduv run pytest lib/crewai/tests/test_flow.pyall pass. The test is network-free (--block-networksafe) and order-independent.Disclosure: this change was prepared with AI assistance, per
CONTRIBUTING.md's AI-authored policy — please apply thellm-generatedlabel. The fix and its test were reviewed and verified locally as above.Full forensic verification evidence (AIV-L3) is in the comment below.
Summary by CodeRabbit
New Features
Behavior
Tests