From cda60bb05758203816676da54c5dd4ddb37799a8 Mon Sep 17 00:00:00 2001 From: ITQ Date: Tue, 24 Feb 2026 17:55:14 +0300 Subject: [PATCH] chore(): added async decision persistence --- src/backend/.env.template | 2 + .../tests/test_notifications_api.py | 23 + src/backend/apps/decision/services.py | 393 +++++++++++++----- .../apps/decision/tests/test_decide.py | 152 ++++++- src/backend/apps/events/tasks.py | 38 +- .../apps/guardrails/tests/test_guardrails.py | 4 +- .../apps/metrics/tests/test_metrics.py | 1 - src/backend/config/settings/base.py | 10 + 8 files changed, 515 insertions(+), 108 deletions(-) diff --git a/src/backend/.env.template b/src/backend/.env.template index ba1f251..38a5d58 100644 --- a/src/backend/.env.template +++ b/src/backend/.env.template @@ -12,6 +12,8 @@ DJANGO_STATIC_URL=static/ REDIS_URI= DJANGO_DB_URI=sqlite:///db.sqlite3 DJANGO_CONN_MAX_AGE=300 +DECISION_RESULT_CACHE_TTL_SECONDS=60 +DECISION_WRITE_MODE=sync DJANGO_SILKY_ENABLED=False DJANGO_SILKY_PYTHON_PROFILER=False diff --git a/src/backend/api/v1/notifications/tests/test_notifications_api.py b/src/backend/api/v1/notifications/tests/test_notifications_api.py index de83a03..51ef1ed 100644 --- a/src/backend/api/v1/notifications/tests/test_notifications_api.py +++ b/src/backend/api/v1/notifications/tests/test_notifications_api.py @@ -54,6 +54,18 @@ class ChannelAPITest(TestCase): self.assertEqual(resp.status_code, 201) self.assertEqual(resp.json()["channel_type"], ChannelType.SMTP) + def test_create_telegram_channel_invalid_config(self) -> None: + resp = self._create_channel(config={"bot_token": "tok"}) + self.assertEqual(resp.status_code, 422) + + def test_create_smtp_channel_invalid_config(self) -> None: + resp = self._create_channel( + channel_type=ChannelType.SMTP, + name="Email", + config={}, + ) + self.assertEqual(resp.status_code, 422) + def test_list_channels(self) -> None: self._create_channel() resp = self.client.get( @@ -92,6 +104,17 @@ class ChannelAPITest(TestCase): self.assertEqual(resp.status_code, 200) self.assertEqual(resp.json()["name"], "Updated") + def test_update_channel_invalid_config(self) -> None: + create_resp = self._create_channel() + ch_id = create_resp.json()["id"] + resp = self.client.patch( + reverse("api-1:update_channel", args=[ch_id]), + data=json.dumps({"config": {"bot_token": "tok"}}), + content_type="application/json", + HTTP_AUTHORIZATION=self.auth, + ) + self.assertEqual(resp.status_code, 422) + def test_delete_channel(self) -> None: create_resp = self._create_channel() ch_id = create_resp.json()["id"] diff --git a/src/backend/apps/decision/services.py b/src/backend/apps/decision/services.py index f297de6..af2b9a3 100644 --- a/src/backend/apps/decision/services.py +++ b/src/backend/apps/decision/services.py @@ -1,9 +1,11 @@ import hashlib +import json import logging import uuid from datetime import timedelta from decimal import Decimal +from django.conf import settings from django.core.cache import cache from django.utils import timezone from prometheus_client import Counter @@ -12,6 +14,7 @@ from apps.conflicts.models import ExperimentConflictDomain from apps.conflicts.services import resolve_domain_conflict from apps.events.models import Decision from apps.events.services import decision_create +from apps.events.tasks import persist_decision_task from apps.experiments.models import ( ACTIVE_STATUSES, Experiment, @@ -36,6 +39,25 @@ FLAG_CACHE_TTL = 300 EXPERIMENT_CACHE_TTL = 60 MAX_CONCURRENT_EXPERIMENTS = 3 COOLDOWN_DAYS = 7 +DECISION_WRITE_MODE_SYNC = "sync" +DECISION_WRITE_MODE_ASYNC = "async" +DECISION_WRITE_MODE_DISABLED = "disabled" +DECISION_WRITE_MODE_VALUES = { + DECISION_WRITE_MODE_SYNC, + DECISION_WRITE_MODE_ASYNC, + DECISION_WRITE_MODE_DISABLED, +} +DECISION_FORCE_SYNC_REASON = "experiment_assigned" +DECISION_CACHEABLE_REASONS = frozenset( + { + "flag_not_found", + "no_active_experiment", + "targeting_mismatch", + "outside_traffic_allocation", + "no_variants", + "experiment_assigned", + } +) def _hash_subject(subject_id: str, experiment_id: str, salt: str) -> Decimal: @@ -56,16 +78,156 @@ def _select_variant( return variants[-1] if variants else None -def _persist_decision(result: dict, subject_id: str) -> None: - decision_create( - decision_id=result["decision_id"], - flag_key=result["flag"], - subject_id=subject_id, - experiment_id=result.get("experiment_id"), - variant_id=result.get("variant_id"), - value=str(result["value"]) if result["value"] is not None else "", - reason=result["reason"], +def _decision_result_cache_ttl() -> int: + ttl = int( + getattr(settings, "DECISION_RESULT_CACHE_TTL_SECONDS", 60), ) + return max(ttl, 0) + + +def _decision_write_mode() -> str: + mode = str( + getattr( + settings, + "DECISION_WRITE_MODE", + DECISION_WRITE_MODE_SYNC, + ) + ).lower() + if mode in DECISION_WRITE_MODE_VALUES: + return mode + return DECISION_WRITE_MODE_SYNC + + +def _subject_attributes_digest(subject_attributes: dict) -> str: + encoded = json.dumps( + subject_attributes, + sort_keys=True, + separators=(",", ":"), + default=str, + ).encode() + return hashlib.sha256(encoded).hexdigest() + + +def _decision_cache_key( + *, + flag_key: str, + subject_id: str, + subject_attributes_digest: str, + flag: FeatureFlag | None, + experiment: Experiment | None, +) -> str: + flag_revision = flag.updated_at.isoformat() if flag else "missing" + experiment_id = str(experiment.pk) if experiment else "none" + experiment_status = experiment.status if experiment else "none" + experiment_version = str(experiment.version) if experiment else "none" + experiment_revision = ( + experiment.updated_at.isoformat() if experiment else "none" + ) + return ( + "decide_result:" + f"{flag_key}:" + f"{subject_id}:" + f"{subject_attributes_digest}:" + f"{flag_revision}:" + f"{experiment_id}:" + f"{experiment_status}:" + f"{experiment_version}:" + f"{experiment_revision}" + ) + + +def _decision_template(result: dict) -> dict: + return { + "flag": result["flag"], + "value": result["value"], + "experiment_id": result.get("experiment_id"), + "variant_id": result.get("variant_id"), + "reason": result["reason"], + } + + +def _decision_from_template(template: dict) -> dict: + return { + "flag": template["flag"], + "value": template["value"], + "decision_id": str(uuid.uuid4()), + "experiment_id": template.get("experiment_id"), + "variant_id": template.get("variant_id"), + "reason": template["reason"], + } + + +def _cache_get_decision(cache_key: str) -> dict | None: + cached = cache.get(cache_key) + if not isinstance(cached, dict): + return None + required = {"flag", "value", "reason"} + if not required.issubset(cached.keys()): + return None + return _decision_from_template(cached) + + +def _cache_set_decision(cache_key: str, result: dict) -> None: + if result["reason"] not in DECISION_CACHEABLE_REASONS: + return + ttl = _decision_result_cache_ttl() + if ttl == 0: + return + cache.set(cache_key, _decision_template(result), ttl) + + +def _build_result( + *, + flag_key: str, + value: str | int | float | bool | None, + reason: str, + experiment_id: str | None = None, + variant_id: str | None = None, +) -> dict: + return { + "flag": flag_key, + "value": value, + "decision_id": str(uuid.uuid4()), + "experiment_id": experiment_id, + "variant_id": variant_id, + "reason": reason, + } + + +def _decision_payload(result: dict, subject_id: str) -> dict: + return { + "decision_id": result["decision_id"], + "flag_key": result["flag"], + "subject_id": subject_id, + "experiment_id": result.get("experiment_id"), + "variant_id": result.get("variant_id"), + "value": str(result["value"]) + if result["value"] is not None + else "", + "reason": result["reason"], + } + + +def _persist_decision(result: dict, subject_id: str) -> None: + payload = _decision_payload(result, subject_id) + mode = _decision_write_mode() + is_force_sync = result["reason"] == DECISION_FORCE_SYNC_REASON + + if mode == DECISION_WRITE_MODE_DISABLED and not is_force_sync: + return + + if mode == DECISION_WRITE_MODE_ASYNC and not is_force_sync: + try: + persist_decision_task.delay(**payload) + except Exception: + logger.exception( + "decision_async_persist_failed", + extra={"reason": result["reason"]}, + ) + else: + return + + decision_create(**payload) def _cached_flag_get(flag_key: str) -> FeatureFlag | None: @@ -171,77 +333,107 @@ def _check_domain_conflicts( return True +def _finalize_result( + *, + result: dict, + subject_id: str, + cache_key: str, +) -> dict: + DECIDE_REQUESTS.labels(reason=result["reason"]).inc() + _cache_set_decision(cache_key, result) + _persist_decision(result, subject_id) + return result + + def decide_for_flag( flag_key: str, subject_id: str, subject_attributes: dict, ) -> dict: - flag = _cached_flag_get(flag_key) - if not flag: - DECIDE_REQUESTS.labels(reason="flag_not_found").inc() - result = { - "flag": flag_key, - "value": None, - "decision_id": str(uuid.uuid4()), - "experiment_id": None, - "variant_id": None, - "reason": "flag_not_found", - } - _persist_decision(result, subject_id) - return result + subject_attributes_digest = _subject_attributes_digest( + subject_attributes, + ) + + flag = _cached_flag_get(flag_key) + experiment = _cached_active_experiment(flag.pk) if flag else None + cache_key = _decision_cache_key( + flag_key=flag_key, + subject_id=subject_id, + subject_attributes_digest=subject_attributes_digest, + flag=flag, + experiment=experiment, + ) + + cached_result = _cache_get_decision(cache_key) + if cached_result is not None: + return _finalize_result( + result=cached_result, + subject_id=subject_id, + cache_key=cache_key, + ) + + if not flag: + result = _build_result( + flag_key=flag_key, + value=None, + reason="flag_not_found", + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) - experiment = _cached_active_experiment(flag.pk) if not experiment or experiment.status != ExperimentStatus.RUNNING: - DECIDE_REQUESTS.labels(reason="no_active_experiment").inc() - result = { - "flag": flag_key, - "value": flag.default_value, - "decision_id": str(uuid.uuid4()), - "experiment_id": None, - "variant_id": None, - "reason": "no_active_experiment", - } - _persist_decision(result, subject_id) - return result + result = _build_result( + flag_key=flag_key, + value=flag.default_value, + reason="no_active_experiment", + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) if not _check_targeting(experiment.targeting_rules, subject_attributes): - DECIDE_REQUESTS.labels(reason="targeting_mismatch").inc() - result = { - "flag": flag_key, - "value": flag.default_value, - "decision_id": str(uuid.uuid4()), - "experiment_id": str(experiment.pk), - "variant_id": None, - "reason": "targeting_mismatch", - } - _persist_decision(result, subject_id) - return result + result = _build_result( + flag_key=flag_key, + value=flag.default_value, + reason="targeting_mismatch", + experiment_id=str(experiment.pk), + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) if not _check_participation_limits(subject_id, experiment.pk): - DECIDE_REQUESTS.labels(reason="participation_limit").inc() - result = { - "flag": flag_key, - "value": flag.default_value, - "decision_id": str(uuid.uuid4()), - "experiment_id": str(experiment.pk), - "variant_id": None, - "reason": "participation_limit", - } - _persist_decision(result, subject_id) - return result + result = _build_result( + flag_key=flag_key, + value=flag.default_value, + reason="participation_limit", + experiment_id=str(experiment.pk), + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) if not _check_domain_conflicts(experiment, subject_id): - DECIDE_REQUESTS.labels(reason="domain_conflict").inc() - result = { - "flag": flag_key, - "value": flag.default_value, - "decision_id": str(uuid.uuid4()), - "experiment_id": str(experiment.pk), - "variant_id": None, - "reason": "domain_conflict", - } - _persist_decision(result, subject_id) - return result + result = _build_result( + flag_key=flag_key, + value=flag.default_value, + reason="domain_conflict", + experiment_id=str(experiment.pk), + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) allocation_hash = _hash_subject( subject_id, @@ -249,31 +441,31 @@ def decide_for_flag( "allocation", ) if allocation_hash >= experiment.traffic_allocation: - DECIDE_REQUESTS.labels(reason="outside_traffic_allocation").inc() - result = { - "flag": flag_key, - "value": flag.default_value, - "decision_id": str(uuid.uuid4()), - "experiment_id": str(experiment.pk), - "variant_id": None, - "reason": "outside_traffic_allocation", - } - _persist_decision(result, subject_id) - return result + result = _build_result( + flag_key=flag_key, + value=flag.default_value, + reason="outside_traffic_allocation", + experiment_id=str(experiment.pk), + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) variants = list(experiment.variants.all()) if not variants: - DECIDE_REQUESTS.labels(reason="no_variants").inc() - result = { - "flag": flag_key, - "value": flag.default_value, - "decision_id": str(uuid.uuid4()), - "experiment_id": str(experiment.pk), - "variant_id": None, - "reason": "no_variants", - } - _persist_decision(result, subject_id) - return result + result = _build_result( + flag_key=flag_key, + value=flag.default_value, + reason="no_variants", + experiment_id=str(experiment.pk), + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) variant_hash = _hash_subject( subject_id, @@ -284,14 +476,15 @@ def decide_for_flag( normalized_hash = variant_hash * total_weight / Decimal(100) selected = _select_variant(variants, normalized_hash) - DECIDE_REQUESTS.labels(reason="experiment_assigned").inc() - result = { - "flag": flag_key, - "value": selected.value if selected else flag.default_value, - "decision_id": str(uuid.uuid4()), - "experiment_id": str(experiment.pk), - "variant_id": str(selected.pk) if selected else None, - "reason": "experiment_assigned", - } - _persist_decision(result, subject_id) - return result + result = _build_result( + flag_key=flag_key, + value=selected.value if selected else flag.default_value, + reason="experiment_assigned", + experiment_id=str(experiment.pk), + variant_id=str(selected.pk) if selected else None, + ) + return _finalize_result( + result=result, + subject_id=subject_id, + cache_key=cache_key, + ) diff --git a/src/backend/apps/decision/tests/test_decide.py b/src/backend/apps/decision/tests/test_decide.py index c5bd969..79a70cb 100644 --- a/src/backend/apps/decision/tests/test_decide.py +++ b/src/backend/apps/decision/tests/test_decide.py @@ -1,8 +1,9 @@ from decimal import Decimal from typing import override +from unittest.mock import patch from django.core.cache import cache -from django.test import TestCase +from django.test import TestCase, override_settings from apps.decision.services import ( MAX_CONCURRENT_EXPERIMENTS, @@ -10,6 +11,7 @@ from apps.decision.services import ( _select_variant, decide_for_flag, ) +from apps.events.models import Decision from apps.experiments.models import Experiment, ExperimentStatus from apps.experiments.services import variant_create from apps.experiments.tests.helpers import make_experiment, make_flag @@ -37,11 +39,12 @@ class HashSubjectTest(TestCase): class SelectVariantTest(TestCase): def test_selects_by_weight(self) -> None: class FV: - def __init__(self, n, w): + def __init__(self, n, w, i): self.name = n self.weight = Decimal(str(w)) + self.id = i - variants = [FV("a", 50), FV("b", 50)] + variants = [FV("a", 50, "uuid-a"), FV("b", 50, "uuid-b")] self.assertEqual(_select_variant(variants, Decimal(10)).name, "a") self.assertEqual(_select_variant(variants, Decimal(60)).name, "b") @@ -370,3 +373,146 @@ class PartialTrafficVariantDistributionTest(TestCase): ratio = counts["ctrl"] / total self.assertGreater(ratio, 0.3) self.assertLess(ratio, 0.7) + + +class DecisionCachingTest(TestCase): + @override + def setUp(self) -> None: + cache.clear() + self.owner = make_user( + username="cache_owner", + email="cache_owner@lotty.local", + ) + self.flag = make_flag(suffix="_cache", default="default_val") + self.experiment = make_experiment( + flag=self.flag, + owner=self.owner, + suffix="_cache", + traffic_allocation=Decimal("100.00"), + ) + variant_create( + experiment=self.experiment, + user=self.owner, + name="control", + value="ctrl", + weight=Decimal("50.00"), + is_control=True, + ) + variant_create( + experiment=self.experiment, + user=self.owner, + name="treatment", + value="treat", + weight=Decimal("50.00"), + ) + Experiment.objects.filter(pk=self.experiment.pk).update( + status=ExperimentStatus.RUNNING, + ) + + def test_cached_result_skips_recalculation(self) -> None: + first = decide_for_flag( + self.flag.key, + "cached_subject", + {"country": "US"}, + ) + with patch( + "apps.decision.services._check_targeting", + side_effect=AssertionError("cache miss"), + ): + second = decide_for_flag( + self.flag.key, + "cached_subject", + {"country": "US"}, + ) + self.assertEqual(first["reason"], "experiment_assigned") + self.assertEqual(second["reason"], "experiment_assigned") + self.assertEqual(first["value"], second["value"]) + self.assertNotEqual(first["decision_id"], second["decision_id"]) + + +class DecisionPersistenceModeTest(TestCase): + @override + def setUp(self) -> None: + cache.clear() + self.owner = make_user( + username="persist_owner", + email="persist_owner@lotty.local", + ) + self.flag = make_flag(suffix="_persist", default="default_val") + + def _make_running_experiment(self): + experiment = make_experiment( + flag=self.flag, + owner=self.owner, + suffix="_persist", + traffic_allocation=Decimal("100.00"), + ) + variant_create( + experiment=experiment, + user=self.owner, + name="control", + value="ctrl", + weight=Decimal("50.00"), + is_control=True, + ) + variant_create( + experiment=experiment, + user=self.owner, + name="treatment", + value="treat", + weight=Decimal("50.00"), + ) + Experiment.objects.filter(pk=experiment.pk).update( + status=ExperimentStatus.RUNNING, + ) + return experiment + + @override_settings(DECISION_WRITE_MODE="async") + def test_async_mode_enqueues_non_assigned_result(self) -> None: + with patch( + "apps.decision.services.persist_decision_task.delay" + ) as delay_mock, patch( + "apps.decision.services.decision_create" + ) as create_mock: + result = decide_for_flag(self.flag.key, "subj_async", {}) + self.assertEqual(result["reason"], "no_active_experiment") + delay_mock.assert_called_once() + create_mock.assert_not_called() + + @override_settings(DECISION_WRITE_MODE="async") + def test_async_mode_keeps_assigned_result_sync(self) -> None: + self._make_running_experiment() + with patch( + "apps.decision.services.persist_decision_task.delay" + ) as delay_mock, patch( + "apps.decision.services.decision_create" + ) as create_mock: + result = decide_for_flag( + self.flag.key, + "subj_async_assigned", + {}, + ) + self.assertEqual(result["reason"], "experiment_assigned") + delay_mock.assert_not_called() + create_mock.assert_called_once() + + @override_settings(DECISION_WRITE_MODE="disabled") + def test_disabled_mode_skips_non_assigned_persist(self) -> None: + before = Decision.objects.count() + result = decide_for_flag(self.flag.key, "subj_disabled", {}) + after = Decision.objects.count() + self.assertEqual(result["reason"], "no_active_experiment") + self.assertEqual(before, after) + + @override_settings(DECISION_WRITE_MODE="disabled") + def test_disabled_mode_keeps_assigned_persist(self) -> None: + self._make_running_experiment() + before = Decision.objects.count() + result = decide_for_flag( + self.flag.key, + "subj_disabled_assigned", + {}, + ) + after = Decision.objects.count() + self.assertEqual(result["reason"], "experiment_assigned") + self.assertEqual(after, before + 1) diff --git a/src/backend/apps/events/tasks.py b/src/backend/apps/events/tasks.py index 1320410..9aa0ed3 100644 --- a/src/backend/apps/events/tasks.py +++ b/src/backend/apps/events/tasks.py @@ -1,7 +1,11 @@ import logging -from apps.events.services import cleanup_expired_pending_events +from apps.events.services import ( + cleanup_expired_pending_events, + decision_create, +) from config.celery import app +from config.errors import ConflictError logger = logging.getLogger("lotty") @@ -14,3 +18,35 @@ def cleanup_expired_pending_events_task(self): extra={"deleted": deleted}, ) return deleted + + +@app.task(bind=True, name="events.persist_decision") +def persist_decision_task( + self, + *, + decision_id: str, + flag_key: str, + subject_id: str, + experiment_id: str | None = None, + variant_id: str | None = None, + value: str, + reason: str, +): + try: + decision_create( + decision_id=decision_id, + flag_key=flag_key, + subject_id=subject_id, + experiment_id=experiment_id, + variant_id=variant_id, + value=value, + reason=reason, + ) + except ConflictError: + logger.info( + "decision_persist_skipped_conflict", + extra={"decision_id": decision_id}, + ) + return {"status": "conflict", "decision_id": decision_id} + + return {"status": "created", "decision_id": decision_id} diff --git a/src/backend/apps/guardrails/tests/test_guardrails.py b/src/backend/apps/guardrails/tests/test_guardrails.py index 9c7dac5..e9e2cc2 100644 --- a/src/backend/apps/guardrails/tests/test_guardrails.py +++ b/src/backend/apps/guardrails/tests/test_guardrails.py @@ -589,9 +589,7 @@ class GuardrailHigherIsBetterTest(TestCase): self.v_treatment, ) for i in range(3): - self._send_purchase( - f"pur_hib_ok_{i}", f"dec_hib_ok_{i}", f"u{i}" - ) + self._send_purchase(f"pur_hib_ok_{i}", f"dec_hib_ok_{i}", f"u{i}") triggers = check_experiment_guardrails(self.experiment) diff --git a/src/backend/apps/metrics/tests/test_metrics.py b/src/backend/apps/metrics/tests/test_metrics.py index b79fa06..a1f7a18 100644 --- a/src/backend/apps/metrics/tests/test_metrics.py +++ b/src/backend/apps/metrics/tests/test_metrics.py @@ -1,4 +1,3 @@ - from django.core.exceptions import ValidationError from django.test import TestCase diff --git a/src/backend/config/settings/base.py b/src/backend/config/settings/base.py index 8d2fbba..7dd1dcf 100644 --- a/src/backend/config/settings/base.py +++ b/src/backend/config/settings/base.py @@ -54,6 +54,16 @@ else: }, } +DECISION_RESULT_CACHE_TTL_SECONDS = env.int( + "DECISION_RESULT_CACHE_TTL_SECONDS", + default=60, +) + +DECISION_WRITE_MODE = env( + "DECISION_WRITE_MODE", + default="sync", +) + # Celery