From 1a18c9bd84829be618d0dc65498e3ff19851e210 Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Tue, 2 Jun 2026 13:01:05 -0500 Subject: [PATCH 1/9] feat(lock_store): make locking backend overridable Allow the centralised lock factory to use a pluggable backend instead of the hardcoded Redis/file selection. Backends are resolved with precedence override > CREWAI_LOCK_FACTORY env > built-in default: - set_lock_backend()/reset_lock_backend() and a scoped lock_backend() context manager for programmatic overrides - CREWAI_LOCK_FACTORY="module:callable" env import-path, resolved lazily and cached, with clear errors on malformed or non-callable specs - LockBackend Protocol documenting the contract (raw name in, context manager out; backend owns its namespacing) Default Redis/file behavior is unchanged when nothing is overridden. --- lib/crewai-core/src/crewai_core/lock_store.py | 170 +++++++++++++- lib/crewai/tests/utilities/test_lock_store.py | 208 ++++++++++++++++++ 2 files changed, 366 insertions(+), 12 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index 0f09fa7f66..75396f4266 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -1,8 +1,21 @@ """Centralised lock factory. -If ``REDIS_URL`` is set and the ``redis`` package is installed, locks are -distributed via ``portalocker.RedisLock``. Otherwise, falls back to the -standard file-based ``portalocker.Lock`` in the system temp dir. +The locking backend is resolved in this order of precedence: + +1. A backend registered in-process via :func:`set_lock_backend` (or scoped + with the :func:`lock_backend` context manager). Best for tests and runtime + wiring. +2. A backend named by the ``CREWAI_LOCK_FACTORY`` environment variable, in + ``"module:callable"`` form (e.g. ``"my_pkg.locks:lock"``). The import path + is resolved lazily and cached. Best for deployment-driven selection, since + it requires no code changes and rolls back with an env unset. +3. The built-in default: if ``REDIS_URL`` is set and the ``redis`` package is + installed, locks are distributed via ``portalocker.RedisLock``; otherwise + they fall back to a file-based ``portalocker.Lock`` in the system temp dir. + +A custom backend is any callable matching :class:`LockBackend`. It receives the +raw lock ``name`` (not the ``crewai:`` channel) and owns its own +namespacing. """ from __future__ import annotations @@ -11,16 +24,19 @@ from contextlib import contextmanager from functools import lru_cache from hashlib import md5 +import importlib import logging import os import tempfile -from typing import TYPE_CHECKING, Final +from typing import TYPE_CHECKING, Final, Protocol, runtime_checkable import portalocker import portalocker.exceptions if TYPE_CHECKING: + from contextlib import AbstractContextManager + import redis @@ -28,9 +44,34 @@ _REDIS_URL: str | None = os.environ.get("REDIS_URL") +# Optional "module:callable" import path for a custom lock backend. Read once at +# import time, mirroring ``_REDIS_URL``; the env must be set before the process +# starts. +_LOCK_FACTORY_SPEC: str | None = os.environ.get("CREWAI_LOCK_FACTORY") + _DEFAULT_TIMEOUT: Final[int] = 120 +@runtime_checkable +class LockBackend(Protocol): + """A pluggable locking backend. + + A backend is any callable that, given a raw lock ``name`` and a + ``timeout``, returns a context manager that holds the lock for the + duration of the ``with`` block and releases it on exit. The ``name`` is + passed through verbatim (e.g. ``"chromadb_init"``); the backend owns its + own namespacing. + """ + + def __call__( + self, name: str, *, timeout: float + ) -> AbstractContextManager[None]: ... + + +# Active backend override; ``None`` means use the built-in default selection. +_backend: LockBackend | None = None + + def _redis_available() -> bool: """Return True if redis is installed and REDIS_URL is set.""" if not _REDIS_URL: @@ -53,16 +94,59 @@ def _redis_connection() -> redis.Redis[bytes]: return Redis.from_url(_REDIS_URL) -@contextmanager -def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: - """Acquire a named lock, yielding while it is held. +@lru_cache(maxsize=1) +def _env_lock_factory() -> LockBackend | None: + """Resolve the ``CREWAI_LOCK_FACTORY`` import path to a callable. - Args: - name: A human-readable lock name (e.g. ``"chromadb_init"``). - Automatically namespaced to avoid collisions. - timeout: Maximum seconds to wait for the lock before raising. + Returns ``None`` when the env var is unset. Resolution is cached, so the + import happens at most once per process. + + Raises: + ValueError: if the spec is not in ``"module:callable"`` form. + ImportError / AttributeError: if the module or attribute is missing. + TypeError: if the resolved attribute is not callable. """ - channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}" + if not _LOCK_FACTORY_SPEC: + return None + + module_path, sep, attr = _LOCK_FACTORY_SPEC.partition(":") + if not sep or not module_path or not attr: + raise ValueError( + "CREWAI_LOCK_FACTORY must be in 'module:callable' form, " + f"got {_LOCK_FACTORY_SPEC!r}" + ) + + module = importlib.import_module(module_path) + factory: LockBackend = getattr(module, attr) + if not callable(factory): + raise TypeError( + f"CREWAI_LOCK_FACTORY={_LOCK_FACTORY_SPEC!r} resolved to a " + f"non-callable {type(factory).__name__}; expected a callable " + "matching LockBackend (name, *, timeout) -> context manager." + ) + logger.debug("Using custom lock backend from %s", _LOCK_FACTORY_SPEC) + return factory + + +def _active_backend() -> LockBackend: + """Return the backend to use, honouring override > env > default.""" + if _backend is not None: + return _backend + env_factory = _env_lock_factory() + if env_factory is not None: + return env_factory + return _default_lock + + +def _namespaced_channel(name: str) -> str: + """Return the collision-resistant, namespaced channel for ``name``.""" + return f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}" + + +@contextmanager +def _default_lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: + """The built-in backend: Redis when available, else a temp-dir file lock.""" + channel = _namespaced_channel(name) if _redis_available(): with portalocker.RedisLock( @@ -87,3 +171,65 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: yield finally: pl.release() # type: ignore[no-untyped-call] + + +def set_lock_backend(backend: LockBackend | None) -> None: + """Override the locking backend used by :func:`lock`. + + Args: + backend: A callable matching the :class:`LockBackend` protocol, i.e. + ``backend(name, *, timeout) -> contextmanager``. Pass ``None`` to + restore the built-in default (Redis/file selection). + """ + global _backend + _backend = backend + + +def reset_lock_backend() -> None: + """Clear the in-process override. + + Backend resolution falls back to the ``CREWAI_LOCK_FACTORY`` env path if + set, otherwise the built-in Redis/file default. + """ + set_lock_backend(None) + + +def get_lock_backend() -> LockBackend: + """Return the currently active locking backend. + + Honours the override > ``CREWAI_LOCK_FACTORY`` env > built-in default + precedence. + """ + return _active_backend() + + +@contextmanager +def lock_backend(backend: LockBackend) -> Iterator[None]: + """Temporarily override the locking backend within a ``with`` block. + + The previous backend (custom or default) is restored on exit. + """ + global _backend + previous = _backend + _backend = backend + try: + yield + finally: + _backend = previous + + +@contextmanager +def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: + """Acquire a named lock, yielding while it is held. + + Delegates to the active backend, resolved as override > + ``CREWAI_LOCK_FACTORY`` env > built-in Redis/file selection. + + Args: + name: A human-readable lock name (e.g. ``"chromadb_init"``). The + built-in default namespaces it to avoid collisions; custom + backends receive it verbatim. + timeout: Maximum seconds to wait for the lock before raising. + """ + with _active_backend()(name, timeout=timeout): + yield diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index 1baa0169a6..f1ce5999d0 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -6,7 +6,9 @@ from __future__ import annotations +from contextlib import contextmanager import sys +import types from unittest import mock import pytest @@ -20,6 +22,17 @@ def no_redis_url(monkeypatch): monkeypatch.setattr(lock_store, "_REDIS_URL", None) +@pytest.fixture(autouse=True) +def reset_backend(monkeypatch): + """Ensure backend overrides never leak across tests.""" + monkeypatch.setattr(lock_store, "_LOCK_FACTORY_SPEC", None) + lock_store._env_lock_factory.cache_clear() + lock_store.reset_lock_backend() + yield + lock_store.reset_lock_backend() + lock_store._env_lock_factory.cache_clear() + + # _redis_available @@ -64,3 +77,198 @@ def test_uses_redis_lock_when_redis_available(monkeypatch): kwargs = mock_redis_lock.call_args.kwargs assert kwargs["channel"].startswith("crewai:") assert kwargs["connection"] is fake_conn + + +# backend override + + +def test_override_backend_is_used(): + calls = [] + + @contextmanager + def fake_backend(name, *, timeout): + calls.append((name, timeout)) + yield + + lock_store.set_lock_backend(fake_backend) + + # The default file/redis path must not be touched when overridden. + with mock.patch("portalocker.Lock") as mock_lock: + with lock("override_test", timeout=5): + pass + + mock_lock.assert_not_called() + assert calls == [("override_test", 5)] + + +def test_reset_restores_default_backend(): + @contextmanager + def fake_backend(name, *, timeout): + yield + + lock_store.set_lock_backend(fake_backend) + lock_store.reset_lock_backend() + + with mock.patch("portalocker.Lock") as mock_lock: + with lock("after_reset"): + pass + + mock_lock.assert_called_once() + + +def test_get_lock_backend_reflects_override(): + assert lock_store.get_lock_backend() is lock_store._default_lock + + @contextmanager + def fake_backend(name, *, timeout): + yield + + lock_store.set_lock_backend(fake_backend) + assert lock_store.get_lock_backend() is fake_backend + + +def test_lock_backend_context_manager_is_scoped(): + seen = [] + + @contextmanager + def fake_backend(name, *, timeout): + seen.append(name) + yield + + with lock_store.lock_backend(fake_backend): + with lock("scoped"): + pass + + assert seen == ["scoped"] + # Override is restored after the block. + assert lock_store.get_lock_backend() is lock_store._default_lock + + +def test_lock_backend_context_manager_restores_previous_override(): + @contextmanager + def outer_backend(name, *, timeout): + yield + + @contextmanager + def inner_backend(name, *, timeout): + yield + + lock_store.set_lock_backend(outer_backend) + with lock_store.lock_backend(inner_backend): + assert lock_store.get_lock_backend() is inner_backend + assert lock_store.get_lock_backend() is outer_backend + + +# CREWAI_LOCK_FACTORY env import-path + + +def _install_env_factory(monkeypatch, factory, modname="fakelocks", attr="lock"): + """Point CREWAI_LOCK_FACTORY at ``factory`` via a registered fake module.""" + module = types.ModuleType(modname) + setattr(module, attr, factory) + monkeypatch.setitem(sys.modules, modname, module) + monkeypatch.setattr(lock_store, "_LOCK_FACTORY_SPEC", f"{modname}:{attr}") + lock_store._env_lock_factory.cache_clear() + + +def test_env_factory_used_when_spec_set(monkeypatch): + calls = [] + + @contextmanager + def fake_backend(name, *, timeout): + calls.append((name, timeout)) + yield + + _install_env_factory(monkeypatch, fake_backend) + + with mock.patch("portalocker.Lock") as mock_lock: + with lock("env_test", timeout=7): + pass + + mock_lock.assert_not_called() + assert calls == [("env_test", 7)] + assert lock_store.get_lock_backend() is fake_backend + + +def test_programmatic_override_takes_precedence_over_env(monkeypatch): + @contextmanager + def env_backend(name, *, timeout): + raise AssertionError("env backend should not be used") + yield # pragma: no cover + + used = [] + + @contextmanager + def code_backend(name, *, timeout): + used.append(name) + yield + + _install_env_factory(monkeypatch, env_backend) + lock_store.set_lock_backend(code_backend) + + with lock("precedence_test"): + pass + + assert used == ["precedence_test"] + assert lock_store.get_lock_backend() is code_backend + + +def test_env_factory_is_cached(monkeypatch): + @contextmanager + def fake_backend(name, *, timeout): + yield + + _install_env_factory(monkeypatch, fake_backend) + + with lock("a"): + pass + + # Remove the module: a cached factory must keep working without re-importing. + monkeypatch.delitem(sys.modules, "fakelocks") + with lock("b"): + pass + + assert lock_store.get_lock_backend() is fake_backend + + +def test_invalid_spec_raises(monkeypatch): + monkeypatch.setattr(lock_store, "_LOCK_FACTORY_SPEC", "no_colon_here") + lock_store._env_lock_factory.cache_clear() + + with pytest.raises(ValueError, match="module:callable"): + with lock("bad_spec"): + pass + + +def test_non_callable_factory_raises_with_context(monkeypatch): + # Resolve the spec to a non-callable attribute. + _install_env_factory(monkeypatch, "not a callable", attr="lock") + + with pytest.raises(TypeError, match="CREWAI_LOCK_FACTORY"): + with lock("bad_factory"): + pass + + +def test_env_factory_used_after_reset(monkeypatch): + """Clearing the in-process override falls back to the env factory.""" + seen = [] + + @contextmanager + def env_backend(name, *, timeout): + seen.append(name) + yield + + @contextmanager + def code_backend(name, *, timeout): + raise AssertionError("override should have been cleared") + yield # pragma: no cover + + _install_env_factory(monkeypatch, env_backend) + lock_store.set_lock_backend(code_backend) + lock_store.reset_lock_backend() + + with lock("after_reset_env"): + pass + + assert seen == ["after_reset_env"] + assert lock_store.get_lock_backend() is env_backend From 623f35e4bb5d5eeb80eaba6379b2c7fe2f57dce0 Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Tue, 2 Jun 2026 13:26:30 -0500 Subject: [PATCH 2/9] refactor(lock_store): use explicit body for LockBackend protocol method Replace the no-op `...` body with `raise NotImplementedError` to satisfy the CodeQL ineffectual-statement check while keeping the Protocol structural-typing only. --- lib/crewai-core/src/crewai_core/lock_store.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index 75396f4266..ac40ac11f0 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -65,7 +65,8 @@ class LockBackend(Protocol): def __call__( self, name: str, *, timeout: float - ) -> AbstractContextManager[None]: ... + ) -> AbstractContextManager[None]: + raise NotImplementedError # Active backend override; ``None`` means use the built-in default selection. From ccaacb3702778ebd8c9cf97d14d36a4725b06ad9 Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Tue, 2 Jun 2026 14:40:43 -0500 Subject: [PATCH 3/9] refactor(lock_store): drop scoped lock_backend context manager Keep the backend overridable via set_lock_backend/reset_lock_backend and the CREWAI_LOCK_FACTORY env path, but remove the scoped lock_backend() context manager. It was speculative surface and the only thread-unsafe piece (racy save/restore of the module global); nothing depends on it. --- lib/crewai-core/src/crewai_core/lock_store.py | 20 ++---------- lib/crewai/tests/utilities/test_lock_store.py | 32 ------------------- 2 files changed, 2 insertions(+), 50 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index ac40ac11f0..a777f1b6b1 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -2,9 +2,8 @@ The locking backend is resolved in this order of precedence: -1. A backend registered in-process via :func:`set_lock_backend` (or scoped - with the :func:`lock_backend` context manager). Best for tests and runtime - wiring. +1. A backend registered in-process via :func:`set_lock_backend`. Best for + tests and runtime wiring. 2. A backend named by the ``CREWAI_LOCK_FACTORY`` environment variable, in ``"module:callable"`` form (e.g. ``"my_pkg.locks:lock"``). The import path is resolved lazily and cached. Best for deployment-driven selection, since @@ -204,21 +203,6 @@ def get_lock_backend() -> LockBackend: return _active_backend() -@contextmanager -def lock_backend(backend: LockBackend) -> Iterator[None]: - """Temporarily override the locking backend within a ``with`` block. - - The previous backend (custom or default) is restored on exit. - """ - global _backend - previous = _backend - _backend = backend - try: - yield - finally: - _backend = previous - - @contextmanager def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: """Acquire a named lock, yielding while it is held. diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index f1ce5999d0..b0576ee180 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -127,38 +127,6 @@ def fake_backend(name, *, timeout): assert lock_store.get_lock_backend() is fake_backend -def test_lock_backend_context_manager_is_scoped(): - seen = [] - - @contextmanager - def fake_backend(name, *, timeout): - seen.append(name) - yield - - with lock_store.lock_backend(fake_backend): - with lock("scoped"): - pass - - assert seen == ["scoped"] - # Override is restored after the block. - assert lock_store.get_lock_backend() is lock_store._default_lock - - -def test_lock_backend_context_manager_restores_previous_override(): - @contextmanager - def outer_backend(name, *, timeout): - yield - - @contextmanager - def inner_backend(name, *, timeout): - yield - - lock_store.set_lock_backend(outer_backend) - with lock_store.lock_backend(inner_backend): - assert lock_store.get_lock_backend() is inner_backend - assert lock_store.get_lock_backend() is outer_backend - - # CREWAI_LOCK_FACTORY env import-path From 9ecc09777aeed8ceca3116c16b3841f2f7591fad Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Tue, 2 Jun 2026 14:42:32 -0500 Subject: [PATCH 4/9] refactor(lock_store): drop reset_lock_backend alias reset_lock_backend() was just set_lock_backend(None); callers use that directly. Clearing the override is documented on set_lock_backend. --- lib/crewai-core/src/crewai_core/lock_store.py | 12 ++---------- lib/crewai/tests/utilities/test_lock_store.py | 8 ++++---- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index a777f1b6b1..297738529b 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -179,21 +179,13 @@ def set_lock_backend(backend: LockBackend | None) -> None: Args: backend: A callable matching the :class:`LockBackend` protocol, i.e. ``backend(name, *, timeout) -> contextmanager``. Pass ``None`` to - restore the built-in default (Redis/file selection). + clear the override, falling back to the ``CREWAI_LOCK_FACTORY`` + env path if set, otherwise the built-in Redis/file default. """ global _backend _backend = backend -def reset_lock_backend() -> None: - """Clear the in-process override. - - Backend resolution falls back to the ``CREWAI_LOCK_FACTORY`` env path if - set, otherwise the built-in Redis/file default. - """ - set_lock_backend(None) - - def get_lock_backend() -> LockBackend: """Return the currently active locking backend. diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index b0576ee180..3890afa779 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -27,9 +27,9 @@ def reset_backend(monkeypatch): """Ensure backend overrides never leak across tests.""" monkeypatch.setattr(lock_store, "_LOCK_FACTORY_SPEC", None) lock_store._env_lock_factory.cache_clear() - lock_store.reset_lock_backend() + lock_store.set_lock_backend(None) yield - lock_store.reset_lock_backend() + lock_store.set_lock_backend(None) lock_store._env_lock_factory.cache_clear() @@ -107,7 +107,7 @@ def fake_backend(name, *, timeout): yield lock_store.set_lock_backend(fake_backend) - lock_store.reset_lock_backend() + lock_store.set_lock_backend(None) with mock.patch("portalocker.Lock") as mock_lock: with lock("after_reset"): @@ -233,7 +233,7 @@ def code_backend(name, *, timeout): _install_env_factory(monkeypatch, env_backend) lock_store.set_lock_backend(code_backend) - lock_store.reset_lock_backend() + lock_store.set_lock_backend(None) with lock("after_reset_env"): pass From 07f177ca0e47eb6d149dcadbb3e13837f412a699 Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Wed, 3 Jun 2026 12:51:10 -0500 Subject: [PATCH 5/9] style(lock_store): apply ruff format --- lib/crewai-core/src/crewai_core/lock_store.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index 297738529b..afb8aba09e 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -62,9 +62,7 @@ class LockBackend(Protocol): own namespacing. """ - def __call__( - self, name: str, *, timeout: float - ) -> AbstractContextManager[None]: + def __call__(self, name: str, *, timeout: float) -> AbstractContextManager[None]: raise NotImplementedError From 986dfff3647a5e4ffabea9ab921d79b5b8a271ea Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Thu, 4 Jun 2026 11:56:51 -0500 Subject: [PATCH 6/9] refactor(lock_store): simplify overridable backend to a single setter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reduce the override surface to just set_lock_backend(): lock() uses the custom backend when one is set, otherwise the unchanged Redis/file default. Drop the CREWAI_LOCK_FACTORY env import-path, the runtime_checkable Protocol, the precedence resolver, and the getter — a custom backend is now any callable(name, *, timeout) -> context manager, registered in process. --- lib/crewai-core/src/crewai_core/lock_store.py | 160 ++++-------------- lib/crewai/tests/utilities/test_lock_store.py | 151 ++--------------- 2 files changed, 43 insertions(+), 268 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index afb8aba09e..ab238e0d38 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -1,41 +1,30 @@ """Centralised lock factory. -The locking backend is resolved in this order of precedence: - -1. A backend registered in-process via :func:`set_lock_backend`. Best for - tests and runtime wiring. -2. A backend named by the ``CREWAI_LOCK_FACTORY`` environment variable, in - ``"module:callable"`` form (e.g. ``"my_pkg.locks:lock"``). The import path - is resolved lazily and cached. Best for deployment-driven selection, since - it requires no code changes and rolls back with an env unset. -3. The built-in default: if ``REDIS_URL`` is set and the ``redis`` package is - installed, locks are distributed via ``portalocker.RedisLock``; otherwise - they fall back to a file-based ``portalocker.Lock`` in the system temp dir. - -A custom backend is any callable matching :class:`LockBackend`. It receives the -raw lock ``name`` (not the ``crewai:`` channel) and owns its own -namespacing. +By default, if ``REDIS_URL`` is set and the ``redis`` package is installed, +locks are distributed via ``portalocker.RedisLock``. Otherwise, falls back to +the standard file-based ``portalocker.Lock`` in the system temp dir. + +The backend can be replaced via :func:`set_lock_backend` to plug in a custom +locking strategy (e.g. a different distributed lock service, or an in-process +lock for tests). """ from __future__ import annotations -from collections.abc import Iterator -from contextlib import contextmanager +from collections.abc import Callable, Iterator +from contextlib import AbstractContextManager, contextmanager from functools import lru_cache from hashlib import md5 -import importlib import logging import os import tempfile -from typing import TYPE_CHECKING, Final, Protocol, runtime_checkable +from typing import TYPE_CHECKING, Final import portalocker import portalocker.exceptions if TYPE_CHECKING: - from contextlib import AbstractContextManager - import redis @@ -43,31 +32,23 @@ _REDIS_URL: str | None = os.environ.get("REDIS_URL") -# Optional "module:callable" import path for a custom lock backend. Read once at -# import time, mirroring ``_REDIS_URL``; the env must be set before the process -# starts. -_LOCK_FACTORY_SPEC: str | None = os.environ.get("CREWAI_LOCK_FACTORY") - _DEFAULT_TIMEOUT: Final[int] = 120 +# A backend is called as ``backend(name, timeout=...)`` and returns a context +# manager that holds the lock while the ``with`` block runs. +LockBackend = Callable[..., AbstractContextManager[None]] -@runtime_checkable -class LockBackend(Protocol): - """A pluggable locking backend. - - A backend is any callable that, given a raw lock ``name`` and a - ``timeout``, returns a context manager that holds the lock for the - duration of the ``with`` block and releases it on exit. The ``name`` is - passed through verbatim (e.g. ``"chromadb_init"``); the backend owns its - own namespacing. - """ +# ``None`` means use the built-in Redis/file selection. +_backend: LockBackend | None = None - def __call__(self, name: str, *, timeout: float) -> AbstractContextManager[None]: - raise NotImplementedError +def set_lock_backend(backend: LockBackend | None) -> None: + """Replace the locking backend used by :func:`lock`. -# Active backend override; ``None`` means use the built-in default selection. -_backend: LockBackend | None = None + Pass ``None`` to restore the built-in Redis/file default. + """ + global _backend + _backend = backend def _redis_available() -> bool: @@ -92,59 +73,21 @@ def _redis_connection() -> redis.Redis[bytes]: return Redis.from_url(_REDIS_URL) -@lru_cache(maxsize=1) -def _env_lock_factory() -> LockBackend | None: - """Resolve the ``CREWAI_LOCK_FACTORY`` import path to a callable. - - Returns ``None`` when the env var is unset. Resolution is cached, so the - import happens at most once per process. +@contextmanager +def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: + """Acquire a named lock, yielding while it is held. - Raises: - ValueError: if the spec is not in ``"module:callable"`` form. - ImportError / AttributeError: if the module or attribute is missing. - TypeError: if the resolved attribute is not callable. + Args: + name: A human-readable lock name (e.g. ``"chromadb_init"``). + Automatically namespaced to avoid collisions. + timeout: Maximum seconds to wait for the lock before raising. """ - if not _LOCK_FACTORY_SPEC: - return None - - module_path, sep, attr = _LOCK_FACTORY_SPEC.partition(":") - if not sep or not module_path or not attr: - raise ValueError( - "CREWAI_LOCK_FACTORY must be in 'module:callable' form, " - f"got {_LOCK_FACTORY_SPEC!r}" - ) - - module = importlib.import_module(module_path) - factory: LockBackend = getattr(module, attr) - if not callable(factory): - raise TypeError( - f"CREWAI_LOCK_FACTORY={_LOCK_FACTORY_SPEC!r} resolved to a " - f"non-callable {type(factory).__name__}; expected a callable " - "matching LockBackend (name, *, timeout) -> context manager." - ) - logger.debug("Using custom lock backend from %s", _LOCK_FACTORY_SPEC) - return factory - - -def _active_backend() -> LockBackend: - """Return the backend to use, honouring override > env > default.""" if _backend is not None: - return _backend - env_factory = _env_lock_factory() - if env_factory is not None: - return env_factory - return _default_lock - - -def _namespaced_channel(name: str) -> str: - """Return the collision-resistant, namespaced channel for ``name``.""" - return f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}" - + with _backend(name, timeout=timeout): + yield + return -@contextmanager -def _default_lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: - """The built-in backend: Redis when available, else a temp-dir file lock.""" - channel = _namespaced_channel(name) + channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}" if _redis_available(): with portalocker.RedisLock( @@ -169,42 +112,3 @@ def _default_lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[N yield finally: pl.release() # type: ignore[no-untyped-call] - - -def set_lock_backend(backend: LockBackend | None) -> None: - """Override the locking backend used by :func:`lock`. - - Args: - backend: A callable matching the :class:`LockBackend` protocol, i.e. - ``backend(name, *, timeout) -> contextmanager``. Pass ``None`` to - clear the override, falling back to the ``CREWAI_LOCK_FACTORY`` - env path if set, otherwise the built-in Redis/file default. - """ - global _backend - _backend = backend - - -def get_lock_backend() -> LockBackend: - """Return the currently active locking backend. - - Honours the override > ``CREWAI_LOCK_FACTORY`` env > built-in default - precedence. - """ - return _active_backend() - - -@contextmanager -def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: - """Acquire a named lock, yielding while it is held. - - Delegates to the active backend, resolved as override > - ``CREWAI_LOCK_FACTORY`` env > built-in Redis/file selection. - - Args: - name: A human-readable lock name (e.g. ``"chromadb_init"``). The - built-in default namespaces it to avoid collisions; custom - backends receive it verbatim. - timeout: Maximum seconds to wait for the lock before raising. - """ - with _active_backend()(name, timeout=timeout): - yield diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index 3890afa779..baad049d8a 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -1,14 +1,14 @@ """Tests for lock_store. -We verify our own logic: the _redis_available guard and which portalocker -backend is selected. We trust portalocker to handle actual locking mechanics. +We verify our own logic: the _redis_available guard, which portalocker +backend is selected, and that a custom backend can be plugged in. We trust +portalocker to handle actual locking mechanics. """ from __future__ import annotations from contextlib import contextmanager import sys -import types from unittest import mock import pytest @@ -23,14 +23,11 @@ def no_redis_url(monkeypatch): @pytest.fixture(autouse=True) -def reset_backend(monkeypatch): - """Ensure backend overrides never leak across tests.""" - monkeypatch.setattr(lock_store, "_LOCK_FACTORY_SPEC", None) - lock_store._env_lock_factory.cache_clear() +def reset_backend(): + """Ensure a custom backend never leaks across tests.""" lock_store.set_lock_backend(None) yield lock_store.set_lock_backend(None) - lock_store._env_lock_factory.cache_clear() # _redis_available @@ -79,10 +76,10 @@ def test_uses_redis_lock_when_redis_available(monkeypatch): assert kwargs["connection"] is fake_conn -# backend override +# custom backend -def test_override_backend_is_used(): +def test_custom_backend_is_used(): calls = [] @contextmanager @@ -94,14 +91,14 @@ def fake_backend(name, *, timeout): # The default file/redis path must not be touched when overridden. with mock.patch("portalocker.Lock") as mock_lock: - with lock("override_test", timeout=5): + with lock("custom_test", timeout=5): pass mock_lock.assert_not_called() - assert calls == [("override_test", 5)] + assert calls == [("custom_test", 5)] -def test_reset_restores_default_backend(): +def test_clearing_backend_restores_default(): @contextmanager def fake_backend(name, *, timeout): yield @@ -110,133 +107,7 @@ def fake_backend(name, *, timeout): lock_store.set_lock_backend(None) with mock.patch("portalocker.Lock") as mock_lock: - with lock("after_reset"): + with lock("after_clear"): pass mock_lock.assert_called_once() - - -def test_get_lock_backend_reflects_override(): - assert lock_store.get_lock_backend() is lock_store._default_lock - - @contextmanager - def fake_backend(name, *, timeout): - yield - - lock_store.set_lock_backend(fake_backend) - assert lock_store.get_lock_backend() is fake_backend - - -# CREWAI_LOCK_FACTORY env import-path - - -def _install_env_factory(monkeypatch, factory, modname="fakelocks", attr="lock"): - """Point CREWAI_LOCK_FACTORY at ``factory`` via a registered fake module.""" - module = types.ModuleType(modname) - setattr(module, attr, factory) - monkeypatch.setitem(sys.modules, modname, module) - monkeypatch.setattr(lock_store, "_LOCK_FACTORY_SPEC", f"{modname}:{attr}") - lock_store._env_lock_factory.cache_clear() - - -def test_env_factory_used_when_spec_set(monkeypatch): - calls = [] - - @contextmanager - def fake_backend(name, *, timeout): - calls.append((name, timeout)) - yield - - _install_env_factory(monkeypatch, fake_backend) - - with mock.patch("portalocker.Lock") as mock_lock: - with lock("env_test", timeout=7): - pass - - mock_lock.assert_not_called() - assert calls == [("env_test", 7)] - assert lock_store.get_lock_backend() is fake_backend - - -def test_programmatic_override_takes_precedence_over_env(monkeypatch): - @contextmanager - def env_backend(name, *, timeout): - raise AssertionError("env backend should not be used") - yield # pragma: no cover - - used = [] - - @contextmanager - def code_backend(name, *, timeout): - used.append(name) - yield - - _install_env_factory(monkeypatch, env_backend) - lock_store.set_lock_backend(code_backend) - - with lock("precedence_test"): - pass - - assert used == ["precedence_test"] - assert lock_store.get_lock_backend() is code_backend - - -def test_env_factory_is_cached(monkeypatch): - @contextmanager - def fake_backend(name, *, timeout): - yield - - _install_env_factory(monkeypatch, fake_backend) - - with lock("a"): - pass - - # Remove the module: a cached factory must keep working without re-importing. - monkeypatch.delitem(sys.modules, "fakelocks") - with lock("b"): - pass - - assert lock_store.get_lock_backend() is fake_backend - - -def test_invalid_spec_raises(monkeypatch): - monkeypatch.setattr(lock_store, "_LOCK_FACTORY_SPEC", "no_colon_here") - lock_store._env_lock_factory.cache_clear() - - with pytest.raises(ValueError, match="module:callable"): - with lock("bad_spec"): - pass - - -def test_non_callable_factory_raises_with_context(monkeypatch): - # Resolve the spec to a non-callable attribute. - _install_env_factory(monkeypatch, "not a callable", attr="lock") - - with pytest.raises(TypeError, match="CREWAI_LOCK_FACTORY"): - with lock("bad_factory"): - pass - - -def test_env_factory_used_after_reset(monkeypatch): - """Clearing the in-process override falls back to the env factory.""" - seen = [] - - @contextmanager - def env_backend(name, *, timeout): - seen.append(name) - yield - - @contextmanager - def code_backend(name, *, timeout): - raise AssertionError("override should have been cleared") - yield # pragma: no cover - - _install_env_factory(monkeypatch, env_backend) - lock_store.set_lock_backend(code_backend) - lock_store.set_lock_backend(None) - - with lock("after_reset_env"): - pass - - assert seen == ["after_reset_env"] - assert lock_store.get_lock_backend() is env_backend From f399c7402f3b9bfbfd3863baf53fc27774b8cb5e Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Thu, 4 Jun 2026 12:33:16 -0500 Subject: [PATCH 7/9] fix(lock_store): snapshot backend to avoid check-then-call race Read the module-global backend once into a local before the None check and the call, so a concurrent set_lock_backend(None) cannot make lock() invoke None. --- lib/crewai-core/src/crewai_core/lock_store.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index ab238e0d38..b80fbbeaf7 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -82,8 +82,11 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: Automatically namespaced to avoid collisions. timeout: Maximum seconds to wait for the lock before raising. """ - if _backend is not None: - with _backend(name, timeout=timeout): + # Snapshot the global once: a concurrent set_lock_backend() must not turn + # the check-then-call into calling ``None``. + backend = _backend + if backend is not None: + with backend(name, timeout=timeout): yield return From a48fb20b946ee561cc93d7363045ba8c89332f07 Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Thu, 4 Jun 2026 12:42:13 -0500 Subject: [PATCH 8/9] docs(lock_store): clarify name handling for custom backends The default namespaces the lock name; custom backends receive it verbatim. Correct the lock() docstring which implied namespacing always happens. --- lib/crewai-core/src/crewai_core/lock_store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index b80fbbeaf7..b34566519c 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -78,8 +78,9 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: """Acquire a named lock, yielding while it is held. Args: - name: A human-readable lock name (e.g. ``"chromadb_init"``). - Automatically namespaced to avoid collisions. + name: A human-readable lock name (e.g. ``"chromadb_init"``). The + built-in default namespaces it to avoid collisions; a custom + backend receives it verbatim. timeout: Maximum seconds to wait for the lock before raising. """ # Snapshot the global once: a concurrent set_lock_backend() must not turn From b4c097d047d58626f0db146212fa7a927292205d Mon Sep 17 00:00:00 2001 From: Matt Aitchison Date: Thu, 4 Jun 2026 13:04:28 -0500 Subject: [PATCH 9/9] docs(lock_store): note set_lock_backend is for one-time startup setup --- lib/crewai-core/src/crewai_core/lock_store.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/crewai-core/src/crewai_core/lock_store.py b/lib/crewai-core/src/crewai_core/lock_store.py index b34566519c..be1d08faaf 100644 --- a/lib/crewai-core/src/crewai_core/lock_store.py +++ b/lib/crewai-core/src/crewai_core/lock_store.py @@ -43,9 +43,12 @@ def set_lock_backend(backend: LockBackend | None) -> None: - """Replace the locking backend used by :func:`lock`. + """Replace the process-wide locking backend used by :func:`lock`. - Pass ``None`` to restore the built-in Redis/file default. + Intended for one-time setup at startup. Pass ``None`` to restore the + built-in Redis/file default. In-flight :func:`lock` calls keep the backend + they started with, but swapping backends while other threads acquire locks + is otherwise unsynchronised. """ global _backend _backend = backend