From e5a224bc3e62bf9fc0b2a425d2e3bece8a5a78c3 Mon Sep 17 00:00:00 2001 From: ITQ Date: Tue, 24 Feb 2026 17:57:27 +0300 Subject: [PATCH] chore(): refactored reports queries --- src/backend/apps/reports/services.py | 221 ++++++++++++++++++--------- 1 file changed, 145 insertions(+), 76 deletions(-) diff --git a/src/backend/apps/reports/services.py b/src/backend/apps/reports/services.py index e052dcb..abd2b5a 100644 --- a/src/backend/apps/reports/services.py +++ b/src/backend/apps/reports/services.py @@ -2,6 +2,22 @@ from datetime import datetime from decimal import Decimal from uuid import UUID +from django.db import connection +from django.db.models import ( + Aggregate, + Avg, + Case, + Count, + F, + FloatField, + QuerySet, + Subquery, + Value, + When, +) +from django.db.models.fields.json import KeyTextTransform +from django.db.models.functions import Cast + from apps.events.models import Event, Exposure from apps.experiments.models import Experiment from apps.metrics.models import ( @@ -11,12 +27,29 @@ from apps.metrics.models import ( ) -def _exposure_decision_ids( +class PercentileCont(Aggregate): + function = "PERCENTILE_CONT" + template = ( + "%(function)s(%(percentile)s) " + "WITHIN GROUP (ORDER BY %(expressions)s)" + ) + allow_distinct = False + output_field = FloatField() + + def __init__(self, expression, percentile, **extra): + super().__init__( + expression, + percentile=percentile, + **extra, + ) + + +def _exposure_queryset( experiment_id: UUID, variant_id: UUID, start_date: datetime | None = None, end_date: datetime | None = None, -) -> list[str]: +) -> QuerySet[Exposure]: qs = Exposure.objects.filter( experiment_id=experiment_id, variant_id=variant_id, @@ -25,17 +58,24 @@ def _exposure_decision_ids( qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) - return list(qs.values_list("decision_id", flat=True)) + return qs -def _count_events( - decision_ids: list[str], +def _exposure_decision_ids_subquery( + exposure_qs: QuerySet[Exposure], +): + return Subquery(exposure_qs.values("decision_id")) + + +def _events_queryset( + *, + exposure_qs: QuerySet[Exposure], event_type_name: str, start_date: datetime | None = None, end_date: datetime | None = None, -) -> int: +) -> QuerySet[Event]: qs = Event.objects.filter( - decision_id__in=decision_ids, + decision_id__in=_exposure_decision_ids_subquery(exposure_qs), event_type__name=event_type_name, is_attributed=True, ) @@ -43,72 +83,101 @@ def _count_events( qs = qs.filter(timestamp__gte=start_date) if end_date: qs = qs.filter(timestamp__lt=end_date) + return qs + + +def _numeric_property_expression(property_field: str): + if connection.vendor == "postgresql": + key_text = KeyTextTransform(property_field, "properties") + pattern = r"^-?(?:\d+(?:\.\d+)?|\.\d+)$" + return Case( + When( + **{ + f"properties__{property_field}__regex": pattern + }, + then=Cast(key_text, FloatField()), + ), + default=Value(None), + output_field=FloatField(), + ) + return Cast(F(f"properties__{property_field}"), FloatField()) + + +def _count_events( + *, + exposure_qs: QuerySet[Exposure], + event_type_name: str, + start_date: datetime | None = None, + end_date: datetime | None = None, +) -> int: + qs = _events_queryset( + exposure_qs=exposure_qs, + event_type_name=event_type_name, + start_date=start_date, + end_date=end_date, + ) return qs.count() def _average_property( - decision_ids: list[str], + *, + exposure_qs: QuerySet[Exposure], event_type_name: str, property_field: str, start_date: datetime | None = None, end_date: datetime | None = None, ) -> Decimal | None: - qs = Event.objects.filter( - decision_id__in=decision_ids, - event_type__name=event_type_name, - is_attributed=True, + qs = _events_queryset( + exposure_qs=exposure_qs, + event_type_name=event_type_name, + start_date=start_date, + end_date=end_date, + ).annotate( + numeric_value=_numeric_property_expression(property_field), ) - if start_date: - qs = qs.filter(timestamp__gte=start_date) - if end_date: - qs = qs.filter(timestamp__lt=end_date) - - values = [] - for props in qs.values_list("properties", flat=True): - if isinstance(props, dict) and property_field in props: - try: - values.append(float(props[property_field])) - except (TypeError, ValueError): - continue - - if not values: + value = qs.aggregate(value=Avg("numeric_value"))["value"] + if value is None: return None - return Decimal(str(sum(values) / len(values))) + return Decimal(str(value)) def _percentile_property( - decision_ids: list[str], + *, + exposure_qs: QuerySet[Exposure], event_type_name: str, property_field: str, percentile: int, start_date: datetime | None = None, end_date: datetime | None = None, ) -> Decimal | None: - qs = Event.objects.filter( - decision_id__in=decision_ids, - event_type__name=event_type_name, - is_attributed=True, - ) - if start_date: - qs = qs.filter(timestamp__gte=start_date) - if end_date: - qs = qs.filter(timestamp__lt=end_date) + qs = _events_queryset( + exposure_qs=exposure_qs, + event_type_name=event_type_name, + start_date=start_date, + end_date=end_date, + ).annotate( + numeric_value=_numeric_property_expression(property_field), + ).exclude(numeric_value__isnull=True) + if connection.vendor == "postgresql": + value = qs.aggregate( + value=PercentileCont( + "numeric_value", + Decimal(percentile) / Decimal(100), + ) + )["value"] + if value is None: + return None + return Decimal(str(value)) - values = [] - for props in qs.values_list("properties", flat=True): - if isinstance(props, dict) and property_field in props: - try: - values.append(float(props[property_field])) - except (TypeError, ValueError): - continue - - if not values: + total = qs.aggregate(total=Count("pk"))["total"] + if not total: return None - - values.sort() - idx = int(len(values) * percentile / 100) - idx = min(idx, len(values) - 1) - return Decimal(str(values[idx])) + idx = min(int(total * percentile / 100), total - 1) + value = qs.order_by("numeric_value").values_list( + "numeric_value", + flat=True, + )[idx] + return Decimal(str(value)) def calculate_metric_value( @@ -121,14 +190,14 @@ def calculate_metric_value( event_end_date: datetime | None = None, ) -> Decimal | None: rule = metric.calculation_rule - decision_ids = _exposure_decision_ids( + exposure_qs = _exposure_queryset( experiment_id, variant_id, start_date, end_date, ) - if not decision_ids: + if not exposure_qs.exists(): return None ev_start = event_start_date or start_date @@ -138,16 +207,16 @@ def calculate_metric_value( if metric_type == MetricType.RATIO: numerator = _count_events( - decision_ids, - rule["numerator_event"], - ev_start, - ev_end, + exposure_qs=exposure_qs, + event_type_name=rule["numerator_event"], + start_date=ev_start, + end_date=ev_end, ) denominator = _count_events( - decision_ids, - rule["denominator_event"], - ev_start, - ev_end, + exposure_qs=exposure_qs, + event_type_name=rule["denominator_event"], + start_date=ev_start, + end_date=ev_end, ) if denominator == 0: return None @@ -155,30 +224,30 @@ def calculate_metric_value( if metric_type == MetricType.COUNT: count = _count_events( - decision_ids, - rule["event"], - ev_start, - ev_end, + exposure_qs=exposure_qs, + event_type_name=rule["event"], + start_date=ev_start, + end_date=ev_end, ) return Decimal(str(count)) if metric_type == MetricType.AVERAGE: return _average_property( - decision_ids, - rule["event"], - rule["property"], - ev_start, - ev_end, + exposure_qs=exposure_qs, + event_type_name=rule["event"], + property_field=rule["property"], + start_date=ev_start, + end_date=ev_end, ) if metric_type == MetricType.PERCENTILE: return _percentile_property( - decision_ids, - rule["event"], - rule["property"], - rule.get("percentile", 95), - ev_start, - ev_end, + exposure_qs=exposure_qs, + event_type_name=rule["event"], + property_field=rule["property"], + percentile=rule.get("percentile", 95), + start_date=ev_start, + end_date=ev_end, ) return None