diff --git a/src/backend/api/v1/events/__init__.py b/src/backend/api/v1/events/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/api/v1/events/apps.py b/src/backend/api/v1/events/apps.py new file mode 100644 index 0000000..9b89f44 --- /dev/null +++ b/src/backend/api/v1/events/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class EventsApiConfig(AppConfig): + name = "api.v1.events" + label = "api_v1_events" diff --git a/src/backend/api/v1/events/endpoints.py b/src/backend/api/v1/events/endpoints.py new file mode 100644 index 0000000..5d2163f --- /dev/null +++ b/src/backend/api/v1/events/endpoints.py @@ -0,0 +1,128 @@ +from http import HTTPStatus +from uuid import UUID + +from django.http import Http404, HttpRequest +from ninja import Router +from prometheus_client import Counter + +from api.v1.events.schemas import ( + EventErrorOut, + EventTypeCreateIn, + EventTypeOut, + EventTypeUpdateIn, + EventsBatchIn, + EventsBatchOut, +) +from apps.events.selectors import event_type_get, event_type_list +from apps.events.services import ( + event_type_create, + event_type_update, + process_events_batch, +) +from api.v1.auth.endpoints import jwt_bearer +from api.v1.experiments.endpoints import require_admin_or_experimenter + +EVENTS_INGESTED = Counter( + "lotty_events_ingested_total", + "Total number of events ingested", + ["status"], +) + +router = Router(tags=["events"], auth=jwt_bearer) + + +@router.post( + "/event-types", + response={HTTPStatus.CREATED: EventTypeOut}, + summary="Create an event type", +) +@require_admin_or_experimenter +def create_event_type( + request: HttpRequest, + payload: EventTypeCreateIn, +) -> tuple[HTTPStatus, EventTypeOut]: + et = event_type_create( + name=payload.name, + display_name=payload.display_name, + description=payload.description, + requires_exposure=payload.requires_exposure, + required_fields=payload.required_fields, + ) + return HTTPStatus.CREATED, EventTypeOut.model_validate(et) + + +@router.get( + "/event-types", + response={HTTPStatus.OK: list[EventTypeOut]}, + summary="List event types", +) +def list_event_types( + request: HttpRequest, + is_active: bool | None = None, # noqa: FBT001 +) -> tuple[int, list[EventTypeOut]]: + qs = event_type_list(is_active=is_active) + return HTTPStatus.OK, [ + EventTypeOut.model_validate(metric) for metric in qs + ] + + +@router.get( + "/event-types/{event_type_id}", + response={HTTPStatus.OK: EventTypeOut}, + summary="Get event type details", +) +def get_event_type( + request: HttpRequest, + event_type_id: UUID, +) -> tuple[int, EventTypeOut]: + et = event_type_get(event_type_id) + if not et: + raise Http404 + return HTTPStatus.OK, EventTypeOut.model_validate(et) + + +@router.patch( + "/event-types/{event_type_id}", + response={HTTPStatus.OK: EventTypeOut}, + summary="Update an event type", +) +@require_admin_or_experimenter +def update_event_type( + request: HttpRequest, + event_type_id: UUID, + payload: EventTypeUpdateIn, +) -> tuple[int, EventTypeOut]: + et = event_type_get(event_type_id) + if not et: + raise Http404 + fields = payload.dict(exclude_unset=True) + updated = event_type_update(event_type=et, **fields) + return HTTPStatus.OK, EventTypeOut.model_validate(updated) + + +@router.post( + "", + response={HTTPStatus.OK: EventsBatchOut}, + auth=None, + summary="Ingest a batch of events", + description=( + "Accepts a batch of events, validates each event, " + "deduplicates, and processes attribution. " + "Returns counts of accepted, duplicate, and rejected events." + ), +) +def ingest_events( + request: HttpRequest, + payload: EventsBatchIn, +) -> tuple[int, EventsBatchOut]: + events_data = [e.dict() for e in payload.events] + batch = process_events_batch(events_data) + EVENTS_INGESTED.labels(status="accepted").inc(batch.accepted) + EVENTS_INGESTED.labels(status="duplicate").inc(batch.duplicates) + EVENTS_INGESTED.labels(status="rejected").inc(batch.rejected) + return HTTPStatus.OK, EventsBatchOut( + accepted=batch.accepted, + duplicates=batch.duplicates, + rejected=batch.rejected, + errors=[EventErrorOut.model_validate(e) for e in batch.errors], + ) diff --git a/src/backend/api/v1/events/schemas.py b/src/backend/api/v1/events/schemas.py new file mode 100644 index 0000000..8057699 --- /dev/null +++ b/src/backend/api/v1/events/schemas.py @@ -0,0 +1,82 @@ +from typing import Any, ClassVar + +from ninja import Field, ModelSchema, Schema + +from apps.events.models import EventType + + +class EventTypeCreateIn(ModelSchema): + description: str = "" + + class Meta: + model = EventType + fields: ClassVar[tuple[str, ...]] = ( + EventType.name.field.name, + EventType.display_name.field.name, + EventType.description.field.name, + EventType.requires_exposure.field.name, + EventType.required_fields.field.name, + ) + + +class EventTypeUpdateIn(ModelSchema): + display_name: str | None = None + description: str | None = None + requires_exposure: bool | None = None + required_fields: list[str] | None = None + is_active: bool | None = None + + class Meta: + model = EventType + fields: ClassVar[tuple[str, ...]] = ( + EventType.display_name.field.name, + EventType.description.field.name, + EventType.requires_exposure.field.name, + EventType.required_fields.field.name, + EventType.is_active.field.name, + ) + + +class EventTypeOut(ModelSchema): + class Meta: + model = EventType + fields: ClassVar[tuple[str, ...]] = ( + EventType.id.field.name, + EventType.name.field.name, + EventType.display_name.field.name, + EventType.description.field.name, + EventType.requires_exposure.field.name, + EventType.required_fields.field.name, + EventType.is_active.field.name, + EventType.created_at.field.name, + EventType.updated_at.field.name, + ) + + +class SingleEventIn(Schema): + event_id: str = Field(min_length=1, max_length=200) + event_type: str = Field(min_length=1, max_length=100) + decision_id: str = Field(min_length=1, max_length=100) + subject_id: str = Field(min_length=1, max_length=200) + timestamp: str = Field( + min_length=1, + description="ISO 8601 timestamp", + ) + properties: dict[str, Any] = Field(default_factory=dict) + + +class EventsBatchIn(Schema): + events: list[SingleEventIn] = Field(min_length=1) + + +class EventErrorOut(Schema): + index: int + event_id: str | None = None + error: str + + +class EventsBatchOut(Schema): + accepted: int + duplicates: int + rejected: int + errors: list[EventErrorOut] diff --git a/src/backend/api/v1/events/tests/__init__.py b/src/backend/api/v1/events/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/api/v1/events/tests/test_events_api.py b/src/backend/api/v1/events/tests/test_events_api.py new file mode 100644 index 0000000..9dce979 --- /dev/null +++ b/src/backend/api/v1/events/tests/test_events_api.py @@ -0,0 +1,269 @@ +import json +import uuid + +from django.test import Client, TestCase +from django.urls import reverse +from django.utils import timezone + +from apps.events.models import Event, PendingEvent +from apps.events.services import decision_create +from apps.events.tests.helpers import make_event_type, make_exposure_type +from apps.reviews.tests.helpers import ( + make_admin, +) +from apps.users.tests.helpers import auth_header + + +class EventTypeAPITest(TestCase): + def setUp(self) -> None: + self.client = Client() + self.admin = make_admin(suffix="_events") + self.admin_auth = auth_header(self.admin) + self.client.defaults["HTTP_AUTHORIZATION"] = self.admin_auth + + def test_create_event_type(self) -> None: + resp = self.client.post( + reverse("api-1:create_event_type"), + data=json.dumps( + { + "name": "page_view", + "display_name": "Page View", + "requires_exposure": False, + } + ), + content_type="application/json", + ) + self.assertEqual(resp.status_code, 201) + data = resp.json() + self.assertEqual(data["name"], "page_view") + self.assertEqual(data["display_name"], "Page View") + self.assertFalse(data["requires_exposure"]) + + def test_create_event_type_with_required_fields(self) -> None: + resp = self.client.post( + reverse("api-1:create_event_type"), + data=json.dumps( + { + "name": "click", + "display_name": "Click", + "required_fields": ["screen", "element"], + "requires_exposure": True, + } + ), + content_type="application/json", + ) + self.assertEqual(resp.status_code, 201) + data = resp.json() + self.assertEqual(data["required_fields"], ["screen", "element"]) + self.assertTrue(data["requires_exposure"]) + + def test_list_event_types(self) -> None: + make_event_type(name="evt_a", display_name="A") + make_event_type(name="evt_b", display_name="B") + + resp = self.client.get(reverse("api-1:list_event_types")) + self.assertEqual(resp.status_code, 200) + self.assertGreaterEqual(len(resp.json()), 2) + + def test_list_active_only(self) -> None: + make_event_type(name="active_evt", display_name="Active") + et = make_event_type(name="inactive_evt", display_name="Inactive") + et.is_active = False + et.save() + + resp = self.client.get( + reverse("api-1:list_event_types"), + {"is_active": "true"}, + ) + self.assertEqual(resp.status_code, 200) + names = [e["name"] for e in resp.json()] + self.assertIn("active_evt", names) + self.assertNotIn("inactive_evt", names) + + def test_get_event_type(self) -> None: + et = make_event_type(name="detail_evt", display_name="Detail") + resp = self.client.get( + reverse("api-1:get_event_type", args=[et.pk]), + ) + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.json()["name"], "detail_evt") + + def test_get_nonexistent_returns_404(self) -> None: + resp = self.client.get( + reverse("api-1:get_event_type", args=[uuid.uuid4()]), + ) + self.assertEqual(resp.status_code, 404) + + def test_update_event_type(self) -> None: + et = make_event_type(name="update_evt", display_name="Old") + resp = self.client.patch( + reverse("api-1:update_event_type", args=[et.pk]), + data=json.dumps({"display_name": "New"}), + content_type="application/json", + ) + self.assertEqual(resp.status_code, 200) + self.assertEqual(resp.json()["display_name"], "New") + + +class EventsIngestAPITest(TestCase): + def setUp(self) -> None: + self.client = Client() + self.exposure_type = make_exposure_type() + self.click_type = make_event_type( + name="button_clicked", + display_name="Button Clicked", + requires_exposure=True, + ) + self.decision_id = "api_dec_1" + self.exp_id = str(uuid.uuid4()) + self.var_id = str(uuid.uuid4()) + decision_create( + decision_id=self.decision_id, + flag_key="test_flag", + subject_id="u42", + experiment_id=self.exp_id, + variant_id=self.var_id, + value="blue", + reason="experiment_assigned", + ) + + def _ingest(self, events): + return self.client.post( + reverse("api-1:ingest_events"), + data=json.dumps({"events": events}), + content_type="application/json", + ) + + def test_accept_valid_exposure(self) -> None: + resp = self._ingest( + [ + { + "event_id": "api_exp_1", + "event_type": "exposure", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["accepted"], 1) + self.assertEqual(data["duplicates"], 0) + self.assertEqual(data["rejected"], 0) + + def test_reject_unknown_event_type(self) -> None: + resp = self._ingest( + [ + { + "event_id": "api_bad_1", + "event_type": "nonexistent", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + data = resp.json() + self.assertEqual(data["accepted"], 0) + self.assertEqual(data["rejected"], 1) + self.assertTrue(len(data["errors"]) > 0) + + def test_deduplicate(self) -> None: + event = { + "event_id": "api_dup_1", + "event_type": "exposure", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + self._ingest([event]) + resp = self._ingest([event]) + data = resp.json() + self.assertEqual(data["duplicates"], 1) + self.assertEqual(data["accepted"], 0) + + def test_attribution_flow(self) -> None: + self._ingest( + [ + { + "event_id": "api_click_before", + "event_type": "button_clicked", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertTrue( + PendingEvent.objects.filter(event_id="api_click_before").exists() + ) + + self._ingest( + [ + { + "event_id": "api_exposure_late", + "event_type": "exposure", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + self.assertTrue( + Event.objects.filter(event_id="api_click_before").exists() + ) + self.assertFalse( + PendingEvent.objects.filter(event_id="api_click_before").exists() + ) + + def test_mixed_batch(self) -> None: + self._ingest( + [ + { + "event_id": "pre_exp", + "event_type": "exposure", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + } + ] + ) + + resp = self._ingest( + [ + { + "event_id": "good_click", + "event_type": "button_clicked", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + }, + { + "event_id": "bad_type", + "event_type": "unknown", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + }, + { + "event_id": "pre_exp", + "event_type": "exposure", + "decision_id": self.decision_id, + "subject_id": "u42", + "timestamp": timezone.now().isoformat(), + "properties": {}, + }, + ] + ) + data = resp.json() + self.assertEqual(data["accepted"], 1) + self.assertEqual(data["rejected"], 1) + self.assertEqual(data["duplicates"], 1)