chore(): refactored reports queries
This commit is contained in:
@@ -2,6 +2,22 @@ from datetime import datetime
|
||||
from decimal import Decimal
|
||||
from uuid import UUID
|
||||
|
||||
from django.db import connection
|
||||
from django.db.models import (
|
||||
Aggregate,
|
||||
Avg,
|
||||
Case,
|
||||
Count,
|
||||
F,
|
||||
FloatField,
|
||||
QuerySet,
|
||||
Subquery,
|
||||
Value,
|
||||
When,
|
||||
)
|
||||
from django.db.models.fields.json import KeyTextTransform
|
||||
from django.db.models.functions import Cast
|
||||
|
||||
from apps.events.models import Event, Exposure
|
||||
from apps.experiments.models import Experiment
|
||||
from apps.metrics.models import (
|
||||
@@ -11,12 +27,29 @@ from apps.metrics.models import (
|
||||
)
|
||||
|
||||
|
||||
def _exposure_decision_ids(
|
||||
class PercentileCont(Aggregate):
|
||||
function = "PERCENTILE_CONT"
|
||||
template = (
|
||||
"%(function)s(%(percentile)s) "
|
||||
"WITHIN GROUP (ORDER BY %(expressions)s)"
|
||||
)
|
||||
allow_distinct = False
|
||||
output_field = FloatField()
|
||||
|
||||
def __init__(self, expression, percentile, **extra):
|
||||
super().__init__(
|
||||
expression,
|
||||
percentile=percentile,
|
||||
**extra,
|
||||
)
|
||||
|
||||
|
||||
def _exposure_queryset(
|
||||
experiment_id: UUID,
|
||||
variant_id: UUID,
|
||||
start_date: datetime | None = None,
|
||||
end_date: datetime | None = None,
|
||||
) -> list[str]:
|
||||
) -> QuerySet[Exposure]:
|
||||
qs = Exposure.objects.filter(
|
||||
experiment_id=experiment_id,
|
||||
variant_id=variant_id,
|
||||
@@ -25,17 +58,24 @@ def _exposure_decision_ids(
|
||||
qs = qs.filter(timestamp__gte=start_date)
|
||||
if end_date:
|
||||
qs = qs.filter(timestamp__lt=end_date)
|
||||
return list(qs.values_list("decision_id", flat=True))
|
||||
return qs
|
||||
|
||||
|
||||
def _count_events(
|
||||
decision_ids: list[str],
|
||||
def _exposure_decision_ids_subquery(
|
||||
exposure_qs: QuerySet[Exposure],
|
||||
):
|
||||
return Subquery(exposure_qs.values("decision_id"))
|
||||
|
||||
|
||||
def _events_queryset(
|
||||
*,
|
||||
exposure_qs: QuerySet[Exposure],
|
||||
event_type_name: str,
|
||||
start_date: datetime | None = None,
|
||||
end_date: datetime | None = None,
|
||||
) -> int:
|
||||
) -> QuerySet[Event]:
|
||||
qs = Event.objects.filter(
|
||||
decision_id__in=decision_ids,
|
||||
decision_id__in=_exposure_decision_ids_subquery(exposure_qs),
|
||||
event_type__name=event_type_name,
|
||||
is_attributed=True,
|
||||
)
|
||||
@@ -43,72 +83,101 @@ def _count_events(
|
||||
qs = qs.filter(timestamp__gte=start_date)
|
||||
if end_date:
|
||||
qs = qs.filter(timestamp__lt=end_date)
|
||||
return qs
|
||||
|
||||
|
||||
def _numeric_property_expression(property_field: str):
|
||||
if connection.vendor == "postgresql":
|
||||
key_text = KeyTextTransform(property_field, "properties")
|
||||
pattern = r"^-?(?:\d+(?:\.\d+)?|\.\d+)$"
|
||||
return Case(
|
||||
When(
|
||||
**{
|
||||
f"properties__{property_field}__regex": pattern
|
||||
},
|
||||
then=Cast(key_text, FloatField()),
|
||||
),
|
||||
default=Value(None),
|
||||
output_field=FloatField(),
|
||||
)
|
||||
return Cast(F(f"properties__{property_field}"), FloatField())
|
||||
|
||||
|
||||
def _count_events(
|
||||
*,
|
||||
exposure_qs: QuerySet[Exposure],
|
||||
event_type_name: str,
|
||||
start_date: datetime | None = None,
|
||||
end_date: datetime | None = None,
|
||||
) -> int:
|
||||
qs = _events_queryset(
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=event_type_name,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
)
|
||||
return qs.count()
|
||||
|
||||
|
||||
def _average_property(
|
||||
decision_ids: list[str],
|
||||
*,
|
||||
exposure_qs: QuerySet[Exposure],
|
||||
event_type_name: str,
|
||||
property_field: str,
|
||||
start_date: datetime | None = None,
|
||||
end_date: datetime | None = None,
|
||||
) -> Decimal | None:
|
||||
qs = Event.objects.filter(
|
||||
decision_id__in=decision_ids,
|
||||
event_type__name=event_type_name,
|
||||
is_attributed=True,
|
||||
qs = _events_queryset(
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=event_type_name,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
).annotate(
|
||||
numeric_value=_numeric_property_expression(property_field),
|
||||
)
|
||||
if start_date:
|
||||
qs = qs.filter(timestamp__gte=start_date)
|
||||
if end_date:
|
||||
qs = qs.filter(timestamp__lt=end_date)
|
||||
|
||||
values = []
|
||||
for props in qs.values_list("properties", flat=True):
|
||||
if isinstance(props, dict) and property_field in props:
|
||||
try:
|
||||
values.append(float(props[property_field]))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
|
||||
if not values:
|
||||
value = qs.aggregate(value=Avg("numeric_value"))["value"]
|
||||
if value is None:
|
||||
return None
|
||||
return Decimal(str(sum(values) / len(values)))
|
||||
return Decimal(str(value))
|
||||
|
||||
|
||||
def _percentile_property(
|
||||
decision_ids: list[str],
|
||||
*,
|
||||
exposure_qs: QuerySet[Exposure],
|
||||
event_type_name: str,
|
||||
property_field: str,
|
||||
percentile: int,
|
||||
start_date: datetime | None = None,
|
||||
end_date: datetime | None = None,
|
||||
) -> Decimal | None:
|
||||
qs = Event.objects.filter(
|
||||
decision_id__in=decision_ids,
|
||||
event_type__name=event_type_name,
|
||||
is_attributed=True,
|
||||
)
|
||||
if start_date:
|
||||
qs = qs.filter(timestamp__gte=start_date)
|
||||
if end_date:
|
||||
qs = qs.filter(timestamp__lt=end_date)
|
||||
qs = _events_queryset(
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=event_type_name,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
).annotate(
|
||||
numeric_value=_numeric_property_expression(property_field),
|
||||
).exclude(numeric_value__isnull=True)
|
||||
if connection.vendor == "postgresql":
|
||||
value = qs.aggregate(
|
||||
value=PercentileCont(
|
||||
"numeric_value",
|
||||
Decimal(percentile) / Decimal(100),
|
||||
)
|
||||
)["value"]
|
||||
if value is None:
|
||||
return None
|
||||
return Decimal(str(value))
|
||||
|
||||
values = []
|
||||
for props in qs.values_list("properties", flat=True):
|
||||
if isinstance(props, dict) and property_field in props:
|
||||
try:
|
||||
values.append(float(props[property_field]))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
|
||||
if not values:
|
||||
total = qs.aggregate(total=Count("pk"))["total"]
|
||||
if not total:
|
||||
return None
|
||||
|
||||
values.sort()
|
||||
idx = int(len(values) * percentile / 100)
|
||||
idx = min(idx, len(values) - 1)
|
||||
return Decimal(str(values[idx]))
|
||||
idx = min(int(total * percentile / 100), total - 1)
|
||||
value = qs.order_by("numeric_value").values_list(
|
||||
"numeric_value",
|
||||
flat=True,
|
||||
)[idx]
|
||||
return Decimal(str(value))
|
||||
|
||||
|
||||
def calculate_metric_value(
|
||||
@@ -121,14 +190,14 @@ def calculate_metric_value(
|
||||
event_end_date: datetime | None = None,
|
||||
) -> Decimal | None:
|
||||
rule = metric.calculation_rule
|
||||
decision_ids = _exposure_decision_ids(
|
||||
exposure_qs = _exposure_queryset(
|
||||
experiment_id,
|
||||
variant_id,
|
||||
start_date,
|
||||
end_date,
|
||||
)
|
||||
|
||||
if not decision_ids:
|
||||
if not exposure_qs.exists():
|
||||
return None
|
||||
|
||||
ev_start = event_start_date or start_date
|
||||
@@ -138,16 +207,16 @@ def calculate_metric_value(
|
||||
|
||||
if metric_type == MetricType.RATIO:
|
||||
numerator = _count_events(
|
||||
decision_ids,
|
||||
rule["numerator_event"],
|
||||
ev_start,
|
||||
ev_end,
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=rule["numerator_event"],
|
||||
start_date=ev_start,
|
||||
end_date=ev_end,
|
||||
)
|
||||
denominator = _count_events(
|
||||
decision_ids,
|
||||
rule["denominator_event"],
|
||||
ev_start,
|
||||
ev_end,
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=rule["denominator_event"],
|
||||
start_date=ev_start,
|
||||
end_date=ev_end,
|
||||
)
|
||||
if denominator == 0:
|
||||
return None
|
||||
@@ -155,30 +224,30 @@ def calculate_metric_value(
|
||||
|
||||
if metric_type == MetricType.COUNT:
|
||||
count = _count_events(
|
||||
decision_ids,
|
||||
rule["event"],
|
||||
ev_start,
|
||||
ev_end,
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=rule["event"],
|
||||
start_date=ev_start,
|
||||
end_date=ev_end,
|
||||
)
|
||||
return Decimal(str(count))
|
||||
|
||||
if metric_type == MetricType.AVERAGE:
|
||||
return _average_property(
|
||||
decision_ids,
|
||||
rule["event"],
|
||||
rule["property"],
|
||||
ev_start,
|
||||
ev_end,
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=rule["event"],
|
||||
property_field=rule["property"],
|
||||
start_date=ev_start,
|
||||
end_date=ev_end,
|
||||
)
|
||||
|
||||
if metric_type == MetricType.PERCENTILE:
|
||||
return _percentile_property(
|
||||
decision_ids,
|
||||
rule["event"],
|
||||
rule["property"],
|
||||
rule.get("percentile", 95),
|
||||
ev_start,
|
||||
ev_end,
|
||||
exposure_qs=exposure_qs,
|
||||
event_type_name=rule["event"],
|
||||
property_field=rule["property"],
|
||||
percentile=rule.get("percentile", 95),
|
||||
start_date=ev_start,
|
||||
end_date=ev_end,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
Reference in New Issue
Block a user