feat(events): added events business logic
This commit is contained in:
@@ -0,0 +1,321 @@
|
||||
from contextlib import suppress
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.db import IntegrityError, transaction
|
||||
from django.utils import timezone
|
||||
from django.utils.dateparse import parse_datetime
|
||||
|
||||
from apps.events.models import (
|
||||
Decision,
|
||||
Event,
|
||||
EventType,
|
||||
Exposure,
|
||||
PendingEvent,
|
||||
)
|
||||
from apps.events.selectors import (
|
||||
decision_get,
|
||||
event_exists,
|
||||
event_type_get_by_name,
|
||||
exposure_exists,
|
||||
pending_event_exists,
|
||||
pending_events_for_decision,
|
||||
)
|
||||
|
||||
PENDING_TTL_DAYS = 7
|
||||
|
||||
|
||||
def event_type_create(
|
||||
*,
|
||||
name: str,
|
||||
display_name: str,
|
||||
description: str = "",
|
||||
is_exposure: bool = False,
|
||||
requires_exposure: bool = False,
|
||||
required_fields: list[str] | None = None,
|
||||
) -> EventType:
|
||||
event_type = EventType(
|
||||
name=name,
|
||||
display_name=display_name,
|
||||
description=description,
|
||||
is_exposure=is_exposure,
|
||||
requires_exposure=requires_exposure,
|
||||
required_fields=required_fields or [],
|
||||
)
|
||||
event_type.save()
|
||||
return event_type
|
||||
|
||||
|
||||
def event_type_update(
|
||||
*,
|
||||
event_type: EventType,
|
||||
**fields: Any,
|
||||
) -> EventType:
|
||||
allowed = {
|
||||
"display_name",
|
||||
"description",
|
||||
"is_exposure",
|
||||
"requires_exposure",
|
||||
"required_fields",
|
||||
"is_active",
|
||||
}
|
||||
for key in fields:
|
||||
if key not in allowed:
|
||||
raise ValidationError({key: f"Field '{key}' cannot be updated."})
|
||||
for key, value in fields.items():
|
||||
if value is not None:
|
||||
setattr(event_type, key, value)
|
||||
event_type.save()
|
||||
return event_type
|
||||
|
||||
|
||||
def _validate_event_payload(
|
||||
event_data: dict[str, Any],
|
||||
event_type: EventType,
|
||||
) -> list[str]:
|
||||
errors = []
|
||||
required_base = [
|
||||
"event_id",
|
||||
"event_type",
|
||||
"decision_id",
|
||||
"subject_id",
|
||||
"timestamp",
|
||||
]
|
||||
errors.extend(
|
||||
f"Missing required field: {field}"
|
||||
for field in required_base
|
||||
if not event_data.get(field)
|
||||
)
|
||||
|
||||
if not isinstance(event_data.get("event_id"), str):
|
||||
errors.append("Field 'event_id' must be a string")
|
||||
|
||||
if not isinstance(event_data.get("decision_id"), str):
|
||||
errors.append("Field 'decision_id' must be a string")
|
||||
|
||||
if not isinstance(event_data.get("subject_id"), str):
|
||||
errors.append("Field 'subject_id' must be a string")
|
||||
|
||||
if not isinstance(event_data.get("timestamp"), str):
|
||||
errors.append("Field 'timestamp' must be an ISO 8601 string")
|
||||
|
||||
properties = event_data.get("properties", {})
|
||||
if not isinstance(properties, dict):
|
||||
errors.append("Field 'properties' must be an object")
|
||||
else:
|
||||
errors.extend(
|
||||
f"Missing required property: {req_field}"
|
||||
for req_field in event_type.required_fields
|
||||
if req_field not in properties
|
||||
)
|
||||
|
||||
return errors
|
||||
|
||||
|
||||
def _is_duplicate(event_id: str) -> bool:
|
||||
return event_exists(event_id) or pending_event_exists(event_id)
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def _process_exposure_event(
|
||||
event_data: dict[str, Any],
|
||||
event_type_obj: EventType,
|
||||
) -> None:
|
||||
decision_id = event_data["decision_id"]
|
||||
subject_id = event_data["subject_id"]
|
||||
timestamp = parse_datetime(event_data["timestamp"]) or timezone.now()
|
||||
|
||||
decision = decision_get(decision_id)
|
||||
experiment_id = None
|
||||
variant_id = None
|
||||
if decision:
|
||||
experiment_id = decision.experiment_id
|
||||
variant_id = decision.variant_id
|
||||
|
||||
with suppress(IntegrityError):
|
||||
Exposure.objects.create(
|
||||
decision_id=decision_id,
|
||||
experiment_id=experiment_id,
|
||||
variant_id=variant_id,
|
||||
subject_id=subject_id,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
|
||||
Event.objects.create(
|
||||
event_id=event_data["event_id"],
|
||||
event_type=event_type_obj,
|
||||
decision_id=decision_id,
|
||||
subject_id=subject_id,
|
||||
timestamp=timestamp,
|
||||
properties=event_data.get("properties", {}),
|
||||
is_attributed=True,
|
||||
)
|
||||
|
||||
_promote_pending_events(decision_id)
|
||||
|
||||
|
||||
def _promote_pending_events(decision_id: str) -> None:
|
||||
pending = pending_events_for_decision(decision_id)
|
||||
for pe in pending:
|
||||
with suppress(IntegrityError):
|
||||
Event.objects.create(
|
||||
event_id=pe.event_id,
|
||||
event_type=pe.event_type,
|
||||
decision_id=pe.decision_id,
|
||||
subject_id=pe.subject_id,
|
||||
timestamp=pe.timestamp,
|
||||
properties=pe.properties,
|
||||
is_attributed=True,
|
||||
)
|
||||
pending.delete()
|
||||
|
||||
|
||||
@transaction.atomic
|
||||
def _process_conversion_event(
|
||||
event_data: dict[str, Any],
|
||||
event_type_obj: EventType,
|
||||
) -> None:
|
||||
decision_id = event_data["decision_id"]
|
||||
subject_id = event_data["subject_id"]
|
||||
timestamp = parse_datetime(event_data["timestamp"]) or timezone.now()
|
||||
properties = event_data.get("properties", {})
|
||||
|
||||
if not event_type_obj.requires_exposure or exposure_exists(decision_id):
|
||||
Event.objects.create(
|
||||
event_id=event_data["event_id"],
|
||||
event_type=event_type_obj,
|
||||
decision_id=decision_id,
|
||||
subject_id=subject_id,
|
||||
timestamp=timestamp,
|
||||
properties=properties,
|
||||
is_attributed=True,
|
||||
)
|
||||
else:
|
||||
PendingEvent.objects.create(
|
||||
event_id=event_data["event_id"],
|
||||
event_type=event_type_obj,
|
||||
decision_id=decision_id,
|
||||
subject_id=subject_id,
|
||||
timestamp=timestamp,
|
||||
properties=properties,
|
||||
expires_at=timezone.now() + timedelta(days=PENDING_TTL_DAYS),
|
||||
)
|
||||
|
||||
|
||||
class BatchResult:
|
||||
__slots__ = ("accepted", "duplicates", "errors", "rejected")
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.accepted: int = 0
|
||||
self.duplicates: int = 0
|
||||
self.rejected: int = 0
|
||||
self.errors: list[dict[str, Any]] = []
|
||||
|
||||
|
||||
def process_events_batch(
|
||||
events: list[dict[str, Any]],
|
||||
) -> BatchResult:
|
||||
result = BatchResult()
|
||||
|
||||
for idx, event_data in enumerate(events):
|
||||
event_type_name = event_data.get("event_type")
|
||||
|
||||
if not event_type_name or not isinstance(event_type_name, str):
|
||||
result.rejected += 1
|
||||
result.errors.append(
|
||||
{
|
||||
"index": idx,
|
||||
"event_id": event_data.get("event_id"),
|
||||
"error": "Missing or invalid 'event_type' field",
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
event_type_obj = event_type_get_by_name(event_type_name)
|
||||
if not event_type_obj:
|
||||
result.rejected += 1
|
||||
result.errors.append(
|
||||
{
|
||||
"index": idx,
|
||||
"event_id": event_data.get("event_id"),
|
||||
"error": f"Unknown event type: {event_type_name}",
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
if not event_type_obj.is_active:
|
||||
result.rejected += 1
|
||||
result.errors.append(
|
||||
{
|
||||
"index": idx,
|
||||
"event_id": event_data.get("event_id"),
|
||||
"error": f"Event type '{event_type_name}' is archived",
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
validation_errors = _validate_event_payload(event_data, event_type_obj)
|
||||
if validation_errors:
|
||||
result.rejected += 1
|
||||
result.errors.append(
|
||||
{
|
||||
"index": idx,
|
||||
"event_id": event_data.get("event_id"),
|
||||
"error": "; ".join(validation_errors),
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
event_id = event_data["event_id"]
|
||||
if _is_duplicate(event_id):
|
||||
result.duplicates += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
if event_type_obj.is_exposure:
|
||||
_process_exposure_event(event_data, event_type_obj)
|
||||
else:
|
||||
_process_conversion_event(event_data, event_type_obj)
|
||||
result.accepted += 1
|
||||
except IntegrityError:
|
||||
result.duplicates += 1
|
||||
except Exception as exc:
|
||||
result.rejected += 1
|
||||
result.errors.append(
|
||||
{
|
||||
"index": idx,
|
||||
"event_id": event_id,
|
||||
"error": str(exc),
|
||||
}
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def decision_create(
|
||||
*,
|
||||
decision_id: str,
|
||||
flag_key: str,
|
||||
subject_id: str,
|
||||
experiment_id: str | None = None,
|
||||
variant_id: str | None = None,
|
||||
value: str,
|
||||
reason: str,
|
||||
) -> Decision:
|
||||
return Decision.objects.create(
|
||||
decision_id=decision_id,
|
||||
flag_key=flag_key,
|
||||
subject_id=subject_id,
|
||||
experiment_id=experiment_id,
|
||||
variant_id=variant_id,
|
||||
value=str(value),
|
||||
reason=reason,
|
||||
)
|
||||
|
||||
|
||||
def cleanup_expired_pending_events() -> int:
|
||||
deleted_count, _ = PendingEvent.objects.filter(
|
||||
expires_at__lte=timezone.now(),
|
||||
).delete()
|
||||
return deleted_count
|
||||
Reference in New Issue
Block a user