diff --git a/docs/features/search.md b/docs/features/search.md index 92422cad950..4b0842bd767 100644 --- a/docs/features/search.md +++ b/docs/features/search.md @@ -2,7 +2,7 @@ ## Global Search -NetBox includes a powerful global search engine, providing a single convenient interface to search across its complex data model. Relevant fields on each model are indexed according to their precedence, so that the most relevant results are returned first. When objects are created or modified, the search index is updated immediately, ensuring real-time accuracy. +NetBox includes a powerful global search engine, providing a single convenient interface to search across its complex data model. Relevant fields on each model are indexed according to their precedence, so that the most relevant results are returned first. When objects are created, modified, or deleted, the search index is updated by a background task shortly afterward. As a result, a newly created or changed object may not appear in search results for a brief period. (When no background worker is running, the index is updated immediately as part of the request.) When entering a search query, the user can choose a specific lookup type: exact match, partial match, etc. When a partial match is found, the matching portion of the applicable field value is included with each result so that the user can easily determine its relevance. diff --git a/netbox/netbox/search/backends.py b/netbox/netbox/search/backends.py index 661134c9512..6f36555d341 100644 --- a/netbox/netbox/search/backends.py +++ b/netbox/netbox/search/backends.py @@ -3,9 +3,8 @@ import netaddr from django.conf import settings -from django.contrib.contenttypes.models import ContentType from django.core.exceptions import ImproperlyConfigured -from django.db import ProgrammingError +from django.db import DatabaseError, ProgrammingError, transaction from django.db.models import F, Q, Window, prefetch_related_objects from django.db.models.fields.related import ForeignKey from django.db.models.functions import window @@ -22,6 +21,7 @@ from utilities.string import title from . import FieldTypes, LookupTypes, get_indexer +from .deferred import OP_CACHE, OP_REMOVE, mark_for_deferred_indexing DEFAULT_LOOKUP_TYPE = LookupTypes.PARTIAL MAX_RESULTS = 1000 @@ -31,7 +31,8 @@ class SearchBackend: """ - Base class for search backends. Subclasses must extend the `cache()`, `remove()`, and `clear()` methods below. + Base class for search backends. Subclasses must extend the `cache()`, `remove()`, and `clear()` + methods below. """ _object_types = None @@ -63,6 +64,11 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE """ raise NotImplementedError + # caching_handler() and removal_handler() are the default, synchronous signal receivers connected + # to post_save/post_delete at module load. They are internal plumbing for signal dispatch, not a + # documented extension point: the public backend contract is cache()/remove()/clear(). A backend + # that needs to do something other than index inline (e.g. defer the work) overrides these in its + # subclass; see CachedValueSearchBackend. def caching_handler(self, sender, instance, created, **kwargs): """ Receiver for the post_save signal, responsible for caching object creation/changes. @@ -115,6 +121,50 @@ def size(self): class CachedValueSearchBackend(SearchBackend): + # These override the base's synchronous receivers to defer indexing past the response. They are + # the seam where this backend captures the `using` alias Django passes to post_save/post_delete: + # the deferred write runs after the transaction commits (and possibly in a worker), by which point + # the originating routing context is gone, so the alias must be captured here and replayed on the + # deferred write to keep cache entries in the originating schema (e.g. a branch schema under + # netbox-branching). Deferral is internal to this backend; the public contract is unchanged. + def caching_handler(self, sender, instance, created, using=None, **kwargs): + """ + Receiver for the post_save signal, responsible for caching object creation/changes. + """ + # Skip non-cacheable objects without scheduling any deferred work. + try: + indexer = get_indexer(instance) + except KeyError: + return + + try: + object_type = ObjectType.objects.get_for_model(indexer.model) + except ProgrammingError as e: + # The schema may be incomplete during migrations; skip caching. + logger.warning(f"Skipping search cache update due to schema error: {e}") + return + + mark_for_deferred_indexing(object_type.pk, instance.pk, OP_CACHE, using=using) + + def removal_handler(self, sender, instance, using=None, **kwargs): + """ + Receiver for the post_delete signal, responsible for caching object deletion. + """ + # Skip non-cacheable objects without scheduling any deferred work. + try: + indexer = get_indexer(instance) + except KeyError: + return + + try: + object_type = ObjectType.objects.get_for_model(indexer.model) + except ProgrammingError as e: + # The schema may be incomplete during migrations; skip caching. + logger.warning(f"Skipping search cache update due to schema error: {e}") + return + + mark_for_deferred_indexing(object_type.pk, instance.pk, OP_REMOVE, using=using) + def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE): # Build the filter used to find relevant CachedValue records @@ -196,13 +246,26 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE return ret - def cache(self, instances, indexer=None, remove_existing=True): + # `using` here is a PostgreSQL/schema concern specific to this backend's deferred-write path (it + # replays the originating alias so branch writes land in the branch schema). It is deliberately + # NOT on the base cache()/remove() contract: a non-PostgreSQL backend (Redis, Solr, etc.) has no + # such concept. Do not lift `using` onto the base for symmetry; doing so would leak this backend's + # storage model into the generic contract. + def cache(self, instances, indexer=None, remove_existing=True, using=None): custom_fields = None # Convert a single instance to an iterable if not hasattr(instances, '__iter__'): instances = [instances] + # Determine the queryset manager used to write cache entries. When a + # database alias is provided (e.g. by a deferred task replaying the alias + # the originating write used), entries are written to that connection; + # otherwise the configured router decides. `using` is expected to be a + # concrete alias or falsy (None) per the caller's contract; a falsy value + # defers to the router, which is the correct behavior either way. + manager = CachedValue.objects.using(using) if using else CachedValue.objects + buffer = [] counter = 0 for instance in instances: @@ -225,7 +288,7 @@ def cache(self, instances, indexer=None, remove_existing=True): # Wipe out any previously cached values for the object if remove_existing: - self.remove(instance) + self.remove(instance, using=using) # Generate cache data object_type = ObjectType.objects.get_for_model(indexer.model) @@ -243,27 +306,106 @@ def cache(self, instances, indexer=None, remove_existing=True): # Check whether the buffer needs to be flushed if len(buffer) >= 2000: - counter += len(CachedValue.objects.bulk_create(buffer)) + counter += len(manager.bulk_create(buffer)) buffer = [] # Final buffer flush if buffer: - counter += len(CachedValue.objects.bulk_create(buffer)) + counter += len(manager.bulk_create(buffer)) return counter - def remove(self, instance): + def _remove_by_id(self, object_type_id, object_ids, using=None): + """ + Delete cached values for the given content type and object IDs using a + single raw DELETE. Shared by remove() and the deferred search task. + """ + if not object_ids: + return None + + qs = CachedValue.objects.filter(object_type_id=object_type_id, object_id__in=object_ids) + + # Call _raw_delete() on the queryset to avoid first loading instances into memory + return qs._raw_delete(using=using or qs.db) + + def remove(self, instance, using=None): # Avoid attempting to query for non-cacheable objects try: - get_indexer(instance) + indexer = get_indexer(instance) except KeyError: return None - ct = ContentType.objects.get_for_model(instance) - qs = CachedValue.objects.filter(object_type=ct, object_id=instance.pk) + # Use the indexer's (concrete) model to resolve the object type, matching + # the content type that cache() writes entries under. + object_type = ObjectType.objects.get_for_model(indexer.model) - # Call _raw_delete() on the queryset to avoid first loading instances into memory - return qs._raw_delete(using=qs.db) + return self._remove_by_id(object_type.pk, [instance.pk], using=using) + + # Postgres SQLSTATEs indicating the target schema/table no longer exists. This happens when a + # branch is merged or deprovisioned (its schema dropped) between the time an update was enqueued + # and when it is applied. Such errors are expected and safe to skip; the index is rebuilt on the + # next reindex. Any other DatabaseError (e.g. a deadlock or lost connection) is transient and must + # propagate so the work fails visibly, rather than silently dropping index updates. + _MISSING_SCHEMA_SQLSTATES = frozenset(( + '3F000', # invalid_schema_name + '42P01', # undefined_table + )) + + def _is_missing_schema(self, exc): + """ + Return True if the given DatabaseError was caused by the target schema/table no longer existing + (vs. a transient error that should propagate). + """ + sqlstate = getattr(getattr(exc, '__cause__', None), 'sqlstate', None) + return sqlstate in self._MISSING_SCHEMA_SQLSTATES + + def _apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=logger): + """ + Apply a coalesced batch of updates to the search cache. Private to this backend; called by the + deferred-flush machinery (netbox.search.deferred) and the background job + (netbox.search.jobs.SearchCacheJob), not part of the public backend contract. + + The `using` alias captured when each object was saved/deleted is replayed here so entries are + written to the originating database/schema (e.g. a branch schema under netbox-branching), + regardless of any routing context that is no longer active by the time this runs. + """ + # Removals are a single DELETE per content type, so (unlike the cache loop below) there is no + # multi-step state to wrap in a transaction. A transient error here propagates and errors the + # caller; the remaining work is dropped rather than retried (NetBox does not retry these jobs + # by default) and is recovered by the next reindex. + for object_type_id, pks in (remove_groups or {}).items(): + try: + self._remove_by_id(object_type_id, pks, using=using) + except DatabaseError as e: + if not self._is_missing_schema(e): + raise + log.warning(f"Skipping search cache removal for object type {object_type_id}: {e}") + + for object_type_id, pks in (cache_groups or {}).items(): + try: + object_type = ObjectType.objects.get(pk=object_type_id) + except ObjectType.DoesNotExist: + continue + model = object_type.model_class() + if model is None: + continue + + try: + # Re-fetch live instances from the originating database. Reading on `using` is + # required: a branch object's PK may be absent (or refer to a different object) on the + # default connection. + queryset = model.objects.using(using).filter(pk__in=pks) + + # Clear any stale entries for these objects, then re-insert. Wrapping both in one + # transaction avoids leaving an object with no cache rows if execution fails between + # the delete and the insert. + with transaction.atomic(using=using): + self._remove_by_id(object_type_id, pks, using=using) + self.cache(queryset, remove_existing=False, using=using) + except DatabaseError as e: + if not self._is_missing_schema(e): + raise + log.warning(f"Skipping search cache update for object type {object_type_id}: {e}") def clear(self, object_types=None): qs = CachedValue.objects.all() diff --git a/netbox/netbox/search/deferred.py b/netbox/netbox/search/deferred.py new file mode 100644 index 00000000000..c59502a4635 --- /dev/null +++ b/netbox/netbox/search/deferred.py @@ -0,0 +1,179 @@ +import logging + +from django.db import DEFAULT_DB_ALIAS, connections, transaction +from redis.exceptions import RedisError + +from netbox.constants import RQ_QUEUE_DEFAULT +from utilities.rqworker import any_workers_for_queue + +# This module is internal plumbing for the search signal handlers; nothing here +# is part of the public/plugin API, so no symbols are exported via __all__. + +logger = logging.getLogger(__name__) + +# Operation markers stored in the per-transaction buffer +OP_CACHE = 'cache' +OP_REMOVE = 'remove' + +# Attributes used to tag a flush callback so we can recognize our own callbacks +# among those registered on a connection and reach the batch they will flush. +_FLUSH_ALIAS_ATTR = '_netbox_search_flush_alias' +_FLUSH_BATCH_ATTR = '_netbox_search_flush_batch' +# The savepoint stack active when a flush callback was registered; see +# _pending_batch() for why buffering is scoped to it. +_FLUSH_SCOPE_ATTR = '_netbox_search_flush_scope' + + +def mark_for_deferred_indexing(object_type_id, pk, op, using=None): + """ + Schedule a searchable object for deferred (re)indexing. + + The work is coalesced per database connection and per transaction: repeated + operations on the same object collapse to a single entry (a deletion always + wins over a create/update), and a single flush is scheduled to run after the + transaction commits. When no transaction is open (autocommit), the indexing + runs synchronously. + + Args: + object_type_id: PK of the object's ObjectType/ContentType. + pk: PK of the object. + op: OP_CACHE or OP_REMOVE. + using: The database alias the originating write used. Replayed verbatim + on the deferred write so the cache entries land in the same schema + (e.g. a branch schema under netbox-branching), regardless of any + routing context that may be unset by the time the flush runs. + """ + # Fall back to the default alias when no originating alias was captured. This is correct for the + # common case (autocommit / non-branch writes route to the default connection), but if a write + # under a branch schema ever reached here without its alias, the deferred write would silently + # land in the main schema. Log at debug so that case is observable rather than invisible. + if not using: + logger.debug("Search cache: no originating DB alias for object %s/%s; using default", object_type_id, pk) + alias = using or DEFAULT_DB_ALIAS + connection = connections[alias] + + # No transaction in progress: index synchronously. Deferring would have + # nothing to defer past, and transaction.on_commit() in autocommit mode runs + # its callback immediately at registration (before we could populate the + # batch), so handle this case explicitly. + # + # On the transactional path below, transaction.on_commit(..., robust=True) + # ensures a flush failure can never propagate to the (already-committed) + # caller. The autocommit path has no such backstop, so guard it here: a + # broad catch is deliberate, because the originating write has committed and + # a search cache update must never turn a successful save into an error. The + # error is logged so a genuine indexing defect is still visible. + if not connection.in_atomic_block: + try: + _flush({(object_type_id, pk): op}, alias) + except Exception: + logger.exception("Search cache: error while indexing inline") + return + + # Scope buffering to the current savepoint stack, not just the alias (see + # _pending_batch). May legitimately contain None entries for nested + # atomic(savepoint=False) blocks; matching is by equality, so that is fine. + scope = tuple(connection.savepoint_ids) + + batch = _pending_batch(connection, alias, scope) + if batch is None: + batch = {} + + def flush(batch=batch, alias=alias): + _flush(batch, alias) + + setattr(flush, _FLUSH_ALIAS_ATTR, alias) + setattr(flush, _FLUSH_BATCH_ATTR, batch) + setattr(flush, _FLUSH_SCOPE_ATTR, scope) + # robust=True is required, not just belt-and-suspenders: Django runs + # on_commit callbacks synchronously as the atomic block exits (after the + # COMMIT), so an exception escaping the callback would propagate out of + # the view's transaction and become a 500 on an already-committed write. + # _flush handles the recoverable Redis fault itself; robust=True is the + # only thing that keeps any *other* failure here (logged by Django at + # ERROR) from surfacing as that post-commit 500. + transaction.on_commit(flush, using=alias, robust=True) + + # Coalesce: a deletion supersedes any pending create/update for the object. + key = (object_type_id, pk) + if op == OP_REMOVE or batch.get(key) != OP_REMOVE: + batch[key] = op + + +def _pending_batch(connection, alias, scope): + """ + Return the batch dict of a flush callback already scheduled for the given + alias and savepoint scope on this connection's current transaction, or None + if there is none. + + This scans `connection.run_on_commit` on each call rather than caching the + lookup elsewhere. That is intentional: the scan is bounded (run_on_commit + holds only the transaction's registered commit callbacks, not one per saved + object), and reading it fresh each time is what keeps the buffer correctly + scoped to the live transaction. Django clears run_on_commit on both commit + and rollback, so a rolled-back transaction's batch can never be found here. + + Matching on `scope` (the savepoint stack active when the callback was + registered) as well as `alias` keeps each savepoint scope on its own + callback. Django prunes a callback when a savepoint in its registration + snapshot rolls back, so an op buffered inside a nested savepoint is dropped + with its callback if that savepoint rolls back -- it can never be found here + and reused by an outer scope. + """ + for _sids, func, _robust in connection.run_on_commit: + if ( + getattr(func, _FLUSH_ALIAS_ATTR, None) == alias + and getattr(func, _FLUSH_SCOPE_ATTR, None) == scope + ): + return getattr(func, _FLUSH_BATCH_ATTR) + return None + + +def _flush(batch, using): + """ + Dispatch a coalesced batch of dirty objects for (re)indexing. + + `_flush` is the single guarded entry point for deferred indexing, reached + either directly (autocommit) or from a transaction.on_commit callback. By the + time it runs the originating write has already committed, so it must never + propagate an error back to the caller and turn a successful save into a 500. + + The inline fallback is safe even during a broker outage: the search index + lives in PostgreSQL (the extras_cachedvalue table), so a Redis outage only + prevents backgrounding, not indexing itself. + + If the broker fails mid-enqueue (after the probe succeeds), Job.enqueue() has + already saved a Job row before the Redis dispatch raised, so the fallback can + leave behind a PENDING Job that no worker will run. The index is still + correct (written inline); the stranded row is cosmetic and ages out via the + housekeeping job. + """ + if not batch: + return + + cache_groups = {} + remove_groups = {} + for (object_type_id, pk), op in batch.items(): + groups = remove_groups if op == OP_REMOVE else cache_groups + groups.setdefault(object_type_id, []).append(pk) + + # Imported here, not at module load, to avoid an import cycle: backends.py + # imports this module at module level (for the signal handlers), and + # netbox.search.jobs imports the search_backend singleton from backends.py, + # which is bound at the bottom of that module. A proper fix is tracked in + # #22485. + from netbox.search.backends import search_backend + from netbox.search.jobs import SearchCacheJob + + try: + # Both the worker-availability check and the job enqueue talk to Redis, + # and a worker can die between the two. Treat any Redis failure across the + # whole dispatch as "no worker available" and fall back to inline + # indexing (a PostgreSQL write that does not depend on Redis). + if any_workers_for_queue(RQ_QUEUE_DEFAULT): + SearchCacheJob.enqueue(using=using, cache_groups=cache_groups, remove_groups=remove_groups) + return + except RedisError: + logger.warning("Search cache: broker unavailable; indexing inline", exc_info=True) + + search_backend._apply_deferred_updates(using=using, cache_groups=cache_groups, remove_groups=remove_groups) diff --git a/netbox/netbox/search/jobs.py b/netbox/netbox/search/jobs.py new file mode 100644 index 00000000000..96b66886fc0 --- /dev/null +++ b/netbox/netbox/search/jobs.py @@ -0,0 +1,24 @@ +import logging + +from netbox.jobs import JobRunner +from netbox.search.backends import search_backend + +# Internal search-indexing machinery; not part of the public/plugin API. + +logger = logging.getLogger(__name__) + + +class SearchCacheJob(JobRunner): + """ + Background job which applies deferred updates to the global search cache. + """ + class Meta: + name = 'Search cache update' + + def run(self, using=None, cache_groups=None, remove_groups=None, **kwargs): + search_backend._apply_deferred_updates( + using=using, + cache_groups=cache_groups, + remove_groups=remove_groups, + log=self.logger, + ) diff --git a/netbox/netbox/tests/test_search.py b/netbox/netbox/tests/test_search.py index 1b6fe9eac9a..b13f9924a92 100644 --- a/netbox/netbox/tests/test_search.py +++ b/netbox/netbox/tests/test_search.py @@ -1,10 +1,27 @@ +from unittest import mock + from django.contrib.contenttypes.models import ContentType -from django.test import TestCase +from django.db import DEFAULT_DB_ALIAS, connection, transaction +from django.db.models.signals import post_delete, post_save +from django.test import TestCase, TransactionTestCase +from redis.exceptions import ConnectionError as RedisConnectionError from dcim.models import Site from dcim.search import SiteIndex from extras.models import CachedValue -from netbox.search.backends import search_backend +from netbox.search import deferred +from netbox.search.backends import SearchBackend, search_backend +from netbox.search.jobs import SearchCacheJob + + +def scheduled_search_flushes(): + # The deferred flush callbacks scheduled on the current connection, + # identified by the alias tag set in netbox.search.deferred. Django stores + # each registered callback as a (savepoint_ids, func, robust) tuple. + return [ + entry[1] for entry in connection.run_on_commit + if hasattr(entry[1], deferred._FLUSH_ALIAS_ATTR) + ] class SearchBackendTestCase(TestCase): @@ -103,7 +120,12 @@ def test_cache_on_save(self): shipping_address='7915 Lilla Plains West Ladariusport TX 19429', comments='Lorem ipsum etcetera' ) - site.save() + + # Caching is deferred to a post-commit task. With no RQ worker running in + # the test environment it falls back to synchronous indexing; execute the + # on_commit callback to drive it within the test's transaction. + with self.captureOnCommitCallbacks(execute=True): + site.save() content_type = ContentType.objects.get_for_model(Site) self.assertEqual( @@ -115,12 +137,24 @@ def test_remove_on_delete(self): """ Test that any cached value for an object are automatically removed on delete(). """ - site = Site.objects.first() - site.delete() - content_type = ContentType.objects.get_for_model(Site) + + # Seed an object with cached entries, then delete it. Capture the pk before delete() (which + # nulls instance.pk in memory) so the post-delete assertion queries the real id rather than + # object_id=None. The create is wrapped in captureOnCommitCallbacks so the deferred caching + # actually runs (and writes rows) before we assert it was seeded. + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create(name='Site Delete', slug='site-delete', facility='Foxtrot') + site_pk = site.pk + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() + ) + + with self.captureOnCommitCallbacks(execute=True): + site.delete() + self.assertFalse( - CachedValue.objects.filter(object_type=content_type, object_id=site.pk).exists() + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() ) def test_clear_all(self): @@ -151,3 +185,582 @@ def test_search(self): self.assertEqual(len(results), 1) results = search_backend.search('xxxxx') self.assertEqual(len(results), 0) + + +class DeferredCachingTestCase(TestCase): + """ + Tests for the deferred (post-commit) search caching machinery in + netbox.search.deferred. + + With no RQ worker registered, deferral falls back to synchronous indexing on + commit, so these tests assert on real CachedValue state and on the real + per-transaction buffer (connection.run_on_commit) rather than mocking the + queue. + """ + + def _scheduled_flush_aliases(self): + return [getattr(func, deferred._FLUSH_ALIAS_ATTR) for func in scheduled_search_flushes()] + + def _pending_batch(self): + for func in scheduled_search_flushes(): + return getattr(func, deferred._FLUSH_BATCH_ATTR) + return None + + def test_run_on_commit_entry_shape(self): + """ + deferred._pending_batch() relies on Django storing each on_commit + callback as a (savepoint_ids, func, robust) tuple in + connection.run_on_commit. That structure is a Django internal, not a + documented API. Assert its shape explicitly so a change in a future + Django release fails here with a clear pointer, rather than surfacing as + an opaque unpack error inside the deferred-caching machinery. + """ + with transaction.atomic(): + transaction.on_commit(lambda: None) + entries = connection.run_on_commit + self.assertTrue(entries, "expected a registered on_commit callback") + entry = entries[-1] + self.assertEqual( + len(entry), 3, + "Django's connection.run_on_commit entry is no longer a 3-tuple; " + "netbox.search.deferred._pending_batch() unpacks (sids, func, robust) " + "and must be updated to match the new structure." + ) + sids, func, robust = entry + self.assertIsInstance(sids, set) + self.assertTrue(callable(func)) + self.assertIsInstance(robust, bool) + + def test_savepoint_ids_shape(self): + """ + deferred.mark_for_deferred_indexing() reads connection.savepoint_ids to + scope each flush callback to its savepoint stack. That is a Django + internal, not a documented API. Assert it is a list inside a nested + atomic() so a future change fails here rather than silently re-leaking + nested-savepoint ops into an outer batch. + """ + with transaction.atomic(): + with transaction.atomic(): + self.assertIsInstance( + connection.savepoint_ids, list, + "Django's connection.savepoint_ids is no longer a list; " + "netbox.search.deferred.mark_for_deferred_indexing() keys its " + "flush callbacks on tuple(savepoint_ids) and must be updated." + ) + self.assertTrue( + connection.savepoint_ids, + "expected at least one savepoint id inside a nested atomic()" + ) + + def test_bulk_save_schedules_single_flush(self): + """ + A batch of saves within one transaction coalesces into a single flush + carrying every object, rather than one scheduled flush per object. + """ + site_ct = ContentType.objects.get_for_model(Site) + with transaction.atomic(): + for i in range(20): + Site.objects.create(name=f'Site {i}', slug=f'site-{i}') + + # Exactly one flush is scheduled, and its batch holds all 20 objects. + self.assertEqual(self._scheduled_flush_aliases().count('default'), 1) + batch = self._pending_batch() + site_pks = [pk for (ot_id, pk) in batch if ot_id == site_ct.pk] + self.assertEqual(len(site_pks), 20) + + def test_save_then_delete_in_same_scope_coalesces_to_removal(self): + """ + A create and delete buffered in the same savepoint scope coalesce to a + single removal. Model.delete() runs in its own atomic(savepoint=False) + block, which pushes a scope marker, so to exercise coalescing within one + scope the operations are buffered directly rather than via a real + delete(). + """ + site_ct = ContentType.objects.get_for_model(Site) + with transaction.atomic(): + deferred.mark_for_deferred_indexing(site_ct.pk, 1, deferred.OP_CACHE) + deferred.mark_for_deferred_indexing(site_ct.pk, 1, deferred.OP_REMOVE) + batch = self._pending_batch() + self.assertEqual(batch[(site_ct.pk, 1)], deferred.OP_REMOVE) + + def test_save_then_delete_ends_absent_from_cache(self): + """ + Creating then deleting an object within one transaction leaves it absent + from the cache, regardless of how the create and delete ops are scoped. + """ + site_ct = ContentType.objects.get_for_model(Site) + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create(name='Ephemeral', slug='ephemeral') + pk = site.pk + site.delete() + + self.assertFalse( + CachedValue.objects.filter(object_type=site_ct, object_id=pk).exists() + ) + + def test_rollback_does_not_leak_buffer(self): + """ + An object dirtied inside a transaction that rolls back leaves no flush + scheduled and no stale buffer behind, so it is never indexed. + """ + content_type = ContentType.objects.get_for_model(Site) + + # A nested atomic block that rolls back: its on_commit callback (and the + # batch it captured) are discarded by Django, so nothing is scheduled on + # the surrounding transaction. + with self.assertRaises(RuntimeError): + with transaction.atomic(): + rolled_back = Site.objects.create(name='RolledBack', slug='rolled-back') + rolled_back_pk = rolled_back.pk + # A flush was scheduled within this savepoint... + self.assertEqual(self._scheduled_flush_aliases().count('default'), 1) + raise RuntimeError('abort') + + # ...and is gone once the savepoint rolled back. + self.assertEqual(self._scheduled_flush_aliases().count('default'), 0) + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=rolled_back_pk).exists() + ) + + def test_commit_after_rollback_still_indexes(self): + """ + After a rolled-back transaction, a subsequent committed save on the same + connection still indexes correctly (no sticky buffer state survives the + rollback to suppress it). + """ + content_type = ContentType.objects.get_for_model(Site) + + with self.assertRaises(RuntimeError): + with transaction.atomic(): + Site.objects.create(name='RolledBack2', slug='rolled-back-2') + raise RuntimeError('abort') + + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create( + name='Committed', + slug='committed', + facility='Echo', + description='Committed after rollback', + physical_address='1 Test Way', + shipping_address='1 Test Way', + comments='Lorem ipsum', + ) + + # The committed object is indexed (no sticky buffer state from the + # rolled-back transaction suppressed it). + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + + def test_non_searchable_model_schedules_no_flush(self): + """ + Saving a model without a registered search index schedules no deferred + flush. + """ + with transaction.atomic(): + # CachedValue itself has no search indexer. + CachedValue.objects.create( + object_type=ContentType.objects.get_for_model(Site), + object_id=1, + field='name', + type='str', + value='test', + weight=100, + ) + self.assertEqual(self._scheduled_flush_aliases(), []) + + def test_flush_enqueues_job_when_worker_available(self): + """ + When an RQ worker is available, the flush enqueues a SearchCacheJob + carrying the dirty objects (and the originating database alias) rather + than indexing inline. + """ + site_ct = ContentType.objects.get_for_model(Site) + + # Patching the worker-availability probe is the established pattern for + # worker-gated behavior (cf. extras/tests/test_views.py, test_api.py). + with mock.patch('netbox.search.deferred.any_workers_for_queue', return_value=True): + with mock.patch.object(SearchCacheJob, 'enqueue') as enqueue: + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create(name='Enqueued', slug='enqueued') + + enqueue.assert_called_once() + kwargs = enqueue.call_args.kwargs + self.assertEqual(kwargs['cache_groups'], {site_ct.pk: [site.pk]}) + self.assertEqual(kwargs['remove_groups'], {}) + # The database alias captured from the post_save signal is forwarded to + # the job verbatim. This is what lets a deferred write target the + # originating schema (e.g. a branch schema under netbox-branching) even + # though the worker has no active routing context; cross-schema routing + # itself is covered by the netbox-branching test suite. + self.assertEqual(kwargs['using'], DEFAULT_DB_ALIAS) + + def test_flush_falls_back_inline_when_broker_unreachable(self): + """ + If the broker is unreachable (the worker-availability probe raises a + RedisError), the flush must not propagate the error; it falls back to + inline indexing, which is a PostgreSQL write with no Redis dependency. + """ + content_type = ContentType.objects.get_for_model(Site) + + with mock.patch( + 'netbox.search.deferred.any_workers_for_queue', + side_effect=RedisConnectionError("broker down"), + ): + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create( + name='Broker Down', + slug='broker-down', + facility='Golf', + description='Indexed inline despite broker outage', + physical_address='3 Test Way', + shipping_address='3 Test Way', + comments='Lorem ipsum', + ) + + # The object was indexed inline (no exception propagated, rows written). + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + + def test_flush_falls_back_inline_when_enqueue_fails(self): + """ + A worker can die (or the broker can drop) between the availability probe + and the enqueue. The flush guards the whole dispatch, not just the probe: + if enqueue raises a RedisError it still falls back to inline indexing. + """ + content_type = ContentType.objects.get_for_model(Site) + + with mock.patch('netbox.search.deferred.any_workers_for_queue', return_value=True): + with mock.patch.object( + SearchCacheJob, 'enqueue', side_effect=RedisConnectionError("broker dropped") + ): + with self.captureOnCommitCallbacks(execute=True): + site = Site.objects.create( + name='Enqueue Failed', + slug='enqueue-failed', + facility='Hotel', + description='Indexed inline after enqueue failure', + physical_address='4 Test Way', + shipping_address='4 Test Way', + comments='Lorem ipsum', + ) + + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + + def test_cache_update_skips_deleted_object(self): + """ + _apply_deferred_updates tolerates a pk that no longer exists (object + deleted between enqueue and execution): it must not error or create cache + rows. + """ + site = Site.objects.create(name='Vanished', slug='vanished') + site_ct = ContentType.objects.get_for_model(Site) + pk = site.pk + site.delete() + CachedValue.objects.filter(object_type=site_ct, object_id=pk).delete() + + # No exception, and no rows resurrected for the missing object. + search_backend._apply_deferred_updates(using=None, cache_groups={site_ct.pk: [pk]}, remove_groups={}) + self.assertFalse( + CachedValue.objects.filter(object_type=site_ct, object_id=pk).exists() + ) + + +class AutocommitCachingTestCase(TransactionTestCase): + """ + Tests for the synchronous (autocommit) indexing path. Uses TransactionTestCase + rather than TestCase so that a save outside an explicit transaction runs in + autocommit (connection.in_atomic_block is False), exercising mark_for_deferred_indexing's + inline-indexing branch rather than the deferred on_commit path. + """ + + def test_autocommit_save_indexes_synchronously(self): + # Saved outside any atomic() block: indexing happens inline, immediately, + # with no background worker and no on_commit deferral. + site = Site.objects.create( + name='Autocommit Site', + slug='autocommit-site', + facility='Foxtrot', + description='Indexed synchronously', + physical_address='2 Test Way', + shipping_address='2 Test Way', + comments='Lorem ipsum', + ) + + content_type = ContentType.objects.get_for_model(Site) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=site.pk).count(), + len(SiteIndex.fields) + ) + + def test_autocommit_delete_removes_synchronously(self): + site = Site.objects.create(name='Autocommit Del', slug='autocommit-del') + content_type = ContentType.objects.get_for_model(Site) + # Capture the pk before delete() nulls instance.pk in memory. + site_pk = site.pk + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() + ) + + site.delete() + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists() + ) + + def test_inner_savepoint_rollback_does_not_leak_into_outer_batch(self): + """ + Regression test for the nested-atomic leak: an operation performed inside + an inner atomic() block (savepoint) that rolls back must not affect the + search cache when the outer transaction commits. + + TransactionTestCase is required so the outer atomic() is a real + transaction whose on_commit callbacks actually fire on commit (under + TestCase the test body is already inside a transaction, so nothing + commits and the leak is invisible). + """ + content_type = ContentType.objects.get_for_model(Site) + + # Object that exists before the transaction and stays cached: the inner + # savepoint will (transiently) delete it, then roll back. Capture the pk + # up front; Model.delete() nulls instance.pk in memory. + survivor = Site.objects.create(name='Survivor', slug='survivor') + survivor_pk = survivor.pk + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=survivor_pk).exists() + ) + + with transaction.atomic(): + # Outer write schedules a search-cache flush for this connection. + outer = self._create_indexed_site('Outer', 'outer') + + # Inner savepoint deletes the survivor, then rolls back. At the DB + # level the survivor still exists after the rollback. + try: + with transaction.atomic(): + survivor.delete() + self.assertFalse(Site.objects.filter(pk=survivor_pk).exists()) + raise RuntimeError('abort inner savepoint') + except RuntimeError: + pass + + # The savepoint rolled back, so the survivor row is still present. + self.assertTrue(Site.objects.filter(pk=survivor_pk).exists()) + + # After the outer transaction commits, the deferred flush runs. The + # rolled-back inner delete must NOT have removed the survivor from the + # cache, and the committed outer object must be indexed. + self.assertTrue( + CachedValue.objects.filter(object_type=content_type, object_id=survivor_pk).exists(), + "rolled-back inner-savepoint delete leaked into the outer flush and " + "removed the survivor from the search cache", + ) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=outer.pk).count(), + len(SiteIndex.fields), + ) + + @staticmethod + def _create_indexed_site(name, slug): + # SiteIndex.to_cache() emits a row only for non-empty fields, so populate + # every indexed field to get exactly len(SiteIndex.fields) cache rows. + return Site.objects.create( + name=name, + slug=slug, + facility='Facility', + description=f'{name} description', + physical_address='1 Test Way', + shipping_address='1 Test Way', + comments='Lorem ipsum', + ) + + def test_committed_savepoint_indexes_in_its_own_scope(self): + """ + A nested atomic() that commits is scoped to its own flush callback (one + per savepoint scope), and both the outer and the inner-committed object + are indexed. The two-callback count locks in the per-scope behavior so a + future change that re-merges scopes cannot silently reintroduce the + nested-rollback leak. + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + outer = self._create_indexed_site('OuterC', 'outer-c') + + with transaction.atomic(): + inner = self._create_indexed_site('InnerC', 'inner-c') + + # Two distinct savepoint scopes dirtied objects, so two flush + # callbacks are scheduled (one per scope), not one coalesced batch. + self.assertEqual(len(scheduled_search_flushes()), 2) + + # Both objects are indexed after the outer transaction commits. + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=outer.pk).count(), + len(SiteIndex.fields), + ) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=inner.pk).count(), + len(SiteIndex.fields), + ) + + def test_cross_scope_save_then_delete_nets_to_removed(self): + """ + Save an object at the transaction top level, then delete it inside a + committed nested savepoint. The two ops land in separate scopes (no + cross-scope coalescing), so two flush callbacks run in registration + (FIFO) order: the cache pass then the removal pass. The re-fetch at flush + time is the source of truth, so the end state is correctly "removed". + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + site = Site.objects.create(name='CrossScope', slug='cross-scope') + site_pk = site.pk + + with transaction.atomic(): + site.delete() + + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=site_pk).exists(), + "object deleted in a committed nested savepoint should not remain cached", + ) + self.assertFalse(Site.objects.filter(pk=site_pk).exists()) + + def test_sibling_savepoints_do_not_cross_contaminate(self): + """ + Two sequential nested savepoints under one outer transaction: the first + rolls back, the second commits, each dirtying a different object. Only + the committed sibling's object is indexed; the rolled-back sibling's op + is discarded with its own callback. + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + try: + with transaction.atomic(): + rolled = Site.objects.create(name='SiblingRollback', slug='sibling-rb') + rolled_pk = rolled.pk + raise RuntimeError('abort first sibling') + except RuntimeError: + pass + + with transaction.atomic(): + committed = self._create_indexed_site('SiblingCommit', 'sibling-commit') + + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=rolled_pk).exists(), + "rolled-back sibling savepoint's object should not be indexed", + ) + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=committed.pk).count(), + len(SiteIndex.fields), + ) + + def test_deep_nesting_middle_rollback(self): + """ + Three savepoint levels, each dirtying a distinct object; the middle level + rolls back. The top object and the deepest object nested under the middle + are both discarded along with the middle savepoint (its rollback prunes + every callback whose registration snapshot contains the middle sid), so + only the top-level object survives to be indexed. + """ + content_type = ContentType.objects.get_for_model(Site) + + with transaction.atomic(): + top = self._create_indexed_site('DeepTop', 'deep-top') + + try: + with transaction.atomic(): # middle savepoint + middle = Site.objects.create(name='DeepMiddle', slug='deep-middle') + middle_pk = middle.pk + + with transaction.atomic(): # deepest savepoint + deepest = Site.objects.create(name='DeepDeepest', slug='deep-deepest') + deepest_pk = deepest.pk + + raise RuntimeError('abort middle') + except RuntimeError: + pass + + # Only the top-level object survived; the middle savepoint's rollback + # discarded both the middle and the deepest object's flush callbacks. + self.assertEqual( + CachedValue.objects.filter(object_type=content_type, object_id=top.pk).count(), + len(SiteIndex.fields), + ) + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=middle_pk).exists() + ) + self.assertFalse( + CachedValue.objects.filter(object_type=content_type, object_id=deepest_pk).exists() + ) + + +class _MinimalSearchBackend(SearchBackend): + """ + A backend implementing only the documented contract (search/cache/remove/clear), with no + knowledge of deferral. Records cache()/remove() calls in memory so a test can assert it indexed + synchronously. Used to prove a custom SEARCH_BACKEND keeps working after this change. + """ + + def __init__(self): + self.cached = [] + self.removed = [] + + def search(self, value, user=None, object_types=None, lookup=None): + return [] + + def cache(self, instances, indexer=None, remove_existing=True): + if not hasattr(instances, '__iter__'): + instances = [instances] + self.cached.extend(instances) + + def remove(self, instance): + self.removed.append(instance) + + def clear(self, object_types=None): + self.cached.clear() + self.removed.clear() + + +class CustomBackendContractTestCase(TransactionTestCase): + """ + Proves the deferred-indexing work did not change the public SearchBackend contract: a custom + backend implementing only search/cache/remove/clear still indexes synchronously through the base + signal handlers, and never schedules deferred (on_commit) work. Deferral is private to + CachedValueSearchBackend; it must not leak onto a backend that didn't opt into it. + """ + + def test_custom_backend_indexes_synchronously(self): + backend = _MinimalSearchBackend() + + # Connect the custom backend's (inherited, synchronous) handlers, exactly as + # backends.py connects the configured backend at import. Same call site, no type-check. + post_save.connect(backend.caching_handler, sender=Site) + post_delete.connect(backend.removal_handler, sender=Site) + self.addCleanup(post_save.disconnect, backend.caching_handler, sender=Site) + self.addCleanup(post_delete.disconnect, backend.removal_handler, sender=Site) + + # A backend implementing only the documented contract indexes through the base handlers + # inline (the handler calls self.cache()/self.remove() synchronously). It is never routed + # into the deferral machinery, which is private to CachedValueSearchBackend. + site = Site.objects.create(name='Custom Backend', slug='custom-backend') + self.assertIn(site, backend.cached) + + site.delete() + self.assertEqual(len(backend.removed), 1) + + def test_default_backend_defers_via_same_call_path(self): + # The default backend (CachedValueSearchBackend) IS connected at import, and reaches the + # SAME caching_handler call path -- but its override defers instead of indexing inline. + # This contrasts with the custom backend above: identical dispatch, polymorphic behavior. + with transaction.atomic(): + Site.objects.create(name='Default Defers', slug='default-defers') + scheduled = scheduled_search_flushes() + self.assertEqual(len(scheduled), 1)