from contextlib import suppress from datetime import timedelta from typing import Any from django.core.exceptions import ValidationError from django.db import IntegrityError, transaction from django.utils import timezone from django.utils.dateparse import parse_datetime from apps.events.models import ( Decision, Event, EventType, Exposure, PendingEvent, ) from apps.events.selectors import ( decision_get, event_exists, event_type_get_by_name, exposure_exists, pending_event_exists, pending_events_for_decision, ) from config.errors import ConflictError PENDING_TTL_DAYS = 7 def event_type_create( *, name: str, display_name: str, description: str = "", is_exposure: bool = False, requires_exposure: bool = False, required_fields: list[str] | None = None, ) -> EventType: event_type = EventType( name=name, display_name=display_name, description=description, is_exposure=is_exposure, requires_exposure=requires_exposure, required_fields=required_fields or [], ) event_type.save() return event_type def event_type_update( *, event_type: EventType, **fields: Any, ) -> EventType: allowed = { "display_name", "description", "is_exposure", "requires_exposure", "required_fields", "is_active", } for key in fields: if key not in allowed: raise ValidationError({key: f"Field '{key}' cannot be updated."}) for key, value in fields.items(): if value is not None: setattr(event_type, key, value) event_type.save() return event_type def _validate_event_payload( event_data: dict[str, Any], event_type: EventType, ) -> list[str]: errors = [] required_base = [ "event_id", "event_type", "decision_id", "subject_id", "timestamp", ] errors.extend( f"Missing required field: {field}" for field in required_base if not event_data.get(field) ) if not isinstance(event_data.get("event_id"), str): errors.append("Field 'event_id' must be a string") if not isinstance(event_data.get("decision_id"), str): errors.append("Field 'decision_id' must be a string") if not isinstance(event_data.get("subject_id"), str): errors.append("Field 'subject_id' must be a string") if not isinstance(event_data.get("timestamp"), str): errors.append("Field 'timestamp' must be an ISO 8601 string") properties = event_data.get("properties", {}) if not isinstance(properties, dict): errors.append("Field 'properties' must be an object") else: errors.extend( f"Missing required property: {req_field}" for req_field in event_type.required_fields if req_field not in properties ) return errors def _is_duplicate(event_id: str) -> bool: return event_exists(event_id) or pending_event_exists(event_id) @transaction.atomic def _process_exposure_event( event_data: dict[str, Any], event_type_obj: EventType, ) -> None: decision_id = event_data["decision_id"] subject_id = event_data["subject_id"] timestamp = parse_datetime(event_data["timestamp"]) if timestamp is None: raise ValidationError( "Field 'timestamp' must be a valid ISO 8601 datetime." ) decision = decision_get(decision_id) has_decision = decision is not None if has_decision: with suppress(ConflictError): Exposure.objects.create( decision_id=decision_id, experiment_id=decision.experiment_id, variant_id=decision.variant_id, subject_id=subject_id, timestamp=timestamp, ) Event.objects.create( event_id=event_data["event_id"], event_type=event_type_obj, decision_id=decision_id, subject_id=subject_id, timestamp=timestamp, properties=event_data.get("properties", {}), is_attributed=has_decision, ) if has_decision: _promote_pending_events(decision_id) def _promote_pending_events(decision_id: str) -> None: pending = pending_events_for_decision(decision_id) for pe in pending: with suppress(ConflictError): Event.objects.create( event_id=pe.event_id, event_type=pe.event_type, decision_id=pe.decision_id, subject_id=pe.subject_id, timestamp=pe.timestamp, properties=pe.properties, is_attributed=True, ) pending.delete() @transaction.atomic def _process_conversion_event( event_data: dict[str, Any], event_type_obj: EventType, ) -> None: decision_id = event_data["decision_id"] subject_id = event_data["subject_id"] timestamp = parse_datetime(event_data["timestamp"]) if timestamp is None: raise ValidationError( "Field 'timestamp' must be a valid ISO 8601 datetime." ) properties = event_data.get("properties", {}) if not event_type_obj.requires_exposure or exposure_exists(decision_id): Event.objects.create( event_id=event_data["event_id"], event_type=event_type_obj, decision_id=decision_id, subject_id=subject_id, timestamp=timestamp, properties=properties, is_attributed=True, ) else: PendingEvent.objects.create( event_id=event_data["event_id"], event_type=event_type_obj, decision_id=decision_id, subject_id=subject_id, timestamp=timestamp, properties=properties, expires_at=timezone.now() + timedelta(days=PENDING_TTL_DAYS), ) class BatchResult: __slots__ = ("accepted", "duplicates", "errors", "rejected") def __init__(self) -> None: self.accepted: int = 0 self.duplicates: int = 0 self.rejected: int = 0 self.errors: list[dict[str, Any]] = [] def process_events_batch( events: list[dict[str, Any]], ) -> BatchResult: result = BatchResult() for idx, event_data in enumerate(events): event_type_name = event_data.get("event_type") if not event_type_name or not isinstance(event_type_name, str): result.rejected += 1 result.errors.append( { "index": idx, "event_id": event_data.get("event_id"), "error": "Missing or invalid 'event_type' field", } ) continue event_type_obj = event_type_get_by_name(event_type_name) if not event_type_obj: result.rejected += 1 result.errors.append( { "index": idx, "event_id": event_data.get("event_id"), "error": f"Unknown event type: {event_type_name}", } ) continue if not event_type_obj.is_active: result.rejected += 1 result.errors.append( { "index": idx, "event_id": event_data.get("event_id"), "error": f"Event type '{event_type_name}' is archived", } ) continue validation_errors = _validate_event_payload(event_data, event_type_obj) if validation_errors: result.rejected += 1 result.errors.append( { "index": idx, "event_id": event_data.get("event_id"), "error": "; ".join(validation_errors), } ) continue event_id = event_data["event_id"] if _is_duplicate(event_id): result.duplicates += 1 continue try: if event_type_obj.is_exposure: _process_exposure_event(event_data, event_type_obj) else: _process_conversion_event(event_data, event_type_obj) result.accepted += 1 except IntegrityError: result.duplicates += 1 except Exception as exc: result.rejected += 1 result.errors.append( { "index": idx, "event_id": event_id, "error": str(exc), } ) return result def decision_create( *, decision_id: str, flag_key: str, subject_id: str, experiment_id: str | None = None, variant_id: str | None = None, value: str, reason: str, ) -> Decision: return Decision.objects.create( decision_id=decision_id, flag_key=flag_key, subject_id=subject_id, experiment_id=experiment_id, variant_id=variant_id, value=str(value), reason=reason, ) def cleanup_expired_pending_events() -> int: deleted_count, _ = PendingEvent.objects.filter( expires_at__lte=timezone.now(), ).delete() return deleted_count