Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/features/search.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Global Search

NetBox includes a powerful global search engine, providing a single convenient interface to search across its complex data model. Relevant fields on each model are indexed according to their precedence, so that the most relevant results are returned first. When objects are created or modified, the search index is updated immediately, ensuring real-time accuracy.
NetBox includes a powerful global search engine, providing a single convenient interface to search across its complex data model. Relevant fields on each model are indexed according to their precedence, so that the most relevant results are returned first. When objects are created, modified, or deleted, the search index is updated by a background task shortly afterward. As a result, a newly created or changed object may not appear in search results for a brief period. (When no background worker is running, the index is updated immediately as part of the request.)

When entering a search query, the user can choose a specific lookup type: exact match, partial match, etc. When a partial match is found, the matching portion of the applicable field value is included with each result so that the user can easily determine its relevance.

Expand Down
151 changes: 133 additions & 18 deletions netbox/netbox/search/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

import netaddr
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.core.exceptions import ImproperlyConfigured
from django.db import ProgrammingError
from django.db import DatabaseError, ProgrammingError, transaction
from django.db.models import F, Q, Window, prefetch_related_objects
from django.db.models.fields.related import ForeignKey
from django.db.models.functions import window
Expand All @@ -22,6 +21,7 @@
from utilities.string import title

from . import FieldTypes, LookupTypes, get_indexer
from .deferred import OP_CACHE, OP_REMOVE, mark_for_deferred_indexing

DEFAULT_LOOKUP_TYPE = LookupTypes.PARTIAL
MAX_RESULTS = 1000
Expand All @@ -31,7 +31,8 @@

class SearchBackend:
"""
Base class for search backends. Subclasses must extend the `cache()`, `remove()`, and `clear()` methods below.
Base class for search backends. Subclasses must extend the `cache()`, `remove()`, `clear()`, and
`apply_deferred_updates()` methods below.
"""
_object_types = None

Expand Down Expand Up @@ -63,22 +64,43 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE
"""
raise NotImplementedError

def caching_handler(self, sender, instance, created, **kwargs):
def caching_handler(self, sender, instance, created, using=None, **kwargs):
"""
Receiver for the post_save signal, responsible for caching object creation/changes.
"""
# Skip non-cacheable objects without scheduling any deferred work.
try:
self.cache(instance, remove_existing=not created)
indexer = get_indexer(instance)
except KeyError:
return

try:
object_type = ObjectType.objects.get_for_model(indexer.model)
except ProgrammingError as e:
# The schema may be incomplete during migrations; skip caching.
logger.warning(f"Skipping search cache update due to schema error: {e}")
pass
return

mark_for_deferred_indexing(object_type.pk, instance.pk, OP_CACHE, using=using)

def removal_handler(self, sender, instance, **kwargs):
def removal_handler(self, sender, instance, using=None, **kwargs):
"""
Receiver for the post_delete signal, responsible for caching object deletion.
"""
self.remove(instance)
# Skip non-cacheable objects without scheduling any deferred work.
try:
indexer = get_indexer(instance)
except KeyError:
return

try:
object_type = ObjectType.objects.get_for_model(indexer.model)
except ProgrammingError as e:
# The schema may be incomplete during migrations; skip caching.
logger.warning(f"Skipping search cache update due to schema error: {e}")
return

mark_for_deferred_indexing(object_type.pk, instance.pk, OP_REMOVE, using=using)

def cache(self, instances, indexer=None, remove_existing=True):
"""
Expand All @@ -98,6 +120,15 @@ def clear(self, object_types=None):
"""
raise NotImplementedError

def apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=None):
"""
Apply a coalesced batch of deferred cache updates (called by the background search cache job
and by the inline fallback). `cache_groups` and `remove_groups` are {object_type_id: [pk, ...]}
maps; `using` is the database alias the originating writes used, replayed here so entries land
in the originating schema.
"""
raise NotImplementedError

def count(self, object_types=None):
"""
Return a count of all cache entries (optionally filtered by object type).
Expand Down Expand Up @@ -196,13 +227,21 @@ def search(self, value, user=None, object_types=None, lookup=DEFAULT_LOOKUP_TYPE

return ret

def cache(self, instances, indexer=None, remove_existing=True):
def cache(self, instances, indexer=None, remove_existing=True, using=None):
custom_fields = None

# Convert a single instance to an iterable
if not hasattr(instances, '__iter__'):
instances = [instances]

# Determine the queryset manager used to write cache entries. When a
# database alias is provided (e.g. by a deferred task replaying the alias
# the originating write used), entries are written to that connection;
# otherwise the configured router decides. `using` is expected to be a
# concrete alias or falsy (None) per the caller's contract; a falsy value
# defers to the router, which is the correct behavior either way.
manager = CachedValue.objects.using(using) if using else CachedValue.objects

buffer = []
counter = 0
for instance in instances:
Expand All @@ -225,7 +264,7 @@ def cache(self, instances, indexer=None, remove_existing=True):

# Wipe out any previously cached values for the object
if remove_existing:
self.remove(instance)
self.remove(instance, using=using)

# Generate cache data
object_type = ObjectType.objects.get_for_model(indexer.model)
Expand All @@ -243,27 +282,103 @@ def cache(self, instances, indexer=None, remove_existing=True):

# Check whether the buffer needs to be flushed
if len(buffer) >= 2000:
counter += len(CachedValue.objects.bulk_create(buffer))
counter += len(manager.bulk_create(buffer))
buffer = []

# Final buffer flush
if buffer:
counter += len(CachedValue.objects.bulk_create(buffer))
counter += len(manager.bulk_create(buffer))

return counter

def remove(self, instance):
def _remove_by_id(self, object_type_id, object_ids, using=None):
"""
Delete cached values for the given content type and object IDs using a
single raw DELETE. Shared by remove() and the deferred search task.
"""
if not object_ids:
return None

qs = CachedValue.objects.filter(object_type_id=object_type_id, object_id__in=object_ids)

# Call _raw_delete() on the queryset to avoid first loading instances into memory
return qs._raw_delete(using=using or qs.db)

def remove(self, instance, using=None):
# Avoid attempting to query for non-cacheable objects
try:
get_indexer(instance)
indexer = get_indexer(instance)
except KeyError:
return None

ct = ContentType.objects.get_for_model(instance)
qs = CachedValue.objects.filter(object_type=ct, object_id=instance.pk)
# Use the indexer's (concrete) model to resolve the object type, matching
# the content type that cache() writes entries under.
object_type = ObjectType.objects.get_for_model(indexer.model)

# Call _raw_delete() on the queryset to avoid first loading instances into memory
return qs._raw_delete(using=qs.db)
return self._remove_by_id(object_type.pk, [instance.pk], using=using)

# Postgres SQLSTATEs indicating the target schema/table no longer exists. This happens when a
# branch is merged or deprovisioned (its schema dropped) between the time an update was enqueued
# and when it is applied. Such errors are expected and safe to skip; the index is rebuilt on the
# next reindex. Any other DatabaseError (e.g. a deadlock or lost connection) is transient and must
# propagate so the work fails visibly, rather than silently dropping index updates.
_MISSING_SCHEMA_SQLSTATES = frozenset((
'3F000', # invalid_schema_name
'42P01', # undefined_table
))

def _is_missing_schema(self, exc):
"""
Return True if the given DatabaseError was caused by the target schema/table no longer existing
(vs. a transient error that should propagate).
"""
sqlstate = getattr(getattr(exc, '__cause__', None), 'sqlstate', None)
return sqlstate in self._MISSING_SCHEMA_SQLSTATES

def apply_deferred_updates(self, using=None, cache_groups=None, remove_groups=None, log=logger):
"""
Apply a coalesced batch of updates to the search cache. The `using` alias captured when each
object was saved/deleted is replayed here so entries are written to the originating
database/schema (e.g. a branch schema under netbox-branching), regardless of any routing
context that is no longer active by the time this runs.
"""
# Removals are a single DELETE per content type, so (unlike the cache loop below) there is no
# multi-step state to wrap in a transaction. A transient error here propagates and errors the
# caller; the remaining work is dropped rather than retried (NetBox does not retry these jobs
# by default) and is recovered by the next reindex.
for object_type_id, pks in (remove_groups or {}).items():
try:
self._remove_by_id(object_type_id, pks, using=using)
except DatabaseError as e:
if not self._is_missing_schema(e):
raise
log.warning(f"Skipping search cache removal for object type {object_type_id}: {e}")

for object_type_id, pks in (cache_groups or {}).items():
try:
object_type = ObjectType.objects.get(pk=object_type_id)
except ObjectType.DoesNotExist:
continue
model = object_type.model_class()
if model is None:
continue

try:
# Re-fetch live instances from the originating database. Reading on `using` is
# required: a branch object's PK may be absent (or refer to a different object) on the
# default connection.
queryset = model.objects.using(using).filter(pk__in=pks)

# Clear any stale entries for these objects, then re-insert. Wrapping both in one
# transaction avoids leaving an object with no cache rows if execution fails between
# the delete and the insert.
with transaction.atomic(using=using):
self._remove_by_id(object_type_id, pks, using=using)
self.cache(queryset, remove_existing=False, using=using)
except DatabaseError as e:
if not self._is_missing_schema(e):
raise
log.warning(f"Skipping search cache update for object type {object_type_id}: {e}")

def clear(self, object_types=None):
qs = CachedValue.objects.all()
Expand Down
Loading