import hashlib import json import logging import uuid from datetime import timedelta from decimal import Decimal from django.conf import settings from django.core.cache import cache from django.utils import timezone from prometheus_client import Counter from apps.conflicts.models import ExperimentConflictDomain from apps.conflicts.services import resolve_domain_conflict from apps.events.models import Decision from apps.events.services import decision_create from apps.events.tasks import persist_decision_task from apps.experiments.models import ( ACTIVE_STATUSES, Experiment, ExperimentStatus, Variant, ) from apps.experiments.selectors import active_experiment_for_flag from apps.flags.models import FeatureFlag from apps.flags.selectors import feature_flag_get_by_key from libs.dsl import evaluate from libs.dsl.exceptions import EvaluationError, LexerError, ParserError logger = logging.getLogger("lotty") DECIDE_REQUESTS = Counter( "lotty_decide_requests_total", "Total number of flag decision requests", ["reason"], ) FLAG_CACHE_TTL = 300 EXPERIMENT_CACHE_TTL = 60 MAX_CONCURRENT_EXPERIMENTS = 3 COOLDOWN_DAYS = 7 DECISION_WRITE_MODE_SYNC = "sync" DECISION_WRITE_MODE_ASYNC = "async" DECISION_WRITE_MODE_DISABLED = "disabled" DECISION_WRITE_MODE_VALUES = { DECISION_WRITE_MODE_SYNC, DECISION_WRITE_MODE_ASYNC, DECISION_WRITE_MODE_DISABLED, } DECISION_FORCE_SYNC_REASON = "experiment_assigned" DECISION_CACHEABLE_REASONS = frozenset( { "flag_not_found", "no_active_experiment", "targeting_mismatch", "outside_traffic_allocation", "no_variants", "experiment_assigned", } ) def _hash_subject(subject_id: str, experiment_id: str, salt: str) -> Decimal: hash_input = f"{subject_id}:{experiment_id}:{salt}".encode() hash_bytes = hashlib.sha256(hash_input).digest() hash_int = int.from_bytes(hash_bytes[:8], byteorder="big") return (hash_int % 10000) / Decimal(100) def _select_variant( variants: list[Variant], hash_value: Decimal ) -> Variant | None: cumulative = Decimal(0) for variant in sorted(variants, key=lambda v: v.name): cumulative += variant.weight if hash_value < cumulative: return variant return variants[-1] if variants else None def _decision_result_cache_ttl() -> int: ttl = int( getattr(settings, "DECISION_RESULT_CACHE_TTL_SECONDS", 60), ) return max(ttl, 0) def _decision_write_mode() -> str: mode = str( getattr( settings, "DECISION_WRITE_MODE", DECISION_WRITE_MODE_SYNC, ) ).lower() if mode in DECISION_WRITE_MODE_VALUES: return mode return DECISION_WRITE_MODE_SYNC def _subject_attributes_digest(subject_attributes: dict) -> str: encoded = json.dumps( subject_attributes, sort_keys=True, separators=(",", ":"), default=str, ).encode() return hashlib.sha256(encoded).hexdigest() def _decision_cache_key( *, flag_key: str, subject_id: str, subject_attributes_digest: str, flag: FeatureFlag | None, experiment: Experiment | None, ) -> str: flag_revision = flag.updated_at.isoformat() if flag else "missing" experiment_id = str(experiment.pk) if experiment else "none" experiment_status = experiment.status if experiment else "none" experiment_version = str(experiment.version) if experiment else "none" experiment_revision = ( experiment.updated_at.isoformat() if experiment else "none" ) return ( "decide_result:" f"{flag_key}:" f"{subject_id}:" f"{subject_attributes_digest}:" f"{flag_revision}:" f"{experiment_id}:" f"{experiment_status}:" f"{experiment_version}:" f"{experiment_revision}" ) def _decision_template(result: dict) -> dict: return { "flag": result["flag"], "value": result["value"], "experiment_id": result.get("experiment_id"), "variant_id": result.get("variant_id"), "reason": result["reason"], } def _decision_from_template(template: dict) -> dict: return { "flag": template["flag"], "value": template["value"], "decision_id": str(uuid.uuid4()), "experiment_id": template.get("experiment_id"), "variant_id": template.get("variant_id"), "reason": template["reason"], } def _cache_get_decision(cache_key: str) -> dict | None: cached = cache.get(cache_key) if not isinstance(cached, dict): return None required = {"flag", "value", "reason"} if not required.issubset(cached.keys()): return None return _decision_from_template(cached) def _cache_set_decision(cache_key: str, result: dict) -> None: if result["reason"] not in DECISION_CACHEABLE_REASONS: return ttl = _decision_result_cache_ttl() if ttl == 0: return cache.set(cache_key, _decision_template(result), ttl) def _build_result( *, flag_key: str, value: str | int | float | bool | None, reason: str, experiment_id: str | None = None, variant_id: str | None = None, ) -> dict: return { "flag": flag_key, "value": value, "decision_id": str(uuid.uuid4()), "experiment_id": experiment_id, "variant_id": variant_id, "reason": reason, } def _decision_payload(result: dict, subject_id: str) -> dict: return { "decision_id": result["decision_id"], "flag_key": result["flag"], "subject_id": subject_id, "experiment_id": result.get("experiment_id"), "variant_id": result.get("variant_id"), "value": str(result["value"]) if result["value"] is not None else "", "reason": result["reason"], } def _persist_decision(result: dict, subject_id: str) -> None: payload = _decision_payload(result, subject_id) mode = _decision_write_mode() is_force_sync = result["reason"] == DECISION_FORCE_SYNC_REASON if mode == DECISION_WRITE_MODE_DISABLED and not is_force_sync: return if mode == DECISION_WRITE_MODE_ASYNC and not is_force_sync: try: persist_decision_task.delay(**payload) except Exception: logger.exception( "decision_async_persist_failed", extra={"reason": result["reason"]}, ) else: return decision_create(**payload) def _cached_flag_get(flag_key: str) -> FeatureFlag | None: cache_key = f"flag:{flag_key}" cached = cache.get(cache_key) if cached is not None: return cached if cached != "__none__" else None flag = feature_flag_get_by_key(flag_key) cache.set(cache_key, flag or "__none__", FLAG_CACHE_TTL) return flag def _cached_active_experiment(flag_pk): cache_key = f"active_exp:{flag_pk}" cached = cache.get(cache_key) if cached is not None: return cached if cached != "__none__" else None experiment = active_experiment_for_flag(flag_pk) cache.set( cache_key, experiment or "__none__", EXPERIMENT_CACHE_TTL, ) return experiment def _check_targeting( targeting_rules: str, subject_attributes: dict, ) -> bool: if not targeting_rules or not targeting_rules.strip(): return True try: return evaluate(targeting_rules, subject_attributes) except (EvaluationError, LexerError, ParserError): logger.warning( "targeting_rules_evaluation_error", extra={"rules": targeting_rules}, ) return False def _check_participation_limits( subject_id: str, experiment_pk: object, ) -> bool: active_count = ( Decision.objects.filter( subject_id=subject_id, reason="experiment_assigned", experiment_id__isnull=False, experiment_id__in=Experiment.objects.filter( status__in=ACTIVE_STATUSES, ).values("pk"), ) .exclude(experiment_id=experiment_pk) .values("experiment_id") .distinct() .count() ) if active_count >= MAX_CONCURRENT_EXPERIMENTS: return False cutoff = timezone.now() - timedelta(days=COOLDOWN_DAYS) recent_completed = ( Decision.objects.filter( subject_id=subject_id, reason="experiment_assigned", experiment_id__isnull=False, created_at__gte=cutoff, ) .filter( experiment_id__in=Experiment.objects.filter( status__in=( ExperimentStatus.COMPLETED, ExperimentStatus.ARCHIVED, ), ).values("pk"), ) .exclude(experiment_id=experiment_pk) .values("experiment_id") .distinct() .exists() ) return not recent_completed def _check_domain_conflicts( experiment: Experiment, subject_id: str, ) -> bool: memberships = ExperimentConflictDomain.objects.filter( experiment=experiment, ).select_related("conflict_domain") for membership in memberships: if not resolve_domain_conflict( experiment_id=experiment.pk, domain_id=membership.conflict_domain_id, subject_id=subject_id, ): return False return True def _finalize_result( *, result: dict, subject_id: str, cache_key: str, ) -> dict: DECIDE_REQUESTS.labels(reason=result["reason"]).inc() _cache_set_decision(cache_key, result) _persist_decision(result, subject_id) return result def decide_for_flag( flag_key: str, subject_id: str, subject_attributes: dict, ) -> dict: subject_attributes_digest = _subject_attributes_digest( subject_attributes, ) flag = _cached_flag_get(flag_key) experiment = _cached_active_experiment(flag.pk) if flag else None cache_key = _decision_cache_key( flag_key=flag_key, subject_id=subject_id, subject_attributes_digest=subject_attributes_digest, flag=flag, experiment=experiment, ) cached_result = _cache_get_decision(cache_key) if cached_result is not None: return _finalize_result( result=cached_result, subject_id=subject_id, cache_key=cache_key, ) if not flag: result = _build_result( flag_key=flag_key, value=None, reason="flag_not_found", ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, ) if not experiment or experiment.status != ExperimentStatus.RUNNING: result = _build_result( flag_key=flag_key, value=flag.default_value, reason="no_active_experiment", ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, ) if not _check_targeting(experiment.targeting_rules, subject_attributes): result = _build_result( flag_key=flag_key, value=flag.default_value, reason="targeting_mismatch", experiment_id=str(experiment.pk), ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, ) if not _check_participation_limits(subject_id, experiment.pk): result = _build_result( flag_key=flag_key, value=flag.default_value, reason="participation_limit", experiment_id=str(experiment.pk), ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, ) if not _check_domain_conflicts(experiment, subject_id): result = _build_result( flag_key=flag_key, value=flag.default_value, reason="domain_conflict", experiment_id=str(experiment.pk), ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, ) allocation_hash = _hash_subject( subject_id, str(experiment.pk), "allocation", ) if allocation_hash >= experiment.traffic_allocation: result = _build_result( flag_key=flag_key, value=flag.default_value, reason="outside_traffic_allocation", experiment_id=str(experiment.pk), ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, ) variants = list(experiment.variants.all()) if not variants: result = _build_result( flag_key=flag_key, value=flag.default_value, reason="no_variants", experiment_id=str(experiment.pk), ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, ) variant_hash = _hash_subject( subject_id, str(experiment.pk), "variant", ) total_weight = sum(v.weight for v in variants) normalized_hash = variant_hash * total_weight / Decimal(100) selected = _select_variant(variants, normalized_hash) result = _build_result( flag_key=flag_key, value=selected.value if selected else flag.default_value, reason="experiment_assigned", experiment_id=str(experiment.pk), variant_id=str(selected.pk) if selected else None, ) return _finalize_result( result=result, subject_id=subject_id, cache_key=cache_key, )