Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 49 additions & 5 deletions openwisp_firmware_upgrader/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,48 @@
DeviceGroup = swapper.load_model("config", "DeviceGroup")


class CascadeDeletePermissionMixin:
"""
Mixin that allows cascade/bulk deletions while blocking
single-row deletion in the change view.
"""

def has_delete_permission(self, request, obj=None):
# Superusers can always delete
if getattr(getattr(request, "user", None), "is_superuser", False) is True:
return True
# Allow if obj is None (general permission check or cascade confirmation)
if obj is None:
return True
# Check if this is a cascade delete from a parent view or bulk action
if request.resolver_match:
url_name = request.resolver_match.url_name
own_delete_url_name = (
f"{self.model._meta.app_label}_{self.model._meta.model_name}_delete"
)
# Allow cascade deletions from parent delete views
if (
url_name
and url_name.endswith("_delete")
and url_name != own_delete_url_name
):
return True
# Allow bulk actions from the changelist
own_changelist_url_name = (
f"{self.model._meta.app_label}_{self.model._meta.model_name}_changelist"
)

if (
url_name
and url_name.endswith("_changelist")
and url_name != own_changelist_url_name
and request.POST.get("action") == "delete_selected"
):
return True

return False


class BaseAdmin(MultitenantAdminMixin, TimeReadonlyAdminMixin, admin.ModelAdmin):
save_on_top = True

Expand Down Expand Up @@ -317,15 +359,12 @@ class Meta:
labels = {"modified": _("last updated")}


class UpgradeOperationInline(admin.StackedInline):
class UpgradeOperationInline(CascadeDeletePermissionMixin, admin.StackedInline):
model = UpgradeOperation
form = UpgradeOperationForm
readonly_fields = UpgradeOperationForm.Meta.fields
extra = 0

def has_delete_permission(self, request, obj):
return False

def has_add_permission(self, request, obj):
return False

Expand Down Expand Up @@ -452,7 +491,12 @@ def has_delete_permission(self, request, obj=None):


@admin.register(BatchUpgradeOperation)
class BatchUpgradeOperationAdmin(ReadonlyUpgradeOptionsMixin, ReadOnlyAdmin, BaseAdmin):
class BatchUpgradeOperationAdmin(
CascadeDeletePermissionMixin,
ReadonlyUpgradeOptionsMixin,
ReadOnlyAdmin,
BaseAdmin,
):
list_display = ["build", "organization", "status", "created", "modified"]
list_filter = [
BuildCategoryOrganizationFilter,
Expand Down
96 changes: 96 additions & 0 deletions openwisp_firmware_upgrader/tests/test_admin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import json
from datetime import timedelta
from unittest import mock
from unittest.mock import MagicMock

import django
import swapper
from django.contrib.admin import AdminSite
from django.contrib.admin.helpers import ACTION_CHECKBOX_NAME
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
Expand All @@ -21,6 +23,7 @@
DeviceFirmwareInline,
DeviceUpgradeOperationInline,
FirmwareImageInline,
UpgradeOperationInline,
admin,
)
from openwisp_users.tests.utils import TestMultitenantAdminMixin
Expand All @@ -40,6 +43,7 @@
UpgradeOperation = load_model("UpgradeOperation")
BatchUpgradeOperation = load_model("BatchUpgradeOperation")
Device = swapper.load_model("config", "Device")
Organization = swapper.load_model("openwisp_users", "Organization")
Location = swapper.load_model("geo", "Location")
DeviceLocation = swapper.load_model("geo", "DeviceLocation")
DeviceConnection = swapper.load_model("connection", "DeviceConnection")
Expand Down Expand Up @@ -1546,4 +1550,96 @@ def test_batch_upgrade_operation_list_location_filter(self, *args):
self.assertNotContains(response, str(batch3.pk))


class TestUpgradeOperationInlineDeletePermission(BaseTestAdmin, TestCase):
def test_upgrade_operation_inline_delete_permission(self):
inline = UpgradeOperationInline(BatchUpgradeOperation, AdminSite())

with self.subTest("cascade delete via parent delete view"):
request = MagicMock()
request.resolver_match.url_name = (
f"{Organization._meta.app_label}_{Organization._meta.model_name}_delete"
)
request.POST.get.return_value = None
self.assertTrue(inline.has_delete_permission(request, obj=MagicMock()))

with self.subTest("bulk delete via delete_selected action"):
request = MagicMock()
request.resolver_match.url_name = (
f"{Organization._meta.app_label}_"
f"{Organization._meta.model_name}_changelist"
)
request.POST.get.side_effect = lambda key, default=None: (
"delete_selected" if key == "action" else default
)
self.assertTrue(inline.has_delete_permission(request, obj=MagicMock()))

with self.subTest("own delete view must be blocked"):
request = MagicMock()
request.resolver_match.url_name = (
f"{UpgradeOperation._meta.app_label}_"
f"{UpgradeOperation._meta.model_name}_delete"
)
request.POST.get.return_value = None
self.assertFalse(inline.has_delete_permission(request, obj=MagicMock()))

with self.subTest("normal change view — delete must be blocked"):
request = MagicMock()
request.resolver_match.url_name = (
f"{BatchUpgradeOperation._meta.app_label}_"
f"{BatchUpgradeOperation._meta.model_name}_change"
)
request.POST.get.return_value = None
self.assertFalse(inline.has_delete_permission(request, obj=MagicMock()))

with self.subTest("superuser always allowed"):
request = MagicMock()
request.user.is_superuser = True
request.resolver_match.url_name = "any_random_view"
self.assertTrue(inline.has_delete_permission(request, obj=MagicMock()))

with self.subTest("obj is None fallback (internal Django check)"):
request = MagicMock()
request.user.is_superuser = False
request.resolver_match = None
self.assertTrue(inline.has_delete_permission(request, obj=None))

def test_cascade_delete_integration(self):
self._login()
org = self._create_org(name="cascade-org", slug="cascade-org")
category = self._create_category(name="Cascade Category", organization=org)
build = self._create_build(category=category, version="9.9")
batch = BatchUpgradeOperation.objects.create(build=build)
delete_url = reverse(
f"admin:{Organization._meta.app_label}"
f"_{Organization._meta.model_name}_delete",
args=[org.pk],
)
response = self.client.post(delete_url, data={"post": "yes"}, follow=True)
self.assertEqual(response.status_code, 200)
self.assertFalse(Organization.objects.filter(pk=org.pk).exists())
self.assertFalse(BatchUpgradeOperation.objects.filter(pk=batch.pk).exists())

def test_cascade_delete_integration_non_superuser(self):
org = self._create_org(
name="cascade-non-superuser", slug="cascade-non-superuser"
)
user = self._create_administrator(organizations=[org])
delete_perm = Permission.objects.get(codename="delete_organization")
user.user_permissions.add(delete_perm)

category = self._create_category(name="Cat", organization=org)
build = self._create_build(category=category, version="1.0")
batch = BatchUpgradeOperation.objects.create(build=build)

self.client.force_login(user)
delete_url = reverse(
f"admin:{org._meta.app_label}_{org._meta.model_name}_delete",
args=[org.pk],
)
response = self.client.post(delete_url, data={"post": "yes"}, follow=True)
self.assertEqual(response.status_code, 403)
self.assertTrue(Organization.objects.filter(pk=org.pk).exists())
self.assertTrue(BatchUpgradeOperation.objects.filter(pk=batch.pk).exists())


del TestConfigAdmin
Loading