Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
db3a176
implement initial pipeline for snapshot notifications
Shofikul-Isl4m Jan 29, 2026
de606f4
notify all users on snapshot publish
Shofikul-Isl4m Jan 31, 2026
27b2984
feat: add exponential backoff retry and DLQ storage to notification w…
Shofikul-Isl4m Feb 2, 2026
8105383
feat:enhance notification worker reliability and DLQ logic
Shofikul-Isl4m Feb 4, 2026
1fecdc7
adress coderabbit and cubic
Shofikul-Isl4m Feb 4, 2026
409e1d3
fix(worker): prevent duplicate emails and recover stuck messages
Shofikul-Isl4m Feb 5, 2026
534007e
merge upstream/main
Shofikul-Isl4m Feb 5, 2026
262eee8
adress code rabit
Shofikul-Isl4m Feb 5, 2026
042d95d
feat:improve notification worker robustness
Shofikul-Isl4m Feb 5, 2026
cd92fd0
feat: implement robust notification recovery and transition-aware sig…
Shofikul-Isl4m Feb 6, 2026
a83a18d
add unit test
Shofikul-Isl4m Feb 7, 2026
5dd54c9
merge upstream/main
Shofikul-Isl4m Feb 7, 2026
c6b47f2
fix sonarcloud issue
Shofikul-Isl4m Feb 7, 2026
3780cf8
adress coderabbit issues
Shofikul-Isl4m Feb 7, 2026
a5ddf38
adress coderabbit
Shofikul-Isl4m Feb 7, 2026
9350dfd
feat(notifications): add idempotent processing and DLQ management
Shofikul-Isl4m Feb 26, 2026
4eadda6
run make check-test
Shofikul-Isl4m Feb 27, 2026
fbc4663
fix: route failed messages to DLQ immediately and preserve raw data
Shofikul-Isl4m Apr 5, 2026
2de837a
fix: re-raise exceptions in entity notification handler
Shofikul-Isl4m Apr 5, 2026
f74a9d4
refactor: use shared notification utility for worker and DLQ idempotency
Shofikul-Isl4m Apr 6, 2026
5ca4224
fix: address feedback for signal robustness and test coverage
Shofikul-Isl4m Apr 7, 2026
beb6da7
fix: refine signal reliability and DLQ error handling
Shofikul-Isl4m Apr 7, 2026
2f11ec0
fix: harden DLQ command safety and signal execution reliability
Shofikul-Isl4m Apr 8, 2026
aba550e
fix: resolve Ruff linting errors in DLQ management command
Shofikul-Isl4m Apr 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions backend/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ run-backend-fuzz:
@COMPOSE_BAKE=true DOCKER_BUILDKIT=1 \
docker compose --project-name nest-fuzz -f docker-compose/fuzz/compose.yaml up --build --remove-orphans --abort-on-container-exit backend db cache

run-notification-worker:
@CMD="python manage.py owasp_run_notification_worker" $(MAKE) exec-backend-command-it
Comment thread
Shofikul-Isl4m marked this conversation as resolved.

save-backup:
@echo "Saving Nest backup"
@CMD="python manage.py dumpdata --natural-primary --natural-foreign --indent=2" $(MAKE) exec-backend-command > backend/data/backup.json
Expand Down
4 changes: 4 additions & 0 deletions backend/apps/owasp/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ owasp-aggregate-member-contributions:
@echo "Aggregating OWASP community member contributions"
@CMD="python manage.py owasp_aggregate_member_contributions" $(MAKE) exec-backend-command

owasp-check-event-deadlines:
@echo "Checking OWASP event deadlines"
@CMD="python manage.py owasp_check_event_deadlines" $(MAKE) exec-backend-command

owasp-create-project-metadata-file:
@echo "Generating metadata"
@CMD="python manage.py owasp_create_project_metadata_file $(entity_key)" $(MAKE) exec-backend-command
Expand Down
24 changes: 24 additions & 0 deletions backend/apps/owasp/admin/notification.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"""Admin registration for notification models."""

from django.contrib import admin

from apps.owasp.models.notification import Notification, Subscription


@admin.register(Subscription)
class SubscriptionAdmin(admin.ModelAdmin):
"""Admin for Subscription model."""

list_display = ("user", "content_type", "object_id", "created_at")
list_filter = ("content_type", "created_at")
search_fields = ("user__email", "user__username")


@admin.register(Notification)
class NotificationAdmin(admin.ModelAdmin):
"""Admin for Notification model."""

list_display = ("recipient", "type", "title", "is_read", "created_at")
list_filter = ("type", "is_read", "created_at")
search_fields = ("recipient__email", "recipient__username", "title", "message")
readonly_fields = ("created_at",)
6 changes: 6 additions & 0 deletions backend/apps/owasp/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ class OwaspConfig(AppConfig):
"""Owasp app config."""

name = "apps.owasp"

def ready(self):
"""Import signals when app is ready."""
import apps.owasp.signals.chapter
import apps.owasp.signals.event
import apps.owasp.signals.snapshot # noqa: F401
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
"""Management command to check for approaching event deadlines."""

import logging

from django.core.management.base import BaseCommand
from django.utils import timezone

from apps.owasp.models.event import Event
from apps.owasp.utils.notifications import publish_event_notification

logger = logging.getLogger(__name__)

REMINDER_DAYS = (7, 3, 1)


class Command(BaseCommand):
"""Check for events with approaching deadlines and queue reminder notifications."""

help = "Check for events with approaching deadlines and send reminder notifications."

def handle(self, *args, **options):
"""Handle execution."""
self.stdout.write("Checking for approaching event deadlines...")
today = timezone.now().date()
total_reminders = 0

for days in REMINDER_DAYS:
target_date = today + timezone.timedelta(days=days)
events = Event.objects.filter(start_date=target_date)

for event in events:
self.stdout.write(f" Event '{event.name}' starts in {days} days ({target_date})")
publish_event_notification(event, "deadline_reminder", days_remaining=days)
total_reminders += 1

self.stdout.write(self.style.SUCCESS(f"Queued {total_reminders} deadline reminder(s)."))
169 changes: 169 additions & 0 deletions backend/apps/owasp/management/commands/owasp_notification_dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
"""Management command to manage notification DLQ."""

import sys

from django.core.mail import send_mail
from django.core.management.base import BaseCommand
from django_redis import get_redis_connection


class Command(BaseCommand):
"""Manage notification DLQ manually."""

help = "Manage notification DLQ: list, retry, or remove failed notifications"

DLQ_STREAM_KEY = "owasp_notifications_dlq"

def add_arguments(self, parser):
parser.add_argument(
"action",
type=str,
choices=["list", "retry", "remove"],
help="Action to perform: list, retry, or remove",
)
parser.add_argument(
"--id",
type=str,
help="Specific message ID to act on (required for retry/remove unless --all is used)",
)
parser.add_argument(
"--all",
action="store_true",
help="Apply action to all messages",
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
)

def handle(self, *args, **options):
action = options["action"]
message_id = options.get("id")
all_messages = options.get("all")

redis_conn = get_redis_connection("default")

if action == "list":
self.list_dlq(redis_conn)
elif action == "retry":
if not message_id and not all_messages:
self.stdout.write(self.style.ERROR("Error: --id or --all is required for retry"))
sys.exit(1)
self.retry_dlq(redis_conn, message_id, all_messages)
elif action == "remove":
if not message_id and not all_messages:
self.stdout.write(self.style.ERROR("Error: --id or --all is required for remove"))
sys.exit(1)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
self.remove_dlq(redis_conn, message_id, all_messages)

def list_dlq(self, redis_conn):
"""List all failed notifications in DLQ."""
messages = redis_conn.xrange(self.DLQ_STREAM_KEY, "-", "+")

if not messages:
self.stdout.write(self.style.SUCCESS("DLQ is empty - no failed notifications"))
return

self.stdout.write("\n" + "=" * 100)
self.stdout.write(
f"{'ID':<20} | {'Email':<25} | {'Type':<18} | {'Entity':<15} | {'Retries':<8}"
)
self.stdout.write("=" * 100)

for msg_id, data in messages:
msg_id_str = msg_id.decode("utf-8") if isinstance(msg_id, bytes) else msg_id
user_email = self._get_value(data, b"user_email", "unknown")
notif_type = self._get_value(data, b"notification_type", "unknown")
entity_name = self._get_value(data, b"entity_name", "unknown")[:15]
retries = self._get_value(data, b"dlq_retries", "0")

self.stdout.write(
f"{msg_id_str:<20} | {user_email:<25} | "
f"{notif_type:<18} | {entity_name:<15} | {retries:<8}"
)

self.stdout.write("=" * 100)
self.stdout.write(f"Total: {len(messages)} failed notification(s)\n")

def retry_dlq(self, redis_conn, message_id, all_messages):
"""Retry failed notification(s)."""
if all_messages:
messages = redis_conn.xrange(self.DLQ_STREAM_KEY, "-", "+")
else:
result = redis_conn.xrange(self.DLQ_STREAM_KEY, message_id, message_id)
messages = result or []

if not messages:
self.stdout.write(self.style.ERROR("Message(s) not found"))
return

success_count = 0
error_count = 0

for msg_id, data in messages:
if not data:
continue

try:
user_email = self._get_value(data, b"user_email")
title = self._get_value(data, b"title")
message = self._get_value(data, b"message")
related_link = self._get_value(data, b"related_link")

if user_email and title and message:
full_message = (
f"{message}\n\nView: {related_link}" if related_link else message
)
send_mail(
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
Outdated
subject=title,
message=full_message,
from_email="noreply@owasp.org",
recipient_list=[user_email],
fail_silently=False,
)
redis_conn.xdel(self.DLQ_STREAM_KEY, msg_id)
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
success_count += 1
self.stdout.write(f"Retried: {msg_id} -> {user_email}")
else:
Comment thread
coderabbitai[bot] marked this conversation as resolved.
self.stdout.write(self.style.WARNING(f"Skipped (missing data): {msg_id}"))
error_count += 1

except Exception as e: # noqa: BLE001
error_count += 1
retries = int(self._get_value(data, b"dlq_retries", "0"))
new_retries = str(retries + 1)
data[b"dlq_retries"] = new_retries.encode()
new_msg = {k.decode(): v.decode() for k, v in data.items()}
redis_conn.xdel(self.DLQ_STREAM_KEY, msg_id)
redis_conn.xadd(self.DLQ_STREAM_KEY, new_msg)
self.stdout.write(
self.style.ERROR(f"Failed to retry {msg_id}: {e}, incremented retries")
)

self.stdout.write(
self.style.SUCCESS(
f"\nRetry complete: {success_count} succeeded, {error_count} failed/retried"
)
)

def remove_dlq(self, redis_conn, message_id, all_messages):
"""Remove failed notification(s) from DLQ."""
if all_messages:
messages = redis_conn.xrange(self.DLQ_STREAM_KEY, "-", "+")
else:
messages = redis_conn.xrange(self.DLQ_STREAM_KEY, message_id, message_id)

if not messages:
self.stdout.write(self.style.ERROR("No messages found"))
return

count = 0
for msg_id, _ in messages:
redis_conn.xdel(self.DLQ_STREAM_KEY, msg_id)
count += 1
self.stdout.write(f"Removed: {msg_id}")

self.stdout.write(self.style.SUCCESS(f"\nRemoved {count} message(s) from DLQ"))

def _get_value(self, data, key, default=None):
"""Get value from message data."""
value = data.get(key.encode() if isinstance(key, str) else key)
if value:
return value.decode("utf-8")
return default
Loading