From 8a8900ee34695ee637af218d9f7d1436557ad508 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Wed, 17 Jun 2026 16:40:10 +0200 Subject: [PATCH 01/11] Fixes #21326: Defer global search cache updates to a background job Previously the global search cache (CachedValue) was updated synchronously inside the post_save/post_delete signal handlers, adding latency to every write and, for bulk operations, one synchronous re-index per object. The signal handlers now buffer dirty objects and defer the work to a background job that runs after the transaction commits: * netbox/netbox/search/deferred.py coalesces dirty objects per (database alias, transaction). The buffer lives inside a transaction.on_commit callback's closure, so a single flush is scheduled per alias per transaction (collapsing repeated operations; a deletion supersedes a pending create/update). Because Django clears run_on_commit on both commit and rollback, no buffered state can survive a rolled-back transaction. In autocommit (no open transaction), the indexing runs inline immediately. * On flush, when an RQ worker is available the work is dispatched as a SearchCacheJob (netbox/netbox/search/tasks.py); otherwise it runs inline, so installs without a running worker behave as before. * The database alias is captured from the signal's `using` kwarg and replayed on the deferred read and write via .using(alias). This keeps cache entries routed to the originating schema (e.g. a branch schema under netbox-branching) even though the worker has no active routing context, without core depending on the plugin. Only primitives (the alias string and {object_type_id: [pk]} maps) cross the job boundary. * SearchBackend.cache()/remove() gain a `using` parameter, and a shared _remove_by_id() performs the raw delete by content type and object IDs. The worker re-fetches live instances on `using`, removes stale entries, and re-indexes within a single transaction so an object is never left with no cache rows. A missing schema (e.g. a branch merged/deprovisioned between enqueue and execution) is skipped; other database errors propagate so the job fails visibly. remove() now resolves the object type via the indexer's (concrete) model rather than ContentType.objects.get_for_model(instance), matching the content type that cache() writes under so the two cannot diverge for a proxy/MTI searchable model. No registered search index is currently a proxy model, so this is not an observable change today. --- netbox/netbox/search/backends.py | 71 ++++++++++++++----- netbox/netbox/search/deferred.py | 117 +++++++++++++++++++++++++++++++ netbox/netbox/search/tasks.py | 101 ++++++++++++++++++++++++++ 3 files changed, 273 insertions(+), 16 deletions(-) create mode 100644 netbox/netbox/search/deferred.py create mode 100644 netbox/netbox/search/tasks.py diff --git a/netbox/netbox/search/backends.py b/netbox/netbox/search/backends.py index 661134c9512..6aede1b36b5 100644 --- a/netbox/netbox/search/backends.py +++ b/netbox/netbox/search/backends.py @@ -3,7 +3,6 @@ import netaddr from django.conf import settings -from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ImproperlyConfigured from django.db import ProgrammingError from django.db.models import F, Q, Window, prefetch_related_objects @@ -63,22 +62,41 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE """ raise NotImplementedError - def caching_handler(self, sender, instance, created, **kwargs): + def caching_handler(self, sender, instance, created, using=None, **kwargs): """ Receiver for the post_save signal, responsible for caching object creation/changes. """ + from netbox.search.deferred import OP_CACHE, mark_dirty + + # Skip non-cacheable objects without scheduling any deferred work. + try: + indexer = get_indexer(instance) + except KeyError: + return + try: - self.cache(instance, remove_existing=not created) + object_type = ObjectType.objects.get_for_model(indexer.model) except ProgrammingError as e: # The schema may be incomplete during migrations; skip caching. logger.warning(f"Skipping search cache update due to schema error: {e}") - pass + return + + mark_dirty(object_type.pk, instance.pk, OP_CACHE, using=using) - def removal_handler(self, sender, instance, **kwargs): + def removal_handler(self, sender, instance, using=None, **kwargs): """ Receiver for the post_delete signal, responsible for caching object deletion. """ - self.remove(instance) + from netbox.search.deferred import OP_REMOVE, mark_dirty + + # Skip non-cacheable objects without scheduling any deferred work. + try: + indexer = get_indexer(instance) + except KeyError: + return + + object_type = ObjectType.objects.get_for_model(indexer.model) + mark_dirty(object_type.pk, instance.pk, OP_REMOVE, using=using) def cache(self, instances, indexer=None, remove_existing=True): """ @@ -196,13 +214,21 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE return ret - def cache(self, instances, indexer=None, remove_existing=True): + def cache(self, instances, indexer=None, remove_existing=True, using=None): custom_fields = None # Convert a single instance to an iterable if not hasattr(instances, '__iter__'): instances = [instances] + # Determine the queryset manager used to write cache entries. When a + # database alias is provided (e.g. by a deferred task replaying the alias + # the originating write used), entries are written to that connection; + # otherwise the configured router decides. `using` is expected to be a + # concrete alias or falsy (None) per the caller's contract; a falsy value + # defers to the router, which is the correct behavior either way. + manager = CachedValue.objects.using(using) if using else CachedValue.objects + buffer = [] counter = 0 for instance in instances: @@ -225,7 +251,7 @@ def cache(self, instances, indexer=None, remove_existing=True): # Wipe out any previously cached values for the object if remove_existing: - self.remove(instance) + self.remove(instance, using=using) # Generate cache data object_type = ObjectType.objects.get_for_model(indexer.model) @@ -243,27 +269,40 @@ def cache(self, instances, indexer=None, remove_existing=True): # Check whether the buffer needs to be flushed if len(buffer) >= 2000: - counter += len(CachedValue.objects.bulk_create(buffer)) + counter += len(manager.bulk_create(buffer)) buffer = [] # Final buffer flush if buffer: - counter += len(CachedValue.objects.bulk_create(buffer)) + counter += len(manager.bulk_create(buffer)) return counter - def remove(self, instance): + def _remove_by_id(self, object_type_id, object_ids, using=None): + """ + Delete cached values for the given content type and object IDs using a + single raw DELETE. Shared by remove() and the deferred search task. + """ + if not object_ids: + return None + + qs = CachedValue.objects.filter(object_type_id=object_type_id, object_id__in=object_ids) + + # Call _raw_delete() on the queryset to avoid first loading instances into memory + return qs._raw_delete(using=using or qs.db) + + def remove(self, instance, using=None): # Avoid attempting to query for non-cacheable objects try: - get_indexer(instance) + indexer = get_indexer(instance) except KeyError: return None - ct = ContentType.objects.get_for_model(instance) - qs = CachedValue.objects.filter(object_type=ct, object_id=instance.pk) + # Use the indexer's (concrete) model to resolve the object type, matching + # the content type that cache() writes entries under. + object_type = ObjectType.objects.get_for_model(indexer.model) - # Call _raw_delete() on the queryset to avoid first loading instances into memory - return qs._raw_delete(using=qs.db) + return self._remove_by_id(object_type.pk, [instance.pk], using=using) def clear(self, object_types=None): qs = CachedValue.objects.all() diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py new file mode 100644 index 00000000000..dd37625465b --- /dev/null +++ b/netbox/netbox/search/deferred.py @@ -0,0 +1,117 @@ +import logging + +from django.db import DEFAULT_DB_ALIAS, connections, transaction + +# This module is internal plumbing for the search signal handlers; nothing here +# is part of the public/plugin API, so no symbols are exported via __all__. + +logger = logging.getLogger(__name__) + +# Operation markers stored in the per-transaction buffer +OP_CACHE = 'cache' +OP_REMOVE = 'remove' + +# Attributes used to tag a flush callback so we can recognize our own callbacks +# among those registered on a connection and reach the batch they will flush. +_FLUSH_ALIAS_ATTR = '_netbox_search_flush_alias' +_FLUSH_BATCH_ATTR = '_netbox_search_flush_batch' + + +def mark_dirty(object_type_id, pk, op, using=None): + """ + Record a searchable object as dirty for deferred (re)indexing. + + The work is coalesced per database connection and per transaction: repeated + operations on the same object collapse to a single entry (a deletion always + wins over a create/update), and a single flush is scheduled to run after the + transaction commits. When no transaction is open (autocommit), the indexing + runs synchronously. + + Args: + object_type_id: PK of the object's ObjectType/ContentType. + pk: PK of the object. + op: OP_CACHE or OP_REMOVE. + using: The database alias the originating write used. Replayed verbatim + on the deferred write so the cache entries land in the same schema + (e.g. a branch schema under netbox-branching), regardless of any + routing context that may be unset by the time the flush runs. + """ + alias = using or DEFAULT_DB_ALIAS + connection = connections[alias] + + # No transaction in progress: index synchronously. Deferring would have + # nothing to defer past, and transaction.on_commit() in autocommit mode runs + # its callback immediately at registration (before we could populate the + # batch), so handle this case explicitly. + if not connection.in_atomic_block: + _flush({(object_type_id, pk): op}, alias) + return + + # Find the batch for a flush already scheduled for this alias in the current + # transaction. Django clears a connection's run_on_commit list on both commit + # and rollback, so any callback we find there belongs to the current + # (uncommitted) transaction -- no stale state can survive a rollback. + batch = _pending_batch(connection, alias) + if batch is None: + batch = {} + + def flush(batch=batch, alias=alias): + _flush(batch, alias) + + setattr(flush, _FLUSH_ALIAS_ATTR, alias) + setattr(flush, _FLUSH_BATCH_ATTR, batch) + transaction.on_commit(flush, using=alias) + + # Coalesce: a deletion supersedes any pending create/update for the object. + key = (object_type_id, pk) + if op == OP_REMOVE or batch.get(key) != OP_REMOVE: + batch[key] = op + + +def _pending_batch(connection, alias): + """ + Return the batch dict of a flush callback already scheduled for the given + alias on this connection's current transaction, or None if there is none. + + This scans `connection.run_on_commit` on each call rather than caching the + lookup elsewhere. That is intentional: the scan is bounded (run_on_commit + holds only the transaction's registered commit callbacks, not one per saved + object), and reading it fresh each time is what keeps the buffer correctly + scoped to the live transaction. Django clears run_on_commit on both commit + and rollback, so a rolled-back transaction's batch can never be found here. + """ + for _sids, func, _robust in connection.run_on_commit: + if getattr(func, _FLUSH_ALIAS_ATTR, None) == alias: + return getattr(func, _FLUSH_BATCH_ATTR) + return None + + +def _flush(batch, using): + """ + Dispatch a coalesced batch of dirty objects for (re)indexing. Enqueues a + background job when an RQ worker is available, otherwise runs the indexing + synchronously inline (preserving pre-deferral behavior on installs without a + running worker). + """ + if not batch: + return + + # Group object IDs by content type and operation. + cache_groups = {} + remove_groups = {} + for (object_type_id, pk), op in batch.items(): + groups = remove_groups if op == OP_REMOVE else cache_groups + groups.setdefault(object_type_id, []).append(pk) + + # Imported here (not at module load) to avoid an import cycle: backends.py + # connects the search signals at import time, and these pull in netbox.config. + from netbox.constants import RQ_QUEUE_DEFAULT + from netbox.search.tasks import SearchCacheJob, update_search_cache + from utilities.rqworker import any_workers_for_queue + + if any_workers_for_queue(RQ_QUEUE_DEFAULT): + SearchCacheJob.enqueue(using=using, cache_groups=cache_groups, remove_groups=remove_groups) + else: + # No worker available: index synchronously, bypassing the Job framework + # (a Job record would never be picked up without a worker). + update_search_cache(using=using, cache_groups=cache_groups, remove_groups=remove_groups) diff --git a/netbox/netbox/search/tasks.py b/netbox/netbox/search/tasks.py new file mode 100644 index 00000000000..c7685a887d7 --- /dev/null +++ b/netbox/netbox/search/tasks.py @@ -0,0 +1,101 @@ +import logging + +from django.db import DatabaseError, transaction + +from core.models import ObjectType +from netbox.jobs import JobRunner +from netbox.search.backends import search_backend + +# Internal search-indexing machinery; not part of the public/plugin API. + +logger = logging.getLogger(__name__) + +# Postgres SQLSTATEs indicating the target schema/table no longer exists. This +# happens when a branch is merged or deprovisioned (its schema dropped) between +# the time an update was enqueued and when this job runs. Such errors are +# expected and safe to skip; the index is rebuilt on the next reindex. Any other +# DatabaseError (e.g. a deadlock or lost connection) is transient and must +# propagate so the job fails visibly and can be retried, rather than silently +# dropping index updates. +_MISSING_SCHEMA_SQLSTATES = frozenset(( + '3F000', # invalid_schema_name + '42P01', # undefined_table +)) + + +def _is_missing_schema(exc): + """ + Return True if the given DatabaseError was caused by the target schema/table + no longer existing (vs. a transient error that should propagate). + """ + sqlstate = getattr(getattr(exc, '__cause__', None), 'sqlstate', None) + return sqlstate in _MISSING_SCHEMA_SQLSTATES + + +def update_search_cache(using=None, cache_groups=None, remove_groups=None, log=logger): + """ + Apply a coalesced batch of updates to the global search cache. + + The `using` alias captured when each object was saved/deleted is replayed + here so that cache entries are written to the originating database/schema + (e.g. a branch schema under netbox-branching), regardless of any routing + context that is no longer active by the time this runs. + + Args: + using: The database alias to read objects from and write cache entries to. + cache_groups: Mapping of {object_type_id: [pk, ...]} to (re)index. + remove_groups: Mapping of {object_type_id: [pk, ...]} to remove. + log: Logger to use (the job logger when run as a background job). + """ + for object_type_id, pks in (remove_groups or {}).items(): + try: + search_backend._remove_by_id(object_type_id, pks, using=using) + except DatabaseError as e: + if not _is_missing_schema(e): + raise + # The target schema no longer exists (e.g. a branch was merged or + # deprovisioned between enqueue and execution). Skip; the index will + # be rebuilt on the next reindex. + log.warning(f"Skipping search cache removal for object type {object_type_id}: {e}") + + for object_type_id, pks in (cache_groups or {}).items(): + try: + object_type = ObjectType.objects.get(pk=object_type_id) + except ObjectType.DoesNotExist: + continue + model = object_type.model_class() + if model is None: + continue + + try: + # Re-fetch live instances from the originating database. Reading on + # `using` is required: a branch object's PK may be absent (or refer to + # a different object) on the default connection. + queryset = model.objects.using(using).filter(pk__in=pks) + + # Clear any stale entries for these objects, then re-insert. Wrapping + # both in one transaction avoids leaving an object with no cache rows + # if execution fails between the delete and the insert. + with transaction.atomic(using=using): + search_backend._remove_by_id(object_type_id, pks, using=using) + search_backend.cache(queryset, remove_existing=False, using=using) + except DatabaseError as e: + if not _is_missing_schema(e): + raise + log.warning(f"Skipping search cache update for object type {object_type_id}: {e}") + + +class SearchCacheJob(JobRunner): + """ + Background job which applies deferred updates to the global search cache. + """ + class Meta: + name = 'Search cache update' + + def run(self, using=None, cache_groups=None, remove_groups=None, **kwargs): + update_search_cache( + using=using, + cache_groups=cache_groups, + remove_groups=remove_groups, + log=self.logger, + ) From 2954cec92dfc171c62a596721a907007e2669058 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Wed, 17 Jun 2026 16:40:37 +0200 Subject: [PATCH 02/11] Fixes #21326: Add tests for deferred search cache updates Cover the deferred indexing behavior introduced for #21326: * The existing save/delete tests now drive the post-commit flush via captureOnCommitCallbacks; with no RQ worker registered they exercise the synchronous fallback. * Coalescing: a bulk save schedules a single flush carrying every object, and a create-then-delete in one transaction collapses to a removal. * Rollback safety: a rolled-back transaction leaves no scheduled flush, and a commit on the same connection after a rollback still indexes (no stale buffer state suppresses it). * Worker path: when a worker is available the flush enqueues a SearchCacheJob, and the originating database alias is forwarded to the job. (Cross-schema routing itself is covered by the netbox-branching test suite; here we assert only that the alias is captured and forwarded.) * The deferred task tolerates an object deleted between enqueue and execution. * AutocommitCachingTestCase uses TransactionTestCase so saves run outside an atomic block, exercising the inline-indexing branch that TestCase masks. --- netbox/netbox/tests/test_search.py | 241 ++++++++++++++++++++++++++++- 1 file changed, 238 insertions(+), 3 deletions(-) diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index 1b6fe9eac9a..44fd27d59f4 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -1,9 +1,13 @@ +from unittest import mock + from django.contrib.contenttypes.models import ContentType -from django.test import TestCase +from django.db import connection, transaction +from django.test import TestCase, TransactionTestCase from dcim.models import Site from dcim.search import SiteIndex from extras.models import CachedValue +from netbox.search import deferred from netbox.search.backends import search_backend @@ -103,7 +107,12 @@ def test_cache_on_save(self): shipping_address='7915 Lilla Plains West Ladariusport TX 19429', comments='Lorem ipsum etcetera' ) - site.save() + + # Caching is deferred to a post-commit task. With no RQ worker running in + # the test environment it falls back to synchronous indexing; execute the + # on_commit callback to drive it within the test's transaction. + with self.captureOnCommitCallbacks(execute=True): + site.save() content_type = ContentType.objects.get_for_model(Site) self.assertEqual( @@ -116,7 +125,9 @@ def test_remove_on_delete(self): Test that any cached value for an object are automatically removed on delete(). """ site = Site.objects.first() - site.delete() + + with self.captureOnCommitCallbacks(execute=True): + site.delete() content_type = ContentType.objects.get_for_model(Site) self.assertFalse( @@ -151,3 +162,227 @@ def test_search(self): self.assertEqual(len(results), 1) results = search_backend.search('xxxxx') self.assertEqual(len(results), 0) + + +class DeferredCachingTestCase(TestCase): + """ + Tests for the deferred (post-commit) search caching machinery in + netbox.search.deferred. + + With no RQ worker registered, deferral falls back to synchronous indexing on + commit, so these tests assert on real CachedValue state and on the real + per-transaction buffer (connection.run_on_commit) rather than mocking the + queue. + """ + + @staticmethod + def _scheduled_flushes(): + # Django stores each registered callback as a (savepoint_ids, func, robust) + # tuple in connection.run_on_commit; return the search flush callbacks. + return [ + entry[1] for entry in connection.run_on_commit + if hasattr(entry[1], deferred._FLUSH_ALIAS_ATTR) + ] + + def _scheduled_flush_aliases(self): + return [getattr(func, deferred._FLUSH_ALIAS_ATTR) for func in self._scheduled_flushes()] + + def _pending_batch(self): + for func in self._scheduled_flushes(): + return getattr(func, deferred._FLUSH_BATCH_ATTR) + return None + + def test_bulk_save_schedules_single_flush(self): + """ + A batch of saves within one transaction coalesces into a single flush + carrying every object, rather than one scheduled flush per object. + """ + site_ct = ContentType.objects.get_for_model(Site) + with transaction.atomic(): + for i in range(20): + Site.objects.create(name=f'Site {i}', slug=f'site-{i}') + + # Exactly one flush is scheduled, and its batch holds all 20 objects. + self.assertEqual(self._scheduled_flush_aliases().count('default'), 1) + batch = self._pending_batch() + site_pks = [pk for (ot_id, pk) in batch if ot_id == site_ct.pk] + self.assertEqual(len(site_pks), 20) + + def test_save_then_delete_coalesces_to_removal(self): + """ + Creating then deleting an object within one transaction coalesces to a + single removal; the object ends up absent from the cache. + """ + site_ct = ContentType.objects.get_for_model(Site) + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create(name='Ephemeral', slug='ephemeral') + pk = site.pk + site.delete() + # The coalesced op for this object is a removal, not a cache. + batch = self._pending_batch() + self.assertEqual(batch[(site_ct.pk, pk)], deferred.OP_REMOVE) + + self.assertFalse( + CachedValue.objects.filter(object_type=site_ct, object_id=pk).exists() + ) + + def test_rollback_does_not_leak_buffer(self): + """ + An object dirtied inside a transaction that rolls back leaves no flush + scheduled and no stale buffer behind, so it is never indexed. + """ + content_type = ContentType.objects.get_for_model(Site) + + # A nested atomic block that rolls back: its on_commit callback (and the + # batch it captured) are discarded by Django, so nothing is scheduled on + # the surrounding transaction. + with self.assertRaises(RuntimeError): + with transaction.atomic(): + rolled_back = Site.objects.create(name='RolledBack', slug='rolled-back') + rolled_back_pk = rolled_back.pk + # A flush was scheduled within this savepoint... + self.assertEqual(self._scheduled_flush_aliases().count('default'), 1) + raise RuntimeError('abort') + + # ...and is gone once the savepoint rolled back. + self.assertEqual(self._scheduled_flush_aliases().count('default'), 0) + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=rolled_back_pk).exists() + ) + + def test_commit_after_rollback_still_indexes(self): + """ + After a rolled-back transaction, a subsequent committed save on the same + connection still indexes correctly (no sticky buffer state survives the + rollback to suppress it). + """ + content_type = ContentType.objects.get_for_model(Site) + + with self.assertRaises(RuntimeError): + with transaction.atomic(): + Site.objects.create(name='RolledBack2', slug='rolled-back-2') + raise RuntimeError('abort') + + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create( + name='Committed', + slug='committed', + facility='Echo', + description='Committed after rollback', + physical_address='1 Test Way', + shipping_address='1 Test Way', + comments='Lorem ipsum', + ) + + # The committed object is indexed (no sticky buffer state from the + # rolled-back transaction suppressed it). + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + + def test_non_searchable_model_schedules_no_flush(self): + """ + Saving a model without a registered search index schedules no deferred + flush. + """ + with transaction.atomic(): + # CachedValue itself has no search indexer. + CachedValue.objects.create( + object_type=ContentType.objects.get_for_model(Site), + object_id=1, + field='name', + type='str', + value='test', + weight=100, + ) + self.assertEqual(self._scheduled_flush_aliases(), []) + + def test_flush_enqueues_job_when_worker_available(self): + """ + When an RQ worker is available, the flush enqueues a SearchCacheJob + carrying the dirty objects (and the originating database alias) rather + than indexing inline. + """ + from django.db import DEFAULT_DB_ALIAS + + from netbox.search.tasks import SearchCacheJob + + site_ct = ContentType.objects.get_for_model(Site) + + # Patching the worker-availability probe is the established pattern for + # worker-gated behavior (cf. extras/tests/test_views.py, test_api.py). + with mock.patch('utilities.rqworker.any_workers_for_queue', return_value=True): + with mock.patch.object(SearchCacheJob, 'enqueue') as enqueue: + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create(name='Enqueued', slug='enqueued') + + enqueue.assert_called_once() + kwargs = enqueue.call_args.kwargs + self.assertEqual(kwargs['cache_groups'], {site_ct.pk: [site.pk]}) + self.assertEqual(kwargs['remove_groups'], {}) + # The database alias captured from the post_save signal is forwarded to + # the job verbatim. This is what lets a deferred write target the + # originating schema (e.g. a branch schema under netbox-branching) even + # though the worker has no active routing context; cross-schema routing + # itself is covered by the netbox-branching test suite. + self.assertEqual(kwargs['using'], DEFAULT_DB_ALIAS) + + def test_cache_update_skips_deleted_object(self): + """ + update_search_cache tolerates a pk that no longer exists (object deleted + between enqueue and execution): it must not error or create cache rows. + """ + from netbox.search.tasks import update_search_cache + + site = Site.objects.create(name='Vanished', slug='vanished') + site_ct = ContentType.objects.get_for_model(Site) + pk = site.pk + site.delete() + CachedValue.objects.filter(object_type=site_ct, object_id=pk).delete() + + # No exception, and no rows resurrected for the missing object. + update_search_cache(using=None, cache_groups={site_ct.pk: [pk]}, remove_groups={}) + self.assertFalse( + CachedValue.objects.filter(object_type=site_ct, object_id=pk).exists() + ) + + +class AutocommitCachingTestCase(TransactionTestCase): + """ + Tests for the synchronous (autocommit) indexing path. Uses TransactionTestCase + rather than TestCase so that a save outside an explicit transaction runs in + autocommit (connection.in_atomic_block is False), exercising mark_dirty's + inline-indexing branch rather than the deferred on_commit path. + """ + + def test_autocommit_save_indexes_synchronously(self): + # Saved outside any atomic() block: indexing happens inline, immediately, + # with no background worker and no on_commit deferral. + site = Site.objects.create( + name='Autocommit Site', + slug='autocommit-site', + facility='Foxtrot', + description='Indexed synchronously', + physical_address='2 Test Way', + shipping_address='2 Test Way', + comments='Lorem ipsum', + ) + + content_type = ContentType.objects.get_for_model(Site) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + + def test_autocommit_delete_removes_synchronously(self): + site = Site.objects.create(name='Autocommit Del', slug='autocommit-del') + content_type = ContentType.objects.get_for_model(Site) + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).exists() + ) + + site.delete() + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).exists() + ) From 603a7b9c4a864e81d24beb371a163c731f0014c8 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Wed, 17 Jun 2026 16:40:57 +0200 Subject: [PATCH 03/11] Fixes #21326: Note eventual consistency of search results Search index updates are now applied by a background task after the response is sent, so a newly created or changed object may not appear in search results for a brief period. Update the global search documentation accordingly (and note the synchronous fallback when no background worker is running). --- docs/features/search.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/features/search.md b/docs/features/search.md index 92422cad950..4b0842bd767 100644 --- a/docs/features/search.md +++ b/docs/features/search.md @@ -2,7 +2,7 @@ ## Global Search -NetBox includes a powerful global search engine, providing a single convenient interface to search across its complex data model. Relevant fields on each model are indexed according to their precedence, so that the most relevant results are returned first. When objects are created or modified, the search index is updated immediately, ensuring real-time accuracy. +NetBox includes a powerful global search engine, providing a single convenient interface to search across its complex data model. Relevant fields on each model are indexed according to their precedence, so that the most relevant results are returned first. When objects are created, modified, or deleted, the search index is updated by a background task shortly afterward. As a result, a newly created or changed object may not appear in search results for a brief period. (When no background worker is running, the index is updated immediately as part of the request.) When entering a search query, the user can choose a specific lookup type: exact match, partial match, etc. When a partial match is found, the matching portion of the applicable field value is included with each result so that the user can easily determine its relevance. From dcfa23a40f9590d7986630c8cf3b53ab19c9f344 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Wed, 17 Jun 2026 17:40:59 +0200 Subject: [PATCH 04/11] Fixes #21326: Address review feedback * Register the on_commit flush with robust=True so a failure while flushing (e.g. Redis being unreachable when checking for an available worker) is logged rather than propagated, preventing a committed save from returning a 500. * Guard removal_handler's get_for_model() lookup with the same ProgrammingError handling as caching_handler, for deletes that fire during migrations. * Add a test asserting Django's connection.run_on_commit entry shape (savepoint_ids, func, robust), so a future Django change fails with a clear pointer to _pending_batch() rather than an opaque unpack error. --- netbox/netbox/search/backends.py | 8 +++++++- netbox/netbox/search/deferred.py | 6 +++++- netbox/netbox/tests/test_search.py | 25 +++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/netbox/netbox/search/backends.py b/netbox/netbox/search/backends.py index 6aede1b36b5..863f5f39d5b 100644 --- a/netbox/netbox/search/backends.py +++ b/netbox/netbox/search/backends.py @@ -95,7 +95,13 @@ def removal_handler(self, sender, instance, using=None, **kwargs): except KeyError: return - object_type = ObjectType.objects.get_for_model(indexer.model) + try: + object_type = ObjectType.objects.get_for_model(indexer.model) + except ProgrammingError as e: + # The schema may be incomplete during migrations; skip caching. + logger.warning(f"Skipping search cache update due to schema error: {e}") + return + mark_dirty(object_type.pk, instance.pk, OP_REMOVE, using=using) def cache(self, instances, indexer=None, remove_existing=True): diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index dd37625465b..495e50c2bc1 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -60,7 +60,11 @@ def flush(batch=batch, alias=alias): setattr(flush, _FLUSH_ALIAS_ATTR, alias) setattr(flush, _FLUSH_BATCH_ATTR, batch) - transaction.on_commit(flush, using=alias) + # robust=True so a failure while flushing (e.g. Redis being unreachable + # when checking for an available worker) is logged rather than propagated + # to the caller. The originating write has already committed; a search + # cache update must never turn a successful save into a 500. + transaction.on_commit(flush, using=alias, robust=True) # Coalesce: a deletion supersedes any pending create/update for the object. key = (object_type_id, pk) diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index 44fd27d59f4..c6e455f3032 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -192,6 +192,31 @@ def _pending_batch(self): return getattr(func, deferred._FLUSH_BATCH_ATTR) return None + def test_run_on_commit_entry_shape(self): + """ + deferred._pending_batch() relies on Django storing each on_commit + callback as a (savepoint_ids, func, robust) tuple in + connection.run_on_commit. That structure is a Django internal, not a + documented API. Assert its shape explicitly so a change in a future + Django release fails here with a clear pointer, rather than surfacing as + an opaque unpack error inside the deferred-caching machinery. + """ + with transaction.atomic(): + transaction.on_commit(lambda: None) + entries = connection.run_on_commit + self.assertTrue(entries, "expected a registered on_commit callback") + entry = entries[-1] + self.assertEqual( + len(entry), 3, + "Django's connection.run_on_commit entry is no longer a 3-tuple; " + "netbox.search.deferred._pending_batch() unpacks (sids, func, robust) " + "and must be updated to match the new structure." + ) + sids, func, robust = entry + self.assertIsInstance(sids, set) + self.assertTrue(callable(func)) + self.assertIsInstance(robust, bool) + def test_bulk_save_schedules_single_flush(self): """ A batch of saves within one transaction coalesces into a single flush From 816710b3882f8f78d502f324ed57258da081528f Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Thu, 18 Jun 2026 10:34:45 +0200 Subject: [PATCH 05/11] Fixes #21326: Harden deferred search cache error handling Make _flush the single guarded dispatcher so a search cache update can never turn an already-committed save into an error, on either the transactional or the autocommit path: * Wrap the worker-availability probe and the job enqueue (both of which touch Redis, and a worker can die between them) in one try/except RedisError, falling back to inline indexing. The search index lives in PostgreSQL, so the inline fallback has no Redis dependency. * Guard the autocommit branch of mark_dirty() so a flush failure there is logged rather than propagated to the post_save/post_delete caller. The transactional path keeps transaction.on_commit(..., robust=True) as defense in depth. * Document that a broker failure mid-enqueue can leave a stranded PENDING Job row (the index is still written inline; the row ages out via housekeeping). * Note why the removal loop is not wrapped in a transaction (single DELETE per content type) and that a dropped update is recovered by reindex, not retry. Tests: add coverage for the inline fallback when the broker is unreachable and when enqueue itself fails after the probe succeeds (pinning the decision to guard the whole dispatch, not just the probe). --- netbox/netbox/search/deferred.py | 59 +++++++++++++++++++------- netbox/netbox/search/tasks.py | 6 +++ netbox/netbox/tests/test_search.py | 67 ++++++++++++++++++++++++++---- 3 files changed, 109 insertions(+), 23 deletions(-) diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index 495e50c2bc1..4555a44579c 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -1,6 +1,7 @@ import logging from django.db import DEFAULT_DB_ALIAS, connections, transaction +from redis.exceptions import RedisError # This module is internal plumbing for the search signal handlers; nothing here # is part of the public/plugin API, so no symbols are exported via __all__. @@ -43,8 +44,18 @@ def mark_dirty(object_type_id, pk, op, using=None): # nothing to defer past, and transaction.on_commit() in autocommit mode runs # its callback immediately at registration (before we could populate the # batch), so handle this case explicitly. + # + # On the transactional path below, transaction.on_commit(..., robust=True) + # ensures a flush failure can never propagate to the (already-committed) + # caller. The autocommit path has no such backstop, so guard it here: a + # broad catch is deliberate, because the originating write has committed and + # a search cache update must never turn a successful save into an error. The + # error is logged so a genuine indexing defect is still visible. if not connection.in_atomic_block: - _flush({(object_type_id, pk): op}, alias) + try: + _flush({(object_type_id, pk): op}, alias) + except Exception: + logger.exception("Search cache: error while indexing inline") return # Find the batch for a flush already scheduled for this alias in the current @@ -60,10 +71,9 @@ def flush(batch=batch, alias=alias): setattr(flush, _FLUSH_ALIAS_ATTR, alias) setattr(flush, _FLUSH_BATCH_ATTR, batch) - # robust=True so a failure while flushing (e.g. Redis being unreachable - # when checking for an available worker) is logged rather than propagated - # to the caller. The originating write has already committed; a search - # cache update must never turn a successful save into a 500. + # _flush already contains the Redis fault around dispatch; robust=True is + # defense in depth so that, regardless, a flush failure is logged rather + # than propagated to the (already-committed) caller. transaction.on_commit(flush, using=alias, robust=True) # Coalesce: a deletion supersedes any pending create/update for the object. @@ -92,15 +102,26 @@ def _pending_batch(connection, alias): def _flush(batch, using): """ - Dispatch a coalesced batch of dirty objects for (re)indexing. Enqueues a - background job when an RQ worker is available, otherwise runs the indexing - synchronously inline (preserving pre-deferral behavior on installs without a - running worker). + Dispatch a coalesced batch of dirty objects for (re)indexing. + + `_flush` is the single guarded entry point for deferred indexing, reached + either directly (autocommit) or from a transaction.on_commit callback. By the + time it runs the originating write has already committed, so it must never + propagate an error back to the caller and turn a successful save into a 500. + + The inline fallback is safe even during a broker outage: the search index + lives in PostgreSQL (the extras_cachedvalue table), so a Redis outage only + prevents backgrounding, not indexing itself. + + If the broker fails mid-enqueue (after the probe succeeds), Job.enqueue() has + already saved a Job row before the Redis dispatch raised, so the fallback can + leave behind a PENDING Job that no worker will run. The index is still + correct (written inline); the stranded row is cosmetic and ages out via the + housekeeping job. """ if not batch: return - # Group object IDs by content type and operation. cache_groups = {} remove_groups = {} for (object_type_id, pk), op in batch.items(): @@ -113,9 +134,15 @@ def _flush(batch, using): from netbox.search.tasks import SearchCacheJob, update_search_cache from utilities.rqworker import any_workers_for_queue - if any_workers_for_queue(RQ_QUEUE_DEFAULT): - SearchCacheJob.enqueue(using=using, cache_groups=cache_groups, remove_groups=remove_groups) - else: - # No worker available: index synchronously, bypassing the Job framework - # (a Job record would never be picked up without a worker). - update_search_cache(using=using, cache_groups=cache_groups, remove_groups=remove_groups) + try: + # Both the worker-availability check and the job enqueue talk to Redis, + # and a worker can die between the two. Treat any Redis failure across the + # whole dispatch as "no worker available" and fall back to inline + # indexing (a PostgreSQL write that does not depend on Redis). + if any_workers_for_queue(RQ_QUEUE_DEFAULT): + SearchCacheJob.enqueue(using=using, cache_groups=cache_groups, remove_groups=remove_groups) + return + except RedisError: + logger.warning("Search cache: broker unavailable; indexing inline", exc_info=True) + + update_search_cache(using=using, cache_groups=cache_groups, remove_groups=remove_groups) diff --git a/netbox/netbox/search/tasks.py b/netbox/netbox/search/tasks.py index c7685a887d7..0590767fae8 100644 --- a/netbox/netbox/search/tasks.py +++ b/netbox/netbox/search/tasks.py @@ -47,6 +47,12 @@ def update_search_cache(using=None, cache_groups=None, remove_groups=None, log=l remove_groups: Mapping of {object_type_id: [pk, ...]} to remove. log: Logger to use (the job logger when run as a background job). """ + # Removals are a single DELETE per content type, so (unlike the cache loop + # below) there is no multi-step state to wrap in a transaction. A transient + # error here propagates and errors the job; the remaining work is dropped + # rather than retried (NetBox does not retry these jobs by default) and is + # recovered by the next reindex, consistent with how derived state is rebuilt + # elsewhere. for object_type_id, pks in (remove_groups or {}).items(): try: search_backend._remove_by_id(object_type_id, pks, using=using) diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index c6e455f3032..5879177a940 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -1,14 +1,16 @@ from unittest import mock from django.contrib.contenttypes.models import ContentType -from django.db import connection, transaction +from django.db import DEFAULT_DB_ALIAS, connection, transaction from django.test import TestCase, TransactionTestCase +from redis.exceptions import ConnectionError as RedisConnectionError from dcim.models import Site from dcim.search import SiteIndex from extras.models import CachedValue from netbox.search import deferred from netbox.search.backends import search_backend +from netbox.search.tasks import SearchCacheJob, update_search_cache class SearchBackendTestCase(TestCase): @@ -329,10 +331,6 @@ def test_flush_enqueues_job_when_worker_available(self): carrying the dirty objects (and the originating database alias) rather than indexing inline. """ - from django.db import DEFAULT_DB_ALIAS - - from netbox.search.tasks import SearchCacheJob - site_ct = ContentType.objects.get_for_model(Site) # Patching the worker-availability probe is the established pattern for @@ -353,13 +351,68 @@ def test_flush_enqueues_job_when_worker_available(self): # itself is covered by the netbox-branching test suite. self.assertEqual(kwargs['using'], DEFAULT_DB_ALIAS) + def test_flush_falls_back_inline_when_broker_unreachable(self): + """ + If the broker is unreachable (the worker-availability probe raises a + RedisError), the flush must not propagate the error; it falls back to + inline indexing, which is a PostgreSQL write with no Redis dependency. + """ + content_type = ContentType.objects.get_for_model(Site) + + with mock.patch( + 'utilities.rqworker.any_workers_for_queue', + side_effect=RedisConnectionError("broker down"), + ): + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create( + name='Broker Down', + slug='broker-down', + facility='Golf', + description='Indexed inline despite broker outage', + physical_address='3 Test Way', + shipping_address='3 Test Way', + comments='Lorem ipsum', + ) + + # The object was indexed inline (no exception propagated, rows written). + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + + def test_flush_falls_back_inline_when_enqueue_fails(self): + """ + A worker can die (or the broker can drop) between the availability probe + and the enqueue. The flush guards the whole dispatch, not just the probe: + if enqueue raises a RedisError it still falls back to inline indexing. + """ + content_type = ContentType.objects.get_for_model(Site) + + with mock.patch('utilities.rqworker.any_workers_for_queue', return_value=True): + with mock.patch.object( + SearchCacheJob, 'enqueue', side_effect=RedisConnectionError("broker dropped") + ): + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create( + name='Enqueue Failed', + slug='enqueue-failed', + facility='Hotel', + description='Indexed inline after enqueue failure', + physical_address='4 Test Way', + shipping_address='4 Test Way', + comments='Lorem ipsum', + ) + + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + def test_cache_update_skips_deleted_object(self): """ update_search_cache tolerates a pk that no longer exists (object deleted between enqueue and execution): it must not error or create cache rows. """ - from netbox.search.tasks import update_search_cache - site = Site.objects.create(name='Vanished', slug='vanished') site_ct = ContentType.objects.get_for_model(Site) pk = site.pk From 249443642162c3d0fe385239519d7befa7cf9e95 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Thu, 18 Jun 2026 12:39:39 +0200 Subject: [PATCH 06/11] Fixes #21326: Reduce nested imports in deferred search caching Move the search deferral imports to module top level wherever possible, leaving a single function-local import for the one genuinely cycle-bound case: * backends.py imports OP_CACHE/OP_REMOVE/mark_dirty from .deferred at module level (the handlers no longer each import them locally). * deferred.py imports RQ_QUEUE_DEFAULT and any_workers_for_queue at module level. * The only remaining function-local import is _flush's `from netbox.search.tasks import SearchCacheJob, update_search_cache`, which is required because tasks.py imports the search_backend singleton defined at the bottom of backends.py (which itself imports this module). A cleaner fix (extracting the singleton) is tracked as a follow-up. Because any_workers_for_queue is now bound in deferred's namespace, the tests patch netbox.search.deferred.any_workers_for_queue (where it is looked up) rather than utilities.rqworker.any_workers_for_queue. --- netbox/netbox/search/backends.py | 5 +---- netbox/netbox/search/deferred.py | 12 ++++++++---- netbox/netbox/tests/test_search.py | 6 +++--- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/netbox/netbox/search/backends.py b/netbox/netbox/search/backends.py index 863f5f39d5b..6aaa842cce2 100644 --- a/netbox/netbox/search/backends.py +++ b/netbox/netbox/search/backends.py @@ -21,6 +21,7 @@ from utilities.string import title from . import FieldTypes, LookupTypes, get_indexer +from .deferred import OP_CACHE, OP_REMOVE, mark_dirty DEFAULT_LOOKUP_TYPE = LookupTypes.PARTIAL MAX_RESULTS = 1000 @@ -66,8 +67,6 @@ def caching_handler(self, sender, instance, created, using=None, **kwargs): """ Receiver for the post_save signal, responsible for caching object creation/changes. """ - from netbox.search.deferred import OP_CACHE, mark_dirty - # Skip non-cacheable objects without scheduling any deferred work. try: indexer = get_indexer(instance) @@ -87,8 +86,6 @@ def removal_handler(self, sender, instance, using=None, **kwargs): """ Receiver for the post_delete signal, responsible for caching object deletion. """ - from netbox.search.deferred import OP_REMOVE, mark_dirty - # Skip non-cacheable objects without scheduling any deferred work. try: indexer = get_indexer(instance) diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index 4555a44579c..e982d88344f 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -3,6 +3,9 @@ from django.db import DEFAULT_DB_ALIAS, connections, transaction from redis.exceptions import RedisError +from netbox.constants import RQ_QUEUE_DEFAULT +from utilities.rqworker import any_workers_for_queue + # This module is internal plumbing for the search signal handlers; nothing here # is part of the public/plugin API, so no symbols are exported via __all__. @@ -128,11 +131,12 @@ def _flush(batch, using): groups = remove_groups if op == OP_REMOVE else cache_groups groups.setdefault(object_type_id, []).append(pk) - # Imported here (not at module load) to avoid an import cycle: backends.py - # connects the search signals at import time, and these pull in netbox.config. - from netbox.constants import RQ_QUEUE_DEFAULT + # Imported here, not at module load, to avoid an import cycle: backends.py + # imports this module at module level (for the signal handlers), and + # netbox.search.tasks imports search_backend from backends.py, which is + # defined at the bottom of that module. A proper fix is tracked as a + # follow-up (see the search signal-wiring housekeeping issue). from netbox.search.tasks import SearchCacheJob, update_search_cache - from utilities.rqworker import any_workers_for_queue try: # Both the worker-availability check and the job enqueue talk to Redis, diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index 5879177a940..e299e251466 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -335,7 +335,7 @@ def test_flush_enqueues_job_when_worker_available(self): # Patching the worker-availability probe is the established pattern for # worker-gated behavior (cf. extras/tests/test_views.py, test_api.py). - with mock.patch('utilities.rqworker.any_workers_for_queue', return_value=True): + with mock.patch('netbox.search.deferred.any_workers_for_queue', return_value=True): with mock.patch.object(SearchCacheJob, 'enqueue') as enqueue: with self.captureOnCommitCallbacks(execute=True): site = Site.objects.create(name='Enqueued', slug='enqueued') @@ -360,7 +360,7 @@ def test_flush_falls_back_inline_when_broker_unreachable(self): content_type = ContentType.objects.get_for_model(Site) with mock.patch( - 'utilities.rqworker.any_workers_for_queue', + 'netbox.search.deferred.any_workers_for_queue', side_effect=RedisConnectionError("broker down"), ): with self.captureOnCommitCallbacks(execute=True): @@ -388,7 +388,7 @@ def test_flush_falls_back_inline_when_enqueue_fails(self): """ content_type = ContentType.objects.get_for_model(Site) - with mock.patch('utilities.rqworker.any_workers_for_queue', return_value=True): + with mock.patch('netbox.search.deferred.any_workers_for_queue', return_value=True): with mock.patch.object( SearchCacheJob, 'enqueue', side_effect=RedisConnectionError("broker dropped") ): From 81690ab4893a7e89aeab310868c30a38eed75374 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Thu, 18 Jun 2026 13:23:02 +0200 Subject: [PATCH 07/11] Fixes #21326: Rename mark_dirty and clarify the robust=True comment * Rename mark_dirty() to mark_for_deferred_indexing(), which says what it does (schedule deferred search indexing) rather than borrowing vague cache- invalidation vocabulary. Update the docstring's "dirty" wording to match. * Expand the comment on the on_commit robust=True flag to record why it is required rather than optional: Django runs on_commit callbacks synchronously as the atomic block exits (after COMMIT), so an exception escaping the flush would surface as a 500 on an already-committed write. _flush handles the recoverable Redis fault itself; robust=True keeps any other failure from becoming that post-commit 500. --- netbox/netbox/search/backends.py | 6 +++--- netbox/netbox/search/deferred.py | 14 +++++++++----- netbox/netbox/tests/test_search.py | 2 +- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/netbox/netbox/search/backends.py b/netbox/netbox/search/backends.py index 6aaa842cce2..8f97432224b 100644 --- a/netbox/netbox/search/backends.py +++ b/netbox/netbox/search/backends.py @@ -21,7 +21,7 @@ from utilities.string import title from . import FieldTypes, LookupTypes, get_indexer -from .deferred import OP_CACHE, OP_REMOVE, mark_dirty +from .deferred import OP_CACHE, OP_REMOVE, mark_for_deferred_indexing DEFAULT_LOOKUP_TYPE = LookupTypes.PARTIAL MAX_RESULTS = 1000 @@ -80,7 +80,7 @@ def caching_handler(self, sender, instance, created, using=None, **kwargs): logger.warning(f"Skipping search cache update due to schema error: {e}") return - mark_dirty(object_type.pk, instance.pk, OP_CACHE, using=using) + mark_for_deferred_indexing(object_type.pk, instance.pk, OP_CACHE, using=using) def removal_handler(self, sender, instance, using=None, **kwargs): """ @@ -99,7 +99,7 @@ def removal_handler(self, sender, instance, using=None, **kwargs): logger.warning(f"Skipping search cache update due to schema error: {e}") return - mark_dirty(object_type.pk, instance.pk, OP_REMOVE, using=using) + mark_for_deferred_indexing(object_type.pk, instance.pk, OP_REMOVE, using=using) def cache(self, instances, indexer=None, remove_existing=True): """ diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index e982d88344f..44367b5e731 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -21,9 +21,9 @@ _FLUSH_BATCH_ATTR = '_netbox_search_flush_batch' -def mark_dirty(object_type_id, pk, op, using=None): +def mark_for_deferred_indexing(object_type_id, pk, op, using=None): """ - Record a searchable object as dirty for deferred (re)indexing. + Schedule a searchable object for deferred (re)indexing. The work is coalesced per database connection and per transaction: repeated operations on the same object collapse to a single entry (a deletion always @@ -74,9 +74,13 @@ def flush(batch=batch, alias=alias): setattr(flush, _FLUSH_ALIAS_ATTR, alias) setattr(flush, _FLUSH_BATCH_ATTR, batch) - # _flush already contains the Redis fault around dispatch; robust=True is - # defense in depth so that, regardless, a flush failure is logged rather - # than propagated to the (already-committed) caller. + # robust=True is required, not just belt-and-suspenders: Django runs + # on_commit callbacks synchronously as the atomic block exits (after the + # COMMIT), so an exception escaping the callback would propagate out of + # the view's transaction and become a 500 on an already-committed write. + # _flush handles the recoverable Redis fault itself; robust=True is the + # only thing that keeps any *other* failure here (logged by Django at + # ERROR) from surfacing as that post-commit 500. transaction.on_commit(flush, using=alias, robust=True) # Coalesce: a deletion supersedes any pending create/update for the object. diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index e299e251466..d5633126326 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -430,7 +430,7 @@ class AutocommitCachingTestCase(TransactionTestCase): """ Tests for the synchronous (autocommit) indexing path. Uses TransactionTestCase rather than TestCase so that a save outside an explicit transaction runs in - autocommit (connection.in_atomic_block is False), exercising mark_dirty's + autocommit (connection.in_atomic_block is False), exercising mark_for_deferred_indexing's inline-indexing branch rather than the deferred on_commit path. """ From bac3d43951ed3e0d1272890de1f80e5e65d9113f Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Thu, 18 Jun 2026 14:45:26 +0200 Subject: [PATCH 08/11] Fixes #21326: Reference issue #22485 in the deferred-import comment --- netbox/netbox/search/deferred.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index 44367b5e731..ccf8cf1fe08 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -138,8 +138,7 @@ def _flush(batch, using): # Imported here, not at module load, to avoid an import cycle: backends.py # imports this module at module level (for the signal handlers), and # netbox.search.tasks imports search_backend from backends.py, which is - # defined at the bottom of that module. A proper fix is tracked as a - # follow-up (see the search signal-wiring housekeeping issue). + # defined at the bottom of that module. A proper fix is tracked in #22485. from netbox.search.tasks import SearchCacheJob, update_search_cache try: From 19ba073c9402be9a3fd090d38c4af190a8d5dc4c Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Thu, 18 Jun 2026 17:05:51 +0200 Subject: [PATCH 09/11] Fixes #21326: Scope deferred search-cache flush to its savepoint The deferred search-cache buffer used a single on_commit callback per (database alias, transaction), found by scanning run_on_commit for the alias tag and mutating its batch. An op buffered inside a nested atomic() that later rolled back was appended to the outer callback's batch, which Django's savepoint_rollback never inspects (it prunes callbacks by the savepoint-id set captured at registration, not by their closure). The rolled-back op survived and flushed on the outer commit, for example removing from the cache an object whose delete was rolled back. Scope each flush callback to the savepoint stack active when it is registered (tuple(connection.savepoint_ids)); _pending_batch now matches on alias and scope. Each savepoint scope gets its own callback and batch, so a rolled-back savepoint's callback is pruned by Django automatically. Coalescing still holds within a scope; cross-scope ops produce one job per scope, and correctness is preserved because the worker re-fetches live rows at flush time. Add regression coverage for the nested-rollback leak plus committed nested scopes, cross-scope save-then-delete, sibling savepoints, and deep nesting with a middle rollback. --- netbox/netbox/search/deferred.py | 32 +++- netbox/netbox/tests/test_search.py | 256 +++++++++++++++++++++++++++-- 2 files changed, 263 insertions(+), 25 deletions(-) diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index ccf8cf1fe08..d3a325aa8a8 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -19,6 +19,9 @@ # among those registered on a connection and reach the batch they will flush. _FLUSH_ALIAS_ATTR = '_netbox_search_flush_alias' _FLUSH_BATCH_ATTR = '_netbox_search_flush_batch' +# The savepoint stack active when a flush callback was registered; see +# _pending_batch() for why buffering is scoped to it. +_FLUSH_SCOPE_ATTR = '_netbox_search_flush_scope' def mark_for_deferred_indexing(object_type_id, pk, op, using=None): @@ -61,11 +64,12 @@ def mark_for_deferred_indexing(object_type_id, pk, op, using=None): logger.exception("Search cache: error while indexing inline") return - # Find the batch for a flush already scheduled for this alias in the current - # transaction. Django clears a connection's run_on_commit list on both commit - # and rollback, so any callback we find there belongs to the current - # (uncommitted) transaction -- no stale state can survive a rollback. - batch = _pending_batch(connection, alias) + # Scope buffering to the current savepoint stack, not just the alias (see + # _pending_batch). May legitimately contain None entries for nested + # atomic(savepoint=False) blocks; matching is by equality, so that is fine. + scope = tuple(connection.savepoint_ids) + + batch = _pending_batch(connection, alias, scope) if batch is None: batch = {} @@ -74,6 +78,7 @@ def flush(batch=batch, alias=alias): setattr(flush, _FLUSH_ALIAS_ATTR, alias) setattr(flush, _FLUSH_BATCH_ATTR, batch) + setattr(flush, _FLUSH_SCOPE_ATTR, scope) # robust=True is required, not just belt-and-suspenders: Django runs # on_commit callbacks synchronously as the atomic block exits (after the # COMMIT), so an exception escaping the callback would propagate out of @@ -89,10 +94,11 @@ def flush(batch=batch, alias=alias): batch[key] = op -def _pending_batch(connection, alias): +def _pending_batch(connection, alias, scope): """ Return the batch dict of a flush callback already scheduled for the given - alias on this connection's current transaction, or None if there is none. + alias and savepoint scope on this connection's current transaction, or None + if there is none. This scans `connection.run_on_commit` on each call rather than caching the lookup elsewhere. That is intentional: the scan is bounded (run_on_commit @@ -100,9 +106,19 @@ def _pending_batch(connection, alias): object), and reading it fresh each time is what keeps the buffer correctly scoped to the live transaction. Django clears run_on_commit on both commit and rollback, so a rolled-back transaction's batch can never be found here. + + Matching on `scope` (the savepoint stack active when the callback was + registered) as well as `alias` keeps each savepoint scope on its own + callback. Django prunes a callback when a savepoint in its registration + snapshot rolls back, so an op buffered inside a nested savepoint is dropped + with its callback if that savepoint rolls back -- it can never be found here + and reused by an outer scope. """ for _sids, func, _robust in connection.run_on_commit: - if getattr(func, _FLUSH_ALIAS_ATTR, None) == alias: + if ( + getattr(func, _FLUSH_ALIAS_ATTR, None) == alias + and getattr(func, _FLUSH_SCOPE_ATTR, None) == scope + ): return getattr(func, _FLUSH_BATCH_ATTR) return None diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index d5633126326..006de276bb1 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -13,6 +13,16 @@ from netbox.search.tasks import SearchCacheJob, update_search_cache +def scheduled_search_flushes(): + # The deferred flush callbacks scheduled on the current connection, + # identified by the alias tag set in netbox.search.deferred. Django stores + # each registered callback as a (savepoint_ids, func, robust) tuple. + return [ + entry[1] for entry in connection.run_on_commit + if hasattr(entry[1], deferred._FLUSH_ALIAS_ATTR) + ] + + class SearchBackendTestCase(TestCase): @classmethod @@ -177,20 +187,11 @@ class DeferredCachingTestCase(TestCase): queue. """ - @staticmethod - def _scheduled_flushes(): - # Django stores each registered callback as a (savepoint_ids, func, robust) - # tuple in connection.run_on_commit; return the search flush callbacks. - return [ - entry[1] for entry in connection.run_on_commit - if hasattr(entry[1], deferred._FLUSH_ALIAS_ATTR) - ] - def _scheduled_flush_aliases(self): - return [getattr(func, deferred._FLUSH_ALIAS_ATTR) for func in self._scheduled_flushes()] + return [getattr(func, deferred._FLUSH_ALIAS_ATTR) for func in scheduled_search_flushes()] def _pending_batch(self): - for func in self._scheduled_flushes(): + for func in scheduled_search_flushes(): return getattr(func, deferred._FLUSH_BATCH_ATTR) return None @@ -219,6 +220,27 @@ def test_run_on_commit_entry_shape(self): self.assertTrue(callable(func)) self.assertIsInstance(robust, bool) + def test_savepoint_ids_shape(self): + """ + deferred.mark_for_deferred_indexing() reads connection.savepoint_ids to + scope each flush callback to its savepoint stack. That is a Django + internal, not a documented API. Assert it is a list inside a nested + atomic() so a future change fails here rather than silently re-leaking + nested-savepoint ops into an outer batch. + """ + with transaction.atomic(): + with transaction.atomic(): + self.assertIsInstance( + connection.savepoint_ids, list, + "Django's connection.savepoint_ids is no longer a list; " + "netbox.search.deferred.mark_for_deferred_indexing() keys its " + "flush callbacks on tuple(savepoint_ids) and must be updated." + ) + self.assertTrue( + connection.savepoint_ids, + "expected at least one savepoint id inside a nested atomic()" + ) + def test_bulk_save_schedules_single_flush(self): """ A batch of saves within one transaction coalesces into a single flush @@ -235,19 +257,31 @@ def test_bulk_save_schedules_single_flush(self): site_pks = [pk for (ot_id, pk) in batch if ot_id == site_ct.pk] self.assertEqual(len(site_pks), 20) - def test_save_then_delete_coalesces_to_removal(self): + def test_save_then_delete_in_same_scope_coalesces_to_removal(self): """ - Creating then deleting an object within one transaction coalesces to a - single removal; the object ends up absent from the cache. + A create and delete buffered in the same savepoint scope coalesce to a + single removal. Model.delete() runs in its own atomic(savepoint=False) + block, which pushes a scope marker, so to exercise coalescing within one + scope the operations are buffered directly rather than via a real + delete(). + """ + site_ct = ContentType.objects.get_for_model(Site) + with transaction.atomic(): + deferred.mark_for_deferred_indexing(site_ct.pk, 1, deferred.OP_CACHE) + deferred.mark_for_deferred_indexing(site_ct.pk, 1, deferred.OP_REMOVE) + batch = self._pending_batch() + self.assertEqual(batch[(site_ct.pk, 1)], deferred.OP_REMOVE) + + def test_save_then_delete_ends_absent_from_cache(self): + """ + Creating then deleting an object within one transaction leaves it absent + from the cache, regardless of how the create and delete ops are scoped. """ site_ct = ContentType.objects.get_for_model(Site) with self.captureOnCommitCallbacks(execute=True): site = Site.objects.create(name='Ephemeral', slug='ephemeral') pk = site.pk site.delete() - # The coalesced op for this object is a removal, not a cache. - batch = self._pending_batch() - self.assertEqual(batch[(site_ct.pk, pk)], deferred.OP_REMOVE) self.assertFalse( CachedValue.objects.filter(object_type=site_ct, object_id=pk).exists() @@ -464,3 +498,191 @@ def test_autocommit_delete_removes_synchronously(self): self.assertFalse( CachedValue.objects.filter(object_type=content_type, object_id=site.pk).exists() ) + + def test_inner_savepoint_rollback_does_not_leak_into_outer_batch(self): + """ + Regression test for the nested-atomic leak: an operation performed inside + an inner atomic() block (savepoint) that rolls back must not affect the + search cache when the outer transaction commits. + + TransactionTestCase is required so the outer atomic() is a real + transaction whose on_commit callbacks actually fire on commit (under + TestCase the test body is already inside a transaction, so nothing + commits and the leak is invisible). + """ + content_type = ContentType.objects.get_for_model(Site) + + # Object that exists before the transaction and stays cached: the inner + # savepoint will (transiently) delete it, then roll back. Capture the pk + # up front; Model.delete() nulls instance.pk in memory. + survivor = Site.objects.create(name='Survivor', slug='survivor') + survivor_pk = survivor.pk + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=survivor_pk).exists() + ) + + with transaction.atomic(): + # Outer write schedules a search-cache flush for this connection. + outer = self._create_indexed_site('Outer', 'outer') + + # Inner savepoint deletes the survivor, then rolls back. At the DB + # level the survivor still exists after the rollback. + try: + with transaction.atomic(): + survivor.delete() + self.assertFalse(Site.objects.filter(pk=survivor_pk).exists()) + raise RuntimeError('abort inner savepoint') + except RuntimeError: + pass + + # The savepoint rolled back, so the survivor row is still present. + self.assertTrue(Site.objects.filter(pk=survivor_pk).exists()) + + # After the outer transaction commits, the deferred flush runs. The + # rolled-back inner delete must NOT have removed the survivor from the + # cache, and the committed outer object must be indexed. + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=survivor_pk).exists(), + "rolled-back inner-savepoint delete leaked into the outer flush and " + "removed the survivor from the search cache", + ) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=outer.pk).count(), + len(SiteIndex.fields), + ) + + @staticmethod + def _create_indexed_site(name, slug): + # SiteIndex.to_cache() emits a row only for non-empty fields, so populate + # every indexed field to get exactly len(SiteIndex.fields) cache rows. + return Site.objects.create( + name=name, + slug=slug, + facility='Facility', + description=f'{name} description', + physical_address='1 Test Way', + shipping_address='1 Test Way', + comments='Lorem ipsum', + ) + + def test_committed_savepoint_indexes_in_its_own_scope(self): + """ + A nested atomic() that commits is scoped to its own flush callback (one + per savepoint scope), and both the outer and the inner-committed object + are indexed. The two-callback count locks in the per-scope behavior so a + future change that re-merges scopes cannot silently reintroduce the + nested-rollback leak. + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + outer = self._create_indexed_site('OuterC', 'outer-c') + + with transaction.atomic(): + inner = self._create_indexed_site('InnerC', 'inner-c') + + # Two distinct savepoint scopes dirtied objects, so two flush + # callbacks are scheduled (one per scope), not one coalesced batch. + self.assertEqual(len(scheduled_search_flushes()), 2) + + # Both objects are indexed after the outer transaction commits. + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=outer.pk).count(), + len(SiteIndex.fields), + ) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=inner.pk).count(), + len(SiteIndex.fields), + ) + + def test_cross_scope_save_then_delete_nets_to_removed(self): + """ + Save an object at the transaction top level, then delete it inside a + committed nested savepoint. The two ops land in separate scopes (no + cross-scope coalescing), so two flush callbacks run in registration + (FIFO) order: the cache pass then the removal pass. The re-fetch at flush + time is the source of truth, so the end state is correctly "removed". + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + site = Site.objects.create(name='CrossScope', slug='cross-scope') + site_pk = site.pk + + with transaction.atomic(): + site.delete() + + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists(), + "object deleted in a committed nested savepoint should not remain cached", + ) + self.assertFalse(Site.objects.filter(pk=site_pk).exists()) + + def test_sibling_savepoints_do_not_cross_contaminate(self): + """ + Two sequential nested savepoints under one outer transaction: the first + rolls back, the second commits, each dirtying a different object. Only + the committed sibling's object is indexed; the rolled-back sibling's op + is discarded with its own callback. + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + try: + with transaction.atomic(): + rolled = Site.objects.create(name='SiblingRollback', slug='sibling-rb') + rolled_pk = rolled.pk + raise RuntimeError('abort first sibling') + except RuntimeError: + pass + + with transaction.atomic(): + committed = self._create_indexed_site('SiblingCommit', 'sibling-commit') + + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=rolled_pk).exists(), + "rolled-back sibling savepoint's object should not be indexed", + ) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=committed.pk).count(), + len(SiteIndex.fields), + ) + + def test_deep_nesting_middle_rollback(self): + """ + Three savepoint levels, each dirtying a distinct object; the middle level + rolls back. The top object and the deepest object nested under the middle + are both discarded along with the middle savepoint (its rollback prunes + every callback whose registration snapshot contains the middle sid), so + only the top-level object survives to be indexed. + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + top = self._create_indexed_site('DeepTop', 'deep-top') + + try: + with transaction.atomic(): # middle savepoint + middle = Site.objects.create(name='DeepMiddle', slug='deep-middle') + middle_pk = middle.pk + + with transaction.atomic(): # deepest savepoint + deepest = Site.objects.create(name='DeepDeepest', slug='deep-deepest') + deepest_pk = deepest.pk + + raise RuntimeError('abort middle') + except RuntimeError: + pass + + # Only the top-level object survived; the middle savepoint's rollback + # discarded both the middle and the deepest object's flush callbacks. + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=top.pk).count(), + len(SiteIndex.fields), + ) + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=middle_pk).exists() + ) + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=deepest_pk).exists() + ) From 4b87d7383c76eb9b2265c7cab18f478a38a6a7d5 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Mon, 22 Jun 2026 12:03:41 +0200 Subject: [PATCH 10/11] Fixes #21326: Move search cache job to jobs.py and onto the backend Relocate SearchCacheJob into netbox/search/jobs.py, matching the per-app job convention (core/jobs.py, extras/jobs.py); tasks.py was the lone violation. The job is now thin: it delegates to the backend. Move the deferred-update logic (the remove loop, the re-fetch + atomic remove-then-cache loop, and the missing-schema skip) from the free function update_search_cache onto CachedValueSearchBackend as apply_deferred_updates(), with _is_missing_schema and the Postgres SQLSTATE set as private members of that class. The CachedValue specifics (_remove_by_id, cache(..., using=...)) are now reached only from within the backend that owns them, rather than from the job and the inline fallback reaching into backend internals. SearchBackend gains an abstract apply_deferred_updates() (raising NotImplementedError like its siblings) so the deferred contract is explicit for custom SEARCH_BACKEND classes. deferred._flush now dispatches via SearchCacheJob (worker path) or search_backend.apply_deferred_updates() (inline fallback). This renames and relocates the backends/deferred/jobs import cycle but does not break it; the single function-local import in _flush remains, and the clean fix (extracting the search_backend singleton) is still tracked in #22485. --- netbox/netbox/search/backends.py | 77 ++++++++++++++++++++- netbox/netbox/search/deferred.py | 10 +-- netbox/netbox/search/jobs.py | 24 +++++++ netbox/netbox/search/tasks.py | 107 ----------------------------- netbox/netbox/tests/test_search.py | 9 +-- 5 files changed, 110 insertions(+), 117 deletions(-) create mode 100644 netbox/netbox/search/jobs.py delete mode 100644 netbox/netbox/search/tasks.py diff --git a/netbox/netbox/search/backends.py b/netbox/netbox/search/backends.py index 8f97432224b..f32c88a1344 100644 --- a/netbox/netbox/search/backends.py +++ b/netbox/netbox/search/backends.py @@ -4,7 +4,7 @@ import netaddr from django.conf import settings from django.core.exceptions import ImproperlyConfigured -from django.db import ProgrammingError +from django.db import DatabaseError, ProgrammingError, transaction from django.db.models import F, Q, Window, prefetch_related_objects from django.db.models.fields.related import ForeignKey from django.db.models.functions import window @@ -31,7 +31,8 @@ class SearchBackend: """ - Base class for search backends. Subclasses must extend the `cache()`, `remove()`, and `clear()` methods below. + Base class for search backends. Subclasses must extend the `cache()`, `remove()`, `clear()`, and + `apply_deferred_updates()` methods below. """ _object_types = None @@ -119,6 +120,15 @@ def clear(self, object_types=None): """ raise NotImplementedError + def apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=None): + """ + Apply a coalesced batch of deferred cache updates (called by the background search cache job + and by the inline fallback). `cache_groups` and `remove_groups` are {object_type_id: [pk, ...]} + maps; `using` is the database alias the originating writes used, replayed here so entries land + in the originating schema. + """ + raise NotImplementedError + def count(self, object_types=None): """ Return a count of all cache entries (optionally filtered by object type). @@ -307,6 +317,69 @@ def remove(self, instance, using=None): return self._remove_by_id(object_type.pk, [instance.pk], using=using) + # Postgres SQLSTATEs indicating the target schema/table no longer exists. This happens when a + # branch is merged or deprovisioned (its schema dropped) between the time an update was enqueued + # and when it is applied. Such errors are expected and safe to skip; the index is rebuilt on the + # next reindex. Any other DatabaseError (e.g. a deadlock or lost connection) is transient and must + # propagate so the work fails visibly, rather than silently dropping index updates. + _MISSING_SCHEMA_SQLSTATES = frozenset(( + '3F000', # invalid_schema_name + '42P01', # undefined_table + )) + + def _is_missing_schema(self, exc): + """ + Return True if the given DatabaseError was caused by the target schema/table no longer existing + (vs. a transient error that should propagate). + """ + sqlstate = getattr(getattr(exc, '__cause__', None), 'sqlstate', None) + return sqlstate in self._MISSING_SCHEMA_SQLSTATES + + def apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=logger): + """ + Apply a coalesced batch of updates to the search cache. The `using` alias captured when each + object was saved/deleted is replayed here so entries are written to the originating + database/schema (e.g. a branch schema under netbox-branching), regardless of any routing + context that is no longer active by the time this runs. + """ + # Removals are a single DELETE per content type, so (unlike the cache loop below) there is no + # multi-step state to wrap in a transaction. A transient error here propagates and errors the + # caller; the remaining work is dropped rather than retried (NetBox does not retry these jobs + # by default) and is recovered by the next reindex. + for object_type_id, pks in (remove_groups or {}).items(): + try: + self._remove_by_id(object_type_id, pks, using=using) + except DatabaseError as e: + if not self._is_missing_schema(e): + raise + log.warning(f"Skipping search cache removal for object type {object_type_id}: {e}") + + for object_type_id, pks in (cache_groups or {}).items(): + try: + object_type = ObjectType.objects.get(pk=object_type_id) + except ObjectType.DoesNotExist: + continue + model = object_type.model_class() + if model is None: + continue + + try: + # Re-fetch live instances from the originating database. Reading on `using` is + # required: a branch object's PK may be absent (or refer to a different object) on the + # default connection. + queryset = model.objects.using(using).filter(pk__in=pks) + + # Clear any stale entries for these objects, then re-insert. Wrapping both in one + # transaction avoids leaving an object with no cache rows if execution fails between + # the delete and the insert. + with transaction.atomic(using=using): + self._remove_by_id(object_type_id, pks, using=using) + self.cache(queryset, remove_existing=False, using=using) + except DatabaseError as e: + if not self._is_missing_schema(e): + raise + log.warning(f"Skipping search cache update for object type {object_type_id}: {e}") + def clear(self, object_types=None): qs = CachedValue.objects.all() if object_types: diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index d3a325aa8a8..a8b7e70f095 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -153,9 +153,11 @@ def _flush(batch, using): # Imported here, not at module load, to avoid an import cycle: backends.py # imports this module at module level (for the signal handlers), and - # netbox.search.tasks imports search_backend from backends.py, which is - # defined at the bottom of that module. A proper fix is tracked in #22485. - from netbox.search.tasks import SearchCacheJob, update_search_cache + # netbox.search.jobs imports the search_backend singleton from backends.py, + # which is bound at the bottom of that module. A proper fix is tracked in + # #22485. + from netbox.search.backends import search_backend + from netbox.search.jobs import SearchCacheJob try: # Both the worker-availability check and the job enqueue talk to Redis, @@ -168,4 +170,4 @@ def _flush(batch, using): except RedisError: logger.warning("Search cache: broker unavailable; indexing inline", exc_info=True) - update_search_cache(using=using, cache_groups=cache_groups, remove_groups=remove_groups) + search_backend.apply_deferred_updates(using=using, cache_groups=cache_groups, remove_groups=remove_groups) diff --git a/netbox/netbox/search/jobs.py b/netbox/netbox/search/jobs.py new file mode 100644 index 00000000000..33e56eaf1a4 --- /dev/null +++ b/netbox/netbox/search/jobs.py @@ -0,0 +1,24 @@ +import logging + +from netbox.jobs import JobRunner +from netbox.search.backends import search_backend + +# Internal search-indexing machinery; not part of the public/plugin API. + +logger = logging.getLogger(__name__) + + +class SearchCacheJob(JobRunner): + """ + Background job which applies deferred updates to the global search cache. + """ + class Meta: + name = 'Search cache update' + + def run(self, using=None, cache_groups=None, remove_groups=None, **kwargs): + search_backend.apply_deferred_updates( + using=using, + cache_groups=cache_groups, + remove_groups=remove_groups, + log=self.logger, + ) diff --git a/netbox/netbox/search/tasks.py b/netbox/netbox/search/tasks.py deleted file mode 100644 index 0590767fae8..00000000000 --- a/netbox/netbox/search/tasks.py +++ /dev/null @@ -1,107 +0,0 @@ -import logging - -from django.db import DatabaseError, transaction - -from core.models import ObjectType -from netbox.jobs import JobRunner -from netbox.search.backends import search_backend - -# Internal search-indexing machinery; not part of the public/plugin API. - -logger = logging.getLogger(__name__) - -# Postgres SQLSTATEs indicating the target schema/table no longer exists. This -# happens when a branch is merged or deprovisioned (its schema dropped) between -# the time an update was enqueued and when this job runs. Such errors are -# expected and safe to skip; the index is rebuilt on the next reindex. Any other -# DatabaseError (e.g. a deadlock or lost connection) is transient and must -# propagate so the job fails visibly and can be retried, rather than silently -# dropping index updates. -_MISSING_SCHEMA_SQLSTATES = frozenset(( - '3F000', # invalid_schema_name - '42P01', # undefined_table -)) - - -def _is_missing_schema(exc): - """ - Return True if the given DatabaseError was caused by the target schema/table - no longer existing (vs. a transient error that should propagate). - """ - sqlstate = getattr(getattr(exc, '__cause__', None), 'sqlstate', None) - return sqlstate in _MISSING_SCHEMA_SQLSTATES - - -def update_search_cache(using=None, cache_groups=None, remove_groups=None, log=logger): - """ - Apply a coalesced batch of updates to the global search cache. - - The `using` alias captured when each object was saved/deleted is replayed - here so that cache entries are written to the originating database/schema - (e.g. a branch schema under netbox-branching), regardless of any routing - context that is no longer active by the time this runs. - - Args: - using: The database alias to read objects from and write cache entries to. - cache_groups: Mapping of {object_type_id: [pk, ...]} to (re)index. - remove_groups: Mapping of {object_type_id: [pk, ...]} to remove. - log: Logger to use (the job logger when run as a background job). - """ - # Removals are a single DELETE per content type, so (unlike the cache loop - # below) there is no multi-step state to wrap in a transaction. A transient - # error here propagates and errors the job; the remaining work is dropped - # rather than retried (NetBox does not retry these jobs by default) and is - # recovered by the next reindex, consistent with how derived state is rebuilt - # elsewhere. - for object_type_id, pks in (remove_groups or {}).items(): - try: - search_backend._remove_by_id(object_type_id, pks, using=using) - except DatabaseError as e: - if not _is_missing_schema(e): - raise - # The target schema no longer exists (e.g. a branch was merged or - # deprovisioned between enqueue and execution). Skip; the index will - # be rebuilt on the next reindex. - log.warning(f"Skipping search cache removal for object type {object_type_id}: {e}") - - for object_type_id, pks in (cache_groups or {}).items(): - try: - object_type = ObjectType.objects.get(pk=object_type_id) - except ObjectType.DoesNotExist: - continue - model = object_type.model_class() - if model is None: - continue - - try: - # Re-fetch live instances from the originating database. Reading on - # `using` is required: a branch object's PK may be absent (or refer to - # a different object) on the default connection. - queryset = model.objects.using(using).filter(pk__in=pks) - - # Clear any stale entries for these objects, then re-insert. Wrapping - # both in one transaction avoids leaving an object with no cache rows - # if execution fails between the delete and the insert. - with transaction.atomic(using=using): - search_backend._remove_by_id(object_type_id, pks, using=using) - search_backend.cache(queryset, remove_existing=False, using=using) - except DatabaseError as e: - if not _is_missing_schema(e): - raise - log.warning(f"Skipping search cache update for object type {object_type_id}: {e}") - - -class SearchCacheJob(JobRunner): - """ - Background job which applies deferred updates to the global search cache. - """ - class Meta: - name = 'Search cache update' - - def run(self, using=None, cache_groups=None, remove_groups=None, **kwargs): - update_search_cache( - using=using, - cache_groups=cache_groups, - remove_groups=remove_groups, - log=self.logger, - ) diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index 006de276bb1..87d75bfa03f 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -10,7 +10,7 @@ from extras.models import CachedValue from netbox.search import deferred from netbox.search.backends import search_backend -from netbox.search.tasks import SearchCacheJob, update_search_cache +from netbox.search.jobs import SearchCacheJob def scheduled_search_flushes(): @@ -444,8 +444,9 @@ def test_flush_falls_back_inline_when_enqueue_fails(self): def test_cache_update_skips_deleted_object(self): """ - update_search_cache tolerates a pk that no longer exists (object deleted - between enqueue and execution): it must not error or create cache rows. + apply_deferred_updates tolerates a pk that no longer exists (object + deleted between enqueue and execution): it must not error or create cache + rows. """ site = Site.objects.create(name='Vanished', slug='vanished') site_ct = ContentType.objects.get_for_model(Site) @@ -454,7 +455,7 @@ def test_cache_update_skips_deleted_object(self): CachedValue.objects.filter(object_type=site_ct, object_id=pk).delete() # No exception, and no rows resurrected for the missing object. - update_search_cache(using=None, cache_groups={site_ct.pk: [pk]}, remove_groups={}) + search_backend.apply_deferred_updates(using=None, cache_groups={site_ct.pk: [pk]}, remove_groups={}) self.assertFalse( CachedValue.objects.filter(object_type=site_ct, object_id=pk).exists() ) From 0c902e5e46fb62ab016fe146d556ff94d3beef00 Mon Sep 17 00:00:00 2001 From: Jason Novinger Date: Wed, 24 Jun 2026 16:24:09 +0200 Subject: [PATCH 11/11] Fixes #21326: Keep search-cache deferral private to the default backend The deferred-indexing path was incorrectly wired into the base SearchBackend: the base caching_handler/removal_handler called mark_for_deferred_indexing, and a new abstract apply_deferred_updates() method on the base forced deferral onto every configured backend. SEARCH_BACKEND is a documented extension point; this silently broke any custom backend implementing only the four documented methods (search/cache/remove/clear). Fix: revert the base handlers to their main behavior (call self.cache()/ self.remove() synchronously through the public contract), and move the deferral logic into CachedValueSearchBackend handler overrides. The deferring overrides capture the `using` alias from the Django signal and call mark_for_deferred_indexing -- the rest of the deferred pipeline (deferred.py, jobs.py) is unchanged except renaming apply_deferred_updates to _apply_deferred_updates to make clear it is not part of the contract. The base public contract is now identical to main: search/cache/remove/ clear, with no `using` parameter and no new required method. A custom backend keeps working synchronously through the unchanged base handlers after upgrade. Branch-correct background indexing is preserved: `using` is captured in the subclass handler override and replayed end-to-end through _apply_deferred_updates to the terminal ORM writes. Also addresses pheus's test asks: capture object pk before delete() in removal tests (asserting object_id=site.pk after delete queries None); inline-seed CachedValue rows in test_remove_on_delete so the seeding is visible in the test body; add CustomBackendContractTestCase proving a four-method custom backend indexes synchronously and the default backend defers through the same call path. --- netbox/netbox/search/backends.py | 111 ++++++++++++++++++----------- netbox/netbox/search/deferred.py | 8 ++- netbox/netbox/search/jobs.py | 2 +- netbox/netbox/tests/test_search.py | 93 +++++++++++++++++++++--- 4 files changed, 162 insertions(+), 52 deletions(-) diff --git a/netbox/netbox/search/backends.py b/netbox/netbox/search/backends.py index f32c88a1344..6f36555d341 100644 --- a/netbox/netbox/search/backends.py +++ b/netbox/netbox/search/backends.py @@ -31,8 +31,8 @@ class SearchBackend: """ - Base class for search backends. Subclasses must extend the `cache()`, `remove()`, `clear()`, and - `apply_deferred_updates()` methods below. + Base class for search backends. Subclasses must extend the `cache()`, `remove()`, and `clear()` + methods below. """ _object_types = None @@ -64,43 +64,27 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE """ raise NotImplementedError - def caching_handler(self, sender, instance, created, using=None, **kwargs): + # caching_handler() and removal_handler() are the default, synchronous signal receivers connected + # to post_save/post_delete at module load. They are internal plumbing for signal dispatch, not a + # documented extension point: the public backend contract is cache()/remove()/clear(). A backend + # that needs to do something other than index inline (e.g. defer the work) overrides these in its + # subclass; see CachedValueSearchBackend. + def caching_handler(self, sender, instance, created, **kwargs): """ Receiver for the post_save signal, responsible for caching object creation/changes. """ - # Skip non-cacheable objects without scheduling any deferred work. try: - indexer = get_indexer(instance) - except KeyError: - return - - try: - object_type = ObjectType.objects.get_for_model(indexer.model) + self.cache(instance, remove_existing=not created) except ProgrammingError as e: # The schema may be incomplete during migrations; skip caching. logger.warning(f"Skipping search cache update due to schema error: {e}") - return + pass - mark_for_deferred_indexing(object_type.pk, instance.pk, OP_CACHE, using=using) - - def removal_handler(self, sender, instance, using=None, **kwargs): + def removal_handler(self, sender, instance, **kwargs): """ Receiver for the post_delete signal, responsible for caching object deletion. """ - # Skip non-cacheable objects without scheduling any deferred work. - try: - indexer = get_indexer(instance) - except KeyError: - return - - try: - object_type = ObjectType.objects.get_for_model(indexer.model) - except ProgrammingError as e: - # The schema may be incomplete during migrations; skip caching. - logger.warning(f"Skipping search cache update due to schema error: {e}") - return - - mark_for_deferred_indexing(object_type.pk, instance.pk, OP_REMOVE, using=using) + self.remove(instance) def cache(self, instances, indexer=None, remove_existing=True): """ @@ -120,15 +104,6 @@ def clear(self, object_types=None): """ raise NotImplementedError - def apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=None): - """ - Apply a coalesced batch of deferred cache updates (called by the background search cache job - and by the inline fallback). `cache_groups` and `remove_groups` are {object_type_id: [pk, ...]} - maps; `using` is the database alias the originating writes used, replayed here so entries land - in the originating schema. - """ - raise NotImplementedError - def count(self, object_types=None): """ Return a count of all cache entries (optionally filtered by object type). @@ -146,6 +121,50 @@ def size(self): class CachedValueSearchBackend(SearchBackend): + # These override the base's synchronous receivers to defer indexing past the response. They are + # the seam where this backend captures the `using` alias Django passes to post_save/post_delete: + # the deferred write runs after the transaction commits (and possibly in a worker), by which point + # the originating routing context is gone, so the alias must be captured here and replayed on the + # deferred write to keep cache entries in the originating schema (e.g. a branch schema under + # netbox-branching). Deferral is internal to this backend; the public contract is unchanged. + def caching_handler(self, sender, instance, created, using=None, **kwargs): + """ + Receiver for the post_save signal, responsible for caching object creation/changes. + """ + # Skip non-cacheable objects without scheduling any deferred work. + try: + indexer = get_indexer(instance) + except KeyError: + return + + try: + object_type = ObjectType.objects.get_for_model(indexer.model) + except ProgrammingError as e: + # The schema may be incomplete during migrations; skip caching. + logger.warning(f"Skipping search cache update due to schema error: {e}") + return + + mark_for_deferred_indexing(object_type.pk, instance.pk, OP_CACHE, using=using) + + def removal_handler(self, sender, instance, using=None, **kwargs): + """ + Receiver for the post_delete signal, responsible for caching object deletion. + """ + # Skip non-cacheable objects without scheduling any deferred work. + try: + indexer = get_indexer(instance) + except KeyError: + return + + try: + object_type = ObjectType.objects.get_for_model(indexer.model) + except ProgrammingError as e: + # The schema may be incomplete during migrations; skip caching. + logger.warning(f"Skipping search cache update due to schema error: {e}") + return + + mark_for_deferred_indexing(object_type.pk, instance.pk, OP_REMOVE, using=using) + def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE): # Build the filter used to find relevant CachedValue records @@ -227,6 +246,11 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE return ret + # `using` here is a PostgreSQL/schema concern specific to this backend's deferred-write path (it + # replays the originating alias so branch writes land in the branch schema). It is deliberately + # NOT on the base cache()/remove() contract: a non-PostgreSQL backend (Redis, Solr, etc.) has no + # such concept. Do not lift `using` onto the base for symmetry; doing so would leak this backend's + # storage model into the generic contract. def cache(self, instances, indexer=None, remove_existing=True, using=None): custom_fields = None @@ -335,12 +359,15 @@ def _is_missing_schema(self, exc): sqlstate = getattr(getattr(exc, '__cause__', None), 'sqlstate', None) return sqlstate in self._MISSING_SCHEMA_SQLSTATES - def apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=logger): + def _apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=logger): """ - Apply a coalesced batch of updates to the search cache. The `using` alias captured when each - object was saved/deleted is replayed here so entries are written to the originating - database/schema (e.g. a branch schema under netbox-branching), regardless of any routing - context that is no longer active by the time this runs. + Apply a coalesced batch of updates to the search cache. Private to this backend; called by the + deferred-flush machinery (netbox.search.deferred) and the background job + (netbox.search.jobs.SearchCacheJob), not part of the public backend contract. + + The `using` alias captured when each object was saved/deleted is replayed here so entries are + written to the originating database/schema (e.g. a branch schema under netbox-branching), + regardless of any routing context that is no longer active by the time this runs. """ # Removals are a single DELETE per content type, so (unlike the cache loop below) there is no # multi-step state to wrap in a transaction. A transient error here propagates and errors the diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py index a8b7e70f095..c59502a4635 100644 --- a/netbox/netbox/search/deferred.py +++ b/netbox/netbox/search/deferred.py @@ -43,6 +43,12 @@ def mark_for_deferred_indexing(object_type_id, pk, op, using=None): (e.g. a branch schema under netbox-branching), regardless of any routing context that may be unset by the time the flush runs. """ + # Fall back to the default alias when no originating alias was captured. This is correct for the + # common case (autocommit / non-branch writes route to the default connection), but if a write + # under a branch schema ever reached here without its alias, the deferred write would silently + # land in the main schema. Log at debug so that case is observable rather than invisible. + if not using: + logger.debug("Search cache: no originating DB alias for object %s/%s; using default", object_type_id, pk) alias = using or DEFAULT_DB_ALIAS connection = connections[alias] @@ -170,4 +176,4 @@ def _flush(batch, using): except RedisError: logger.warning("Search cache: broker unavailable; indexing inline", exc_info=True) - search_backend.apply_deferred_updates(using=using, cache_groups=cache_groups, remove_groups=remove_groups) + search_backend._apply_deferred_updates(using=using, cache_groups=cache_groups, remove_groups=remove_groups) diff --git a/netbox/netbox/search/jobs.py b/netbox/netbox/search/jobs.py index 33e56eaf1a4..96b66886fc0 100644 --- a/netbox/netbox/search/jobs.py +++ b/netbox/netbox/search/jobs.py @@ -16,7 +16,7 @@ class Meta: name = 'Search cache update' def run(self, using=None, cache_groups=None, remove_groups=None, **kwargs): - search_backend.apply_deferred_updates( + search_backend._apply_deferred_updates( using=using, cache_groups=cache_groups, remove_groups=remove_groups, diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index 87d75bfa03f..b13f9924a92 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -2,6 +2,7 @@ from django.contrib.contenttypes.models import ContentType from django.db import DEFAULT_DB_ALIAS, connection, transaction +from django.db.models.signals import post_delete, post_save from django.test import TestCase, TransactionTestCase from redis.exceptions import ConnectionError as RedisConnectionError @@ -9,7 +10,7 @@ from dcim.search import SiteIndex from extras.models import CachedValue from netbox.search import deferred -from netbox.search.backends import search_backend +from netbox.search.backends import SearchBackend, search_backend from netbox.search.jobs import SearchCacheJob @@ -136,14 +137,24 @@ def test_remove_on_delete(self): """ Test that any cached value for an object are automatically removed on delete(). """ - site = Site.objects.first() + content_type = ContentType.objects.get_for_model(Site) + + # Seed an object with cached entries, then delete it. Capture the pk before delete() (which + # nulls instance.pk in memory) so the post-delete assertion queries the real id rather than + # object_id=None. The create is wrapped in captureOnCommitCallbacks so the deferred caching + # actually runs (and writes rows) before we assert it was seeded. + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create(name='Site Delete', slug='site-delete', facility='Foxtrot') + site_pk = site.pk + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() + ) with self.captureOnCommitCallbacks(execute=True): site.delete() - content_type = ContentType.objects.get_for_model(Site) self.assertFalse( - CachedValue.objects.filter(object_type=content_type, object_id=site.pk).exists() + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() ) def test_clear_all(self): @@ -444,7 +455,7 @@ def test_flush_falls_back_inline_when_enqueue_fails(self): def test_cache_update_skips_deleted_object(self): """ - apply_deferred_updates tolerates a pk that no longer exists (object + _apply_deferred_updates tolerates a pk that no longer exists (object deleted between enqueue and execution): it must not error or create cache rows. """ @@ -455,7 +466,7 @@ def test_cache_update_skips_deleted_object(self): CachedValue.objects.filter(object_type=site_ct, object_id=pk).delete() # No exception, and no rows resurrected for the missing object. - search_backend.apply_deferred_updates(using=None, cache_groups={site_ct.pk: [pk]}, remove_groups={}) + search_backend._apply_deferred_updates(using=None, cache_groups={site_ct.pk: [pk]}, remove_groups={}) self.assertFalse( CachedValue.objects.filter(object_type=site_ct, object_id=pk).exists() ) @@ -491,13 +502,15 @@ def test_autocommit_save_indexes_synchronously(self): def test_autocommit_delete_removes_synchronously(self): site = Site.objects.create(name='Autocommit Del', slug='autocommit-del') content_type = ContentType.objects.get_for_model(Site) + # Capture the pk before delete() nulls instance.pk in memory. + site_pk = site.pk self.assertTrue( - CachedValue.objects.filter(object_type=content_type, object_id=site.pk).exists() + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() ) site.delete() self.assertFalse( - CachedValue.objects.filter(object_type=content_type, object_id=site.pk).exists() + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() ) def test_inner_savepoint_rollback_does_not_leak_into_outer_batch(self): @@ -687,3 +700,67 @@ def test_deep_nesting_middle_rollback(self): self.assertFalse( CachedValue.objects.filter(object_type=content_type, object_id=deepest_pk).exists() ) + + +class _MinimalSearchBackend(SearchBackend): + """ + A backend implementing only the documented contract (search/cache/remove/clear), with no + knowledge of deferral. Records cache()/remove() calls in memory so a test can assert it indexed + synchronously. Used to prove a custom SEARCH_BACKEND keeps working after this change. + """ + + def __init__(self): + self.cached = [] + self.removed = [] + + def search(self, value, user=None, object_types=None, lookup=None): + return [] + + def cache(self, instances, indexer=None, remove_existing=True): + if not hasattr(instances, '__iter__'): + instances = [instances] + self.cached.extend(instances) + + def remove(self, instance): + self.removed.append(instance) + + def clear(self, object_types=None): + self.cached.clear() + self.removed.clear() + + +class CustomBackendContractTestCase(TransactionTestCase): + """ + Proves the deferred-indexing work did not change the public SearchBackend contract: a custom + backend implementing only search/cache/remove/clear still indexes synchronously through the base + signal handlers, and never schedules deferred (on_commit) work. Deferral is private to + CachedValueSearchBackend; it must not leak onto a backend that didn't opt into it. + """ + + def test_custom_backend_indexes_synchronously(self): + backend = _MinimalSearchBackend() + + # Connect the custom backend's (inherited, synchronous) handlers, exactly as + # backends.py connects the configured backend at import. Same call site, no type-check. + post_save.connect(backend.caching_handler, sender=Site) + post_delete.connect(backend.removal_handler, sender=Site) + self.addCleanup(post_save.disconnect, backend.caching_handler, sender=Site) + self.addCleanup(post_delete.disconnect, backend.removal_handler, sender=Site) + + # A backend implementing only the documented contract indexes through the base handlers + # inline (the handler calls self.cache()/self.remove() synchronously). It is never routed + # into the deferral machinery, which is private to CachedValueSearchBackend. + site = Site.objects.create(name='Custom Backend', slug='custom-backend') + self.assertIn(site, backend.cached) + + site.delete() + self.assertEqual(len(backend.removed), 1) + + def test_default_backend_defers_via_same_call_path(self): + # The default backend (CachedValueSearchBackend) IS connected at import, and reaches the + # SAME caching_handler call path -- but its override defers instead of indexing inline. + # This contrasts with the custom backend above: identical dispatch, polymorphic behavior. + with transaction.atomic(): + Site.objects.create(name='Default Defers', slug='default-defers') + scheduled = scheduled_search_flushes() + self.assertEqual(len(scheduled), 1)