chore(): final fixes before deadline
- added images of notification feature working - added seed data script - race condition fix for notifications worker - small improvements
This commit is contained in:
@@ -5,7 +5,7 @@ from typing import Any
|
||||
import requests
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.core.mail import EmailMessage, get_connection
|
||||
from django.db import transaction
|
||||
from django.db import connection, transaction
|
||||
from django.db.models import Q, QuerySet
|
||||
from django.utils import timezone
|
||||
|
||||
@@ -334,15 +334,64 @@ def _send_smtp(config: dict[str, Any], payload: dict[str, Any]) -> None:
|
||||
email.send(fail_silently=False)
|
||||
|
||||
|
||||
def flush_pending_notifications() -> dict[str, int]:
|
||||
def _pending_notifications_for_flush() -> QuerySet[NotificationLog]:
|
||||
pending = (
|
||||
NotificationLog.objects.filter(
|
||||
status=NotificationStatus.PENDING,
|
||||
)
|
||||
.select_related("channel")
|
||||
.order_by("created_at")
|
||||
)
|
||||
if not connection.features.has_select_for_update:
|
||||
return pending
|
||||
if connection.features.has_select_for_update_skip_locked:
|
||||
return pending.select_for_update(skip_locked=True)
|
||||
return pending.select_for_update()
|
||||
|
||||
|
||||
def _flush_notification_log(
|
||||
*,
|
||||
log: NotificationLog,
|
||||
senders: dict[str, Any],
|
||||
results: dict[str, int],
|
||||
) -> None:
|
||||
channel = log.channel
|
||||
if not channel or not channel.is_active:
|
||||
log.status = NotificationStatus.FAILED
|
||||
log.error = "Channel is inactive or missing."
|
||||
log.save(update_fields=["status", "error"])
|
||||
results["failed"] += 1
|
||||
return
|
||||
|
||||
sender = senders.get(channel.channel_type)
|
||||
if not sender:
|
||||
log.status = NotificationStatus.FAILED
|
||||
log.error = f"No sender for channel type '{channel.channel_type}'."
|
||||
log.save(update_fields=["status", "error"])
|
||||
results["failed"] += 1
|
||||
return
|
||||
|
||||
try:
|
||||
sender(channel.config, log.payload)
|
||||
log.status = NotificationStatus.SENT
|
||||
log.sent_at = timezone.now()
|
||||
log.save(update_fields=["status", "sent_at"])
|
||||
results["sent"] += 1
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"notification_send_failed",
|
||||
extra={
|
||||
"log_id": str(log.pk),
|
||||
"channel": channel.name,
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
log.status = NotificationStatus.FAILED
|
||||
log.error = str(exc)[:1000]
|
||||
log.save(update_fields=["status", "error"])
|
||||
results["failed"] += 1
|
||||
|
||||
|
||||
def flush_pending_notifications() -> dict[str, int]:
|
||||
senders = {
|
||||
ChannelType.TELEGRAM: _send_telegram,
|
||||
ChannelType.SMTP: _send_smtp,
|
||||
@@ -350,42 +399,15 @@ def flush_pending_notifications() -> dict[str, int]:
|
||||
|
||||
results = {"sent": 0, "failed": 0}
|
||||
|
||||
for log in pending:
|
||||
if not log.channel or not log.channel.is_active:
|
||||
log.status = NotificationStatus.FAILED
|
||||
log.error = "Channel is inactive or missing."
|
||||
log.save(update_fields=["status", "error"])
|
||||
results["failed"] += 1
|
||||
continue
|
||||
|
||||
sender = senders.get(log.channel.channel_type)
|
||||
if not sender:
|
||||
log.status = NotificationStatus.FAILED
|
||||
log.error = (
|
||||
f"No sender for channel type '{log.channel.channel_type}'."
|
||||
while True:
|
||||
with transaction.atomic():
|
||||
log = _pending_notifications_for_flush().first()
|
||||
if log is None:
|
||||
break
|
||||
_flush_notification_log(
|
||||
log=log,
|
||||
senders=senders,
|
||||
results=results,
|
||||
)
|
||||
log.save(update_fields=["status", "error"])
|
||||
results["failed"] += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
sender(log.channel.config, log.payload)
|
||||
log.status = NotificationStatus.SENT
|
||||
log.sent_at = timezone.now()
|
||||
log.save(update_fields=["status", "sent_at"])
|
||||
results["sent"] += 1
|
||||
except Exception as exc:
|
||||
logger.exception(
|
||||
"notification_send_failed",
|
||||
extra={
|
||||
"log_id": str(log.pk),
|
||||
"channel": log.channel.name,
|
||||
"error": str(exc),
|
||||
},
|
||||
)
|
||||
log.status = NotificationStatus.FAILED
|
||||
log.error = str(exc)[:1000]
|
||||
log.save(update_fields=["status", "error"])
|
||||
results["failed"] += 1
|
||||
|
||||
return results
|
||||
|
||||
Reference in New Issue
Block a user