-
-
Notifications
You must be signed in to change notification settings - Fork 631
feat(notifications): add EDA-based idempotent processing and DLQ for chapter/event updates #4454
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Shofikul-Isl4m
wants to merge
24
commits into
OWASP:main
Choose a base branch
from
Shofikul-Isl4m:feature/entity-subscriptions
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 23 commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
db3a176
implement initial pipeline for snapshot notifications
Shofikul-Isl4m de606f4
notify all users on snapshot publish
Shofikul-Isl4m 27b2984
feat: add exponential backoff retry and DLQ storage to notification w…
Shofikul-Isl4m 8105383
feat:enhance notification worker reliability and DLQ logic
Shofikul-Isl4m 1fecdc7
adress coderabbit and cubic
Shofikul-Isl4m 409e1d3
fix(worker): prevent duplicate emails and recover stuck messages
Shofikul-Isl4m 534007e
merge upstream/main
Shofikul-Isl4m 262eee8
adress code rabit
Shofikul-Isl4m 042d95d
feat:improve notification worker robustness
Shofikul-Isl4m cd92fd0
feat: implement robust notification recovery and transition-aware sig…
Shofikul-Isl4m a83a18d
add unit test
Shofikul-Isl4m 5dd54c9
merge upstream/main
Shofikul-Isl4m c6b47f2
fix sonarcloud issue
Shofikul-Isl4m 3780cf8
adress coderabbit issues
Shofikul-Isl4m a5ddf38
adress coderabbit
Shofikul-Isl4m 9350dfd
feat(notifications): add idempotent processing and DLQ management
Shofikul-Isl4m 4eadda6
run make check-test
Shofikul-Isl4m fbc4663
fix: route failed messages to DLQ immediately and preserve raw data
Shofikul-Isl4m 2de837a
fix: re-raise exceptions in entity notification handler
Shofikul-Isl4m f74a9d4
refactor: use shared notification utility for worker and DLQ idempotency
Shofikul-Isl4m 5ca4224
fix: address feedback for signal robustness and test coverage
Shofikul-Isl4m beb6da7
fix: refine signal reliability and DLQ error handling
Shofikul-Isl4m 2f11ec0
fix: harden DLQ command safety and signal execution reliability
Shofikul-Isl4m aba550e
fix: resolve Ruff linting errors in DLQ management command
Shofikul-Isl4m File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| """Admin registration for notification models.""" | ||
|
|
||
| from django.contrib import admin | ||
|
|
||
| from apps.owasp.models.notification import Notification, Subscription | ||
|
|
||
|
|
||
| @admin.register(Subscription) | ||
| class SubscriptionAdmin(admin.ModelAdmin): | ||
| """Admin for Subscription model.""" | ||
|
|
||
| list_display = ("user", "content_type", "object_id", "created_at") | ||
| list_filter = ("content_type", "created_at") | ||
| search_fields = ("user__email", "user__username") | ||
|
|
||
|
|
||
| @admin.register(Notification) | ||
| class NotificationAdmin(admin.ModelAdmin): | ||
| """Admin for Notification model.""" | ||
|
|
||
| list_display = ("recipient", "type", "title", "is_read", "created_at") | ||
| list_filter = ("type", "is_read", "created_at") | ||
| search_fields = ("recipient__email", "recipient__username", "title", "message") | ||
| readonly_fields = ("created_at",) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
backend/apps/owasp/management/commands/owasp_check_event_deadlines.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| """Management command to check for approaching event deadlines.""" | ||
|
|
||
| import logging | ||
|
|
||
| from django.core.management.base import BaseCommand | ||
| from django.utils import timezone | ||
|
|
||
| from apps.owasp.models.event import Event | ||
| from apps.owasp.utils.notifications import publish_event_notification | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| REMINDER_DAYS = (7, 3, 1) | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| """Check for events with approaching deadlines and queue reminder notifications.""" | ||
|
|
||
| help = "Check for events with approaching deadlines and send reminder notifications." | ||
|
|
||
| def handle(self, *args, **options): | ||
| """Handle execution.""" | ||
| self.stdout.write("Checking for approaching event deadlines...") | ||
| today = timezone.now().date() | ||
| total_reminders = 0 | ||
|
|
||
| for days in REMINDER_DAYS: | ||
| target_date = today + timezone.timedelta(days=days) | ||
| events = Event.objects.filter(start_date=target_date) | ||
|
|
||
| for event in events: | ||
| self.stdout.write(f" Event '{event.name}' starts in {days} days ({target_date})") | ||
| publish_event_notification(event, "deadline_reminder", days_remaining=days) | ||
| total_reminders += 1 | ||
|
|
||
| self.stdout.write(self.style.SUCCESS(f"Queued {total_reminders} deadline reminder(s).")) |
198 changes: 198 additions & 0 deletions
198
backend/apps/owasp/management/commands/owasp_notification_dlq.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,198 @@ | ||
| """Management command to manage notification DLQ.""" | ||
|
|
||
| from django.core.mail import send_mail | ||
| from django.core.management.base import BaseCommand, CommandError | ||
| from django_redis import get_redis_connection | ||
|
|
||
| from apps.nest.models import User | ||
| from apps.owasp.utils.notifications import send_notification | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| """Manage notification DLQ manually.""" | ||
|
|
||
| help = "Manage notification DLQ: list, retry, or remove failed notifications" | ||
|
|
||
| DLQ_STREAM_KEY = "owasp_notifications_dlq" | ||
|
|
||
| def add_arguments(self, parser): | ||
| parser.add_argument( | ||
| "action", | ||
| type=str, | ||
| choices=["list", "retry", "remove"], | ||
| help="Action to perform: list, retry, or remove", | ||
| ) | ||
| group = parser.add_mutually_exclusive_group() | ||
| group.add_argument( | ||
| "--id", | ||
| type=str, | ||
| help="Specific message ID to act on (required for retry/remove unless --all is used)", | ||
| ) | ||
| group.add_argument( | ||
| "--all", | ||
| action="store_true", | ||
| help="Apply action to all messages", | ||
| ) | ||
|
|
||
| def handle(self, *args, **options): | ||
| action = options["action"] | ||
| message_id = options.get("id") | ||
| all_messages = options.get("all") | ||
|
|
||
| redis_conn = get_redis_connection("default") | ||
|
|
||
| if action == "list": | ||
| self.list_dlq(redis_conn) | ||
| elif action == "retry": | ||
| if not message_id and not all_messages: | ||
| raise CommandError("Error: --id or --all is required for retry") | ||
| self.retry_dlq(redis_conn, message_id, all_messages) | ||
| elif action == "remove": | ||
| if not message_id and not all_messages: | ||
| raise CommandError("Error: --id or --all is required for remove") | ||
| self.remove_dlq(redis_conn, message_id, all_messages) | ||
|
|
||
| def list_dlq(self, redis_conn): | ||
| """List all failed notifications in DLQ.""" | ||
| messages = redis_conn.xrange(self.DLQ_STREAM_KEY, "-", "+") | ||
|
|
||
| if not messages: | ||
| self.stdout.write(self.style.SUCCESS("DLQ is empty - no failed notifications")) | ||
| return | ||
|
|
||
| self.stdout.write("\n" + "=" * 100) | ||
| self.stdout.write( | ||
| f"{'ID':<20} | {'Email':<25} | {'Type':<18} | {'Entity':<15} | {'Retries':<8}" | ||
| ) | ||
| self.stdout.write("=" * 100) | ||
|
|
||
| for msg_id, data in messages: | ||
| msg_id_str = msg_id.decode("utf-8") if isinstance(msg_id, bytes) else msg_id | ||
| user_email = self._get_value(data, b"user_email", "unknown") | ||
| notif_type = self._get_value(data, b"notification_type", "unknown") | ||
| entity_name = self._get_value(data, b"entity_name", "unknown")[:15] | ||
| retries = self._get_value(data, b"dlq_retries", "0") | ||
|
|
||
| self.stdout.write( | ||
| f"{msg_id_str:<20} | {user_email:<25} | " | ||
| f"{notif_type:<18} | {entity_name:<15} | {retries:<8}" | ||
| ) | ||
|
|
||
| self.stdout.write("=" * 100) | ||
| self.stdout.write(f"Total: {len(messages)} failed notification(s)\n") | ||
|
|
||
| def retry_dlq(self, redis_conn, message_id, all_messages): | ||
| """Retry failed notification(s).""" | ||
| if all_messages: | ||
| messages = redis_conn.xrange(self.DLQ_STREAM_KEY, "-", "+") | ||
| else: | ||
| result = redis_conn.xrange(self.DLQ_STREAM_KEY, message_id, message_id) | ||
| messages = result or [] | ||
|
|
||
| if not messages: | ||
| raise CommandError("Message(s) not found") | ||
|
|
||
| success_count = 0 | ||
| error_count = 0 | ||
|
|
||
| for msg_id, data in messages: | ||
| if not data: | ||
| continue | ||
|
|
||
| try: | ||
| user_email = self._get_value(data, b"user_email") | ||
| title = self._get_value(data, b"title") | ||
| message = self._get_value(data, b"message") | ||
| related_link = self._get_value(data, b"related_link") | ||
|
|
||
| user_id = self._get_value(data, b"user_id") | ||
| notification_type = self._get_value(data, b"notification_type") | ||
|
|
||
| if user_email and title and message: | ||
| if user_id and notification_type: | ||
| try: | ||
| user = User.objects.get(id=int(user_id)) | ||
| send_notification( | ||
| user=user, | ||
| title=title, | ||
| message=message, | ||
| notification_type=notification_type, | ||
| related_link=related_link or "", | ||
| ) | ||
| except User.DoesNotExist: | ||
| self.stdout.write( | ||
| self.style.WARNING(f"User {user_id} not found: {msg_id}") | ||
| ) | ||
| error_count += 1 | ||
| continue | ||
|
cubic-dev-ai[bot] marked this conversation as resolved.
|
||
| else: | ||
| # Fallback for old DLQ format | ||
| full_message = ( | ||
| f"{message}\n\nView: {related_link}" if related_link else message | ||
| ) | ||
| send_mail( | ||
| subject=title, | ||
| message=full_message, | ||
| from_email="noreply@owasp.org", | ||
| recipient_list=[user_email], | ||
| fail_silently=False, | ||
| ) | ||
|
|
||
| redis_conn.xdel(self.DLQ_STREAM_KEY, msg_id) | ||
|
cubic-dev-ai[bot] marked this conversation as resolved.
|
||
| success_count += 1 | ||
| self.stdout.write(f"Retried: {msg_id} -> {user_email}") | ||
| else: | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| msg_type = self._get_value(data, b"type") | ||
| if msg_type in ["processing_failed", "recovery_failed"]: | ||
| self.stdout.write( | ||
| self.style.WARNING( | ||
| f"Skipped: {msg_id} is a system processing failure. " | ||
| "Requires manual admin review." | ||
| ) | ||
| ) | ||
| else: | ||
| self.stdout.write(self.style.WARNING(f"Skipped (missing data): {msg_id}")) | ||
| error_count += 1 | ||
|
|
||
| except Exception as e: # noqa: BLE001 | ||
| error_count += 1 | ||
| retries = int(self._get_value(data, b"dlq_retries", "0")) | ||
| new_retries = str(retries + 1) | ||
| data[b"dlq_retries"] = new_retries.encode() | ||
| new_msg = {k.decode(): v.decode() for k, v in data.items()} | ||
| redis_conn.xdel(self.DLQ_STREAM_KEY, msg_id) | ||
| redis_conn.xadd(self.DLQ_STREAM_KEY, new_msg) | ||
| self.stdout.write( | ||
| self.style.ERROR(f"Failed to retry {msg_id}: {e}, incremented retries") | ||
| ) | ||
|
|
||
| self.stdout.write( | ||
| self.style.SUCCESS( | ||
| f"\nRetry complete: {success_count} succeeded, {error_count} failed/retried" | ||
| ) | ||
| ) | ||
|
|
||
| def remove_dlq(self, redis_conn, message_id, all_messages): | ||
| """Remove failed notification(s) from DLQ.""" | ||
| if all_messages: | ||
| messages = redis_conn.xrange(self.DLQ_STREAM_KEY, "-", "+") | ||
| else: | ||
| messages = redis_conn.xrange(self.DLQ_STREAM_KEY, message_id, message_id) | ||
|
|
||
| if not messages: | ||
| raise CommandError("No messages found") | ||
|
|
||
| count = 0 | ||
| for msg_id, _ in messages: | ||
| redis_conn.xdel(self.DLQ_STREAM_KEY, msg_id) | ||
| count += 1 | ||
| self.stdout.write(f"Removed: {msg_id}") | ||
|
|
||
| self.stdout.write(self.style.SUCCESS(f"\nRemoved {count} message(s) from DLQ")) | ||
|
|
||
| def _get_value(self, data, key, default=None): | ||
| """Get value from message data.""" | ||
| value = data.get(key.encode() if isinstance(key, str) else key) | ||
| if value: | ||
| return value.decode("utf-8") | ||
| return default | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.