328 lines
9.1 KiB
Python
328 lines
9.1 KiB
Python
from contextlib import suppress
|
|
from datetime import timedelta
|
|
from typing import Any
|
|
|
|
from django.core.exceptions import ValidationError
|
|
from django.db import IntegrityError, transaction
|
|
from django.utils import timezone
|
|
from django.utils.dateparse import parse_datetime
|
|
|
|
from apps.events.models import (
|
|
Decision,
|
|
Event,
|
|
EventType,
|
|
Exposure,
|
|
PendingEvent,
|
|
)
|
|
from apps.events.selectors import (
|
|
decision_get,
|
|
event_exists,
|
|
event_type_get_by_name,
|
|
exposure_exists,
|
|
pending_event_exists,
|
|
pending_events_for_decision,
|
|
)
|
|
from config.errors import ConflictError
|
|
|
|
PENDING_TTL_DAYS = 7
|
|
|
|
|
|
def event_type_create(
|
|
*,
|
|
name: str,
|
|
display_name: str,
|
|
description: str = "",
|
|
is_exposure: bool = False,
|
|
requires_exposure: bool = False,
|
|
required_fields: list[str] | None = None,
|
|
) -> EventType:
|
|
event_type = EventType(
|
|
name=name,
|
|
display_name=display_name,
|
|
description=description,
|
|
is_exposure=is_exposure,
|
|
requires_exposure=requires_exposure,
|
|
required_fields=required_fields or [],
|
|
)
|
|
event_type.save()
|
|
return event_type
|
|
|
|
|
|
def event_type_update(
|
|
*,
|
|
event_type: EventType,
|
|
**fields: Any,
|
|
) -> EventType:
|
|
allowed = {
|
|
"display_name",
|
|
"description",
|
|
"is_exposure",
|
|
"requires_exposure",
|
|
"required_fields",
|
|
"is_active",
|
|
}
|
|
for key in fields:
|
|
if key not in allowed:
|
|
raise ValidationError({key: f"Field '{key}' cannot be updated."})
|
|
for key, value in fields.items():
|
|
if value is not None:
|
|
setattr(event_type, key, value)
|
|
event_type.save()
|
|
return event_type
|
|
|
|
|
|
def _validate_event_payload(
|
|
event_data: dict[str, Any],
|
|
event_type: EventType,
|
|
) -> list[str]:
|
|
errors = []
|
|
required_base = [
|
|
"event_id",
|
|
"event_type",
|
|
"decision_id",
|
|
"subject_id",
|
|
"timestamp",
|
|
]
|
|
errors.extend(
|
|
f"Missing required field: {field}"
|
|
for field in required_base
|
|
if not event_data.get(field)
|
|
)
|
|
|
|
if not isinstance(event_data.get("event_id"), str):
|
|
errors.append("Field 'event_id' must be a string")
|
|
|
|
if not isinstance(event_data.get("decision_id"), str):
|
|
errors.append("Field 'decision_id' must be a string")
|
|
|
|
if not isinstance(event_data.get("subject_id"), str):
|
|
errors.append("Field 'subject_id' must be a string")
|
|
|
|
if not isinstance(event_data.get("timestamp"), str):
|
|
errors.append("Field 'timestamp' must be an ISO 8601 string")
|
|
|
|
properties = event_data.get("properties", {})
|
|
if not isinstance(properties, dict):
|
|
errors.append("Field 'properties' must be an object")
|
|
else:
|
|
errors.extend(
|
|
f"Missing required property: {req_field}"
|
|
for req_field in event_type.required_fields
|
|
if req_field not in properties
|
|
)
|
|
|
|
return errors
|
|
|
|
|
|
def _is_duplicate(event_id: str) -> bool:
|
|
return event_exists(event_id) or pending_event_exists(event_id)
|
|
|
|
|
|
@transaction.atomic
|
|
def _process_exposure_event(
|
|
event_data: dict[str, Any],
|
|
event_type_obj: EventType,
|
|
) -> None:
|
|
decision_id = event_data["decision_id"]
|
|
subject_id = event_data["subject_id"]
|
|
timestamp = parse_datetime(event_data["timestamp"])
|
|
if timestamp is None:
|
|
raise ValidationError(
|
|
"Field 'timestamp' must be a valid ISO 8601 datetime."
|
|
)
|
|
|
|
decision = decision_get(decision_id)
|
|
has_decision = decision is not None
|
|
if has_decision:
|
|
with suppress(ConflictError):
|
|
Exposure.objects.create(
|
|
decision_id=decision_id,
|
|
experiment_id=decision.experiment_id,
|
|
variant_id=decision.variant_id,
|
|
subject_id=subject_id,
|
|
timestamp=timestamp,
|
|
)
|
|
|
|
Event.objects.create(
|
|
event_id=event_data["event_id"],
|
|
event_type=event_type_obj,
|
|
decision_id=decision_id,
|
|
subject_id=subject_id,
|
|
timestamp=timestamp,
|
|
properties=event_data.get("properties", {}),
|
|
is_attributed=has_decision,
|
|
)
|
|
|
|
if has_decision:
|
|
_promote_pending_events(decision_id)
|
|
|
|
|
|
def _promote_pending_events(decision_id: str) -> None:
|
|
pending = pending_events_for_decision(decision_id)
|
|
for pe in pending:
|
|
with suppress(ConflictError):
|
|
Event.objects.create(
|
|
event_id=pe.event_id,
|
|
event_type=pe.event_type,
|
|
decision_id=pe.decision_id,
|
|
subject_id=pe.subject_id,
|
|
timestamp=pe.timestamp,
|
|
properties=pe.properties,
|
|
is_attributed=True,
|
|
)
|
|
pending.delete()
|
|
|
|
|
|
@transaction.atomic
|
|
def _process_conversion_event(
|
|
event_data: dict[str, Any],
|
|
event_type_obj: EventType,
|
|
) -> None:
|
|
decision_id = event_data["decision_id"]
|
|
subject_id = event_data["subject_id"]
|
|
timestamp = parse_datetime(event_data["timestamp"])
|
|
if timestamp is None:
|
|
raise ValidationError(
|
|
"Field 'timestamp' must be a valid ISO 8601 datetime."
|
|
)
|
|
properties = event_data.get("properties", {})
|
|
|
|
if not event_type_obj.requires_exposure or exposure_exists(decision_id):
|
|
Event.objects.create(
|
|
event_id=event_data["event_id"],
|
|
event_type=event_type_obj,
|
|
decision_id=decision_id,
|
|
subject_id=subject_id,
|
|
timestamp=timestamp,
|
|
properties=properties,
|
|
is_attributed=True,
|
|
)
|
|
else:
|
|
PendingEvent.objects.create(
|
|
event_id=event_data["event_id"],
|
|
event_type=event_type_obj,
|
|
decision_id=decision_id,
|
|
subject_id=subject_id,
|
|
timestamp=timestamp,
|
|
properties=properties,
|
|
expires_at=timezone.now() + timedelta(days=PENDING_TTL_DAYS),
|
|
)
|
|
|
|
|
|
class BatchResult:
|
|
__slots__ = ("accepted", "duplicates", "errors", "rejected")
|
|
|
|
def __init__(self) -> None:
|
|
self.accepted: int = 0
|
|
self.duplicates: int = 0
|
|
self.rejected: int = 0
|
|
self.errors: list[dict[str, Any]] = []
|
|
|
|
|
|
def process_events_batch(
|
|
events: list[dict[str, Any]],
|
|
) -> BatchResult:
|
|
result = BatchResult()
|
|
|
|
for idx, event_data in enumerate(events):
|
|
event_type_name = event_data.get("event_type")
|
|
|
|
if not event_type_name or not isinstance(event_type_name, str):
|
|
result.rejected += 1
|
|
result.errors.append(
|
|
{
|
|
"index": idx,
|
|
"event_id": event_data.get("event_id"),
|
|
"error": "Missing or invalid 'event_type' field",
|
|
}
|
|
)
|
|
continue
|
|
|
|
event_type_obj = event_type_get_by_name(event_type_name)
|
|
if not event_type_obj:
|
|
result.rejected += 1
|
|
result.errors.append(
|
|
{
|
|
"index": idx,
|
|
"event_id": event_data.get("event_id"),
|
|
"error": f"Unknown event type: {event_type_name}",
|
|
}
|
|
)
|
|
continue
|
|
|
|
if not event_type_obj.is_active:
|
|
result.rejected += 1
|
|
result.errors.append(
|
|
{
|
|
"index": idx,
|
|
"event_id": event_data.get("event_id"),
|
|
"error": f"Event type '{event_type_name}' is archived",
|
|
}
|
|
)
|
|
continue
|
|
|
|
validation_errors = _validate_event_payload(event_data, event_type_obj)
|
|
if validation_errors:
|
|
result.rejected += 1
|
|
result.errors.append(
|
|
{
|
|
"index": idx,
|
|
"event_id": event_data.get("event_id"),
|
|
"error": "; ".join(validation_errors),
|
|
}
|
|
)
|
|
continue
|
|
|
|
event_id = event_data["event_id"]
|
|
if _is_duplicate(event_id):
|
|
result.duplicates += 1
|
|
continue
|
|
|
|
try:
|
|
if event_type_obj.is_exposure:
|
|
_process_exposure_event(event_data, event_type_obj)
|
|
else:
|
|
_process_conversion_event(event_data, event_type_obj)
|
|
result.accepted += 1
|
|
except IntegrityError:
|
|
result.duplicates += 1
|
|
except Exception as exc:
|
|
result.rejected += 1
|
|
result.errors.append(
|
|
{
|
|
"index": idx,
|
|
"event_id": event_id,
|
|
"error": str(exc),
|
|
}
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def decision_create(
|
|
*,
|
|
decision_id: str,
|
|
flag_key: str,
|
|
subject_id: str,
|
|
experiment_id: str | None = None,
|
|
variant_id: str | None = None,
|
|
value: str,
|
|
reason: str,
|
|
) -> Decision:
|
|
return Decision.objects.create(
|
|
decision_id=decision_id,
|
|
flag_key=flag_key,
|
|
subject_id=subject_id,
|
|
experiment_id=experiment_id,
|
|
variant_id=variant_id,
|
|
value=str(value),
|
|
reason=reason,
|
|
)
|
|
|
|
|
|
def cleanup_expired_pending_events() -> int:
|
|
deleted_count, _ = PendingEvent.objects.filter(
|
|
expires_at__lte=timezone.now(),
|
|
).delete()
|
|
return deleted_count
|