test(integration): added integration tests

This commit is contained in:
ITQ
2026-02-23 11:48:03 +03:00
parent 3003ac888b
commit 15d80532e0
8 changed files with 1456 additions and 0 deletions
+5
View File
@@ -51,6 +51,11 @@ mypy:
test: test:
@ uv run python manage.py test @ uv run python manage.py test
# run integration tests
[group('test')]
test-integration:
@ uv run python manage.py test tests.integration
# run tests with coverage report # run tests with coverage report
[group('test')] [group('test')]
test-coverage: test-coverage:
View File
@@ -0,0 +1,231 @@
import json
import uuid
from decimal import Decimal
from typing import override
from django.core.cache import cache
from django.test import Client, TestCase
from django.urls import reverse
from django.utils import timezone
from apps.events.tests.helpers import make_event_type, make_exposure_type
from apps.experiments.services import (
experiment_approve,
experiment_start,
experiment_submit_for_review,
)
from apps.experiments.tests.helpers import add_two_variants, make_experiment
from apps.metrics.services import (
experiment_metric_add,
metric_definition_create,
)
from apps.reviews.services import review_settings_update
from apps.reviews.tests.helpers import make_approver, make_experimenter
class APIContractFlowTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
self.client = Client()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
owner = make_experimenter("_api")
approver = make_approver("_api")
self.experiment = make_experiment(
owner=owner,
suffix="_api",
traffic_allocation=Decimal("100.00"),
)
add_two_variants(self.experiment)
self.metric = metric_definition_create(
key="ctr_api",
name="CTR",
metric_type="ratio",
direction="higher_is_better",
calculation_rule={
"numerator_event": "api_click",
"denominator_event": "api_exposure",
},
)
experiment_metric_add(
experiment=self.experiment,
metric=self.metric,
is_primary=True,
)
make_exposure_type(name="api_exposure")
make_event_type(
name="api_click",
display_name="Click",
requires_exposure=True,
)
self.experiment = experiment_submit_for_review(
experiment=self.experiment, user=owner
)
self.experiment = experiment_approve(
experiment=self.experiment, approver=approver
)
self.experiment = experiment_start(
experiment=self.experiment, user=owner
)
def test_decide_to_events_to_report_via_http(self) -> None:
decide_resp = self.client.post(
reverse("api-1:decide"),
data=json.dumps(
{
"subject_id": "api_user_1",
"flags": [self.experiment.flag.key],
"subject_attributes": {},
}
),
content_type="application/json",
)
self.assertEqual(decide_resp.status_code, 200)
decide_data = decide_resp.json()
self.assertEqual(len(decide_data["decisions"]), 1)
decision = decide_data["decisions"][0]
self.assertEqual(decision["reason"], "experiment_assigned")
self.assertIsNotNone(decision["decision_id"])
self.assertIsNotNone(decision["variant_id"])
now = timezone.now().isoformat()
events_resp = self.client.post(
reverse("api-1:ingest_events"),
data=json.dumps(
{
"events": [
{
"event_id": "api_exp_1",
"event_type": "api_exposure",
"decision_id": decision["decision_id"],
"subject_id": "api_user_1",
"timestamp": now,
"properties": {},
},
{
"event_id": "api_click_1",
"event_type": "api_click",
"decision_id": decision["decision_id"],
"subject_id": "api_user_1",
"timestamp": now,
"properties": {},
},
]
}
),
content_type="application/json",
)
self.assertEqual(events_resp.status_code, 200)
events_data = events_resp.json()
self.assertEqual(events_data["accepted"], 2)
self.assertEqual(events_data["rejected"], 0)
self.assertEqual(events_data["duplicates"], 0)
report_resp = self.client.get(
reverse(
"api-1:get_experiment_report",
args=[self.experiment.pk],
),
)
self.assertEqual(report_resp.status_code, 200)
report = report_resp.json()
self.assertEqual(report["experiment_id"], str(self.experiment.pk))
total_exposures = sum(v["exposures"] for v in report["variants"])
self.assertEqual(total_exposures, 1)
self.assertEqual(len(report["variants"]), 2)
for variant in report["variants"]:
if variant["exposures"] > 0:
self.assertEqual(len(variant["metrics"]), 1)
self.assertEqual(
variant["metrics"][0]["metric_key"], "ctr_api"
)
def test_decide_returns_all_requested_flags(self) -> None:
resp = self.client.post(
reverse("api-1:decide"),
data=json.dumps(
{
"subject_id": "api_user_2",
"flags": [
self.experiment.flag.key,
"nonexistent_flag",
],
}
),
content_type="application/json",
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(len(data["decisions"]), 2)
reasons = {d["flag"]: d["reason"] for d in data["decisions"]}
self.assertEqual(
reasons[self.experiment.flag.key], "experiment_assigned"
)
self.assertEqual(reasons["nonexistent_flag"], "flag_not_found")
def test_events_dedup_via_http(self) -> None:
cache.clear()
decide_resp = self.client.post(
reverse("api-1:decide"),
data=json.dumps(
{
"subject_id": "api_user_3",
"flags": [self.experiment.flag.key],
}
),
content_type="application/json",
)
decision = decide_resp.json()["decisions"][0]
now = timezone.now().isoformat()
event_payload = json.dumps(
{
"events": [
{
"event_id": "api_ddup_1",
"event_type": "api_exposure",
"decision_id": decision["decision_id"],
"subject_id": "api_user_3",
"timestamp": now,
"properties": {},
}
]
}
)
r1 = self.client.post(
reverse("api-1:ingest_events"),
data=event_payload,
content_type="application/json",
)
self.assertEqual(r1.json()["accepted"], 1)
r2 = self.client.post(
reverse("api-1:ingest_events"),
data=event_payload,
content_type="application/json",
)
self.assertEqual(r2.json()["duplicates"], 1)
self.assertEqual(r2.json()["accepted"], 0)
def test_report_404_for_unknown_experiment(self) -> None:
resp = self.client.get(
reverse(
"api-1:get_experiment_report",
args=[uuid.uuid4()],
),
)
self.assertEqual(resp.status_code, 404)
@@ -0,0 +1,323 @@
from decimal import Decimal
from typing import override
from django.core.cache import cache
from django.test import TestCase
from django.utils import timezone
from apps.decision.services import decide_for_flag
from apps.events.models import Exposure, PendingEvent
from apps.events.services import process_events_batch
from apps.events.tests.helpers import make_event_type, make_exposure_type
from apps.experiments.services import (
experiment_approve,
experiment_start,
experiment_submit_for_review,
)
from apps.experiments.tests.helpers import add_two_variants, make_experiment
from apps.metrics.services import (
experiment_metric_add,
metric_definition_create,
)
from apps.reports.services import build_experiment_report
from apps.reviews.services import review_settings_update
from apps.reviews.tests.helpers import make_approver, make_experimenter
class OutOfOrderAttributionTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_ooa")
self.approver = make_approver("_ooa")
self.experiment = make_experiment(
owner=self.owner,
suffix="_ooa",
traffic_allocation=Decimal("100.00"),
)
add_two_variants(self.experiment)
self.metric = metric_definition_create(
key="conv_ooa",
name="Conversion",
metric_type="count",
direction="higher_is_better",
calculation_rule={"event": "ooa_purchase"},
)
experiment_metric_add(
experiment=self.experiment,
metric=self.metric,
is_primary=True,
)
make_exposure_type(name="ooa_exposure")
make_event_type(
name="ooa_purchase",
display_name="Purchase",
requires_exposure=True,
)
self.experiment = experiment_submit_for_review(
experiment=self.experiment, user=self.owner
)
self.experiment = experiment_approve(
experiment=self.experiment, approver=self.approver
)
self.experiment = experiment_start(
experiment=self.experiment, user=self.owner
)
def test_conversion_before_exposure_goes_to_pending(self) -> None:
cache.clear()
d = decide_for_flag("flag_ooa", "user_ooa_1", {})
now = timezone.now().isoformat()
result = process_events_batch(
[
{
"event_id": "ooa_conv_1",
"event_type": "ooa_purchase",
"decision_id": d["decision_id"],
"subject_id": "user_ooa_1",
"timestamp": now,
"properties": {},
}
]
)
self.assertEqual(result.accepted, 1)
self.assertTrue(
PendingEvent.objects.filter(event_id="ooa_conv_1").exists()
)
def test_pending_event_promoted_on_exposure_arrival(self) -> None:
cache.clear()
d = decide_for_flag("flag_ooa", "user_ooa_2", {})
now = timezone.now().isoformat()
process_events_batch(
[
{
"event_id": "ooa_conv_2",
"event_type": "ooa_purchase",
"decision_id": d["decision_id"],
"subject_id": "user_ooa_2",
"timestamp": now,
"properties": {},
}
]
)
self.assertTrue(
PendingEvent.objects.filter(event_id="ooa_conv_2").exists()
)
result = process_events_batch(
[
{
"event_id": "ooa_exp_2",
"event_type": "ooa_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_ooa_2",
"timestamp": now,
"properties": {},
}
]
)
self.assertEqual(result.accepted, 1)
self.assertFalse(
PendingEvent.objects.filter(event_id="ooa_conv_2").exists()
)
def test_promoted_event_appears_in_report(self) -> None:
cache.clear()
d = decide_for_flag("flag_ooa", "user_ooa_3", {})
now = timezone.now().isoformat()
process_events_batch(
[
{
"event_id": "ooa_conv_3",
"event_type": "ooa_purchase",
"decision_id": d["decision_id"],
"subject_id": "user_ooa_3",
"timestamp": now,
"properties": {},
}
]
)
process_events_batch(
[
{
"event_id": "ooa_exp_3",
"event_type": "ooa_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_ooa_3",
"timestamp": now,
"properties": {},
}
]
)
report = build_experiment_report(self.experiment)
total_exposures = sum(v["exposures"] for v in report["variants"])
self.assertEqual(total_exposures, 1)
class EventDeduplicationTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_ded")
self.approver = make_approver("_ded")
self.experiment = make_experiment(
owner=self.owner,
suffix="_ded",
traffic_allocation=Decimal("100.00"),
)
add_two_variants(self.experiment)
make_exposure_type(name="ded_exposure")
self.experiment = experiment_submit_for_review(
experiment=self.experiment, user=self.owner
)
self.experiment = experiment_approve(
experiment=self.experiment, approver=self.approver
)
self.experiment = experiment_start(
experiment=self.experiment, user=self.owner
)
def test_duplicate_exposure_not_counted_twice(self) -> None:
cache.clear()
d = decide_for_flag("flag_ded", "user_dup", {})
now = timezone.now().isoformat()
event = {
"event_id": "ded_same_id",
"event_type": "ded_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_dup",
"timestamp": now,
"properties": {},
}
r1 = process_events_batch([event])
self.assertEqual(r1.accepted, 1)
r2 = process_events_batch([event])
self.assertEqual(r2.duplicates, 1)
self.assertEqual(r2.accepted, 0)
exposures = Exposure.objects.filter(
decision_id=d["decision_id"],
)
self.assertEqual(exposures.count(), 1)
def test_deduplication_prevents_metric_inflation(self) -> None:
cache.clear()
d = decide_for_flag("flag_ded", "user_infl", {})
now = timezone.now().isoformat()
event = {
"event_id": "ded_infl_id",
"event_type": "ded_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_infl",
"timestamp": now,
"properties": {},
}
process_events_batch([event])
process_events_batch([event])
process_events_batch([event])
report = build_experiment_report(self.experiment)
total_exposures = sum(v["exposures"] for v in report["variants"])
self.assertEqual(total_exposures, 1)
class ConversionWithoutExposureTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_cwe")
self.approver = make_approver("_cwe")
self.experiment = make_experiment(
owner=self.owner,
suffix="_cwe",
traffic_allocation=Decimal("100.00"),
)
add_two_variants(self.experiment)
self.metric = metric_definition_create(
key="conv_cwe",
name="Conversion",
metric_type="count",
direction="higher_is_better",
calculation_rule={"event": "cwe_purchase"},
)
experiment_metric_add(
experiment=self.experiment,
metric=self.metric,
is_primary=True,
)
make_exposure_type(name="cwe_exposure")
make_event_type(
name="cwe_purchase",
display_name="Purchase",
requires_exposure=True,
)
self.experiment = experiment_submit_for_review(
experiment=self.experiment, user=self.owner
)
self.experiment = experiment_approve(
experiment=self.experiment, approver=self.approver
)
self.experiment = experiment_start(
experiment=self.experiment, user=self.owner
)
def test_unresolved_pending_event_not_in_report(self) -> None:
cache.clear()
d = decide_for_flag("flag_cwe", "user_no_exp", {})
now = timezone.now().isoformat()
process_events_batch(
[
{
"event_id": "cwe_conv_only",
"event_type": "cwe_purchase",
"decision_id": d["decision_id"],
"subject_id": "user_no_exp",
"timestamp": now,
"properties": {},
}
]
)
self.assertTrue(
PendingEvent.objects.filter(event_id="cwe_conv_only").exists()
)
report = build_experiment_report(self.experiment)
total_exposures = sum(v["exposures"] for v in report["variants"])
self.assertEqual(total_exposures, 0)
@@ -0,0 +1,290 @@
from decimal import Decimal
from typing import override
from django.core.cache import cache
from django.test import TestCase
from django.utils import timezone
from apps.decision.services import decide_for_flag
from apps.events.services import process_events_batch
from apps.events.tests.helpers import make_event_type, make_exposure_type
from apps.experiments.models import ExperimentOutcome, ExperimentStatus
from apps.experiments.services import (
experiment_approve,
experiment_start,
experiment_submit_for_review,
)
from apps.experiments.tests.helpers import add_two_variants, make_experiment
from apps.guardrails.models import GuardrailAction, GuardrailTrigger
from apps.guardrails.services import (
check_all_running_experiments,
check_experiment_guardrails,
guardrail_create,
)
from apps.metrics.services import (
experiment_metric_add,
metric_definition_create,
)
from apps.reviews.services import review_settings_update
from apps.reviews.tests.helpers import make_approver, make_experimenter
def _start_experiment(owner, approver, suffix, traffic=Decimal("100.00")):
experiment = make_experiment(
owner=owner,
suffix=suffix,
traffic_allocation=traffic,
)
add_two_variants(experiment)
experiment = experiment_submit_for_review(
experiment=experiment, user=owner
)
experiment = experiment_approve(experiment=experiment, approver=approver)
return experiment_start(experiment=experiment, user=owner)
class GuardrailPauseIntegrationTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_gpi")
self.approver = make_approver("_gpi")
self.experiment = _start_experiment(self.owner, self.approver, "_gpi")
self.error_metric = metric_definition_create(
key="error_rate_gpi",
name="Error Rate",
metric_type="ratio",
direction="lower_is_better",
calculation_rule={
"numerator_event": "gpi_error",
"denominator_event": "gpi_exposure",
},
)
experiment_metric_add(
experiment=self.experiment,
metric=self.error_metric,
)
make_exposure_type(name="gpi_exposure")
make_event_type(name="gpi_error", display_name="Error")
guardrail_create(
experiment=self.experiment,
metric=self.error_metric,
threshold=Decimal("0.10"),
observation_window_minutes=60,
action=GuardrailAction.PAUSE,
)
def test_guardrail_pauses_experiment_on_threshold_breach(self) -> None:
now = timezone.now().isoformat()
cache.clear()
d = decide_for_flag("flag_gpi", "user_gp1", {})
self.assertEqual(d["reason"], "experiment_assigned")
process_events_batch(
[
{
"event_id": "gpi_exp_1",
"event_type": "gpi_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_gp1",
"timestamp": now,
"properties": {},
},
{
"event_id": "gpi_err_1",
"event_type": "gpi_error",
"decision_id": d["decision_id"],
"subject_id": "user_gp1",
"timestamp": now,
"properties": {},
},
]
)
triggers = check_experiment_guardrails(self.experiment)
self.assertTrue(len(triggers) > 0)
self.experiment.refresh_from_db()
self.assertEqual(self.experiment.status, ExperimentStatus.PAUSED)
self.assertTrue(
GuardrailTrigger.objects.filter(
experiment=self.experiment
).exists()
)
def test_no_trigger_when_metric_below_threshold(self) -> None:
now = timezone.now().isoformat()
cache.clear()
decisions = []
for i in range(10):
cache.clear()
d = decide_for_flag("flag_gpi", f"user_ok_{i}", {})
decisions.append(d)
events = [
{
"event_id": f"gpi_ok_exp_{i}",
"event_type": "gpi_exposure",
"decision_id": d["decision_id"],
"subject_id": f"user_ok_{i}",
"timestamp": now,
"properties": {},
}
for i, d in enumerate(decisions)
]
process_events_batch(events)
triggers = check_experiment_guardrails(self.experiment)
self.assertEqual(len(triggers), 0)
self.experiment.refresh_from_db()
self.assertEqual(self.experiment.status, ExperimentStatus.RUNNING)
class GuardrailRollbackIntegrationTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_gri")
self.approver = make_approver("_gri")
self.experiment = _start_experiment(self.owner, self.approver, "_gri")
self.crash_metric = metric_definition_create(
key="crash_rate_gri",
name="Crash Rate",
metric_type="ratio",
direction="lower_is_better",
calculation_rule={
"numerator_event": "gri_crash",
"denominator_event": "gri_exposure",
},
)
experiment_metric_add(
experiment=self.experiment,
metric=self.crash_metric,
)
make_exposure_type(name="gri_exposure")
make_event_type(name="gri_crash", display_name="Crash")
guardrail_create(
experiment=self.experiment,
metric=self.crash_metric,
threshold=Decimal("0.05"),
action=GuardrailAction.ROLLBACK,
)
def test_rollback_completes_experiment_with_control_winner(self) -> None:
now = timezone.now().isoformat()
cache.clear()
d = decide_for_flag("flag_gri", "user_rb1", {})
process_events_batch(
[
{
"event_id": "gri_exp_1",
"event_type": "gri_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_rb1",
"timestamp": now,
"properties": {},
},
{
"event_id": "gri_crash_1",
"event_type": "gri_crash",
"decision_id": d["decision_id"],
"subject_id": "user_rb1",
"timestamp": now,
"properties": {},
},
]
)
triggers = check_experiment_guardrails(self.experiment)
self.assertTrue(len(triggers) > 0)
self.experiment.refresh_from_db()
self.assertEqual(self.experiment.status, ExperimentStatus.COMPLETED)
outcome = ExperimentOutcome.objects.get(experiment=self.experiment)
self.assertEqual(outcome.outcome, "rollback")
class GuardrailCheckAllTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_gca")
self.approver = make_approver("_gca")
self.exp1 = _start_experiment(self.owner, self.approver, "_gca1")
self.exp2 = _start_experiment(self.owner, self.approver, "_gca2")
self.metric = metric_definition_create(
key="err_gca",
name="Error",
metric_type="ratio",
direction="lower_is_better",
calculation_rule={
"numerator_event": "gca_error",
"denominator_event": "gca_exposure",
},
)
for exp in (self.exp1, self.exp2):
experiment_metric_add(experiment=exp, metric=self.metric)
guardrail_create(
experiment=exp,
metric=self.metric,
threshold=Decimal("0.10"),
action=GuardrailAction.PAUSE,
)
make_exposure_type(name="gca_exposure")
make_event_type(name="gca_error", display_name="Error")
def test_check_all_processes_multiple_experiments(self) -> None:
now = timezone.now().isoformat()
for suffix, _ in [("gca1", self.exp1), ("gca2", self.exp2)]:
cache.clear()
d = decide_for_flag(f"flag_{suffix}", "user_ca", {})
process_events_batch(
[
{
"event_id": f"{suffix}_exp",
"event_type": "gca_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_ca",
"timestamp": now,
"properties": {},
},
{
"event_id": f"{suffix}_err",
"event_type": "gca_error",
"decision_id": d["decision_id"],
"subject_id": "user_ca",
"timestamp": now,
"properties": {},
},
]
)
results = check_all_running_experiments()
self.assertEqual(results["checked"], 2)
self.assertGreaterEqual(results["triggered"], 1)
@@ -0,0 +1,218 @@
from decimal import Decimal
from typing import override
from django.core.cache import cache
from django.test import TestCase
from django.utils import timezone
from apps.decision.services import decide_for_flag
from apps.events.services import process_events_batch
from apps.events.tests.helpers import make_event_type, make_exposure_type
from apps.experiments.models import ExperimentStatus
from apps.experiments.services import (
experiment_approve,
experiment_complete,
experiment_create,
experiment_start,
experiment_submit_for_review,
variant_create,
)
from apps.experiments.tests.helpers import (
add_two_variants,
make_experiment,
make_flag,
)
from apps.metrics.services import (
experiment_metric_add,
metric_definition_create,
)
from apps.reports.services import build_experiment_report
from apps.reviews.services import review_settings_update
from apps.reviews.tests.helpers import make_approver, make_experimenter
class FullHappyPathTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_hp")
self.approver = make_approver("_hp")
self.experiment = make_experiment(
owner=self.owner,
suffix="_hp",
traffic_allocation=Decimal("100.00"),
)
self.v_control, self.v_treatment = add_two_variants(self.experiment)
self.metric = metric_definition_create(
key="ctr_hp",
name="CTR",
metric_type="ratio",
direction="higher_is_better",
calculation_rule={
"numerator_event": "hp_click",
"denominator_event": "hp_exposure",
},
)
experiment_metric_add(
experiment=self.experiment,
metric=self.metric,
is_primary=True,
)
make_exposure_type(name="hp_exposure")
make_event_type(
name="hp_click",
display_name="Click",
requires_exposure=True,
)
self.experiment = experiment_submit_for_review(
experiment=self.experiment, user=self.owner
)
self.experiment = experiment_approve(
experiment=self.experiment, approver=self.approver
)
self.experiment = experiment_start(
experiment=self.experiment, user=self.owner
)
def test_full_decide_event_report_flow(self) -> None:
decisions = []
for i in range(10):
cache.clear()
d = decide_for_flag("flag_hp", f"user_{i}", {"country": "US"})
self.assertEqual(d["reason"], "experiment_assigned")
self.assertIsNotNone(d["variant_id"])
decisions.append(d)
now = timezone.now().isoformat()
exposure_events = [
{
"event_id": f"hp_exp_{i}",
"event_type": "hp_exposure",
"decision_id": d["decision_id"],
"subject_id": f"user_{i}",
"timestamp": now,
"properties": {},
}
for i, d in enumerate(decisions)
]
result = process_events_batch(exposure_events)
self.assertEqual(result.accepted, 10)
click_events = [
{
"event_id": f"hp_click_{i}",
"event_type": "hp_click",
"decision_id": d["decision_id"],
"subject_id": f"user_{i}",
"timestamp": now,
"properties": {},
}
for i, d in enumerate(decisions[:5])
]
result = process_events_batch(click_events)
self.assertEqual(result.accepted, 5)
report = build_experiment_report(self.experiment)
self.assertEqual(str(report["experiment_id"]), str(self.experiment.pk))
total_exposures = sum(v["exposures"] for v in report["variants"])
self.assertEqual(total_exposures, 10)
def test_lifecycle_with_rollout_outcome(self) -> None:
cache.clear()
d = decide_for_flag("flag_hp", "subject_1", {})
self.assertEqual(d["reason"], "experiment_assigned")
self.experiment = experiment_complete(
experiment=self.experiment,
user=self.owner,
outcome="rollout",
rationale="Treatment wins",
winning_variant_id=str(self.v_treatment.pk),
)
self.assertEqual(self.experiment.status, ExperimentStatus.COMPLETED)
def test_decide_returns_default_after_complete(self) -> None:
self.experiment = experiment_complete(
experiment=self.experiment,
user=self.owner,
outcome="no_effect",
rationale="No significant difference",
)
cache.clear()
d = decide_for_flag("flag_hp", "subject_2", {})
self.assertEqual(d["reason"], "no_active_experiment")
self.assertEqual(d["value"], "a")
def test_targeting_mismatch_returns_default(self) -> None:
owner = make_experimenter("_tm")
approver = make_approver("_tm")
flag = make_flag(suffix="_tm", default="a")
exp = experiment_create(
flag=flag,
name="Targeting Test",
owner=owner,
traffic_allocation=Decimal("100.00"),
targeting_rules='country IN ["DE"]',
)
variant_create(
experiment=exp,
user=owner,
name="control",
value="a",
weight=Decimal("50.00"),
is_control=True,
)
variant_create(
experiment=exp,
user=owner,
name="treatment",
value="b",
weight=Decimal("50.00"),
)
exp = experiment_submit_for_review(experiment=exp, user=owner)
exp = experiment_approve(experiment=exp, approver=approver)
exp = experiment_start(experiment=exp, user=owner)
cache.clear()
d = decide_for_flag("flag_tm", "subject_3", {"country": "US"})
self.assertEqual(d["reason"], "targeting_mismatch")
self.assertEqual(d["value"], "a")
def test_report_with_period_filter(self) -> None:
cache.clear()
d = decide_for_flag("flag_hp", "user_rp", {})
now = timezone.now()
process_events_batch(
[
{
"event_id": "hp_rp_exp",
"event_type": "hp_exposure",
"decision_id": d["decision_id"],
"subject_id": "user_rp",
"timestamp": now.isoformat(),
"properties": {},
}
]
)
future = now + timezone.timedelta(hours=1)
report = build_experiment_report(
self.experiment,
start_date=future,
end_date=future + timezone.timedelta(hours=1),
)
total_exposures = sum(v["exposures"] for v in report["variants"])
self.assertEqual(total_exposures, 0)
@@ -0,0 +1,389 @@
from decimal import Decimal
from typing import override
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.test import TestCase
from django.utils import timezone
from apps.events.services import (
decision_create,
event_type_create,
process_events_batch,
)
from apps.experiments.models import ExperimentStatus
from apps.experiments.services import (
experiment_approve,
experiment_complete,
experiment_create,
experiment_start,
experiment_submit_for_review,
variant_create,
)
from apps.experiments.tests.helpers import add_two_variants, make_flag
from apps.reviews.services import (
approver_group_create,
review_settings_update,
)
from apps.reviews.tests.helpers import (
make_approver,
make_experimenter,
make_viewer,
)
from config.errors import ForbiddenError
class InvalidLifecycleTransitionsTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
self.owner = make_experimenter("_ilt")
self.approver = make_approver("_ilt")
def test_cannot_start_draft_experiment(self) -> None:
flag = make_flag(suffix="_nsd", default="d")
experiment = experiment_create(
flag=flag,
name="Draft Start",
owner=self.owner,
traffic_allocation=Decimal("100.00"),
)
add_two_variants(experiment)
with self.assertRaises(ValidationError) as ctx:
experiment_start(experiment=experiment, user=self.owner)
self.assertIn("status", ctx.exception.message_dict)
def test_cannot_start_without_enough_approvals(self) -> None:
flag = make_flag(suffix="_nsa", default="d")
experiment = experiment_create(
flag=flag,
name="No Approval Start",
owner=self.owner,
traffic_allocation=Decimal("100.00"),
)
add_two_variants(experiment)
review_settings_update(
default_min_approvals=2,
allow_any_approver=True,
)
experiment = experiment_submit_for_review(
experiment=experiment, user=self.owner
)
experiment = experiment_approve(
experiment=experiment, approver=self.approver
)
self.assertEqual(experiment.status, ExperimentStatus.IN_REVIEW)
with self.assertRaises(ValidationError):
experiment_start(experiment=experiment, user=self.owner)
def test_cannot_submit_without_variants(self) -> None:
flag = make_flag(suffix="_nsv", default="d")
experiment = experiment_create(
flag=flag,
name="No Variants",
owner=self.owner,
)
with self.assertRaises(ValidationError) as ctx:
experiment_submit_for_review(
experiment=experiment, user=self.owner
)
self.assertIn("variants", ctx.exception.message_dict)
def test_cannot_complete_without_rationale(self) -> None:
flag = make_flag(suffix="_ncr", default="d")
experiment = experiment_create(
flag=flag,
name="No Rationale",
owner=self.owner,
traffic_allocation=Decimal("100.00"),
)
add_two_variants(experiment)
experiment = experiment_submit_for_review(
experiment=experiment, user=self.owner
)
experiment = experiment_approve(
experiment=experiment, approver=self.approver
)
experiment = experiment_start(experiment=experiment, user=self.owner)
with self.assertRaises(ValidationError) as ctx:
experiment_complete(
experiment=experiment,
user=self.owner,
outcome="no_effect",
rationale="",
)
self.assertIn("rationale", ctx.exception.message_dict)
def test_cannot_run_two_experiments_on_same_flag(self) -> None:
flag = make_flag(suffix="_dup", default="d")
exp1 = experiment_create(
flag=flag,
name="Exp1",
owner=self.owner,
traffic_allocation=Decimal("100.00"),
)
add_two_variants(exp1)
exp1 = experiment_submit_for_review(experiment=exp1, user=self.owner)
exp1 = experiment_approve(experiment=exp1, approver=self.approver)
exp1 = experiment_start(experiment=exp1, user=self.owner)
exp2 = experiment_create(
flag=flag,
name="Exp2",
owner=self.owner,
traffic_allocation=Decimal("100.00"),
)
add_two_variants(exp2)
exp2 = experiment_submit_for_review(experiment=exp2, user=self.owner)
exp2 = experiment_approve(experiment=exp2, approver=self.approver)
with self.assertRaises(ValidationError) as ctx:
experiment_start(experiment=exp2, user=self.owner)
self.assertIn("flag", ctx.exception.message_dict)
class ReviewPolicyEnforcementTest(TestCase):
@override
def setUp(self) -> None:
cache.clear()
def test_approver_group_restricts_who_can_approve(self) -> None:
review_settings_update(
default_min_approvals=1,
allow_any_approver=False,
)
owner = make_experimenter("_rpea")
approved_approver = make_approver("_rpea1")
unauthorized_approver = make_approver("_rpea2")
approver_group_create(
experimenter=owner,
approver_ids=[approved_approver.pk],
min_approvals=1,
)
flag = make_flag(suffix="_rpea", default="d")
experiment = experiment_create(
flag=flag,
name="Restricted Approval",
owner=owner,
traffic_allocation=Decimal("100.00"),
)
add_two_variants(experiment)
experiment = experiment_submit_for_review(
experiment=experiment, user=owner
)
with self.assertRaises(ValidationError):
experiment_approve(
experiment=experiment,
approver=unauthorized_approver,
)
experiment = experiment_approve(
experiment=experiment,
approver=approved_approver,
)
self.assertEqual(experiment.status, ExperimentStatus.APPROVED)
def test_non_owner_cannot_submit_for_review(self) -> None:
viewer = make_viewer("_voe")
flag = make_flag(suffix="_voe", default="d")
experiment = experiment_create(
flag=flag,
name="Viewer Experiment",
owner=viewer,
traffic_allocation=Decimal("100.00"),
)
other_user = make_experimenter("_voe2")
with self.assertRaises(ForbiddenError):
experiment_submit_for_review(
experiment=experiment, user=other_user
)
class EventValidationIntegrationTest(TestCase):
@override
def setUp(self) -> None:
event_type_create(
name="neg_exposure",
display_name="Exposure",
is_exposure=True,
)
event_type_create(
name="neg_click",
display_name="Click",
requires_exposure=True,
required_fields=["screen"],
)
self.now = timezone.now().isoformat()
def test_batch_with_mixed_valid_and_invalid_events(self) -> None:
decision_create(
decision_id="neg_dec_1",
flag_key="test_flag",
subject_id="u1",
value="v",
reason="test",
)
process_events_batch(
[
{
"event_id": "neg_exp_1",
"event_type": "neg_exposure",
"decision_id": "neg_dec_1",
"subject_id": "u1",
"timestamp": self.now,
"properties": {},
}
]
)
result = process_events_batch(
[
{
"event_id": "neg_valid_click",
"event_type": "neg_click",
"decision_id": "neg_dec_1",
"subject_id": "u1",
"timestamp": self.now,
"properties": {"screen": "home"},
},
{
"event_id": "neg_invalid_type",
"event_type": "nonexistent_type",
"decision_id": "neg_dec_1",
"subject_id": "u1",
"timestamp": self.now,
"properties": {},
},
{
"event_id": "neg_missing_field",
"event_type": "neg_click",
"decision_id": "neg_dec_1",
"subject_id": "u1",
"timestamp": self.now,
"properties": {},
},
{
"event_id": "neg_missing_decision",
"event_type": "neg_click",
"decision_id": "",
"subject_id": "u1",
"timestamp": self.now,
"properties": {"screen": "home"},
},
{
"event_id": "neg_bad_ts",
"event_type": "neg_click",
"decision_id": "neg_dec_1",
"subject_id": "u1",
"timestamp": 12345,
"properties": {"screen": "home"},
},
{
"event_id": "neg_exp_1",
"event_type": "neg_exposure",
"decision_id": "neg_dec_1",
"subject_id": "u1",
"timestamp": self.now,
"properties": {},
},
]
)
self.assertEqual(result.accepted, 1)
self.assertEqual(result.duplicates, 1)
self.assertGreaterEqual(result.rejected, 3)
def test_non_string_event_type_rejected(self) -> None:
result = process_events_batch(
[
{
"event_id": "neg_bad_type",
"event_type": 999,
"decision_id": "dec",
"subject_id": "u1",
"timestamp": self.now,
"properties": {},
}
]
)
self.assertEqual(result.rejected, 1)
self.assertEqual(result.accepted, 0)
class VariantWeightValidationTest(TestCase):
def test_weights_matching_allocation_allows_submit(self) -> None:
owner = make_experimenter("_vwv")
flag = make_flag(suffix="_vwv", default="d")
experiment = experiment_create(
flag=flag,
name="Weight Validation",
owner=owner,
traffic_allocation=Decimal("30.00"),
)
variant_create(
experiment=experiment,
user=owner,
name="control",
value="a",
weight=Decimal("15.00"),
is_control=True,
)
variant_create(
experiment=experiment,
user=owner,
name="treatment",
value="b",
weight=Decimal("15.00"),
)
review_settings_update(
default_min_approvals=1,
allow_any_approver=True,
)
experiment = experiment_submit_for_review(
experiment=experiment, user=owner
)
self.assertEqual(experiment.status, ExperimentStatus.IN_REVIEW)
def test_weights_exceeding_allocation_rejected(self) -> None:
owner = make_experimenter("_vwe")
flag = make_flag(suffix="_vwe", default="d")
experiment = experiment_create(
flag=flag,
name="Weight Exceeds",
owner=owner,
traffic_allocation=Decimal("30.00"),
)
variant_create(
experiment=experiment,
user=owner,
name="control",
value="a",
weight=Decimal("15.00"),
is_control=True,
)
with self.assertRaises(ValidationError):
variant_create(
experiment=experiment,
user=owner,
name="treatment",
value="b",
weight=Decimal("20.00"),
)