from datetime import datetime from decimal import Decimal from uuid import UUID from apps.events.models import Event, Exposure from apps.experiments.models import Experiment from apps.metrics.models import ( ExperimentMetric, MetricDefinition, MetricType, ) def _exposure_decision_ids( experiment_id: UUID, variant_id: UUID, start_date: datetime | None = None, end_date: datetime | None = None, ) -> list[str]: qs = Exposure.objects.filter( experiment_id=experiment_id, variant_id=variant_id, ) if start_date: qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) return list(qs.values_list("decision_id", flat=True)) def _count_events( decision_ids: list[str], event_type_name: str, start_date: datetime | None = None, end_date: datetime | None = None, ) -> int: qs = Event.objects.filter( decision_id__in=decision_ids, event_type__name=event_type_name, is_attributed=True, ) if start_date: qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) return qs.count() def _average_property( decision_ids: list[str], event_type_name: str, property_field: str, start_date: datetime | None = None, end_date: datetime | None = None, ) -> Decimal | None: qs = Event.objects.filter( decision_id__in=decision_ids, event_type__name=event_type_name, is_attributed=True, ) if start_date: qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) values = [] for props in qs.values_list("properties", flat=True): if isinstance(props, dict) and property_field in props: try: values.append(float(props[property_field])) except (TypeError, ValueError): continue if not values: return None return Decimal(str(sum(values) / len(values))) def _percentile_property( decision_ids: list[str], event_type_name: str, property_field: str, percentile: int, start_date: datetime | None = None, end_date: datetime | None = None, ) -> Decimal | None: qs = Event.objects.filter( decision_id__in=decision_ids, event_type__name=event_type_name, is_attributed=True, ) if start_date: qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) values = [] for props in qs.values_list("properties", flat=True): if isinstance(props, dict) and property_field in props: try: values.append(float(props[property_field])) except (TypeError, ValueError): continue if not values: return None values.sort() idx = int(len(values) * percentile / 100) idx = min(idx, len(values) - 1) return Decimal(str(values[idx])) def calculate_metric_value( metric: MetricDefinition, experiment_id: UUID, variant_id: UUID, start_date: datetime | None = None, end_date: datetime | None = None, ) -> Decimal | None: rule = metric.calculation_rule decision_ids = _exposure_decision_ids( experiment_id, variant_id, start_date, end_date, ) if not decision_ids: return None metric_type = rule.get("type", metric.metric_type) if metric_type == MetricType.RATIO: numerator = _count_events( decision_ids, rule["numerator_event"], start_date, end_date, ) denominator = _count_events( decision_ids, rule["denominator_event"], start_date, end_date, ) if denominator == 0: return Decimal(0) return Decimal(str(round(numerator / denominator, 6))) if metric_type == MetricType.COUNT: count = _count_events( decision_ids, rule["event"], start_date, end_date, ) return Decimal(str(count)) if metric_type == MetricType.AVERAGE: return _average_property( decision_ids, rule["event"], rule["property"], start_date, end_date, ) if metric_type == MetricType.PERCENTILE: return _percentile_property( decision_ids, rule["event"], rule["property"], rule.get("percentile", 95), start_date, end_date, ) return None def _exposure_count_for_variant( experiment_id: UUID, variant_id: UUID, start_date: datetime | None = None, end_date: datetime | None = None, ) -> int: qs = Exposure.objects.filter( experiment_id=experiment_id, variant_id=variant_id, ) if start_date: qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) return qs.count() def _unique_subjects_for_variant( experiment_id: UUID, variant_id: UUID, start_date: datetime | None = None, end_date: datetime | None = None, ) -> int: qs = Exposure.objects.filter( experiment_id=experiment_id, variant_id=variant_id, ) if start_date: qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) return qs.values("subject_id").distinct().count() def build_experiment_report( experiment: Experiment, start_date: datetime | None = None, end_date: datetime | None = None, ) -> dict: experiment_metrics = ( ExperimentMetric.objects.filter(experiment=experiment) .select_related("metric") .order_by("-is_primary", "metric__key") ) variants = experiment.variants.all() variant_reports = [] for variant in variants: metric_results = [] for em in experiment_metrics: value = calculate_metric_value( metric=em.metric, experiment_id=experiment.pk, variant_id=variant.pk, start_date=start_date, end_date=end_date, ) metric_results.append( { "metric_key": em.metric.key, "metric_name": em.metric.name, "metric_type": em.metric.metric_type, "direction": em.metric.direction, "is_primary": em.is_primary, "value": value, } ) variant_reports.append( { "variant_id": variant.pk, "variant_name": variant.name, "is_control": variant.is_control, "weight": variant.weight, "exposures": _exposure_count_for_variant( experiment.pk, variant.pk, start_date, end_date, ), "unique_subjects": _unique_subjects_for_variant( experiment.pk, variant.pk, start_date, end_date, ), "metrics": metric_results, } ) return { "experiment_id": experiment.pk, "experiment_name": experiment.name, "status": experiment.status, "period": { "start": start_date.isoformat() if start_date else None, "end": end_date.isoformat() if end_date else None, }, "variants": variant_reports, }