290 lines
8.9 KiB
Python
290 lines
8.9 KiB
Python
import hashlib
|
|
import logging
|
|
import uuid
|
|
from datetime import timedelta
|
|
from decimal import Decimal
|
|
|
|
from django.core.cache import cache
|
|
from django.utils import timezone
|
|
from prometheus_client import Counter
|
|
|
|
from apps.conflicts.models import ExperimentConflictDomain
|
|
from apps.conflicts.services import resolve_domain_conflict
|
|
from apps.events.models import Decision
|
|
from apps.events.services import decision_create
|
|
from apps.experiments.models import Experiment, ExperimentStatus, Variant
|
|
from apps.experiments.selectors import active_experiment_for_flag
|
|
from apps.flags.models import FeatureFlag
|
|
from apps.flags.selectors import feature_flag_get_by_key
|
|
from libs.dsl import evaluate
|
|
from libs.dsl.exceptions import EvaluationError, LexerError, ParserError
|
|
|
|
logger = logging.getLogger("lotty")
|
|
|
|
DECIDE_REQUESTS = Counter(
|
|
"lotty_decide_requests_total",
|
|
"Total number of flag decision requests",
|
|
["reason"],
|
|
)
|
|
|
|
FLAG_CACHE_TTL = 300
|
|
EXPERIMENT_CACHE_TTL = 60
|
|
MAX_CONCURRENT_EXPERIMENTS = 3
|
|
COOLDOWN_DAYS = 7
|
|
|
|
|
|
def _hash_subject(subject_id: str, experiment_id: str, salt: str) -> float:
|
|
hash_input = f"{subject_id}:{experiment_id}:{salt}".encode()
|
|
hash_bytes = hashlib.sha256(hash_input).digest()
|
|
hash_int = int.from_bytes(hash_bytes[:8], byteorder="big")
|
|
return (hash_int % 10000) / Decimal(100)
|
|
|
|
|
|
def _select_variant(
|
|
variants: list[Variant], hash_value: float
|
|
) -> Variant | None:
|
|
cumulative = Decimal(0)
|
|
for variant in sorted(variants, key=lambda v: v.name):
|
|
cumulative += variant.weight
|
|
if hash_value < cumulative:
|
|
return variant
|
|
return variants[-1] if variants else None
|
|
|
|
|
|
def _persist_decision(result: dict, subject_id: str) -> None:
|
|
decision_create(
|
|
decision_id=result["decision_id"],
|
|
flag_key=result["flag"],
|
|
subject_id=subject_id,
|
|
experiment_id=result.get("experiment_id"),
|
|
variant_id=result.get("variant_id"),
|
|
value=str(result["value"]) if result["value"] is not None else "",
|
|
reason=result["reason"],
|
|
)
|
|
|
|
|
|
def _cached_flag_get(flag_key: str) -> FeatureFlag | None:
|
|
cache_key = f"flag:{flag_key}"
|
|
cached = cache.get(cache_key)
|
|
if cached is not None:
|
|
return cached if cached != "__none__" else None
|
|
flag = feature_flag_get_by_key(flag_key)
|
|
cache.set(cache_key, flag or "__none__", FLAG_CACHE_TTL)
|
|
return flag
|
|
|
|
|
|
def _cached_active_experiment(flag_pk):
|
|
cache_key = f"active_exp:{flag_pk}"
|
|
cached = cache.get(cache_key)
|
|
if cached is not None:
|
|
return cached if cached != "__none__" else None
|
|
experiment = active_experiment_for_flag(flag_pk)
|
|
cache.set(
|
|
cache_key,
|
|
experiment or "__none__",
|
|
EXPERIMENT_CACHE_TTL,
|
|
)
|
|
return experiment
|
|
|
|
|
|
def _check_targeting(
|
|
targeting_rules: str,
|
|
subject_attributes: dict,
|
|
) -> bool:
|
|
if not targeting_rules or not targeting_rules.strip():
|
|
return True
|
|
try:
|
|
return evaluate(targeting_rules, subject_attributes)
|
|
except (EvaluationError, LexerError, ParserError):
|
|
logger.warning(
|
|
"targeting_rules_evaluation_error",
|
|
extra={"rules": targeting_rules},
|
|
)
|
|
return False
|
|
|
|
|
|
def _check_participation_limits(
|
|
subject_id: str,
|
|
experiment_pk: object,
|
|
) -> bool:
|
|
active_count = (
|
|
Decision.objects.filter(
|
|
subject_id=subject_id,
|
|
reason="experiment_assigned",
|
|
experiment_id__isnull=False,
|
|
)
|
|
.exclude(experiment_id=experiment_pk)
|
|
.values("experiment_id")
|
|
.distinct()
|
|
.count()
|
|
)
|
|
if active_count >= MAX_CONCURRENT_EXPERIMENTS:
|
|
return False
|
|
|
|
cutoff = timezone.now() - timedelta(days=COOLDOWN_DAYS)
|
|
recent_completed = (
|
|
Decision.objects.filter(
|
|
subject_id=subject_id,
|
|
reason="experiment_assigned",
|
|
experiment_id__isnull=False,
|
|
created_at__gte=cutoff,
|
|
)
|
|
.filter(
|
|
experiment_id__in=Experiment.objects.filter(
|
|
status__in=(
|
|
ExperimentStatus.COMPLETED,
|
|
ExperimentStatus.ARCHIVED,
|
|
),
|
|
).values("pk"),
|
|
)
|
|
.exclude(experiment_id=experiment_pk)
|
|
.values("experiment_id")
|
|
.distinct()
|
|
.exists()
|
|
)
|
|
return not recent_completed
|
|
|
|
|
|
def _check_domain_conflicts(
|
|
experiment: Experiment,
|
|
subject_id: str,
|
|
) -> bool:
|
|
memberships = ExperimentConflictDomain.objects.filter(
|
|
experiment=experiment,
|
|
).select_related("conflict_domain")
|
|
|
|
for membership in memberships:
|
|
if not resolve_domain_conflict(
|
|
experiment_id=experiment.pk,
|
|
domain_id=membership.conflict_domain_id,
|
|
subject_id=subject_id,
|
|
):
|
|
return False
|
|
return True
|
|
|
|
|
|
def decide_for_flag(
|
|
flag_key: str,
|
|
subject_id: str,
|
|
subject_attributes: dict,
|
|
) -> dict:
|
|
flag = _cached_flag_get(flag_key)
|
|
if not flag:
|
|
DECIDE_REQUESTS.labels(reason="flag_not_found").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": None,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": None,
|
|
"variant_id": None,
|
|
"reason": "flag_not_found",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|
|
|
|
experiment = _cached_active_experiment(flag.pk)
|
|
if not experiment or experiment.status != ExperimentStatus.RUNNING:
|
|
DECIDE_REQUESTS.labels(reason="no_active_experiment").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": flag.default_value,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": None,
|
|
"variant_id": None,
|
|
"reason": "no_active_experiment",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|
|
|
|
if not _check_targeting(experiment.targeting_rules, subject_attributes):
|
|
DECIDE_REQUESTS.labels(reason="targeting_mismatch").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": flag.default_value,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": str(experiment.pk),
|
|
"variant_id": None,
|
|
"reason": "targeting_mismatch",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|
|
|
|
if not _check_participation_limits(subject_id, experiment.pk):
|
|
DECIDE_REQUESTS.labels(reason="participation_limit").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": flag.default_value,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": str(experiment.pk),
|
|
"variant_id": None,
|
|
"reason": "participation_limit",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|
|
|
|
if not _check_domain_conflicts(experiment, subject_id):
|
|
DECIDE_REQUESTS.labels(reason="domain_conflict").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": flag.default_value,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": str(experiment.pk),
|
|
"variant_id": None,
|
|
"reason": "domain_conflict",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|
|
|
|
allocation_hash = _hash_subject(
|
|
subject_id,
|
|
str(experiment.pk),
|
|
"allocation",
|
|
)
|
|
if allocation_hash >= experiment.traffic_allocation:
|
|
DECIDE_REQUESTS.labels(reason="outside_traffic_allocation").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": flag.default_value,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": str(experiment.pk),
|
|
"variant_id": None,
|
|
"reason": "outside_traffic_allocation",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|
|
|
|
variants = list(experiment.variants.all())
|
|
if not variants:
|
|
DECIDE_REQUESTS.labels(reason="no_variants").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": flag.default_value,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": str(experiment.pk),
|
|
"variant_id": None,
|
|
"reason": "no_variants",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|
|
|
|
variant_hash = _hash_subject(
|
|
subject_id,
|
|
str(experiment.pk),
|
|
"variant",
|
|
)
|
|
total_weight = sum(v.weight for v in variants)
|
|
normalized_hash = variant_hash * total_weight / Decimal(100)
|
|
selected = _select_variant(variants, normalized_hash)
|
|
|
|
DECIDE_REQUESTS.labels(reason="experiment_assigned").inc()
|
|
result = {
|
|
"flag": flag_key,
|
|
"value": selected.value if selected else flag.default_value,
|
|
"decision_id": str(uuid.uuid4()),
|
|
"experiment_id": str(experiment.pk),
|
|
"variant_id": str(selected.pk) if selected else None,
|
|
"reason": "experiment_assigned",
|
|
}
|
|
_persist_decision(result, subject_id)
|
|
return result
|