feat(events): added events API

This commit is contained in:
ITQ
2026-02-18 15:22:03 +03:00
parent 66bff9e5d4
commit 8994f8fb96
6 changed files with 485 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class EventsApiConfig(AppConfig):
name = "api.v1.events"
label = "api_v1_events"
+128
View File
@@ -0,0 +1,128 @@
from http import HTTPStatus
from uuid import UUID
from django.http import Http404, HttpRequest
from ninja import Router
from prometheus_client import Counter
from api.v1.events.schemas import (
EventErrorOut,
EventTypeCreateIn,
EventTypeOut,
EventTypeUpdateIn,
EventsBatchIn,
EventsBatchOut,
)
from apps.events.selectors import event_type_get, event_type_list
from apps.events.services import (
event_type_create,
event_type_update,
process_events_batch,
)
from api.v1.auth.endpoints import jwt_bearer
from api.v1.experiments.endpoints import require_admin_or_experimenter
EVENTS_INGESTED = Counter(
"lotty_events_ingested_total",
"Total number of events ingested",
["status"],
)
router = Router(tags=["events"], auth=jwt_bearer)
@router.post(
"/event-types",
response={HTTPStatus.CREATED: EventTypeOut},
summary="Create an event type",
)
@require_admin_or_experimenter
def create_event_type(
request: HttpRequest,
payload: EventTypeCreateIn,
) -> tuple[HTTPStatus, EventTypeOut]:
et = event_type_create(
name=payload.name,
display_name=payload.display_name,
description=payload.description,
requires_exposure=payload.requires_exposure,
required_fields=payload.required_fields,
)
return HTTPStatus.CREATED, EventTypeOut.model_validate(et)
@router.get(
"/event-types",
response={HTTPStatus.OK: list[EventTypeOut]},
summary="List event types",
)
def list_event_types(
request: HttpRequest,
is_active: bool | None = None, # noqa: FBT001
) -> tuple[int, list[EventTypeOut]]:
qs = event_type_list(is_active=is_active)
return HTTPStatus.OK, [
EventTypeOut.model_validate(metric) for metric in qs
]
@router.get(
"/event-types/{event_type_id}",
response={HTTPStatus.OK: EventTypeOut},
summary="Get event type details",
)
def get_event_type(
request: HttpRequest,
event_type_id: UUID,
) -> tuple[int, EventTypeOut]:
et = event_type_get(event_type_id)
if not et:
raise Http404
return HTTPStatus.OK, EventTypeOut.model_validate(et)
@router.patch(
"/event-types/{event_type_id}",
response={HTTPStatus.OK: EventTypeOut},
summary="Update an event type",
)
@require_admin_or_experimenter
def update_event_type(
request: HttpRequest,
event_type_id: UUID,
payload: EventTypeUpdateIn,
) -> tuple[int, EventTypeOut]:
et = event_type_get(event_type_id)
if not et:
raise Http404
fields = payload.dict(exclude_unset=True)
updated = event_type_update(event_type=et, **fields)
return HTTPStatus.OK, EventTypeOut.model_validate(updated)
@router.post(
"",
response={HTTPStatus.OK: EventsBatchOut},
auth=None,
summary="Ingest a batch of events",
description=(
"Accepts a batch of events, validates each event, "
"deduplicates, and processes attribution. "
"Returns counts of accepted, duplicate, and rejected events."
),
)
def ingest_events(
request: HttpRequest,
payload: EventsBatchIn,
) -> tuple[int, EventsBatchOut]:
events_data = [e.dict() for e in payload.events]
batch = process_events_batch(events_data)
EVENTS_INGESTED.labels(status="accepted").inc(batch.accepted)
EVENTS_INGESTED.labels(status="duplicate").inc(batch.duplicates)
EVENTS_INGESTED.labels(status="rejected").inc(batch.rejected)
return HTTPStatus.OK, EventsBatchOut(
accepted=batch.accepted,
duplicates=batch.duplicates,
rejected=batch.rejected,
errors=[EventErrorOut.model_validate(e) for e in batch.errors],
)
+82
View File
@@ -0,0 +1,82 @@
from typing import Any, ClassVar
from ninja import Field, ModelSchema, Schema
from apps.events.models import EventType
class EventTypeCreateIn(ModelSchema):
description: str = ""
class Meta:
model = EventType
fields: ClassVar[tuple[str, ...]] = (
EventType.name.field.name,
EventType.display_name.field.name,
EventType.description.field.name,
EventType.requires_exposure.field.name,
EventType.required_fields.field.name,
)
class EventTypeUpdateIn(ModelSchema):
display_name: str | None = None
description: str | None = None
requires_exposure: bool | None = None
required_fields: list[str] | None = None
is_active: bool | None = None
class Meta:
model = EventType
fields: ClassVar[tuple[str, ...]] = (
EventType.display_name.field.name,
EventType.description.field.name,
EventType.requires_exposure.field.name,
EventType.required_fields.field.name,
EventType.is_active.field.name,
)
class EventTypeOut(ModelSchema):
class Meta:
model = EventType
fields: ClassVar[tuple[str, ...]] = (
EventType.id.field.name,
EventType.name.field.name,
EventType.display_name.field.name,
EventType.description.field.name,
EventType.requires_exposure.field.name,
EventType.required_fields.field.name,
EventType.is_active.field.name,
EventType.created_at.field.name,
EventType.updated_at.field.name,
)
class SingleEventIn(Schema):
event_id: str = Field(min_length=1, max_length=200)
event_type: str = Field(min_length=1, max_length=100)
decision_id: str = Field(min_length=1, max_length=100)
subject_id: str = Field(min_length=1, max_length=200)
timestamp: str = Field(
min_length=1,
description="ISO 8601 timestamp",
)
properties: dict[str, Any] = Field(default_factory=dict)
class EventsBatchIn(Schema):
events: list[SingleEventIn] = Field(min_length=1)
class EventErrorOut(Schema):
index: int
event_id: str | None = None
error: str
class EventsBatchOut(Schema):
accepted: int
duplicates: int
rejected: int
errors: list[EventErrorOut]
@@ -0,0 +1,269 @@
import json
import uuid
from django.test import Client, TestCase
from django.urls import reverse
from django.utils import timezone
from apps.events.models import Event, PendingEvent
from apps.events.services import decision_create
from apps.events.tests.helpers import make_event_type, make_exposure_type
from apps.reviews.tests.helpers import (
make_admin,
)
from apps.users.tests.helpers import auth_header
class EventTypeAPITest(TestCase):
def setUp(self) -> None:
self.client = Client()
self.admin = make_admin(suffix="_events")
self.admin_auth = auth_header(self.admin)
self.client.defaults["HTTP_AUTHORIZATION"] = self.admin_auth
def test_create_event_type(self) -> None:
resp = self.client.post(
reverse("api-1:create_event_type"),
data=json.dumps(
{
"name": "page_view",
"display_name": "Page View",
"requires_exposure": False,
}
),
content_type="application/json",
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["name"], "page_view")
self.assertEqual(data["display_name"], "Page View")
self.assertFalse(data["requires_exposure"])
def test_create_event_type_with_required_fields(self) -> None:
resp = self.client.post(
reverse("api-1:create_event_type"),
data=json.dumps(
{
"name": "click",
"display_name": "Click",
"required_fields": ["screen", "element"],
"requires_exposure": True,
}
),
content_type="application/json",
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["required_fields"], ["screen", "element"])
self.assertTrue(data["requires_exposure"])
def test_list_event_types(self) -> None:
make_event_type(name="evt_a", display_name="A")
make_event_type(name="evt_b", display_name="B")
resp = self.client.get(reverse("api-1:list_event_types"))
self.assertEqual(resp.status_code, 200)
self.assertGreaterEqual(len(resp.json()), 2)
def test_list_active_only(self) -> None:
make_event_type(name="active_evt", display_name="Active")
et = make_event_type(name="inactive_evt", display_name="Inactive")
et.is_active = False
et.save()
resp = self.client.get(
reverse("api-1:list_event_types"),
{"is_active": "true"},
)
self.assertEqual(resp.status_code, 200)
names = [e["name"] for e in resp.json()]
self.assertIn("active_evt", names)
self.assertNotIn("inactive_evt", names)
def test_get_event_type(self) -> None:
et = make_event_type(name="detail_evt", display_name="Detail")
resp = self.client.get(
reverse("api-1:get_event_type", args=[et.pk]),
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["name"], "detail_evt")
def test_get_nonexistent_returns_404(self) -> None:
resp = self.client.get(
reverse("api-1:get_event_type", args=[uuid.uuid4()]),
)
self.assertEqual(resp.status_code, 404)
def test_update_event_type(self) -> None:
et = make_event_type(name="update_evt", display_name="Old")
resp = self.client.patch(
reverse("api-1:update_event_type", args=[et.pk]),
data=json.dumps({"display_name": "New"}),
content_type="application/json",
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["display_name"], "New")
class EventsIngestAPITest(TestCase):
def setUp(self) -> None:
self.client = Client()
self.exposure_type = make_exposure_type()
self.click_type = make_event_type(
name="button_clicked",
display_name="Button Clicked",
requires_exposure=True,
)
self.decision_id = "api_dec_1"
self.exp_id = str(uuid.uuid4())
self.var_id = str(uuid.uuid4())
decision_create(
decision_id=self.decision_id,
flag_key="test_flag",
subject_id="u42",
experiment_id=self.exp_id,
variant_id=self.var_id,
value="blue",
reason="experiment_assigned",
)
def _ingest(self, events):
return self.client.post(
reverse("api-1:ingest_events"),
data=json.dumps({"events": events}),
content_type="application/json",
)
def test_accept_valid_exposure(self) -> None:
resp = self._ingest(
[
{
"event_id": "api_exp_1",
"event_type": "exposure",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
}
]
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["accepted"], 1)
self.assertEqual(data["duplicates"], 0)
self.assertEqual(data["rejected"], 0)
def test_reject_unknown_event_type(self) -> None:
resp = self._ingest(
[
{
"event_id": "api_bad_1",
"event_type": "nonexistent",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
}
]
)
data = resp.json()
self.assertEqual(data["accepted"], 0)
self.assertEqual(data["rejected"], 1)
self.assertTrue(len(data["errors"]) > 0)
def test_deduplicate(self) -> None:
event = {
"event_id": "api_dup_1",
"event_type": "exposure",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
}
self._ingest([event])
resp = self._ingest([event])
data = resp.json()
self.assertEqual(data["duplicates"], 1)
self.assertEqual(data["accepted"], 0)
def test_attribution_flow(self) -> None:
self._ingest(
[
{
"event_id": "api_click_before",
"event_type": "button_clicked",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
}
]
)
self.assertTrue(
PendingEvent.objects.filter(event_id="api_click_before").exists()
)
self._ingest(
[
{
"event_id": "api_exposure_late",
"event_type": "exposure",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
}
]
)
self.assertTrue(
Event.objects.filter(event_id="api_click_before").exists()
)
self.assertFalse(
PendingEvent.objects.filter(event_id="api_click_before").exists()
)
def test_mixed_batch(self) -> None:
self._ingest(
[
{
"event_id": "pre_exp",
"event_type": "exposure",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
}
]
)
resp = self._ingest(
[
{
"event_id": "good_click",
"event_type": "button_clicked",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
},
{
"event_id": "bad_type",
"event_type": "unknown",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
},
{
"event_id": "pre_exp",
"event_type": "exposure",
"decision_id": self.decision_id,
"subject_id": "u42",
"timestamp": timezone.now().isoformat(),
"properties": {},
},
]
)
data = resp.json()
self.assertEqual(data["accepted"], 1)
self.assertEqual(data["rejected"], 1)
self.assertEqual(data["duplicates"], 1)