diff --git a/docs/plugins/development/models.md b/docs/plugins/development/models.md index 9a8256d6db2..cb3e6b8a2d8 100644 --- a/docs/plugins/development/models.md +++ b/docs/plugins/development/models.md @@ -162,6 +162,13 @@ For more information about database migrations, see the [Django documentation](h ::: netbox.models.features.TagsMixin +!!! note "Tagged objects and the REST API" + The `/api/extras/tagged-objects/` endpoint returns only tagged objects the requesting user is + permitted to view, which requires the target model's manager to provide `RestrictedQuerySet` (as + `NetBoxModel` does). A model that uses `TagsMixin` without `NetBoxModel` has a plain manager, so + its tagged objects are omitted from that endpoint unless its view permission is exempted via + [`EXEMPT_VIEW_PERMISSIONS`](../../configuration/security.md#exempt_view_permissions). + ## Custom Model Features In addition to utilizing the model features provided natively by NetBox (listed above), plugins can register their own model features. This is done using the `register_model_feature()` function from `netbox.utils`. This function takes two arguments: a feature name, and a callable which accepts a model class. The callable must return a boolean value indicting whether the given model supports the named feature. diff --git a/netbox/extras/api/serializers_/tags.py b/netbox/extras/api/serializers_/tags.py index 75ca4e9d268..c42c185b3b1 100644 --- a/netbox/extras/api/serializers_/tags.py +++ b/netbox/extras/api/serializers_/tags.py @@ -58,11 +58,15 @@ class Meta: def get_object(self, obj): """ Serialize a nested representation of the tagged object. + + TaggedItemViewSet filters list/detail responses to targets the requesting user may view, so + the None check here guards against a missing or concurrently deleted target. """ + content_object = obj.content_object + if content_object is None: + return None try: - serializer = get_serializer_for_model(obj.content_object) + serializer = get_serializer_for_model(content_object) except SerializerNotFound: return obj.object_repr - data = serializer(obj.content_object, nested=True, context={'request': self.context['request']}).data - - return data + return serializer(content_object, nested=True, context={'request': self.context['request']}).data diff --git a/netbox/extras/api/views.py b/netbox/extras/api/views.py index faf8a9becb2..e9efa3d9aa5 100644 --- a/netbox/extras/api/views.py +++ b/netbox/extras/api/views.py @@ -22,6 +22,7 @@ from netbox.api.viewsets import BaseViewSet, NetBoxModelViewSet from netbox.api.viewsets.mixins import ObjectValidationMixin from utilities.exceptions import RQWorkerNotRunningException +from utilities.permissions import restrict_queryset_by_gfk from utilities.request import copy_safe_request from utilities.rqworker import any_workers_for_queue @@ -192,6 +193,12 @@ class TaggedItemViewSet(RetrieveModelMixin, ListModelMixin, BaseViewSet): serializer_class = serializers.TaggedItemSerializer filterset_class = filtersets.TaggedItemFilterSet + def filter_queryset(self, queryset): + # Limit results to tagged items whose target object the requesting user may view, so the + # endpoint does not disclose objects (or their identity) outside the user's permissions. + queryset = super().filter_queryset(queryset) + return restrict_queryset_by_gfk(queryset, self.request.user, action='view') + # # Image attachments diff --git a/netbox/extras/tests/query_counts.json b/netbox/extras/tests/query_counts.json index a6807bd29a7..c1e45233768 100644 --- a/netbox/extras/tests/query_counts.json +++ b/netbox/extras/tests/query_counts.json @@ -31,7 +31,7 @@ "tableconfig:list_objects_with_permission": 19, "tag:api_list_objects": 10, "tag:list_objects_with_permission": 18, - "taggeditem:api_list_objects": 12, + "taggeditem:api_list_objects": 13, "webhook:api_list_objects": 13, "webhook:list_objects_with_permission": 20 } diff --git a/netbox/extras/tests/test_api.py b/netbox/extras/tests/test_api.py index 04ff9d424fc..518a5528c2f 100644 --- a/netbox/extras/tests/test_api.py +++ b/netbox/extras/tests/test_api.py @@ -5,6 +5,7 @@ from django.contrib.contenttypes.models import ContentType from django.core.files.uploadedfile import SimpleUploadedFile +from django.test import override_settings from django.urls import reverse from django.utils.timezone import make_aware, now from rest_framework import status @@ -18,7 +19,7 @@ from extras.scripts import BooleanVar, IntegerVar, StringVar from extras.scripts import Script as PythonClass from users.constants import TOKEN_PREFIX -from users.models import Group, Token, User +from users.models import Group, ObjectPermission, Token, User from utilities.tables import get_table_for_model from utilities.testing import APITestCase, APIViewTestCases @@ -750,6 +751,149 @@ def setUpTestData(cls): sites[1].tags.set([tags[1], tags[2]]) sites[2].tags.set([tags[2], tags[0]]) + def setUp(self): + super().setUp() + # Tagged-object visibility now requires view permission on the target model. + self.add_permissions('dcim.view_site') + + +class TaggedObjectPermissionTestCase(APITestCase): + """The tagged-objects endpoint must not disclose objects outside the user's view permissions.""" + + @classmethod + def setUpTestData(cls): + cls.tag = Tag.objects.create(name='Tag 1', slug='tag-1') + # Two distinct target models: one the test user will be granted view on, one not. + cls.visible_obj = Site.objects.create(name='Visible Site', slug='visible-site') + cls.hidden_obj = Manufacturer.objects.create(name='Hidden Manufacturer', slug='hidden-manufacturer') + cls.visible_obj.tags.set([cls.tag]) + cls.hidden_obj.tags.set([cls.tag]) + + def _list_url(self): + # Scope to the tag, matching the reported GET /api/extras/tagged-objects/?tag= + return f"{reverse('extras-api:taggeditem-list')}?tag={self.tag.slug}" + + def test_hidden_target_excluded_from_list(self): + """A tagged object the user cannot view is absent from both results and count.""" + self.add_permissions('extras.view_taggeditem', 'dcim.view_site') + + response = self.client.get(self._list_url(), **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertEqual(response.data['count'], 1) + object_types = {r['object_type'] for r in response.data['results']} + self.assertIn('dcim.site', object_types) + self.assertNotIn('dcim.manufacturer', object_types) + + visible_row = response.data['results'][0] + self.assertIsNotNone(visible_row['object']) + self.assertEqual(visible_row['object']['id'], self.visible_obj.pk) + + def test_no_target_permission_returns_no_rows(self): + """With view_taggeditem but no target view permission, results and count are empty (not 403).""" + self.add_permissions('extras.view_taggeditem') + + response = self.client.get(self._list_url(), **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + self.assertEqual(response.data['count'], 0) + self.assertEqual(response.data['results'], []) + + def test_target_object_constraints_are_honored(self): + """Object-level constraints on the target model gate tagged rows of the same model.""" + # A second Site, tagged the same, that the constrained permission must exclude. + hidden_site = Site.objects.create(name='Hidden Site', slug='hidden-site') + hidden_site.tags.set([self.tag]) + + obj_perm = ObjectPermission( + name='View one site', + constraints={'pk': self.visible_obj.pk}, + actions=['view'], + ) + obj_perm.save() + obj_perm.users.add(self.user) + obj_perm.object_types.add(ObjectType.objects.get_for_model(Site)) + self.add_permissions('extras.view_taggeditem') + + response = self.client.get(self._list_url(), **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertEqual(response.data['count'], 1) + row = response.data['results'][0] + self.assertEqual(row['object_type'], 'dcim.site') + self.assertEqual(row['object_id'], self.visible_obj.pk) + + def test_hidden_target_detail_returns_404(self): + """A tagged-item row for a hidden target is not retrievable by id; a visible one is.""" + self.add_permissions('extras.view_taggeditem', 'dcim.view_site') + + visible_item = TaggedItem.objects.get( + content_type__model='site', object_id=self.visible_obj.pk, tag=self.tag + ) + hidden_item = TaggedItem.objects.get( + content_type__model='manufacturer', object_id=self.hidden_obj.pk, tag=self.tag + ) + + hidden_url = reverse('extras-api:taggeditem-detail', kwargs={'pk': hidden_item.pk}) + self.assertHttpStatus(self.client.get(hidden_url, **self.header), status.HTTP_404_NOT_FOUND) + + visible_url = reverse('extras-api:taggeditem-detail', kwargs={'pk': visible_item.pk}) + self.assertHttpStatus(self.client.get(visible_url, **self.header), status.HTTP_200_OK) + + def test_authorized_target_includes_nested_object(self): + """With view permission on all targets, every row exposes its nested object.""" + self.add_permissions('extras.view_taggeditem', 'dcim.view_site', 'dcim.view_manufacturer') + + response = self.client.get(self._list_url(), **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertEqual(response.data['count'], 2) + object_types = {r['object_type'] for r in response.data['results']} + self.assertIn('dcim.site', object_types) + self.assertIn('dcim.manufacturer', object_types) + for r in response.data['results']: + self.assertIsNotNone(r['object']) + self.assertEqual(r['object']['id'], r['object_id']) + + def test_superuser_sees_all_rows(self): + """A superuser is unaffected by target-object filtering.""" + self.user.is_superuser = True + self.user.save() + + response = self.client.get(self._list_url(), **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + self.assertEqual(response.data['count'], 2) + object_types = {r['object_type'] for r in response.data['results']} + self.assertIn('dcim.site', object_types) + self.assertIn('dcim.manufacturer', object_types) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['dcim.manufacturer']) + def test_exempt_target_visible_without_object_permission(self): + """A target whose view permission is exempt is included without an explicit grant.""" + self.add_permissions('extras.view_taggeditem', 'dcim.view_site') + + response = self.client.get(self._list_url(), **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertEqual(response.data['count'], 2) + object_types = {r['object_type'] for r in response.data['results']} + self.assertIn('dcim.manufacturer', object_types) + manufacturer_row = next(r for r in response.data['results'] if r['object_type'] == 'dcim.manufacturer') + self.assertEqual(manufacturer_row['object_id'], self.hidden_obj.pk) + self.assertIsNotNone(manufacturer_row['object']) + + @override_settings(EXEMPT_VIEW_PERMISSIONS=['*']) + def test_blanket_exempt_includes_all_targets(self): + """With EXEMPT_VIEW_PERMISSIONS=['*'], every tagged target is visible without object perms.""" + self.add_permissions('extras.view_taggeditem') + + response = self.client.get(self._list_url(), **self.header) + self.assertHttpStatus(response, status.HTTP_200_OK) + + self.assertEqual(response.data['count'], 2) + object_types = {r['object_type'] for r in response.data['results']} + self.assertIn('dcim.site', object_types) + self.assertIn('dcim.manufacturer', object_types) + # TODO: Standardize to APIViewTestCase (needs create & update tests) class ImageAttachmentTestCase( diff --git a/netbox/utilities/permissions.py b/netbox/utilities/permissions.py index 0cee1db9dbf..fe459c6db3c 100644 --- a/netbox/utilities/permissions.py +++ b/netbox/utilities/permissions.py @@ -15,6 +15,7 @@ 'qs_filter_from_constraints', 'resolve_permission', 'resolve_permission_type', + 'restrict_queryset_by_gfk', ) @@ -159,3 +160,61 @@ def _replace_tokens(value, tokens): return Q() return params + + +def restrict_queryset_by_gfk( + queryset, + user, + action='view', + content_type_field='content_type', + object_id_field='object_id', +): + """ + Restrict a queryset carrying a GenericForeignKey-style (content_type, object_id) pair so that + only rows whose related target object the user is permitted to perform `action` on are returned. + + Each target's visibility is evaluated against that target model's own restricted queryset + (RestrictedQuerySet.restrict()), so per-model object permissions, exempt views, anonymous + access, and superuser access are all honored. Rows whose target model does not participate in + NetBox's object-permission system (no restrict() method) are excluded (fail closed). + """ + # Superusers may view everything; skip the (potentially per-type) filtering for efficiency. + if user and user.is_superuser: + return queryset + + ContentType = apps.get_model('contenttypes', 'ContentType') + content_type_id_field = f'{content_type_field}_id' + + # Resolve the distinct content types referenced by the (already filtered) queryset + content_type_ids = queryset.order_by().values_list(content_type_id_field, flat=True).distinct() + + query = Q() + matched = False + for ct_id in content_type_ids: + # get_for_id() is process-cached, avoiding a DB hit once each content type is warm. + model = ContentType.objects.get_for_id(ct_id).model_class() + if model is None: + continue + + # Exempt-view models are visible to everyone; match the content type without a subquery. + if permission_is_exempt(get_permission_for_model(model, action)): + query |= Q(**{content_type_id_field: ct_id}) + matched = True + continue + + target_queryset = model._default_manager.all() + if not hasattr(target_queryset, 'restrict'): + # Fail closed: target model has no object-level permission enforcement. + continue + visible_pks = target_queryset.restrict(user, action).values('pk') + query |= Q(**{ + content_type_id_field: ct_id, + f'{object_id_field}__in': visible_pks, + }) + matched = True + + if not matched: + # Fail closed: an empty Q() would match every row, so return nothing instead. + return queryset.none() + + return queryset.filter(query)