diff --git a/src/backend/apps/notifications/__init__.py b/src/backend/apps/notifications/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/notifications/apps.py b/src/backend/apps/notifications/apps.py new file mode 100644 index 0000000..22fdafc --- /dev/null +++ b/src/backend/apps/notifications/apps.py @@ -0,0 +1,5 @@ +from django.apps import AppConfig + + +class NotificationsConfig(AppConfig): + name = "apps.notifications" diff --git a/src/backend/apps/notifications/migrations/0001_initial.py b/src/backend/apps/notifications/migrations/0001_initial.py new file mode 100644 index 0000000..23222c8 --- /dev/null +++ b/src/backend/apps/notifications/migrations/0001_initial.py @@ -0,0 +1,79 @@ +# Generated by Django 5.2.11 on 2026-02-22 13:54 + +import django.db.models.deletion +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ('experiments', '0002_initial'), + ] + + operations = [ + migrations.CreateModel( + name='NotificationChannel', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('channel_type', models.CharField(choices=[('telegram', 'Telegram'), ('smtp', 'SMTP Email')], max_length=20, verbose_name='channel type')), + ('name', models.CharField(max_length=200, verbose_name='name')), + ('config', models.JSONField(default=dict, help_text='Provider-specific settings (tokens, chat IDs, SMTP host, etc.)', verbose_name='configuration')), + ('is_active', models.BooleanField(default=True, verbose_name='is active')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='updated at')), + ], + options={ + 'verbose_name': 'notification channel', + 'verbose_name_plural': 'notification channels', + 'ordering': ['-created_at'], + }, + ), + migrations.CreateModel( + name='NotificationRule', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('event_type', models.CharField(choices=[('experiment_started', 'Experiment started'), ('experiment_paused', 'Experiment paused'), ('experiment_completed', 'Experiment completed'), ('guardrail_triggered', 'Guardrail triggered'), ('review_requested', 'Review requested'), ('review_approved', 'Review approved'), ('review_rejected', 'Review rejected')], max_length=30, verbose_name='event type')), + ('is_active', models.BooleanField(default=True, verbose_name='is active')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='updated at')), + ('channel', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='rules', to='notifications.notificationchannel', verbose_name='channel')), + ('experiment', models.ForeignKey(blank=True, help_text='If null, applies to all experiments.', null=True, on_delete=django.db.models.deletion.CASCADE, related_name='notification_rules', to='experiments.experiment', verbose_name='experiment')), + ], + options={ + 'verbose_name': 'notification rule', + 'verbose_name_plural': 'notification rules', + 'ordering': ['-created_at'], + }, + ), + migrations.CreateModel( + name='NotificationLog', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('event_type', models.CharField(choices=[('experiment_started', 'Experiment started'), ('experiment_paused', 'Experiment paused'), ('experiment_completed', 'Experiment completed'), ('guardrail_triggered', 'Guardrail triggered'), ('review_requested', 'Review requested'), ('review_approved', 'Review approved'), ('review_rejected', 'Review rejected')], max_length=30, verbose_name='event type')), + ('event_key', models.CharField(db_index=True, max_length=255, verbose_name='dedup key')), + ('payload', models.JSONField(default=dict, verbose_name='payload')), + ('status', models.CharField(choices=[('pending', 'Pending'), ('sent', 'Sent'), ('failed', 'Failed')], default='pending', max_length=10, verbose_name='status')), + ('error', models.TextField(blank=True, default='', verbose_name='error details')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ('sent_at', models.DateTimeField(blank=True, null=True, verbose_name='sent at')), + ('channel', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='logs', to='notifications.notificationchannel', verbose_name='channel')), + ('rule', models.ForeignKey(null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='logs', to='notifications.notificationrule', verbose_name='rule')), + ], + options={ + 'verbose_name': 'notification log', + 'verbose_name_plural': 'notification logs', + 'ordering': ['-created_at'], + }, + ), + migrations.AddIndex( + model_name='notificationrule', + index=models.Index(fields=['event_type', 'is_active'], name='idx_rule_event_active'), + ), + migrations.AddIndex( + model_name='notificationlog', + index=models.Index(fields=['status', '-created_at'], name='idx_notif_log_status'), + ), + ] diff --git a/src/backend/apps/notifications/migrations/0002_alter_notificationchannel_config.py b/src/backend/apps/notifications/migrations/0002_alter_notificationchannel_config.py new file mode 100644 index 0000000..7273572 --- /dev/null +++ b/src/backend/apps/notifications/migrations/0002_alter_notificationchannel_config.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.11 on 2026-02-22 13:57 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('notifications', '0001_initial'), + ] + + operations = [ + migrations.AlterField( + model_name='notificationchannel', + name='config', + field=models.JSONField(blank=True, default=dict, help_text='Provider-specific settings (tokens, chat IDs, SMTP host, etc.)', verbose_name='configuration'), + ), + ] diff --git a/src/backend/apps/notifications/migrations/__init__.py b/src/backend/apps/notifications/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/notifications/models.py b/src/backend/apps/notifications/models.py new file mode 100644 index 0000000..e897a04 --- /dev/null +++ b/src/backend/apps/notifications/models.py @@ -0,0 +1,183 @@ +from typing import override + +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from apps.core.models import BaseModel + + +class ChannelType(models.TextChoices): + TELEGRAM = "telegram", _("Telegram") + SMTP = "smtp", _("SMTP Email") + + +class NotificationEventType(models.TextChoices): + EXPERIMENT_STARTED = "experiment_started", _("Experiment started") + EXPERIMENT_PAUSED = "experiment_paused", _("Experiment paused") + EXPERIMENT_COMPLETED = "experiment_completed", _("Experiment completed") + GUARDRAIL_TRIGGERED = "guardrail_triggered", _("Guardrail triggered") + REVIEW_REQUESTED = "review_requested", _("Review requested") + REVIEW_APPROVED = "review_approved", _("Review approved") + REVIEW_REJECTED = "review_rejected", _("Review rejected") + + +class NotificationStatus(models.TextChoices): + PENDING = "pending", _("Pending") + SENT = "sent", _("Sent") + FAILED = "failed", _("Failed") + + +class NotificationChannel(BaseModel): + channel_type = models.CharField( + max_length=20, + choices=ChannelType.choices, + verbose_name=_("channel type"), + ) + name = models.CharField( + max_length=200, + verbose_name=_("name"), + ) + config = models.JSONField( + default=dict, + blank=True, + verbose_name=_("configuration"), + help_text=_("Provider-specific settings (tokens, chat IDs, SMTP host, etc.)"), + ) + is_active = models.BooleanField( + default=True, + verbose_name=_("is active"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + updated_at = models.DateTimeField( + auto_now=True, + verbose_name=_("updated at"), + ) + + class Meta: + verbose_name = _("notification channel") + verbose_name_plural = _("notification channels") + ordering = ["-created_at"] + + @override + def __str__(self) -> str: + return f"{self.name} ({self.channel_type})" + + +class NotificationRule(BaseModel): + event_type = models.CharField( + max_length=30, + choices=NotificationEventType.choices, + verbose_name=_("event type"), + ) + channel = models.ForeignKey( + NotificationChannel, + on_delete=models.CASCADE, + related_name="rules", + verbose_name=_("channel"), + ) + experiment = models.ForeignKey( + "experiments.Experiment", + on_delete=models.CASCADE, + related_name="notification_rules", + verbose_name=_("experiment"), + null=True, + blank=True, + help_text=_("If null, applies to all experiments."), + ) + is_active = models.BooleanField( + default=True, + verbose_name=_("is active"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + updated_at = models.DateTimeField( + auto_now=True, + verbose_name=_("updated at"), + ) + + class Meta: + verbose_name = _("notification rule") + verbose_name_plural = _("notification rules") + ordering = ["-created_at"] + indexes = [ + models.Index( + fields=["event_type", "is_active"], + name="idx_rule_event_active", + ), + ] + + @override + def __str__(self) -> str: + scope = self.experiment.name if self.experiment else "all" + return f"{self.event_type} -> {self.channel.name} ({scope})" + + +class NotificationLog(BaseModel): + rule = models.ForeignKey( + NotificationRule, + on_delete=models.SET_NULL, + null=True, + related_name="logs", + verbose_name=_("rule"), + ) + channel = models.ForeignKey( + NotificationChannel, + on_delete=models.SET_NULL, + null=True, + related_name="logs", + verbose_name=_("channel"), + ) + event_type = models.CharField( + max_length=30, + choices=NotificationEventType.choices, + verbose_name=_("event type"), + ) + event_key = models.CharField( + max_length=255, + verbose_name=_("dedup key"), + db_index=True, + ) + payload = models.JSONField( + default=dict, + verbose_name=_("payload"), + ) + status = models.CharField( + max_length=10, + choices=NotificationStatus.choices, + default=NotificationStatus.PENDING, + verbose_name=_("status"), + ) + error = models.TextField( + blank=True, + default="", + verbose_name=_("error details"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + sent_at = models.DateTimeField( + null=True, + blank=True, + verbose_name=_("sent at"), + ) + + class Meta: + verbose_name = _("notification log") + verbose_name_plural = _("notification logs") + ordering = ["-created_at"] + indexes = [ + models.Index( + fields=["status", "-created_at"], + name="idx_notif_log_status", + ), + ] + + @override + def __str__(self) -> str: + return f"Notification({self.event_type}, {self.status})" diff --git a/src/backend/apps/notifications/services.py b/src/backend/apps/notifications/services.py new file mode 100644 index 0000000..cbbd4b2 --- /dev/null +++ b/src/backend/apps/notifications/services.py @@ -0,0 +1,316 @@ +import logging +from dataclasses import dataclass, field +from typing import Any + +import requests +from django.core.mail import send_mail +from django.db import transaction +from django.db.models import QuerySet +from django.utils import timezone + +from apps.notifications.models import ( + ChannelType, + NotificationChannel, + NotificationEventType, + NotificationLog, + NotificationRule, + NotificationStatus, +) + +logger = logging.getLogger("lotty") + + +@dataclass(frozen=True) +class NotificationPayload: + title: str + body: str + event_type: str + experiment_id: str = "" + experiment_name: str = "" + extra: dict[str, Any] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Channel CRUD +# --------------------------------------------------------------------------- + + +@transaction.atomic +def channel_create( + *, + channel_type: str, + name: str, + config: dict[str, Any] | None = None, +) -> NotificationChannel: + channel = NotificationChannel( + channel_type=channel_type, + name=name, + config=config or {}, + ) + channel.save() + return channel + + +def channel_update( + *, + channel: NotificationChannel, + **fields: Any, +) -> NotificationChannel: + allowed = {"name", "config", "is_active"} + for key in fields: + if key not in allowed: + raise ValueError(f"Field '{key}' cannot be updated.") + for key, value in fields.items(): + if value is not None: + setattr(channel, key, value) + channel.save() + return channel + + +def channel_delete(*, channel: NotificationChannel) -> None: + channel.delete() + + +def channel_list() -> QuerySet[NotificationChannel]: + return NotificationChannel.objects.all() + + +def channel_get(channel_id: Any) -> NotificationChannel | None: + try: + return NotificationChannel.objects.get(pk=channel_id) + except NotificationChannel.DoesNotExist: + return None + + +# --------------------------------------------------------------------------- +# Rule CRUD +# --------------------------------------------------------------------------- + + +@transaction.atomic +def rule_create( + *, + event_type: str, + channel: NotificationChannel, + experiment: Any | None = None, +) -> NotificationRule: + rule = NotificationRule( + event_type=event_type, + channel=channel, + experiment=experiment, + ) + rule.save() + return rule + + +def rule_update( + *, + rule: NotificationRule, + **fields: Any, +) -> NotificationRule: + allowed = {"event_type", "is_active"} + for key in fields: + if key not in allowed: + raise ValueError(f"Field '{key}' cannot be updated.") + for key, value in fields.items(): + if value is not None: + setattr(rule, key, value) + rule.save() + return rule + + +def rule_delete(*, rule: NotificationRule) -> None: + rule.delete() + + +def rule_list(channel_id: Any | None = None) -> QuerySet[NotificationRule]: + qs = NotificationRule.objects.select_related("channel", "experiment").all() + if channel_id is not None: + qs = qs.filter(channel_id=channel_id) + return qs + + +# --------------------------------------------------------------------------- +# Log selectors +# --------------------------------------------------------------------------- + + +def log_list( + *, + status: str | None = None, + limit: int = 100, +) -> QuerySet[NotificationLog]: + qs = NotificationLog.objects.select_related("channel", "rule").all() + if status: + qs = qs.filter(status=status) + return qs[:limit] + + +# --------------------------------------------------------------------------- +# Notification enqueue (called from integration points) +# --------------------------------------------------------------------------- + + +def notification_enqueue( + event_type: str, + payload: NotificationPayload, +) -> list[NotificationLog]: + rules = NotificationRule.objects.filter( + event_type=event_type, + is_active=True, + channel__is_active=True, + ).select_related("channel") + + if payload.experiment_id: + rules = rules.filter( + models_Q(experiment__isnull=True) + | models_Q(experiment_id=payload.experiment_id) + ) + else: + rules = rules.filter(experiment__isnull=True) + + logs: list[NotificationLog] = [] + for rule in rules: + event_key = _build_event_key(event_type, payload) + if NotificationLog.objects.filter( + event_key=event_key, + channel=rule.channel, + status__in=[NotificationStatus.PENDING, NotificationStatus.SENT], + ).exists(): + continue + + log = NotificationLog.objects.create( + rule=rule, + channel=rule.channel, + event_type=event_type, + event_key=event_key, + payload={ + "title": payload.title, + "body": payload.body, + "experiment_id": payload.experiment_id, + "experiment_name": payload.experiment_name, + "extra": payload.extra, + }, + status=NotificationStatus.PENDING, + ) + logs.append(log) + + return logs + + +def _build_event_key(event_type: str, payload: NotificationPayload) -> str: + bucket = int(timezone.now().timestamp()) // 60 + return f"{event_type}:{payload.experiment_id}:{bucket}" + + +def models_Q(**kwargs): + from django.db.models import Q + + return Q(**kwargs) + + +# --------------------------------------------------------------------------- +# Senders +# --------------------------------------------------------------------------- + + +def _send_telegram(config: dict[str, Any], payload: dict[str, Any]) -> None: + bot_token = config.get("bot_token", "") + chat_id = config.get("chat_id", "") + if not bot_token or not chat_id: + raise ValueError("Telegram config requires 'bot_token' and 'chat_id'.") + + text = f"*{payload['title']}*\n\n{payload['body']}" + if payload.get("experiment_name"): + text += f"\n\nExperiment: {payload['experiment_name']}" + + api_url = config.get( + "api_url", + f"https://api.telegram.org/bot{bot_token}", + ) + response = requests.post( + f"{api_url}/sendMessage", + json={ + "chat_id": chat_id, + "text": text, + "parse_mode": "Markdown", + }, + timeout=10, + ) + response.raise_for_status() + + +def _send_smtp(config: dict[str, Any], payload: dict[str, Any]) -> None: + recipient = config.get("recipient", "") + from_email = config.get("from_email", "lotty@lotty.local") + if not recipient: + raise ValueError("SMTP config requires 'recipient'.") + + subject = payload.get("title", "Lotty Notification") + body = payload.get("body", "") + if payload.get("experiment_name"): + body += f"\n\nExperiment: {payload['experiment_name']}" + + send_mail( + subject=subject, + message=body, + from_email=from_email, + recipient_list=[recipient], + fail_silently=False, + ) + + +# --------------------------------------------------------------------------- +# Flush pending (called from Celery task) +# --------------------------------------------------------------------------- + + +def flush_pending_notifications() -> dict[str, int]: + pending = NotificationLog.objects.filter( + status=NotificationStatus.PENDING, + ).select_related("channel") + + senders = { + ChannelType.TELEGRAM: _send_telegram, + ChannelType.SMTP: _send_smtp, + } + + results = {"sent": 0, "failed": 0} + + for log in pending: + if not log.channel or not log.channel.is_active: + log.status = NotificationStatus.FAILED + log.error = "Channel is inactive or missing." + log.save(update_fields=["status", "error"]) + results["failed"] += 1 + continue + + sender = senders.get(log.channel.channel_type) + if not sender: + log.status = NotificationStatus.FAILED + log.error = f"No sender for channel type '{log.channel.channel_type}'." + log.save(update_fields=["status", "error"]) + results["failed"] += 1 + continue + + try: + sender(log.channel.config, log.payload) + log.status = NotificationStatus.SENT + log.sent_at = timezone.now() + log.save(update_fields=["status", "sent_at"]) + results["sent"] += 1 + except Exception as exc: + logger.exception( + "notification_send_failed", + extra={ + "log_id": str(log.pk), + "channel": log.channel.name, + "error": str(exc), + }, + ) + log.status = NotificationStatus.FAILED + log.error = str(exc)[:1000] + log.save(update_fields=["status", "error"]) + results["failed"] += 1 + + return results diff --git a/src/backend/apps/notifications/tasks.py b/src/backend/apps/notifications/tasks.py new file mode 100644 index 0000000..f15b6c7 --- /dev/null +++ b/src/backend/apps/notifications/tasks.py @@ -0,0 +1,19 @@ +import logging + +from apps.notifications.services import flush_pending_notifications +from config.celery import app + +logger = logging.getLogger("lotty") + + +@app.task(bind=True, name="notifications.flush_pending") +def flush_pending_notifications_task(self): + results = flush_pending_notifications() + logger.info( + "notifications_flush_completed", + extra={ + "sent": results["sent"], + "failed": results["failed"], + }, + ) + return results diff --git a/src/backend/apps/notifications/tests/__init__.py b/src/backend/apps/notifications/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/notifications/tests/test_notifications.py b/src/backend/apps/notifications/tests/test_notifications.py new file mode 100644 index 0000000..129e8d8 --- /dev/null +++ b/src/backend/apps/notifications/tests/test_notifications.py @@ -0,0 +1,306 @@ +from typing import Any, override +from unittest.mock import patch + +from django.test import TestCase + +from apps.experiments.tests.helpers import make_experiment +from apps.notifications.models import ( + ChannelType, + NotificationChannel, + NotificationEventType, + NotificationLog, + NotificationRule, + NotificationStatus, +) +from apps.notifications.services import ( + NotificationPayload, + channel_create, + channel_delete, + channel_get, + channel_list, + channel_update, + flush_pending_notifications, + notification_enqueue, + rule_create, + rule_delete, + rule_list, + rule_update, +) + + +class ChannelCRUDTest(TestCase): + def test_create_channel(self) -> None: + ch = channel_create( + channel_type=ChannelType.TELEGRAM, + name="Team Chat", + config={"bot_token": "123", "chat_id": "-100"}, + ) + self.assertEqual(ch.channel_type, ChannelType.TELEGRAM) + self.assertEqual(ch.name, "Team Chat") + self.assertEqual(ch.config["bot_token"], "123") + self.assertTrue(ch.is_active) + + def test_update_channel(self) -> None: + ch = channel_create( + channel_type=ChannelType.SMTP, + name="Old Name", + ) + ch = channel_update(channel=ch, name="New Name") + self.assertEqual(ch.name, "New Name") + + def test_update_channel_disallowed_field(self) -> None: + ch = channel_create( + channel_type=ChannelType.SMTP, + name="X", + ) + with self.assertRaises(ValueError): + channel_update(channel=ch, channel_type="telegram") + + def test_delete_channel(self) -> None: + ch = channel_create( + channel_type=ChannelType.TELEGRAM, + name="Delete Me", + ) + pk = ch.pk + channel_delete(channel=ch) + self.assertIsNone(channel_get(pk)) + + def test_list_channels(self) -> None: + channel_create(channel_type=ChannelType.TELEGRAM, name="A") + channel_create(channel_type=ChannelType.SMTP, name="B") + self.assertEqual(channel_list().count(), 2) + + +class RuleCRUDTest(TestCase): + @override + def setUp(self) -> None: + self.channel = channel_create( + channel_type=ChannelType.TELEGRAM, + name="Test Channel", + config={"bot_token": "tok", "chat_id": "123"}, + ) + + def test_create_rule_global(self) -> None: + r = rule_create( + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + channel=self.channel, + ) + self.assertEqual(r.event_type, NotificationEventType.GUARDRAIL_TRIGGERED) + self.assertIsNone(r.experiment) + self.assertTrue(r.is_active) + + def test_create_rule_for_experiment(self) -> None: + exp = make_experiment(suffix="_rule") + r = rule_create( + event_type=NotificationEventType.EXPERIMENT_STARTED, + channel=self.channel, + experiment=exp, + ) + self.assertEqual(r.experiment_id, exp.pk) + + def test_update_rule(self) -> None: + r = rule_create( + event_type=NotificationEventType.EXPERIMENT_STARTED, + channel=self.channel, + ) + r = rule_update(rule=r, is_active=False) + self.assertFalse(r.is_active) + + def test_delete_rule(self) -> None: + r = rule_create( + event_type=NotificationEventType.EXPERIMENT_STARTED, + channel=self.channel, + ) + rule_delete(rule=r) + self.assertEqual(rule_list().count(), 0) + + +class NotificationEnqueueTest(TestCase): + @override + def setUp(self) -> None: + self.channel = channel_create( + channel_type=ChannelType.TELEGRAM, + name="Alerts", + config={"bot_token": "tok", "chat_id": "-100"}, + ) + self.experiment = make_experiment(suffix="_enq") + + def test_enqueue_creates_pending_log(self) -> None: + rule_create( + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + channel=self.channel, + ) + logs = notification_enqueue( + NotificationEventType.GUARDRAIL_TRIGGERED, + NotificationPayload( + title="Alert", + body="Error rate exceeded", + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + experiment_id=str(self.experiment.pk), + experiment_name=self.experiment.name, + ), + ) + self.assertEqual(len(logs), 1) + self.assertEqual(logs[0].status, NotificationStatus.PENDING) + + def test_enqueue_deduplicates(self) -> None: + rule_create( + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + channel=self.channel, + ) + payload = NotificationPayload( + title="Alert", + body="Error rate exceeded", + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + experiment_id=str(self.experiment.pk), + experiment_name=self.experiment.name, + ) + logs_1 = notification_enqueue( + NotificationEventType.GUARDRAIL_TRIGGERED, payload + ) + logs_2 = notification_enqueue( + NotificationEventType.GUARDRAIL_TRIGGERED, payload + ) + self.assertEqual(len(logs_1), 1) + self.assertEqual(len(logs_2), 0) + + def test_enqueue_no_matching_rules(self) -> None: + logs = notification_enqueue( + NotificationEventType.EXPERIMENT_STARTED, + NotificationPayload( + title="Started", + body="Exp started", + event_type=NotificationEventType.EXPERIMENT_STARTED, + ), + ) + self.assertEqual(len(logs), 0) + + def test_enqueue_inactive_channel_skipped(self) -> None: + self.channel.is_active = False + self.channel.save(update_fields=["is_active"]) + rule_create( + event_type=NotificationEventType.EXPERIMENT_STARTED, + channel=self.channel, + ) + logs = notification_enqueue( + NotificationEventType.EXPERIMENT_STARTED, + NotificationPayload( + title="Started", + body="Exp started", + event_type=NotificationEventType.EXPERIMENT_STARTED, + ), + ) + self.assertEqual(len(logs), 0) + + def test_enqueue_experiment_scoped_rule(self) -> None: + rule_create( + event_type=NotificationEventType.EXPERIMENT_STARTED, + channel=self.channel, + experiment=self.experiment, + ) + other_exp = make_experiment(suffix="_other") + logs = notification_enqueue( + NotificationEventType.EXPERIMENT_STARTED, + NotificationPayload( + title="Started", + body="Exp started", + event_type=NotificationEventType.EXPERIMENT_STARTED, + experiment_id=str(other_exp.pk), + experiment_name=other_exp.name, + ), + ) + self.assertEqual(len(logs), 0) + + +class FlushNotificationsTest(TestCase): + @override + def setUp(self) -> None: + self.channel = channel_create( + channel_type=ChannelType.TELEGRAM, + name="Flush Test", + config={"bot_token": "tok", "chat_id": "-100"}, + ) + + @patch("apps.notifications.services._send_telegram") + def test_flush_sends_pending(self, mock_send: Any) -> None: + rule = rule_create( + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + channel=self.channel, + ) + NotificationLog.objects.create( + rule=rule, + channel=self.channel, + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + event_key="test:key:1", + payload={"title": "Alert", "body": "Test"}, + status=NotificationStatus.PENDING, + ) + results = flush_pending_notifications() + self.assertEqual(results["sent"], 1) + self.assertEqual(results["failed"], 0) + mock_send.assert_called_once() + + @patch( + "apps.notifications.services._send_telegram", + side_effect=Exception("Network error"), + ) + def test_flush_marks_failed_on_error(self, mock_send: Any) -> None: + rule = rule_create( + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + channel=self.channel, + ) + log = NotificationLog.objects.create( + rule=rule, + channel=self.channel, + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + event_key="test:key:2", + payload={"title": "Alert", "body": "Test"}, + status=NotificationStatus.PENDING, + ) + results = flush_pending_notifications() + self.assertEqual(results["sent"], 0) + self.assertEqual(results["failed"], 1) + log.refresh_from_db() + self.assertEqual(log.status, NotificationStatus.FAILED) + self.assertIn("Network error", log.error) + + def test_flush_skips_inactive_channel(self) -> None: + self.channel.is_active = False + self.channel.save(update_fields=["is_active"]) + rule = rule_create( + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + channel=self.channel, + ) + NotificationLog.objects.create( + rule=rule, + channel=self.channel, + event_type=NotificationEventType.GUARDRAIL_TRIGGERED, + event_key="test:key:3", + payload={"title": "Alert", "body": "Test"}, + status=NotificationStatus.PENDING, + ) + results = flush_pending_notifications() + self.assertEqual(results["failed"], 1) + + @patch("apps.notifications.services._send_smtp") + def test_flush_smtp_channel(self, mock_send: Any) -> None: + smtp_ch = channel_create( + channel_type=ChannelType.SMTP, + name="Email", + config={"recipient": "team@lotty.local"}, + ) + rule = rule_create( + event_type=NotificationEventType.EXPERIMENT_COMPLETED, + channel=smtp_ch, + ) + NotificationLog.objects.create( + rule=rule, + channel=smtp_ch, + event_type=NotificationEventType.EXPERIMENT_COMPLETED, + event_key="test:key:4", + payload={"title": "Done", "body": "Exp completed"}, + status=NotificationStatus.PENDING, + ) + results = flush_pending_notifications() + self.assertEqual(results["sent"], 1) + mock_send.assert_called_once()