diff --git a/src/backend/apps/events/__init__.py b/src/backend/apps/events/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/events/apps.py b/src/backend/apps/events/apps.py new file mode 100644 index 0000000..aebaf1e --- /dev/null +++ b/src/backend/apps/events/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class EventsConfig(AppConfig): + name = "apps.events" + verbose_name = "Events" diff --git a/src/backend/apps/events/migrations/0001_initial.py b/src/backend/apps/events/migrations/0001_initial.py new file mode 100644 index 0000000..919589e --- /dev/null +++ b/src/backend/apps/events/migrations/0001_initial.py @@ -0,0 +1,114 @@ +# Generated by Django 5.2.11 on 2026-02-14 09:55 + +import django.core.validators +import django.db.models.deletion +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='EventType', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('name', models.CharField(max_length=100, unique=True, validators=[django.core.validators.RegexValidator(message='Event type name must start with a lowercase letter and contain only lowercase letters, digits, and underscores.', regex='^[a-z][a-z0-9_]*$')], verbose_name='name')), + ('display_name', models.CharField(max_length=200, verbose_name='display name')), + ('description', models.TextField(blank=True, verbose_name='description')), + ('requires_exposure', models.BooleanField(default=False, help_text='When True, events of this type are only attributed if a matching exposure exists for the same decision_id.', verbose_name='requires exposure')), + ('required_fields', models.JSONField(blank=True, default=list, help_text='List of property field names that must be present in event properties.', verbose_name='required fields')), + ('is_active', models.BooleanField(db_index=True, default=True, verbose_name='is active')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='updated at')), + ], + options={ + 'verbose_name': 'event type', + 'verbose_name_plural': 'event types', + 'ordering': ['name'], + }, + ), + migrations.CreateModel( + name='Decision', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('decision_id', models.CharField(max_length=100, unique=True, verbose_name='decision ID')), + ('flag_key', models.CharField(max_length=100, verbose_name='flag key')), + ('subject_id', models.CharField(db_index=True, max_length=200, verbose_name='subject ID')), + ('experiment_id', models.UUIDField(blank=True, null=True, verbose_name='experiment ID')), + ('variant_id', models.UUIDField(blank=True, null=True, verbose_name='variant ID')), + ('value', models.CharField(blank=True, max_length=500, verbose_name='resolved value')), + ('reason', models.CharField(max_length=50, verbose_name='reason')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ], + options={ + 'verbose_name': 'decision', + 'verbose_name_plural': 'decisions', + 'ordering': ['-created_at'], + 'indexes': [models.Index(fields=['flag_key', 'subject_id'], name='idx_decision_flag_subject')], + }, + ), + migrations.CreateModel( + name='Exposure', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('decision_id', models.CharField(max_length=100, unique=True, verbose_name='decision ID')), + ('experiment_id', models.UUIDField(blank=True, db_index=True, null=True, verbose_name='experiment ID')), + ('variant_id', models.UUIDField(blank=True, null=True, verbose_name='variant ID')), + ('subject_id', models.CharField(db_index=True, max_length=200, verbose_name='subject ID')), + ('timestamp', models.DateTimeField(verbose_name='event timestamp')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ], + options={ + 'verbose_name': 'exposure', + 'verbose_name_plural': 'exposures', + 'ordering': ['-timestamp'], + 'indexes': [models.Index(fields=['experiment_id', 'variant_id', 'timestamp'], name='idx_exposure_exp_var_ts')], + }, + ), + migrations.CreateModel( + name='Event', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('event_id', models.CharField(help_text='Client-provided idempotency key', max_length=200, unique=True, verbose_name='event ID')), + ('decision_id', models.CharField(db_index=True, max_length=100, verbose_name='decision ID')), + ('subject_id', models.CharField(db_index=True, max_length=200, verbose_name='subject ID')), + ('timestamp', models.DateTimeField(verbose_name='event timestamp')), + ('properties', models.JSONField(blank=True, default=dict, verbose_name='properties')), + ('is_attributed', models.BooleanField(db_index=True, default=True, help_text='False when event requires exposure but none was found yet.', verbose_name='is attributed')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ('event_type', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='events', to='events.eventtype', verbose_name='event type')), + ], + options={ + 'verbose_name': 'event', + 'verbose_name_plural': 'events', + 'ordering': ['-timestamp'], + 'indexes': [models.Index(fields=['decision_id', 'event_type'], name='idx_event_decision_type'), models.Index(fields=['event_type', 'subject_id', 'timestamp'], name='idx_event_type_subj_ts')], + }, + ), + migrations.CreateModel( + name='PendingEvent', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('event_id', models.CharField(max_length=200, unique=True, verbose_name='event ID')), + ('decision_id', models.CharField(db_index=True, max_length=100, verbose_name='decision ID')), + ('subject_id', models.CharField(max_length=200, verbose_name='subject ID')), + ('timestamp', models.DateTimeField(verbose_name='event timestamp')), + ('properties', models.JSONField(blank=True, default=dict, verbose_name='properties')), + ('expires_at', models.DateTimeField(db_index=True, verbose_name='expires at')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ('event_type', models.ForeignKey(on_delete=django.db.models.deletion.PROTECT, related_name='pending_events', to='events.eventtype', verbose_name='event type')), + ], + options={ + 'verbose_name': 'pending event', + 'verbose_name_plural': 'pending events', + 'ordering': ['-created_at'], + 'indexes': [models.Index(fields=['decision_id'], name='idx_pending_decision')], + }, + ), + ] diff --git a/src/backend/apps/events/migrations/0002_add_is_exposure_to_event_type.py b/src/backend/apps/events/migrations/0002_add_is_exposure_to_event_type.py new file mode 100644 index 0000000..ff9fbe4 --- /dev/null +++ b/src/backend/apps/events/migrations/0002_add_is_exposure_to_event_type.py @@ -0,0 +1,24 @@ +# Generated by Django 5.2.11 on 2026-02-20 21:08 + +import django.core.validators +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('events', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='eventtype', + name='is_exposure', + field=models.BooleanField(default=False, help_text='When True, this event type represents an exposure (fact of showing a variant to a user).', verbose_name='is exposure'), + ), + migrations.AlterField( + model_name='eventtype', + name='name', + field=models.CharField(max_length=100, unique=True, validators=[django.core.validators.RegexValidator(message='Event type name must follow snake_case, camelCase, or PascalCase.', regex='^[A-Za-z][A-Za-z0-9_]*$')], verbose_name='name'), + ), + ] diff --git a/src/backend/apps/events/migrations/__init__.py b/src/backend/apps/events/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/events/models.py b/src/backend/apps/events/models.py new file mode 100644 index 0000000..e5fc591 --- /dev/null +++ b/src/backend/apps/events/models.py @@ -0,0 +1,300 @@ +from typing import override + +from django.core.validators import RegexValidator +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from apps.core.models import BaseModel + +EVENT_TYPE_KEY_PATTERN = r"^[A-Za-z][A-Za-z0-9_]*$" + + +class EventType(BaseModel): + name = models.CharField( + max_length=100, + unique=True, + verbose_name=_("name"), + validators=[ + RegexValidator( + regex=EVENT_TYPE_KEY_PATTERN, + message=( + "Event type name must follow snake_case, " + "camelCase, or PascalCase." + ), + ) + ], + ) + display_name = models.CharField( + max_length=200, + verbose_name=_("display name"), + ) + description = models.TextField( + blank=True, + verbose_name=_("description"), + ) + is_exposure = models.BooleanField( + default=False, + verbose_name=_("is exposure"), + help_text=_( + "When True, this event type represents an exposure " + "(fact of showing a variant to a user)." + ), + ) + requires_exposure = models.BooleanField( + default=False, + verbose_name=_("requires exposure"), + help_text=_( + "When True, events of this type are only attributed " + "if a matching exposure exists for the same decision_id." + ), + ) + required_fields = models.JSONField( + default=list, + blank=True, + verbose_name=_("required fields"), + help_text=_( + "List of property field names that must be present " + "in event properties." + ), + ) + is_active = models.BooleanField( + default=True, + db_index=True, + verbose_name=_("is active"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + updated_at = models.DateTimeField( + auto_now=True, + verbose_name=_("updated at"), + ) + + class Meta: + verbose_name = _("event type") + verbose_name_plural = _("event types") + ordering = ["name"] + + @override + def __str__(self) -> str: + return self.name + + +class Exposure(BaseModel): + decision_id = models.CharField( + max_length=100, + unique=True, + verbose_name=_("decision ID"), + ) + experiment_id = models.UUIDField( + null=True, + blank=True, + verbose_name=_("experiment ID"), + db_index=True, + ) + variant_id = models.UUIDField( + null=True, + blank=True, + verbose_name=_("variant ID"), + ) + subject_id = models.CharField( + max_length=200, + db_index=True, + verbose_name=_("subject ID"), + ) + timestamp = models.DateTimeField( + verbose_name=_("event timestamp"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + + class Meta: + verbose_name = _("exposure") + verbose_name_plural = _("exposures") + ordering = ["-timestamp"] + indexes = [ + models.Index( + fields=["experiment_id", "variant_id", "timestamp"], + name="idx_exposure_exp_var_ts", + ), + ] + + @override + def __str__(self) -> str: + return f"Exposure({self.decision_id})" + + +class Decision(BaseModel): + decision_id = models.CharField( + max_length=100, + unique=True, + verbose_name=_("decision ID"), + ) + flag_key = models.CharField( + max_length=100, + verbose_name=_("flag key"), + ) + subject_id = models.CharField( + max_length=200, + db_index=True, + verbose_name=_("subject ID"), + ) + experiment_id = models.UUIDField( + null=True, + blank=True, + verbose_name=_("experiment ID"), + ) + variant_id = models.UUIDField( + null=True, + blank=True, + verbose_name=_("variant ID"), + ) + value = models.CharField( + max_length=500, + blank=True, + verbose_name=_("resolved value"), + ) + reason = models.CharField( + max_length=50, + verbose_name=_("reason"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + + class Meta: + verbose_name = _("decision") + verbose_name_plural = _("decisions") + ordering = ["-created_at"] + indexes = [ + models.Index( + fields=["flag_key", "subject_id"], + name="idx_decision_flag_subject", + ), + ] + + @override + def __str__(self) -> str: + return f"Decision({self.decision_id}, {self.flag_key})" + + +class Event(BaseModel): + event_id = models.CharField( + max_length=200, + unique=True, + verbose_name=_("event ID"), + help_text=_("Client-provided idempotency key"), + ) + event_type = models.ForeignKey( + EventType, + on_delete=models.PROTECT, + related_name="events", + verbose_name=_("event type"), + ) + decision_id = models.CharField( + max_length=100, + db_index=True, + verbose_name=_("decision ID"), + ) + subject_id = models.CharField( + max_length=200, + db_index=True, + verbose_name=_("subject ID"), + ) + timestamp = models.DateTimeField( + verbose_name=_("event timestamp"), + ) + properties = models.JSONField( + default=dict, + blank=True, + verbose_name=_("properties"), + ) + is_attributed = models.BooleanField( + default=True, + db_index=True, + verbose_name=_("is attributed"), + help_text=_( + "False when event requires exposure but none was found yet." + ), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + + class Meta: + verbose_name = _("event") + verbose_name_plural = _("events") + ordering = ["-timestamp"] + indexes = [ + models.Index( + fields=["decision_id", "event_type"], + name="idx_event_decision_type", + ), + models.Index( + fields=["event_type", "subject_id", "timestamp"], + name="idx_event_type_subj_ts", + ), + ] + + @override + def __str__(self) -> str: + return f"Event({self.event_id}, {self.event_type})" + + +class PendingEvent(BaseModel): + event_id = models.CharField( + max_length=200, + unique=True, + verbose_name=_("event ID"), + ) + event_type = models.ForeignKey( + EventType, + on_delete=models.PROTECT, + related_name="pending_events", + verbose_name=_("event type"), + ) + decision_id = models.CharField( + max_length=100, + db_index=True, + verbose_name=_("decision ID"), + ) + subject_id = models.CharField( + max_length=200, + verbose_name=_("subject ID"), + ) + timestamp = models.DateTimeField( + verbose_name=_("event timestamp"), + ) + properties = models.JSONField( + default=dict, + blank=True, + verbose_name=_("properties"), + ) + expires_at = models.DateTimeField( + db_index=True, + verbose_name=_("expires at"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + + class Meta: + verbose_name = _("pending event") + verbose_name_plural = _("pending events") + ordering = ["-created_at"] + indexes = [ + models.Index( + fields=["decision_id"], + name="idx_pending_decision", + ), + ] + + @override + def __str__(self) -> str: + return f"PendingEvent({self.event_id}, {self.decision_id})" diff --git a/src/backend/apps/events/selectors.py b/src/backend/apps/events/selectors.py new file mode 100644 index 0000000..705c646 --- /dev/null +++ b/src/backend/apps/events/selectors.py @@ -0,0 +1,90 @@ +from uuid import UUID + +from django.db.models import QuerySet +from django.utils import timezone + +from apps.events.models import ( + Decision, + Event, + EventType, + Exposure, + PendingEvent, +) + + +def event_type_list( + *, + is_active: bool | None = None, +) -> QuerySet[EventType]: + qs = EventType.objects.all() + if is_active is not None: + qs = qs.filter(is_active=is_active) + return qs + + +def event_type_get_by_name(name: str) -> EventType | None: + return EventType.objects.filter(name=name).first() + + +def event_type_get(event_type_id: UUID) -> EventType | None: + return EventType.objects.filter(pk=event_type_id).first() + + +def exposure_exists(decision_id: str) -> bool: + return Exposure.objects.filter(decision_id=decision_id).exists() + + +def exposure_get(decision_id: str) -> Exposure | None: + return Exposure.objects.filter(decision_id=decision_id).first() + + +def decision_get(decision_id: str) -> Decision | None: + return Decision.objects.filter(decision_id=decision_id).first() + + +def pending_events_for_decision( + decision_id: str, +) -> QuerySet[PendingEvent]: + return PendingEvent.objects.filter( + decision_id=decision_id, + expires_at__gt=timezone.now(), + ) + + +def event_exists(event_id: str) -> bool: + return Event.objects.filter(event_id=event_id).exists() + + +def pending_event_exists(event_id: str) -> bool: + return PendingEvent.objects.filter(event_id=event_id).exists() + + +def events_for_decision(decision_id: str) -> QuerySet[Event]: + return Event.objects.filter(decision_id=decision_id).select_related( + "event_type" + ) + + +def events_for_experiment( + experiment_id: UUID, + *, + event_type_name: str | None = None, + start_date: str | None = None, + end_date: str | None = None, +) -> QuerySet[Event]: + exposure_decision_ids = Exposure.objects.filter( + experiment_id=experiment_id, + ).values_list("decision_id", flat=True) + + qs = Event.objects.filter( + decision_id__in=exposure_decision_ids, + is_attributed=True, + ).select_related("event_type") + + if event_type_name: + qs = qs.filter(event_type__name=event_type_name) + if start_date: + qs = qs.filter(timestamp__gte=start_date) + if end_date: + qs = qs.filter(timestamp__lt=end_date) + return qs diff --git a/src/backend/apps/events/services.py b/src/backend/apps/events/services.py new file mode 100644 index 0000000..ee2a646 --- /dev/null +++ b/src/backend/apps/events/services.py @@ -0,0 +1,321 @@ +from contextlib import suppress +from datetime import timedelta +from typing import Any + +from django.core.exceptions import ValidationError +from django.db import IntegrityError, transaction +from django.utils import timezone +from django.utils.dateparse import parse_datetime + +from apps.events.models import ( + Decision, + Event, + EventType, + Exposure, + PendingEvent, +) +from apps.events.selectors import ( + decision_get, + event_exists, + event_type_get_by_name, + exposure_exists, + pending_event_exists, + pending_events_for_decision, +) + +PENDING_TTL_DAYS = 7 + + +def event_type_create( + *, + name: str, + display_name: str, + description: str = "", + is_exposure: bool = False, + requires_exposure: bool = False, + required_fields: list[str] | None = None, +) -> EventType: + event_type = EventType( + name=name, + display_name=display_name, + description=description, + is_exposure=is_exposure, + requires_exposure=requires_exposure, + required_fields=required_fields or [], + ) + event_type.save() + return event_type + + +def event_type_update( + *, + event_type: EventType, + **fields: Any, +) -> EventType: + allowed = { + "display_name", + "description", + "is_exposure", + "requires_exposure", + "required_fields", + "is_active", + } + for key in fields: + if key not in allowed: + raise ValidationError({key: f"Field '{key}' cannot be updated."}) + for key, value in fields.items(): + if value is not None: + setattr(event_type, key, value) + event_type.save() + return event_type + + +def _validate_event_payload( + event_data: dict[str, Any], + event_type: EventType, +) -> list[str]: + errors = [] + required_base = [ + "event_id", + "event_type", + "decision_id", + "subject_id", + "timestamp", + ] + errors.extend( + f"Missing required field: {field}" + for field in required_base + if not event_data.get(field) + ) + + if not isinstance(event_data.get("event_id"), str): + errors.append("Field 'event_id' must be a string") + + if not isinstance(event_data.get("decision_id"), str): + errors.append("Field 'decision_id' must be a string") + + if not isinstance(event_data.get("subject_id"), str): + errors.append("Field 'subject_id' must be a string") + + if not isinstance(event_data.get("timestamp"), str): + errors.append("Field 'timestamp' must be an ISO 8601 string") + + properties = event_data.get("properties", {}) + if not isinstance(properties, dict): + errors.append("Field 'properties' must be an object") + else: + errors.extend( + f"Missing required property: {req_field}" + for req_field in event_type.required_fields + if req_field not in properties + ) + + return errors + + +def _is_duplicate(event_id: str) -> bool: + return event_exists(event_id) or pending_event_exists(event_id) + + +@transaction.atomic +def _process_exposure_event( + event_data: dict[str, Any], + event_type_obj: EventType, +) -> None: + decision_id = event_data["decision_id"] + subject_id = event_data["subject_id"] + timestamp = parse_datetime(event_data["timestamp"]) or timezone.now() + + decision = decision_get(decision_id) + experiment_id = None + variant_id = None + if decision: + experiment_id = decision.experiment_id + variant_id = decision.variant_id + + with suppress(IntegrityError): + Exposure.objects.create( + decision_id=decision_id, + experiment_id=experiment_id, + variant_id=variant_id, + subject_id=subject_id, + timestamp=timestamp, + ) + + Event.objects.create( + event_id=event_data["event_id"], + event_type=event_type_obj, + decision_id=decision_id, + subject_id=subject_id, + timestamp=timestamp, + properties=event_data.get("properties", {}), + is_attributed=True, + ) + + _promote_pending_events(decision_id) + + +def _promote_pending_events(decision_id: str) -> None: + pending = pending_events_for_decision(decision_id) + for pe in pending: + with suppress(IntegrityError): + Event.objects.create( + event_id=pe.event_id, + event_type=pe.event_type, + decision_id=pe.decision_id, + subject_id=pe.subject_id, + timestamp=pe.timestamp, + properties=pe.properties, + is_attributed=True, + ) + pending.delete() + + +@transaction.atomic +def _process_conversion_event( + event_data: dict[str, Any], + event_type_obj: EventType, +) -> None: + decision_id = event_data["decision_id"] + subject_id = event_data["subject_id"] + timestamp = parse_datetime(event_data["timestamp"]) or timezone.now() + properties = event_data.get("properties", {}) + + if not event_type_obj.requires_exposure or exposure_exists(decision_id): + Event.objects.create( + event_id=event_data["event_id"], + event_type=event_type_obj, + decision_id=decision_id, + subject_id=subject_id, + timestamp=timestamp, + properties=properties, + is_attributed=True, + ) + else: + PendingEvent.objects.create( + event_id=event_data["event_id"], + event_type=event_type_obj, + decision_id=decision_id, + subject_id=subject_id, + timestamp=timestamp, + properties=properties, + expires_at=timezone.now() + timedelta(days=PENDING_TTL_DAYS), + ) + + +class BatchResult: + __slots__ = ("accepted", "duplicates", "errors", "rejected") + + def __init__(self) -> None: + self.accepted: int = 0 + self.duplicates: int = 0 + self.rejected: int = 0 + self.errors: list[dict[str, Any]] = [] + + +def process_events_batch( + events: list[dict[str, Any]], +) -> BatchResult: + result = BatchResult() + + for idx, event_data in enumerate(events): + event_type_name = event_data.get("event_type") + + if not event_type_name or not isinstance(event_type_name, str): + result.rejected += 1 + result.errors.append( + { + "index": idx, + "event_id": event_data.get("event_id"), + "error": "Missing or invalid 'event_type' field", + } + ) + continue + + event_type_obj = event_type_get_by_name(event_type_name) + if not event_type_obj: + result.rejected += 1 + result.errors.append( + { + "index": idx, + "event_id": event_data.get("event_id"), + "error": f"Unknown event type: {event_type_name}", + } + ) + continue + + if not event_type_obj.is_active: + result.rejected += 1 + result.errors.append( + { + "index": idx, + "event_id": event_data.get("event_id"), + "error": f"Event type '{event_type_name}' is archived", + } + ) + continue + + validation_errors = _validate_event_payload(event_data, event_type_obj) + if validation_errors: + result.rejected += 1 + result.errors.append( + { + "index": idx, + "event_id": event_data.get("event_id"), + "error": "; ".join(validation_errors), + } + ) + continue + + event_id = event_data["event_id"] + if _is_duplicate(event_id): + result.duplicates += 1 + continue + + try: + if event_type_obj.is_exposure: + _process_exposure_event(event_data, event_type_obj) + else: + _process_conversion_event(event_data, event_type_obj) + result.accepted += 1 + except IntegrityError: + result.duplicates += 1 + except Exception as exc: + result.rejected += 1 + result.errors.append( + { + "index": idx, + "event_id": event_id, + "error": str(exc), + } + ) + + return result + + +def decision_create( + *, + decision_id: str, + flag_key: str, + subject_id: str, + experiment_id: str | None = None, + variant_id: str | None = None, + value: str, + reason: str, +) -> Decision: + return Decision.objects.create( + decision_id=decision_id, + flag_key=flag_key, + subject_id=subject_id, + experiment_id=experiment_id, + variant_id=variant_id, + value=str(value), + reason=reason, + ) + + +def cleanup_expired_pending_events() -> int: + deleted_count, _ = PendingEvent.objects.filter( + expires_at__lte=timezone.now(), + ).delete() + return deleted_count diff --git a/src/backend/apps/events/tasks.py b/src/backend/apps/events/tasks.py new file mode 100644 index 0000000..1320410 --- /dev/null +++ b/src/backend/apps/events/tasks.py @@ -0,0 +1,16 @@ +import logging + +from apps.events.services import cleanup_expired_pending_events +from config.celery import app + +logger = logging.getLogger("lotty") + + +@app.task(bind=True, name="events.cleanup_expired_pending") +def cleanup_expired_pending_events_task(self): + deleted = cleanup_expired_pending_events() + logger.info( + "pending_events_cleanup_completed", + extra={"deleted": deleted}, + ) + return deleted diff --git a/src/backend/apps/events/tests/__init__.py b/src/backend/apps/events/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/events/tests/helpers.py b/src/backend/apps/events/tests/helpers.py new file mode 100644 index 0000000..edf10ac --- /dev/null +++ b/src/backend/apps/events/tests/helpers.py @@ -0,0 +1,29 @@ +from apps.events.models import EventType +from apps.events.services import event_type_create + + +def make_event_type( + name: str = "test_event", + display_name: str = "Test Event", + is_exposure: bool = False, # noqa: FBT001, FBT002 + requires_exposure: bool = False, # noqa: FBT001, FBT002 + required_fields: list[str] | None = None, + **kwargs, +) -> EventType: + return event_type_create( + name=name, + display_name=display_name, + is_exposure=is_exposure, + requires_exposure=requires_exposure, + required_fields=required_fields or [], + **kwargs, + ) + + +def make_exposure_type(name: str = "exposure") -> EventType: + return make_event_type( + name=name, + display_name="Exposure", + is_exposure=True, + requires_exposure=False, + ) diff --git a/src/backend/apps/events/tests/test_models.py b/src/backend/apps/events/tests/test_models.py new file mode 100644 index 0000000..15bc219 --- /dev/null +++ b/src/backend/apps/events/tests/test_models.py @@ -0,0 +1,51 @@ +from django.core.exceptions import ValidationError +from django.test import TestCase + +from apps.events.services import event_type_update +from apps.events.tests.helpers import make_event_type + + +class EventTypeModelTest(TestCase): + def test_create_event_type(self) -> None: + et = make_event_type(name="page_view", display_name="Page View") + self.assertEqual(et.name, "page_view") + self.assertEqual(et.display_name, "Page View") + self.assertFalse(et.requires_exposure) + self.assertTrue(et.is_active) + + def test_create_event_type_with_required_fields(self) -> None: + et = make_event_type( + name="click", + display_name="Click", + required_fields=["screen", "element"], + ) + self.assertEqual(et.required_fields, ["screen", "element"]) + + def test_unique_name_constraint(self) -> None: + make_event_type(name="unique_evt") + with self.assertRaises((ValidationError, Exception)): + make_event_type(name="unique_evt") + + def test_invalid_name_rejected(self) -> None: + with self.assertRaises(ValidationError): + make_event_type(name="Invalid Name!") + + def test_update_event_type(self) -> None: + et = make_event_type(name="updatable") + updated = event_type_update( + event_type=et, + display_name="Updated Name", + requires_exposure=True, + ) + self.assertEqual(updated.display_name, "Updated Name") + self.assertTrue(updated.requires_exposure) + + def test_update_disallowed_field(self) -> None: + et = make_event_type(name="no_rename") + with self.assertRaises(ValidationError): + event_type_update(event_type=et, name="renamed") + + def test_archive_event_type(self) -> None: + et = make_event_type(name="archivable") + updated = event_type_update(event_type=et, is_active=False) + self.assertFalse(updated.is_active) diff --git a/src/backend/apps/events/tests/test_services.py b/src/backend/apps/events/tests/test_services.py new file mode 100644 index 0000000..7c92481 --- /dev/null +++ b/src/backend/apps/events/tests/test_services.py @@ -0,0 +1,368 @@ +import uuid + +from django.test import TestCase +from django.utils import timezone + +from apps.events.models import Event, Exposure, PendingEvent +from apps.events.services import ( + decision_create, + process_events_batch, +) +from apps.events.tests.helpers import make_event_type, make_exposure_type + + +class EventValidationTest(TestCase): + def setUp(self) -> None: + self.exposure_type = make_exposure_type() + self.click_type = make_event_type( + name="button_clicked", + display_name="Button Clicked", + requires_exposure=True, + required_fields=["screen"], + ) + + def test_reject_unknown_event_type(self) -> None: + result = process_events_batch( + [ + { + "event_id": "e1", + "event_type": "nonexistent_type", + "decision_id": "d1", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(result.rejected, 1) + self.assertEqual(result.accepted, 0) + + def test_reject_missing_required_field(self) -> None: + result = process_events_batch( + [ + { + "event_id": "e2", + "event_type": "button_clicked", + "decision_id": "d1", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(result.rejected, 1) + self.assertIn("screen", result.errors[0]["error"]) + + def test_reject_missing_decision_id(self) -> None: + result = process_events_batch( + [ + { + "event_id": "e3", + "event_type": "button_clicked", + "decision_id": "", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {"screen": "checkout"}, + } + ] + ) + self.assertEqual(result.rejected, 1) + + def test_reject_invalid_event_type_field(self) -> None: + result = process_events_batch( + [ + { + "event_id": "e4", + "event_type": 12345, + "decision_id": "d1", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + } + ] + ) + self.assertEqual(result.rejected, 1) + + def test_reject_archived_event_type(self) -> None: + archived = make_event_type( + name="archived_evt", + display_name="Archived", + ) + archived.is_active = False + archived.save() + + result = process_events_batch( + [ + { + "event_id": "e5", + "event_type": "archived_evt", + "decision_id": "d1", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(result.rejected, 1) + + +class EventDeduplicationTest(TestCase): + def setUp(self) -> None: + self.exposure_type = make_exposure_type() + + def test_duplicate_event_counted_once(self) -> None: + decision_create( + decision_id="dec1", + flag_key="flag", + subject_id="u1", + value="v", + reason="test", + ) + + event_data = { + "event_id": "dup_evt_1", + "event_type": self.exposure_type.name, + "decision_id": "dec1", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + + r1 = process_events_batch([event_data]) + self.assertEqual(r1.accepted, 1) + self.assertEqual(r1.duplicates, 0) + + r2 = process_events_batch([event_data]) + self.assertEqual(r2.accepted, 0) + self.assertEqual(r2.duplicates, 1) + + def test_duplicate_in_same_batch(self) -> None: + decision_create( + decision_id="dec2", + flag_key="flag", + subject_id="u1", + value="v", + reason="test", + ) + + event_data = { + "event_id": "batch_dup", + "event_type": self.exposure_type.name, + "decision_id": "dec2", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + + result = process_events_batch([event_data, event_data]) + self.assertEqual(result.accepted + result.duplicates, 2) + self.assertGreaterEqual(result.duplicates, 1) + + +class ExposureAttributionTest(TestCase): + def setUp(self) -> None: + self.exposure_type = make_exposure_type() + self.click_type = make_event_type( + name="button_clicked", + display_name="Button Clicked", + requires_exposure=True, + ) + self.exp_id = str(uuid.uuid4()) + self.var_id = str(uuid.uuid4()) + self.decision_id = "attr_dec_1" + + decision_create( + decision_id=self.decision_id, + flag_key="button_color", + subject_id="u42", + experiment_id=self.exp_id, + variant_id=self.var_id, + value="blue", + reason="experiment_assigned", + ) + + def test_exposure_creates_exposure_record(self) -> None: + result = process_events_batch( + [ + { + "event_id": "exp_evt_1", + "event_type": self.exposure_type.name, + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(result.accepted, 1) + self.assertTrue( + Exposure.objects.filter(decision_id=self.decision_id).exists() + ) + + def test_conversion_with_exposure_is_attributed(self) -> None: + process_events_batch( + [ + { + "event_id": "exp_evt_2", + "event_type": self.exposure_type.name, + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + result = process_events_batch( + [ + { + "event_id": "click_evt_1", + "event_type": "button_clicked", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(result.accepted, 1) + event = Event.objects.get(event_id="click_evt_1") + self.assertTrue(event.is_attributed) + + def test_conversion_without_exposure_goes_pending(self) -> None: + result = process_events_batch( + [ + { + "event_id": "click_no_exp", + "event_type": "button_clicked", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(result.accepted, 1) + self.assertFalse( + Event.objects.filter(event_id="click_no_exp").exists() + ) + self.assertTrue( + PendingEvent.objects.filter(event_id="click_no_exp").exists() + ) + + def test_late_exposure_promotes_pending_events(self) -> None: + process_events_batch( + [ + { + "event_id": "early_click", + "event_type": "button_clicked", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertTrue( + PendingEvent.objects.filter(event_id="early_click").exists() + ) + + process_events_batch( + [ + { + "event_id": "late_exposure", + "event_type": self.exposure_type.name, + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertTrue(Event.objects.filter(event_id="early_click").exists()) + self.assertFalse( + PendingEvent.objects.filter(event_id="early_click").exists() + ) + + def test_event_not_requiring_exposure_always_attributed(self) -> None: + make_event_type( + name="technical_event", + display_name="Technical", + requires_exposure=False, + ) + result = process_events_batch( + [ + { + "event_id": "tech_evt_1", + "event_type": "technical_event", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(result.accepted, 1) + event = Event.objects.get(event_id="tech_evt_1") + self.assertTrue(event.is_attributed) + + +class BatchResponseTest(TestCase): + def setUp(self) -> None: + self.exposure_type = make_exposure_type() + self.click_type = make_event_type( + name="button_clicked", + display_name="Button Clicked", + requires_exposure=True, + ) + decision_create( + decision_id="batch_dec", + flag_key="flag", + subject_id="u1", + value="v", + reason="test", + ) + + def test_mixed_batch_response(self) -> None: + process_events_batch( + [ + { + "event_id": "exp_for_batch", + "event_type": self.exposure_type.name, + "decision_id": "batch_dec", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + + result = process_events_batch( + [ + { + "event_id": "valid_1", + "event_type": "button_clicked", + "decision_id": "batch_dec", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + }, + { + "event_id": "invalid_1", + "event_type": "unknown_type", + "decision_id": "batch_dec", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + }, + { + "event_id": "exp_for_batch", + "event_type": self.exposure_type.name, + "decision_id": "batch_dec", + "subject_id": "u1", + "timestamp": timezone.now().isoformat(), + "properties": {}, + }, + ] + ) + self.assertEqual(result.accepted, 1) + self.assertEqual(result.rejected, 1) + self.assertEqual(result.duplicates, 1) + self.assertEqual(len(result.errors), 1)