Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/plugins/development/models.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ For more information about database migrations, see the [Django documentation](h

::: netbox.models.features.TagsMixin

!!! note "Tagged objects and the REST API"
The `/api/extras/tagged-objects/` endpoint returns only tagged objects the requesting user is
permitted to view, which requires the target model's manager to provide `RestrictedQuerySet` (as
`NetBoxModel` does). A model that uses `TagsMixin` without `NetBoxModel` has a plain manager, so
its tagged objects are omitted from that endpoint unless its view permission is exempted via
[`EXEMPT_VIEW_PERMISSIONS`](../../configuration/security.md#exempt_view_permissions).

## Custom Model Features

In addition to utilizing the model features provided natively by NetBox (listed above), plugins can register their own model features. This is done using the `register_model_feature()` function from `netbox.utils`. This function takes two arguments: a feature name, and a callable which accepts a model class. The callable must return a boolean value indicting whether the given model supports the named feature.
Expand Down
12 changes: 8 additions & 4 deletions netbox/extras/api/serializers_/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class Meta:
def get_object(self, obj):
"""
Serialize a nested representation of the tagged object.

TaggedItemViewSet filters list/detail responses to targets the requesting user may view, so
the None check here guards against a missing or concurrently deleted target.
"""
content_object = obj.content_object
if content_object is None:
return None
try:
serializer = get_serializer_for_model(obj.content_object)
serializer = get_serializer_for_model(content_object)
except SerializerNotFound:
return obj.object_repr
data = serializer(obj.content_object, nested=True, context={'request': self.context['request']}).data

return data
return serializer(content_object, nested=True, context={'request': self.context['request']}).data
7 changes: 7 additions & 0 deletions netbox/extras/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from netbox.api.viewsets import BaseViewSet, NetBoxModelViewSet
from netbox.api.viewsets.mixins import ObjectValidationMixin
from utilities.exceptions import RQWorkerNotRunningException
from utilities.permissions import restrict_queryset_by_gfk
from utilities.request import copy_safe_request
from utilities.rqworker import any_workers_for_queue

Expand Down Expand Up @@ -192,6 +193,12 @@ class TaggedItemViewSet(RetrieveModelMixin, ListModelMixin, BaseViewSet):
serializer_class = serializers.TaggedItemSerializer
filterset_class = filtersets.TaggedItemFilterSet

def filter_queryset(self, queryset):
# Limit results to tagged items whose target object the requesting user may view, so the
# endpoint does not disclose objects (or their identity) outside the user's permissions.
queryset = super().filter_queryset(queryset)
return restrict_queryset_by_gfk(queryset, self.request.user, action='view')


#
# Image attachments
Expand Down
2 changes: 1 addition & 1 deletion netbox/extras/tests/query_counts.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
"tableconfig:list_objects_with_permission": 19,
"tag:api_list_objects": 10,
"tag:list_objects_with_permission": 18,
"taggeditem:api_list_objects": 12,
"taggeditem:api_list_objects": 13,
"webhook:api_list_objects": 13,
"webhook:list_objects_with_permission": 20
}
146 changes: 145 additions & 1 deletion netbox/extras/tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from django.contrib.contenttypes.models import ContentType
from django.core.files.uploadedfile import SimpleUploadedFile
from django.test import override_settings
from django.urls import reverse
from django.utils.timezone import make_aware, now
from rest_framework import status
Expand All @@ -18,7 +19,7 @@
from extras.scripts import BooleanVar, IntegerVar, StringVar
from extras.scripts import Script as PythonClass
from users.constants import TOKEN_PREFIX
from users.models import Group, Token, User
from users.models import Group, ObjectPermission, Token, User
from utilities.tables import get_table_for_model
from utilities.testing import APITestCase, APIViewTestCases

Expand Down Expand Up @@ -750,6 +751,149 @@ def setUpTestData(cls):
sites[1].tags.set([tags[1], tags[2]])
sites[2].tags.set([tags[2], tags[0]])

def setUp(self):
super().setUp()
# Tagged-object visibility now requires view permission on the target model.
self.add_permissions('dcim.view_site')


class TaggedObjectPermissionTestCase(APITestCase):
"""The tagged-objects endpoint must not disclose objects outside the user's view permissions."""

@classmethod
def setUpTestData(cls):
cls.tag = Tag.objects.create(name='Tag 1', slug='tag-1')
# Two distinct target models: one the test user will be granted view on, one not.
cls.visible_obj = Site.objects.create(name='Visible Site', slug='visible-site')
cls.hidden_obj = Manufacturer.objects.create(name='Hidden Manufacturer', slug='hidden-manufacturer')
cls.visible_obj.tags.set([cls.tag])
cls.hidden_obj.tags.set([cls.tag])

def _list_url(self):
# Scope to the tag, matching the reported GET /api/extras/tagged-objects/?tag=<slug>
return f"{reverse('extras-api:taggeditem-list')}?tag={self.tag.slug}"

def test_hidden_target_excluded_from_list(self):
"""A tagged object the user cannot view is absent from both results and count."""
self.add_permissions('extras.view_taggeditem', 'dcim.view_site')

response = self.client.get(self._list_url(), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)

self.assertEqual(response.data['count'], 1)
object_types = {r['object_type'] for r in response.data['results']}
self.assertIn('dcim.site', object_types)
self.assertNotIn('dcim.manufacturer', object_types)

visible_row = response.data['results'][0]
self.assertIsNotNone(visible_row['object'])
self.assertEqual(visible_row['object']['id'], self.visible_obj.pk)

def test_no_target_permission_returns_no_rows(self):
"""With view_taggeditem but no target view permission, results and count are empty (not 403)."""
self.add_permissions('extras.view_taggeditem')

response = self.client.get(self._list_url(), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], 0)
self.assertEqual(response.data['results'], [])

def test_target_object_constraints_are_honored(self):
"""Object-level constraints on the target model gate tagged rows of the same model."""
# A second Site, tagged the same, that the constrained permission must exclude.
hidden_site = Site.objects.create(name='Hidden Site', slug='hidden-site')
hidden_site.tags.set([self.tag])

obj_perm = ObjectPermission(
name='View one site',
constraints={'pk': self.visible_obj.pk},
actions=['view'],
)
obj_perm.save()
obj_perm.users.add(self.user)
obj_perm.object_types.add(ObjectType.objects.get_for_model(Site))
self.add_permissions('extras.view_taggeditem')

response = self.client.get(self._list_url(), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)

self.assertEqual(response.data['count'], 1)
row = response.data['results'][0]
self.assertEqual(row['object_type'], 'dcim.site')
self.assertEqual(row['object_id'], self.visible_obj.pk)

def test_hidden_target_detail_returns_404(self):
"""A tagged-item row for a hidden target is not retrievable by id; a visible one is."""
self.add_permissions('extras.view_taggeditem', 'dcim.view_site')

visible_item = TaggedItem.objects.get(
content_type__model='site', object_id=self.visible_obj.pk, tag=self.tag
)
hidden_item = TaggedItem.objects.get(
content_type__model='manufacturer', object_id=self.hidden_obj.pk, tag=self.tag
)

hidden_url = reverse('extras-api:taggeditem-detail', kwargs={'pk': hidden_item.pk})
self.assertHttpStatus(self.client.get(hidden_url, **self.header), status.HTTP_404_NOT_FOUND)

visible_url = reverse('extras-api:taggeditem-detail', kwargs={'pk': visible_item.pk})
self.assertHttpStatus(self.client.get(visible_url, **self.header), status.HTTP_200_OK)

def test_authorized_target_includes_nested_object(self):
"""With view permission on all targets, every row exposes its nested object."""
self.add_permissions('extras.view_taggeditem', 'dcim.view_site', 'dcim.view_manufacturer')

response = self.client.get(self._list_url(), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)

self.assertEqual(response.data['count'], 2)
object_types = {r['object_type'] for r in response.data['results']}
self.assertIn('dcim.site', object_types)
self.assertIn('dcim.manufacturer', object_types)
for r in response.data['results']:
self.assertIsNotNone(r['object'])
self.assertEqual(r['object']['id'], r['object_id'])

def test_superuser_sees_all_rows(self):
"""A superuser is unaffected by target-object filtering."""
self.user.is_superuser = True
self.user.save()

response = self.client.get(self._list_url(), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)
self.assertEqual(response.data['count'], 2)
object_types = {r['object_type'] for r in response.data['results']}
self.assertIn('dcim.site', object_types)
self.assertIn('dcim.manufacturer', object_types)

@override_settings(EXEMPT_VIEW_PERMISSIONS=['dcim.manufacturer'])
def test_exempt_target_visible_without_object_permission(self):
"""A target whose view permission is exempt is included without an explicit grant."""
self.add_permissions('extras.view_taggeditem', 'dcim.view_site')

response = self.client.get(self._list_url(), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)

self.assertEqual(response.data['count'], 2)
object_types = {r['object_type'] for r in response.data['results']}
self.assertIn('dcim.manufacturer', object_types)
manufacturer_row = next(r for r in response.data['results'] if r['object_type'] == 'dcim.manufacturer')
self.assertEqual(manufacturer_row['object_id'], self.hidden_obj.pk)
self.assertIsNotNone(manufacturer_row['object'])

@override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
def test_blanket_exempt_includes_all_targets(self):
"""With EXEMPT_VIEW_PERMISSIONS=['*'], every tagged target is visible without object perms."""
self.add_permissions('extras.view_taggeditem')

response = self.client.get(self._list_url(), **self.header)
self.assertHttpStatus(response, status.HTTP_200_OK)

self.assertEqual(response.data['count'], 2)
object_types = {r['object_type'] for r in response.data['results']}
self.assertIn('dcim.site', object_types)
self.assertIn('dcim.manufacturer', object_types)


# TODO: Standardize to APIViewTestCase (needs create & update tests)
class ImageAttachmentTestCase(
Expand Down
59 changes: 59 additions & 0 deletions netbox/utilities/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
'qs_filter_from_constraints',
'resolve_permission',
'resolve_permission_type',
'restrict_queryset_by_gfk',
)


Expand Down Expand Up @@ -159,3 +160,61 @@ def _replace_tokens(value, tokens):
return Q()

return params


def restrict_queryset_by_gfk(
queryset,
user,
action='view',
content_type_field='content_type',
object_id_field='object_id',
):
"""
Restrict a queryset carrying a GenericForeignKey-style (content_type, object_id) pair so that
only rows whose related target object the user is permitted to perform `action` on are returned.

Each target's visibility is evaluated against that target model's own restricted queryset
(RestrictedQuerySet.restrict()), so per-model object permissions, exempt views, anonymous
access, and superuser access are all honored. Rows whose target model does not participate in
NetBox's object-permission system (no restrict() method) are excluded (fail closed).
"""
# Superusers may view everything; skip the (potentially per-type) filtering for efficiency.
if user and user.is_superuser:
return queryset

ContentType = apps.get_model('contenttypes', 'ContentType')
content_type_id_field = f'{content_type_field}_id'

# Resolve the distinct content types referenced by the (already filtered) queryset
content_type_ids = queryset.order_by().values_list(content_type_id_field, flat=True).distinct()

query = Q()
matched = False
for ct_id in content_type_ids:
# get_for_id() is process-cached, avoiding a DB hit once each content type is warm.
model = ContentType.objects.get_for_id(ct_id).model_class()
if model is None:
continue

# Exempt-view models are visible to everyone; match the content type without a subquery.
if permission_is_exempt(get_permission_for_model(model, action)):
query |= Q(**{content_type_id_field: ct_id})
matched = True
continue

target_queryset = model._default_manager.all()
if not hasattr(target_queryset, 'restrict'):
# Fail closed: target model has no object-level permission enforcement.
continue
visible_pks = target_queryset.restrict(user, action).values('pk')
query |= Q(**{
content_type_id_field: ct_id,
f'{object_id_field}__in': visible_pks,
})
matched = True

if not matched:
# Fail closed: an empty Q() would match every row, so return nothing instead.
return queryset.none()

return queryset.filter(query)