diff --git a/openwisp_firmware_upgrader/admin.py b/openwisp_firmware_upgrader/admin.py index b55955bd..b016809a 100644 --- a/openwisp_firmware_upgrader/admin.py +++ b/openwisp_firmware_upgrader/admin.py @@ -53,6 +53,48 @@ DeviceGroup = swapper.load_model("config", "DeviceGroup") +class CascadeDeletePermissionMixin: + """ + Mixin that allows cascade/bulk deletions while blocking + single-row deletion in the change view. + """ + + def has_delete_permission(self, request, obj=None): + # Superusers can always delete + if getattr(getattr(request, "user", None), "is_superuser", False) is True: + return True + # Allow if obj is None (general permission check or cascade confirmation) + if obj is None: + return True + # Check if this is a cascade delete from a parent view or bulk action + if request.resolver_match: + url_name = request.resolver_match.url_name + own_delete_url_name = ( + f"{self.model._meta.app_label}_{self.model._meta.model_name}_delete" + ) + # Allow cascade deletions from parent delete views + if ( + url_name + and url_name.endswith("_delete") + and url_name != own_delete_url_name + ): + return True + # Allow bulk actions from the changelist + own_changelist_url_name = ( + f"{self.model._meta.app_label}_{self.model._meta.model_name}_changelist" + ) + + if ( + url_name + and url_name.endswith("_changelist") + and url_name != own_changelist_url_name + and request.POST.get("action") == "delete_selected" + ): + return True + + return False + + class BaseAdmin(MultitenantAdminMixin, TimeReadonlyAdminMixin, admin.ModelAdmin): save_on_top = True @@ -317,15 +359,12 @@ class Meta: labels = {"modified": _("last updated")} -class UpgradeOperationInline(admin.StackedInline): +class UpgradeOperationInline(CascadeDeletePermissionMixin, admin.StackedInline): model = UpgradeOperation form = UpgradeOperationForm readonly_fields = UpgradeOperationForm.Meta.fields extra = 0 - def has_delete_permission(self, request, obj): - return False - def has_add_permission(self, request, obj): return False @@ -452,7 +491,12 @@ def has_delete_permission(self, request, obj=None): @admin.register(BatchUpgradeOperation) -class BatchUpgradeOperationAdmin(ReadonlyUpgradeOptionsMixin, ReadOnlyAdmin, BaseAdmin): +class BatchUpgradeOperationAdmin( + CascadeDeletePermissionMixin, + ReadonlyUpgradeOptionsMixin, + ReadOnlyAdmin, + BaseAdmin, +): list_display = ["build", "organization", "status", "created", "modified"] list_filter = [ BuildCategoryOrganizationFilter, diff --git a/openwisp_firmware_upgrader/tests/test_admin.py b/openwisp_firmware_upgrader/tests/test_admin.py index 1ce34c8c..a3209a8c 100644 --- a/openwisp_firmware_upgrader/tests/test_admin.py +++ b/openwisp_firmware_upgrader/tests/test_admin.py @@ -1,9 +1,11 @@ import json from datetime import timedelta from unittest import mock +from unittest.mock import MagicMock import django import swapper +from django.contrib.admin import AdminSite from django.contrib.admin.helpers import ACTION_CHECKBOX_NAME from django.contrib.auth import get_user_model from django.contrib.auth.models import Permission @@ -21,6 +23,7 @@ DeviceFirmwareInline, DeviceUpgradeOperationInline, FirmwareImageInline, + UpgradeOperationInline, admin, ) from openwisp_users.tests.utils import TestMultitenantAdminMixin @@ -40,6 +43,7 @@ UpgradeOperation = load_model("UpgradeOperation") BatchUpgradeOperation = load_model("BatchUpgradeOperation") Device = swapper.load_model("config", "Device") +Organization = swapper.load_model("openwisp_users", "Organization") Location = swapper.load_model("geo", "Location") DeviceLocation = swapper.load_model("geo", "DeviceLocation") DeviceConnection = swapper.load_model("connection", "DeviceConnection") @@ -1546,4 +1550,96 @@ def test_batch_upgrade_operation_list_location_filter(self, *args): self.assertNotContains(response, str(batch3.pk)) +class TestUpgradeOperationInlineDeletePermission(BaseTestAdmin, TestCase): + def test_upgrade_operation_inline_delete_permission(self): + inline = UpgradeOperationInline(BatchUpgradeOperation, AdminSite()) + + with self.subTest("cascade delete via parent delete view"): + request = MagicMock() + request.resolver_match.url_name = ( + f"{Organization._meta.app_label}_{Organization._meta.model_name}_delete" + ) + request.POST.get.return_value = None + self.assertTrue(inline.has_delete_permission(request, obj=MagicMock())) + + with self.subTest("bulk delete via delete_selected action"): + request = MagicMock() + request.resolver_match.url_name = ( + f"{Organization._meta.app_label}_" + f"{Organization._meta.model_name}_changelist" + ) + request.POST.get.side_effect = lambda key, default=None: ( + "delete_selected" if key == "action" else default + ) + self.assertTrue(inline.has_delete_permission(request, obj=MagicMock())) + + with self.subTest("own delete view must be blocked"): + request = MagicMock() + request.resolver_match.url_name = ( + f"{UpgradeOperation._meta.app_label}_" + f"{UpgradeOperation._meta.model_name}_delete" + ) + request.POST.get.return_value = None + self.assertFalse(inline.has_delete_permission(request, obj=MagicMock())) + + with self.subTest("normal change view — delete must be blocked"): + request = MagicMock() + request.resolver_match.url_name = ( + f"{BatchUpgradeOperation._meta.app_label}_" + f"{BatchUpgradeOperation._meta.model_name}_change" + ) + request.POST.get.return_value = None + self.assertFalse(inline.has_delete_permission(request, obj=MagicMock())) + + with self.subTest("superuser always allowed"): + request = MagicMock() + request.user.is_superuser = True + request.resolver_match.url_name = "any_random_view" + self.assertTrue(inline.has_delete_permission(request, obj=MagicMock())) + + with self.subTest("obj is None fallback (internal Django check)"): + request = MagicMock() + request.user.is_superuser = False + request.resolver_match = None + self.assertTrue(inline.has_delete_permission(request, obj=None)) + + def test_cascade_delete_integration(self): + self._login() + org = self._create_org(name="cascade-org", slug="cascade-org") + category = self._create_category(name="Cascade Category", organization=org) + build = self._create_build(category=category, version="9.9") + batch = BatchUpgradeOperation.objects.create(build=build) + delete_url = reverse( + f"admin:{Organization._meta.app_label}" + f"_{Organization._meta.model_name}_delete", + args=[org.pk], + ) + response = self.client.post(delete_url, data={"post": "yes"}, follow=True) + self.assertEqual(response.status_code, 200) + self.assertFalse(Organization.objects.filter(pk=org.pk).exists()) + self.assertFalse(BatchUpgradeOperation.objects.filter(pk=batch.pk).exists()) + + def test_cascade_delete_integration_non_superuser(self): + org = self._create_org( + name="cascade-non-superuser", slug="cascade-non-superuser" + ) + user = self._create_administrator(organizations=[org]) + delete_perm = Permission.objects.get(codename="delete_organization") + user.user_permissions.add(delete_perm) + + category = self._create_category(name="Cat", organization=org) + build = self._create_build(category=category, version="1.0") + batch = BatchUpgradeOperation.objects.create(build=build) + + self.client.force_login(user) + delete_url = reverse( + f"admin:{org._meta.app_label}_{org._meta.model_name}_delete", + args=[org.pk], + ) + response = self.client.post(delete_url, data={"post": "yes"}, follow=True) + self.assertEqual(response.status_code, 403) + self.assertTrue(Organization.objects.filter(pk=org.pk).exists()) + self.assertTrue(BatchUpgradeOperation.objects.filter(pk=batch.pk).exists()) + + del TestConfigAdmin