import logging from dataclasses import dataclass, field from typing import Any import requests from django.core.mail import send_mail from django.db import transaction from django.db.models import QuerySet from django.utils import timezone from apps.notifications.models import ( ChannelType, NotificationChannel, NotificationEventType, NotificationLog, NotificationRule, NotificationStatus, ) logger = logging.getLogger("lotty") @dataclass(frozen=True) class NotificationPayload: title: str body: str event_type: str experiment_id: str = "" experiment_name: str = "" extra: dict[str, Any] = field(default_factory=dict) # --------------------------------------------------------------------------- # Channel CRUD # --------------------------------------------------------------------------- @transaction.atomic def channel_create( *, channel_type: str, name: str, config: dict[str, Any] | None = None, ) -> NotificationChannel: channel = NotificationChannel( channel_type=channel_type, name=name, config=config or {}, ) channel.save() return channel def channel_update( *, channel: NotificationChannel, **fields: Any, ) -> NotificationChannel: allowed = {"name", "config", "is_active"} for key in fields: if key not in allowed: raise ValueError(f"Field '{key}' cannot be updated.") for key, value in fields.items(): if value is not None: setattr(channel, key, value) channel.save() return channel def channel_delete(*, channel: NotificationChannel) -> None: channel.delete() def channel_list() -> QuerySet[NotificationChannel]: return NotificationChannel.objects.all() def channel_get(channel_id: Any) -> NotificationChannel | None: try: return NotificationChannel.objects.get(pk=channel_id) except NotificationChannel.DoesNotExist: return None # --------------------------------------------------------------------------- # Rule CRUD # --------------------------------------------------------------------------- @transaction.atomic def rule_create( *, event_type: str, channel: NotificationChannel, experiment: Any | None = None, ) -> NotificationRule: rule = NotificationRule( event_type=event_type, channel=channel, experiment=experiment, ) rule.save() return rule def rule_update( *, rule: NotificationRule, **fields: Any, ) -> NotificationRule: allowed = {"event_type", "is_active"} for key in fields: if key not in allowed: raise ValueError(f"Field '{key}' cannot be updated.") for key, value in fields.items(): if value is not None: setattr(rule, key, value) rule.save() return rule def rule_delete(*, rule: NotificationRule) -> None: rule.delete() def rule_list(channel_id: Any | None = None) -> QuerySet[NotificationRule]: qs = NotificationRule.objects.select_related("channel", "experiment").all() if channel_id is not None: qs = qs.filter(channel_id=channel_id) return qs # --------------------------------------------------------------------------- # Log selectors # --------------------------------------------------------------------------- def log_list( *, status: str | None = None, limit: int = 100, ) -> QuerySet[NotificationLog]: qs = NotificationLog.objects.select_related("channel", "rule").all() if status: qs = qs.filter(status=status) return qs[:limit] # --------------------------------------------------------------------------- # Notification enqueue (called from integration points) # --------------------------------------------------------------------------- def notification_enqueue( event_type: str, payload: NotificationPayload, ) -> list[NotificationLog]: rules = NotificationRule.objects.filter( event_type=event_type, is_active=True, channel__is_active=True, ).select_related("channel") if payload.experiment_id: rules = rules.filter( models_Q(experiment__isnull=True) | models_Q(experiment_id=payload.experiment_id) ) else: rules = rules.filter(experiment__isnull=True) logs: list[NotificationLog] = [] for rule in rules: event_key = _build_event_key(event_type, payload) if NotificationLog.objects.filter( event_key=event_key, channel=rule.channel, status__in=[NotificationStatus.PENDING, NotificationStatus.SENT], ).exists(): continue log = NotificationLog.objects.create( rule=rule, channel=rule.channel, event_type=event_type, event_key=event_key, payload={ "title": payload.title, "body": payload.body, "experiment_id": payload.experiment_id, "experiment_name": payload.experiment_name, "extra": payload.extra, }, status=NotificationStatus.PENDING, ) logs.append(log) return logs def _build_event_key(event_type: str, payload: NotificationPayload) -> str: bucket = int(timezone.now().timestamp()) // 60 return f"{event_type}:{payload.experiment_id}:{bucket}" def models_Q(**kwargs): from django.db.models import Q return Q(**kwargs) # --------------------------------------------------------------------------- # Senders # --------------------------------------------------------------------------- def _send_telegram(config: dict[str, Any], payload: dict[str, Any]) -> None: bot_token = config.get("bot_token", "") chat_id = config.get("chat_id", "") if not bot_token or not chat_id: raise ValueError("Telegram config requires 'bot_token' and 'chat_id'.") text = f"*{payload['title']}*\n\n{payload['body']}" if payload.get("experiment_name"): text += f"\n\nExperiment: {payload['experiment_name']}" api_url = config.get( "api_url", f"https://api.telegram.org/bot{bot_token}", ) response = requests.post( f"{api_url}/sendMessage", json={ "chat_id": chat_id, "text": text, "parse_mode": "Markdown", }, timeout=10, ) response.raise_for_status() def _send_smtp(config: dict[str, Any], payload: dict[str, Any]) -> None: recipient = config.get("recipient", "") from_email = config.get("from_email", "lotty@lotty.local") if not recipient: raise ValueError("SMTP config requires 'recipient'.") subject = payload.get("title", "Lotty Notification") body = payload.get("body", "") if payload.get("experiment_name"): body += f"\n\nExperiment: {payload['experiment_name']}" send_mail( subject=subject, message=body, from_email=from_email, recipient_list=[recipient], fail_silently=False, ) # --------------------------------------------------------------------------- # Flush pending (called from Celery task) # --------------------------------------------------------------------------- def flush_pending_notifications() -> dict[str, int]: pending = NotificationLog.objects.filter( status=NotificationStatus.PENDING, ).select_related("channel") senders = { ChannelType.TELEGRAM: _send_telegram, ChannelType.SMTP: _send_smtp, } results = {"sent": 0, "failed": 0} for log in pending: if not log.channel or not log.channel.is_active: log.status = NotificationStatus.FAILED log.error = "Channel is inactive or missing." log.save(update_fields=["status", "error"]) results["failed"] += 1 continue sender = senders.get(log.channel.channel_type) if not sender: log.status = NotificationStatus.FAILED log.error = f"No sender for channel type '{log.channel.channel_type}'." log.save(update_fields=["status", "error"]) results["failed"] += 1 continue try: sender(log.channel.config, log.payload) log.status = NotificationStatus.SENT log.sent_at = timezone.now() log.save(update_fields=["status", "sent_at"]) results["sent"] += 1 except Exception as exc: logger.exception( "notification_send_failed", extra={ "log_id": str(log.pk), "channel": log.channel.name, "error": str(exc), }, ) log.status = NotificationStatus.FAILED log.error = str(exc)[:1000] log.save(update_fields=["status", "error"]) results["failed"] += 1 return results