feat(conflicts): added conflicts business and presentation logic

This commit is contained in:
ITQ
2026-02-23 10:54:51 +03:00
parent d87671e49a
commit ace35b2585
19 changed files with 1283 additions and 86 deletions
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class ConflictsApiConfig(AppConfig):
name = "api.v1.conflicts"
label = "api_v1_conflicts"
+219
View File
@@ -0,0 +1,219 @@
from http import HTTPStatus
from uuid import UUID
from django.http import Http404, HttpRequest
from django.shortcuts import get_object_or_404
from ninja import Router
from api.v1.conflicts.schemas import (
AddExperimentIn,
ConflictDomainCreateIn,
ConflictDomainListOut,
ConflictDomainOut,
ConflictDomainUpdateIn,
ExperimentDomainOut,
MembershipOut,
UpdatePriorityIn,
)
from apps.conflicts.models import ConflictDomain
from apps.conflicts.selectors import (
conflict_domain_get,
conflict_domain_list,
domain_active_experiments,
experiment_conflict_domains,
)
from apps.conflicts.services import (
conflict_domain_create,
conflict_domain_delete,
conflict_domain_update,
experiment_add_to_domain,
experiment_remove_from_domain,
experiment_update_domain_priority,
)
from apps.experiments.models import Experiment
from apps.users.auth.bearer import jwt_bearer, require_admin
router = Router(tags=["conflicts"], auth=jwt_bearer)
@router.get(
"/domains",
response={HTTPStatus.OK: ConflictDomainListOut},
summary="List conflict domains",
)
def list_domains(
request: HttpRequest,
) -> tuple[HTTPStatus, ConflictDomainListOut]:
qs = conflict_domain_list()
items = [ConflictDomainOut.model_validate(d) for d in qs]
return HTTPStatus.OK, ConflictDomainListOut(count=len(items), items=items)
@router.post(
"/domains",
response={HTTPStatus.CREATED: ConflictDomainOut},
summary="Create conflict domain",
)
@require_admin
def create_domain(
request: HttpRequest,
payload: ConflictDomainCreateIn,
) -> tuple[HTTPStatus, ConflictDomainOut]:
domain = conflict_domain_create(
name=payload.name,
description=payload.description,
policy=payload.policy,
max_concurrent=payload.max_concurrent,
)
return HTTPStatus.CREATED, ConflictDomainOut.model_validate(domain)
@router.get(
"/domains/{domain_id}",
response={HTTPStatus.OK: ConflictDomainOut},
summary="Get conflict domain",
)
def get_domain(
request: HttpRequest,
domain_id: UUID,
) -> tuple[HTTPStatus, ConflictDomainOut]:
domain = conflict_domain_get(domain_id)
if not domain:
raise Http404
return HTTPStatus.OK, ConflictDomainOut.model_validate(domain)
@router.patch(
"/domains/{domain_id}",
response={HTTPStatus.OK: ConflictDomainOut},
summary="Update conflict domain",
)
@require_admin
def update_domain(
request: HttpRequest,
domain_id: UUID,
payload: ConflictDomainUpdateIn,
) -> tuple[HTTPStatus, ConflictDomainOut]:
domain = get_object_or_404(ConflictDomain, pk=domain_id)
domain = conflict_domain_update(
domain=domain,
**payload.model_dump(exclude_none=True),
)
return HTTPStatus.OK, ConflictDomainOut.model_validate(domain)
@router.delete(
"/domains/{domain_id}",
response={HTTPStatus.NO_CONTENT: None},
summary="Delete conflict domain",
)
@require_admin
def delete_domain(
request: HttpRequest,
domain_id: UUID,
) -> tuple[HTTPStatus, None]:
domain = get_object_or_404(ConflictDomain, pk=domain_id)
conflict_domain_delete(domain=domain)
return HTTPStatus.NO_CONTENT, None
@router.get(
"/domains/{domain_id}/experiments",
response={HTTPStatus.OK: list[MembershipOut]},
summary="List experiments in conflict domain",
)
def list_domain_experiments(
request: HttpRequest,
domain_id: UUID,
) -> tuple[HTTPStatus, list[MembershipOut]]:
get_object_or_404(ConflictDomain, pk=domain_id)
memberships = domain_active_experiments(domain_id)
return HTTPStatus.OK, [
MembershipOut.from_membership(m) for m in memberships
]
@router.post(
"/domains/{domain_id}/experiments",
response={HTTPStatus.CREATED: MembershipOut},
summary="Add experiment to conflict domain",
)
@require_admin
def add_experiment_to_domain(
request: HttpRequest,
domain_id: UUID,
payload: AddExperimentIn,
) -> tuple[HTTPStatus, MembershipOut]:
domain = get_object_or_404(ConflictDomain, pk=domain_id)
experiment = get_object_or_404(Experiment, pk=payload.experiment_id)
membership = experiment_add_to_domain(
experiment=experiment,
domain=domain,
priority=payload.priority,
)
membership = (
type(membership)
.objects.select_related("experiment", "conflict_domain")
.get(pk=membership.pk)
)
return HTTPStatus.CREATED, MembershipOut.from_membership(membership)
@router.patch(
"/domains/{domain_id}/experiments/{experiment_id}",
response={HTTPStatus.OK: MembershipOut},
summary="Update experiment priority in domain",
)
@require_admin
def update_experiment_priority(
request: HttpRequest,
domain_id: UUID,
experiment_id: UUID,
payload: UpdatePriorityIn,
) -> tuple[HTTPStatus, MembershipOut]:
domain = get_object_or_404(ConflictDomain, pk=domain_id)
experiment = get_object_or_404(Experiment, pk=experiment_id)
membership = experiment_update_domain_priority(
experiment=experiment,
domain=domain,
priority=payload.priority,
)
membership = (
type(membership)
.objects.select_related("experiment", "conflict_domain")
.get(pk=membership.pk)
)
return HTTPStatus.OK, MembershipOut.from_membership(membership)
@router.delete(
"/domains/{domain_id}/experiments/{experiment_id}",
response={HTTPStatus.NO_CONTENT: None},
summary="Remove experiment from conflict domain",
)
@require_admin
def remove_experiment_from_domain(
request: HttpRequest,
domain_id: UUID,
experiment_id: UUID,
) -> tuple[HTTPStatus, None]:
domain = get_object_or_404(ConflictDomain, pk=domain_id)
experiment = get_object_or_404(Experiment, pk=experiment_id)
experiment_remove_from_domain(experiment=experiment, domain=domain)
return HTTPStatus.NO_CONTENT, None
@router.get(
"/experiments/{experiment_id}/domains",
response={HTTPStatus.OK: list[ExperimentDomainOut]},
summary="List conflict domains for experiment",
)
def list_experiment_domains(
request: HttpRequest,
experiment_id: UUID,
) -> tuple[HTTPStatus, list[ExperimentDomainOut]]:
get_object_or_404(Experiment, pk=experiment_id)
memberships = experiment_conflict_domains(experiment_id)
return HTTPStatus.OK, [
ExperimentDomainOut.from_membership(m) for m in memberships
]
+97
View File
@@ -0,0 +1,97 @@
from datetime import datetime
from typing import ClassVar
from uuid import UUID
from ninja import Field, ModelSchema, Schema
from apps.conflicts.models import ConflictDomain, ConflictPolicy, ExperimentConflictDomain
class ConflictDomainOut(ModelSchema):
class Meta:
model = ConflictDomain
fields: ClassVar[tuple[str, ...]] = (
ConflictDomain.id.field.name,
ConflictDomain.name.field.name,
ConflictDomain.description.field.name,
ConflictDomain.policy.field.name,
ConflictDomain.max_concurrent.field.name,
ConflictDomain.created_at.field.name,
ConflictDomain.updated_at.field.name,
)
class ConflictDomainCreateIn(Schema):
name: str = Field(..., max_length=200)
description: str = ""
policy: ConflictPolicy = ConflictPolicy.MUTUAL_EXCLUSION
max_concurrent: int = Field(1, ge=1)
class ConflictDomainUpdateIn(Schema):
name: str | None = None
description: str | None = None
policy: ConflictPolicy | None = None
max_concurrent: int | None = Field(None, ge=1)
class ConflictDomainListOut(Schema):
count: int
items: list[ConflictDomainOut]
class ExperimentBriefOut(Schema):
id: UUID
name: str
status: str
class MembershipOut(Schema):
id: UUID
experiment: ExperimentBriefOut
priority: int
created_at: datetime
@classmethod
def from_membership(
cls, m: ExperimentConflictDomain,
) -> "MembershipOut":
return cls(
id=m.pk,
experiment=ExperimentBriefOut(
id=m.experiment.pk,
name=m.experiment.name,
status=m.experiment.status,
),
priority=m.priority,
created_at=m.created_at,
)
class ExperimentDomainOut(Schema):
id: UUID
conflict_domain: ConflictDomainOut
priority: int
created_at: datetime
@classmethod
def from_membership(
cls, m: ExperimentConflictDomain,
) -> "ExperimentDomainOut":
return cls(
id=m.pk,
conflict_domain=ConflictDomainOut.model_validate(
m.conflict_domain,
),
priority=m.priority,
created_at=m.created_at,
)
class AddExperimentIn(Schema):
experiment_id: UUID
priority: int = 0
class UpdatePriorityIn(Schema):
priority: int
@@ -0,0 +1,178 @@
import json
from typing import override
from django.test import Client, TestCase
from django.urls import reverse
from apps.conflicts.tests.helpers import make_domain
from apps.experiments.tests.helpers import add_two_variants, make_flag
from apps.experiments.services import experiment_create
from apps.reviews.tests.helpers import make_admin, make_experimenter, make_viewer
from apps.users.tests.helpers import auth_header
class ConflictDomainAPITest(TestCase):
@override
def setUp(self) -> None:
self.client = Client()
self.admin = make_admin("_cda")
self.viewer = make_viewer("_cda")
self.admin_auth = auth_header(self.admin)
self.viewer_auth = auth_header(self.viewer)
def test_create_domain(self) -> None:
resp = self.client.post(
reverse("api-1:create_domain"),
data=json.dumps({
"name": "checkout",
"description": "Checkout zone",
"policy": "mutual_exclusion",
"max_concurrent": 1,
}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["name"], "checkout")
self.assertEqual(data["policy"], "mutual_exclusion")
def test_create_domain_viewer_forbidden(self) -> None:
resp = self.client.post(
reverse("api-1:create_domain"),
data=json.dumps({"name": "forbidden_zone"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.viewer_auth,
)
self.assertIn(resp.status_code, (403, 401))
def test_list_domains(self) -> None:
make_domain(suffix="_list1")
make_domain(suffix="_list2")
resp = self.client.get(
reverse("api-1:list_domains"),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertGreaterEqual(data["count"], 2)
def test_get_domain(self) -> None:
domain = make_domain(suffix="_get")
resp = self.client.get(
reverse("api-1:get_domain", args=[domain.pk]),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["name"], domain.name)
def test_update_domain(self) -> None:
domain = make_domain(suffix="_upd")
resp = self.client.patch(
reverse("api-1:update_domain", args=[domain.pk]),
data=json.dumps({"description": "Updated desc"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["description"], "Updated desc")
def test_delete_domain(self) -> None:
domain = make_domain(suffix="_del")
resp = self.client.delete(
reverse("api-1:delete_domain", args=[domain.pk]),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 204)
def test_get_nonexistent_domain(self) -> None:
import uuid
resp = self.client.get(
reverse("api-1:get_domain", args=[uuid.uuid4()]),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 404)
class DomainExperimentAPITest(TestCase):
@override
def setUp(self) -> None:
self.client = Client()
self.admin = make_admin("_dea")
self.experimenter = make_experimenter("_dea")
self.admin_auth = auth_header(self.admin)
self.domain = make_domain(suffix="_dea")
self.flag = make_flag(suffix="_dea")
self.exp = experiment_create(
flag=self.flag,
name="DeaExp",
owner=self.experimenter,
)
add_two_variants(self.exp)
def test_add_experiment_to_domain(self) -> None:
resp = self.client.post(
reverse("api-1:add_experiment_to_domain", args=[self.domain.pk]),
data=json.dumps({
"experiment_id": str(self.exp.pk),
"priority": 5,
}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["priority"], 5)
self.assertEqual(data["experiment"]["id"], str(self.exp.pk))
def test_list_experiment_domains(self) -> None:
self.client.post(
reverse("api-1:add_experiment_to_domain", args=[self.domain.pk]),
data=json.dumps({"experiment_id": str(self.exp.pk)}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
resp = self.client.get(
reverse(
"api-1:list_experiment_domains", args=[self.exp.pk]
),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(len(resp.json()), 1)
def test_update_priority(self) -> None:
self.client.post(
reverse("api-1:add_experiment_to_domain", args=[self.domain.pk]),
data=json.dumps({"experiment_id": str(self.exp.pk)}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
resp = self.client.patch(
reverse(
"api-1:update_experiment_priority",
args=[self.domain.pk, self.exp.pk],
),
data=json.dumps({"priority": 42}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["priority"], 42)
def test_remove_experiment_from_domain(self) -> None:
self.client.post(
reverse("api-1:add_experiment_to_domain", args=[self.domain.pk]),
data=json.dumps({"experiment_id": str(self.exp.pk)}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
resp = self.client.delete(
reverse(
"api-1:remove_experiment_from_domain",
args=[self.domain.pk, self.exp.pk],
),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 204)
+5
View File
@@ -0,0 +1,5 @@
from django.apps import AppConfig
class ConflictsConfig(AppConfig):
name = "apps.conflicts"
@@ -0,0 +1,50 @@
# Generated by Django 5.2.11 on 2026-02-22 20:13
import django.db.models.deletion
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('experiments', '0002_initial'),
]
operations = [
migrations.CreateModel(
name='ConflictDomain',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.CharField(max_length=200, unique=True, verbose_name='name')),
('description', models.TextField(blank=True, verbose_name='description')),
('policy', models.CharField(choices=[('mutual_exclusion', 'Mutual exclusion'), ('priority', 'Priority tiers')], default='mutual_exclusion', max_length=30, verbose_name='conflict policy')),
('max_concurrent', models.PositiveIntegerField(default=1, verbose_name='max concurrent experiments')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, verbose_name='updated at')),
],
options={
'verbose_name': 'conflict domain',
'verbose_name_plural': 'conflict domains',
'ordering': ['name'],
},
),
migrations.CreateModel(
name='ExperimentConflictDomain',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('priority', models.IntegerField(default=0, help_text='Higher value wins in priority-based resolution', verbose_name='priority')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')),
('conflict_domain', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='experiment_memberships', to='conflicts.conflictdomain', verbose_name='conflict domain')),
('experiment', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='conflict_memberships', to='experiments.experiment', verbose_name='experiment')),
],
options={
'verbose_name': 'experiment conflict domain',
'verbose_name_plural': 'experiment conflict domains',
'ordering': ['-priority'],
'constraints': [models.UniqueConstraint(fields=('experiment', 'conflict_domain'), name='unique_experiment_conflict_domain')],
},
),
]
+92
View File
@@ -0,0 +1,92 @@
from typing import override
from django.db import models
from django.utils.translation import gettext_lazy as _
from apps.core.models import BaseModel
class ConflictPolicy(models.TextChoices):
MUTUAL_EXCLUSION = "mutual_exclusion", _("Mutual exclusion")
PRIORITY = "priority", _("Priority tiers")
class ConflictDomain(BaseModel):
name = models.CharField(
max_length=200,
unique=True,
verbose_name=_("name"),
)
description = models.TextField(
blank=True,
verbose_name=_("description"),
)
policy = models.CharField(
max_length=30,
choices=ConflictPolicy.choices,
default=ConflictPolicy.MUTUAL_EXCLUSION,
verbose_name=_("conflict policy"),
)
max_concurrent = models.PositiveIntegerField(
default=1,
verbose_name=_("max concurrent experiments"),
)
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name=_("created at"),
)
updated_at = models.DateTimeField(
auto_now=True,
verbose_name=_("updated at"),
)
class Meta:
verbose_name = _("conflict domain")
verbose_name_plural = _("conflict domains")
ordering = ["name"]
@override
def __str__(self) -> str:
return f"{self.name} ({self.policy})"
class ExperimentConflictDomain(BaseModel):
experiment = models.ForeignKey(
"experiments.Experiment",
on_delete=models.CASCADE,
related_name="conflict_memberships",
verbose_name=_("experiment"),
)
conflict_domain = models.ForeignKey(
ConflictDomain,
on_delete=models.CASCADE,
related_name="experiment_memberships",
verbose_name=_("conflict domain"),
)
priority = models.IntegerField(
default=0,
verbose_name=_("priority"),
help_text=_("Higher value wins in priority-based resolution"),
)
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name=_("created at"),
)
class Meta:
verbose_name = _("experiment conflict domain")
verbose_name_plural = _("experiment conflict domains")
ordering = ["-priority"]
constraints = [
models.UniqueConstraint(
fields=["experiment", "conflict_domain"],
name="unique_experiment_conflict_domain",
),
]
@override
def __str__(self) -> str:
return (
f"{self.experiment.name} in {self.conflict_domain.name} "
f"(priority={self.priority})"
)
+59
View File
@@ -0,0 +1,59 @@
from uuid import UUID
from django.db.models import QuerySet
from apps.conflicts.models import (
ConflictDomain,
ExperimentConflictDomain,
)
from apps.experiments.models import ACTIVE_STATUSES
def conflict_domain_list() -> QuerySet[ConflictDomain]:
return ConflictDomain.objects.all()
def conflict_domain_get(domain_id: UUID) -> ConflictDomain | None:
return ConflictDomain.objects.filter(pk=domain_id).first()
def conflict_domain_get_by_name(name: str) -> ConflictDomain | None:
return ConflictDomain.objects.filter(name=name).first()
def experiment_conflict_domains(
experiment_id: UUID,
) -> QuerySet[ExperimentConflictDomain]:
return (
ExperimentConflictDomain.objects.filter(
experiment_id=experiment_id,
)
.select_related("conflict_domain")
.order_by("-priority")
)
def domain_active_experiments(
domain_id: UUID,
) -> QuerySet[ExperimentConflictDomain]:
return (
ExperimentConflictDomain.objects.filter(
conflict_domain_id=domain_id,
experiment__status__in=ACTIVE_STATUSES,
)
.select_related("experiment", "conflict_domain")
.order_by("-priority")
)
def subject_domain_winner(
domain_id: UUID,
exclude_experiment_id: UUID | None = None,
) -> ExperimentConflictDomain | None:
qs = ExperimentConflictDomain.objects.filter(
conflict_domain_id=domain_id,
experiment__status__in=ACTIVE_STATUSES,
).select_related("experiment", "conflict_domain")
if exclude_experiment_id:
qs = qs.exclude(experiment_id=exclude_experiment_id)
return qs.order_by("-priority", "experiment__created_at").first()
+191
View File
@@ -0,0 +1,191 @@
from uuid import UUID
from django.core.exceptions import ValidationError
from django.db import transaction
from apps.conflicts.models import (
ConflictDomain,
ConflictPolicy,
ExperimentConflictDomain,
)
from apps.conflicts.selectors import domain_active_experiments
from apps.experiments.models import ACTIVE_STATUSES, Experiment
@transaction.atomic
def conflict_domain_create(
*,
name: str,
description: str = "",
policy: str = ConflictPolicy.MUTUAL_EXCLUSION,
max_concurrent: int = 1,
) -> ConflictDomain:
domain = ConflictDomain(
name=name,
description=description,
policy=policy,
max_concurrent=max_concurrent,
)
domain.save()
return domain
@transaction.atomic
def conflict_domain_update(
*,
domain: ConflictDomain,
name: str | None = None,
description: str | None = None,
policy: str | None = None,
max_concurrent: int | None = None,
) -> ConflictDomain:
if name is not None:
domain.name = name
if description is not None:
domain.description = description
if policy is not None:
domain.policy = policy
if max_concurrent is not None:
domain.max_concurrent = max_concurrent
domain.save()
return domain
@transaction.atomic
def conflict_domain_delete(*, domain: ConflictDomain) -> None:
active = domain.experiment_memberships.filter(
experiment__status__in=ACTIVE_STATUSES,
).exists()
if active:
raise ValidationError(
{
"domain": (
f"Cannot delete domain '{domain.name}': "
f"it has active experiments."
)
}
)
domain.delete()
@transaction.atomic
def experiment_add_to_domain(
*,
experiment: Experiment,
domain: ConflictDomain,
priority: int = 0,
) -> ExperimentConflictDomain:
membership = ExperimentConflictDomain(
experiment=experiment,
conflict_domain=domain,
priority=priority,
)
membership.save()
return membership
@transaction.atomic
def experiment_remove_from_domain(
*,
experiment: Experiment,
domain: ConflictDomain,
) -> None:
deleted_count, _ = ExperimentConflictDomain.objects.filter(
experiment=experiment,
conflict_domain=domain,
).delete()
if not deleted_count:
raise ValidationError(
{"domain": "Experiment is not in this conflict domain."}
)
@transaction.atomic
def experiment_update_domain_priority(
*,
experiment: Experiment,
domain: ConflictDomain,
priority: int,
) -> ExperimentConflictDomain:
membership = ExperimentConflictDomain.objects.filter(
experiment=experiment,
conflict_domain=domain,
).first()
if not membership:
raise ValidationError(
{"domain": "Experiment is not in this conflict domain."}
)
membership.priority = priority
membership.save(update_fields=["priority"])
return membership
def validate_domain_conflicts(experiment: Experiment) -> None:
memberships = ExperimentConflictDomain.objects.filter(
experiment=experiment,
).select_related("conflict_domain")
for membership in memberships:
domain = membership.conflict_domain
active = domain_active_experiments(domain.pk).exclude(
experiment_id=experiment.pk,
)
active_count = active.count()
if active_count >= domain.max_concurrent:
active_names = ", ".join(
m.experiment.name for m in active[:3]
)
raise ValidationError(
{
"conflict_domain": (
f"Domain '{domain.name}' already has "
f"{active_count}/{domain.max_concurrent} "
f"active experiment(s): {active_names}."
)
}
)
def resolve_domain_conflict(
experiment_id: UUID,
domain_id: UUID,
subject_id: str,
) -> bool:
domain = ConflictDomain.objects.filter(pk=domain_id).first()
if not domain:
return True
active_memberships = list(
ExperimentConflictDomain.objects.filter(
conflict_domain_id=domain_id,
experiment__status__in=ACTIVE_STATUSES,
)
.select_related("experiment")
.order_by("-priority", "experiment__created_at")
)
if len(active_memberships) <= 1:
return True
if domain.policy == ConflictPolicy.MUTUAL_EXCLUSION:
winner = active_memberships[0]
return str(winner.experiment_id) == str(experiment_id)
if domain.policy == ConflictPolicy.PRIORITY:
current = next(
(m for m in active_memberships if str(m.experiment_id) == str(experiment_id)),
None,
)
if not current:
return False
top_priority = active_memberships[0].priority
if current.priority < top_priority:
return False
tied = [m for m in active_memberships if m.priority == top_priority]
if len(tied) <= 1:
return True
winner = min(tied, key=lambda m: m.experiment.created_at)
return str(winner.experiment_id) == str(experiment_id)
return True
@@ -0,0 +1,15 @@
from apps.conflicts.models import ConflictPolicy
from apps.conflicts.services import conflict_domain_create
def make_domain(
suffix="",
policy=ConflictPolicy.MUTUAL_EXCLUSION,
max_concurrent=1,
):
return conflict_domain_create(
name=f"domain{suffix}",
description=f"Test domain {suffix}",
policy=policy,
max_concurrent=max_concurrent,
)
@@ -0,0 +1,337 @@
from typing import override
from django.core.exceptions import ValidationError
from django.test import TestCase
from apps.conflicts.models import (
ConflictDomain,
ConflictPolicy,
ExperimentConflictDomain,
)
from apps.conflicts.selectors import (
conflict_domain_get,
conflict_domain_list,
domain_active_experiments,
experiment_conflict_domains,
)
from apps.conflicts.services import (
conflict_domain_create,
conflict_domain_delete,
conflict_domain_update,
experiment_add_to_domain,
experiment_remove_from_domain,
experiment_update_domain_priority,
resolve_domain_conflict,
validate_domain_conflicts,
)
from apps.conflicts.tests.helpers import make_domain
from apps.experiments.models import ExperimentStatus
from apps.experiments.services import (
experiment_approve,
experiment_create,
experiment_start,
experiment_submit_for_review,
)
from apps.experiments.tests.helpers import add_two_variants, make_flag
from apps.reviews.services import review_settings_update
from apps.reviews.tests.helpers import make_approver, make_experimenter
class ConflictDomainCRUDTest(TestCase):
def test_creates_domain(self) -> None:
domain = make_domain(suffix="_c1")
self.assertEqual(domain.name, "domain_c1")
self.assertEqual(domain.policy, ConflictPolicy.MUTUAL_EXCLUSION)
self.assertEqual(domain.max_concurrent, 1)
def test_creates_domain_with_priority_policy(self) -> None:
domain = make_domain(
suffix="_c2",
policy=ConflictPolicy.PRIORITY,
max_concurrent=3,
)
self.assertEqual(domain.policy, ConflictPolicy.PRIORITY)
self.assertEqual(domain.max_concurrent, 3)
def test_duplicate_name_fails(self) -> None:
make_domain(suffix="_dup")
with self.assertRaises(Exception):
make_domain(suffix="_dup")
def test_updates_domain(self) -> None:
domain = make_domain(suffix="_upd")
domain = conflict_domain_update(
domain=domain,
description="Updated",
max_concurrent=5,
)
domain.refresh_from_db()
self.assertEqual(domain.description, "Updated")
self.assertEqual(domain.max_concurrent, 5)
def test_deletes_domain_without_active(self) -> None:
domain = make_domain(suffix="_del")
pk = domain.pk
conflict_domain_delete(domain=domain)
self.assertFalse(ConflictDomain.objects.filter(pk=pk).exists())
def test_list_domains(self) -> None:
make_domain(suffix="_l1")
make_domain(suffix="_l2")
domains = conflict_domain_list()
names = [d.name for d in domains]
self.assertIn("domain_l1", names)
self.assertIn("domain_l2", names)
def test_get_domain(self) -> None:
domain = make_domain(suffix="_get")
fetched = conflict_domain_get(domain.pk)
self.assertEqual(fetched.pk, domain.pk)
class ExperimentDomainMembershipTest(TestCase):
@override
def setUp(self) -> None:
self.experimenter = make_experimenter("_cdm")
self.approver = make_approver("_cdm")
self.flag = make_flag(suffix="_cdm")
review_settings_update(
default_min_approvals=1, allow_any_approver=True
)
self.domain = make_domain(suffix="_cdm")
def _make_ready_experiment(self, suffix=""):
exp = experiment_create(
flag=make_flag(suffix=f"_cdm{suffix}"),
name=f"CdmExp{suffix}",
owner=self.experimenter,
)
add_two_variants(exp)
return exp
def _approve(self, exp):
exp = experiment_submit_for_review(
experiment=exp, user=self.experimenter
)
return experiment_approve(experiment=exp, approver=self.approver)
def test_add_experiment_to_domain(self) -> None:
exp = self._make_ready_experiment("_add")
membership = experiment_add_to_domain(
experiment=exp,
domain=self.domain,
priority=10,
)
self.assertEqual(membership.priority, 10)
self.assertEqual(membership.conflict_domain, self.domain)
def test_duplicate_membership_fails(self) -> None:
exp = self._make_ready_experiment("_dup2")
experiment_add_to_domain(experiment=exp, domain=self.domain)
with self.assertRaises(Exception):
experiment_add_to_domain(experiment=exp, domain=self.domain)
def test_remove_experiment_from_domain(self) -> None:
exp = self._make_ready_experiment("_rm")
experiment_add_to_domain(experiment=exp, domain=self.domain)
experiment_remove_from_domain(experiment=exp, domain=self.domain)
self.assertFalse(
ExperimentConflictDomain.objects.filter(
experiment=exp, conflict_domain=self.domain
).exists()
)
def test_remove_nonexistent_fails(self) -> None:
exp = self._make_ready_experiment("_rmne")
with self.assertRaises(ValidationError):
experiment_remove_from_domain(experiment=exp, domain=self.domain)
def test_update_priority(self) -> None:
exp = self._make_ready_experiment("_pri")
experiment_add_to_domain(
experiment=exp, domain=self.domain, priority=1
)
membership = experiment_update_domain_priority(
experiment=exp, domain=self.domain, priority=99
)
self.assertEqual(membership.priority, 99)
def test_experiment_conflict_domains_selector(self) -> None:
exp = self._make_ready_experiment("_sel")
domain2 = make_domain(suffix="_sel2")
experiment_add_to_domain(experiment=exp, domain=self.domain)
experiment_add_to_domain(experiment=exp, domain=domain2)
memberships = experiment_conflict_domains(exp.pk)
self.assertEqual(memberships.count(), 2)
def test_domain_active_experiments_selector(self) -> None:
exp = self._make_ready_experiment("_ae")
experiment_add_to_domain(experiment=exp, domain=self.domain)
exp = self._approve(exp)
exp = experiment_start(experiment=exp, user=self.experimenter)
active = domain_active_experiments(self.domain.pk)
self.assertEqual(active.count(), 1)
self.assertEqual(active.first().experiment.pk, exp.pk)
def test_delete_domain_with_active_experiment_fails(self) -> None:
exp = self._make_ready_experiment("_daf")
experiment_add_to_domain(experiment=exp, domain=self.domain)
exp = self._approve(exp)
experiment_start(experiment=exp, user=self.experimenter)
with self.assertRaises(ValidationError):
conflict_domain_delete(domain=self.domain)
class DomainConflictValidationTest(TestCase):
@override
def setUp(self) -> None:
self.experimenter = make_experimenter("_dcv")
self.approver = make_approver("_dcv")
review_settings_update(
default_min_approvals=1, allow_any_approver=True
)
self.domain = make_domain(suffix="_dcv", max_concurrent=1)
def _make_and_start(self, suffix):
flag = make_flag(suffix=f"_dcv{suffix}")
exp = experiment_create(
flag=flag, name=f"DcvExp{suffix}", owner=self.experimenter
)
add_two_variants(exp)
experiment_add_to_domain(experiment=exp, domain=self.domain)
exp = experiment_submit_for_review(
experiment=exp, user=self.experimenter
)
exp = experiment_approve(experiment=exp, approver=self.approver)
return experiment_start(experiment=exp, user=self.experimenter)
def test_start_blocked_by_domain_conflict(self) -> None:
self._make_and_start("_1")
flag2 = make_flag(suffix="_dcv_2b")
exp2 = experiment_create(
flag=flag2, name="DcvExp2B", owner=self.experimenter
)
add_two_variants(exp2)
experiment_add_to_domain(experiment=exp2, domain=self.domain)
exp2 = experiment_submit_for_review(
experiment=exp2, user=self.experimenter
)
exp2 = experiment_approve(experiment=exp2, approver=self.approver)
with self.assertRaises(ValidationError) as ctx:
experiment_start(experiment=exp2, user=self.experimenter)
self.assertIn("conflict_domain", str(ctx.exception))
def test_start_allowed_when_domain_has_capacity(self) -> None:
domain = make_domain(suffix="_cap", max_concurrent=2)
flag1 = make_flag(suffix="_cap1")
exp1 = experiment_create(
flag=flag1, name="CapExp1", owner=self.experimenter
)
add_two_variants(exp1)
experiment_add_to_domain(experiment=exp1, domain=domain)
exp1 = experiment_submit_for_review(
experiment=exp1, user=self.experimenter
)
exp1 = experiment_approve(experiment=exp1, approver=self.approver)
experiment_start(experiment=exp1, user=self.experimenter)
flag2 = make_flag(suffix="_cap2")
exp2 = experiment_create(
flag=flag2, name="CapExp2", owner=self.experimenter
)
add_two_variants(exp2)
experiment_add_to_domain(experiment=exp2, domain=domain)
exp2 = experiment_submit_for_review(
experiment=exp2, user=self.experimenter
)
exp2 = experiment_approve(experiment=exp2, approver=self.approver)
exp2 = experiment_start(experiment=exp2, user=self.experimenter)
self.assertEqual(exp2.status, ExperimentStatus.RUNNING)
def test_no_domain_no_conflict(self) -> None:
flag = make_flag(suffix="_nodom")
exp = experiment_create(
flag=flag, name="NoDomExp", owner=self.experimenter
)
add_two_variants(exp)
exp = experiment_submit_for_review(
experiment=exp, user=self.experimenter
)
exp = experiment_approve(experiment=exp, approver=self.approver)
exp = experiment_start(experiment=exp, user=self.experimenter)
self.assertEqual(exp.status, ExperimentStatus.RUNNING)
class ResolveDomainConflictTest(TestCase):
@override
def setUp(self) -> None:
self.experimenter = make_experimenter("_rdc")
self.approver = make_approver("_rdc")
review_settings_update(
default_min_approvals=1, allow_any_approver=True
)
def _make_and_start(self, suffix, domain, priority=0):
flag = make_flag(suffix=f"_rdc{suffix}")
exp = experiment_create(
flag=flag, name=f"RdcExp{suffix}", owner=self.experimenter
)
add_two_variants(exp)
experiment_add_to_domain(
experiment=exp, domain=domain, priority=priority
)
exp = experiment_submit_for_review(
experiment=exp, user=self.experimenter
)
exp = experiment_approve(experiment=exp, approver=self.approver)
return experiment_start(experiment=exp, user=self.experimenter)
def test_mutual_exclusion_winner_is_first(self) -> None:
domain = make_domain(
suffix="_me",
policy=ConflictPolicy.MUTUAL_EXCLUSION,
max_concurrent=3,
)
exp1 = self._make_and_start("_me1", domain)
exp2 = self._make_and_start("_me2", domain)
winner = resolve_domain_conflict(exp1.pk, domain.pk, "u1")
self.assertTrue(winner)
loser = resolve_domain_conflict(exp2.pk, domain.pk, "u1")
self.assertFalse(loser)
def test_priority_higher_wins(self) -> None:
domain = make_domain(
suffix="_pr",
policy=ConflictPolicy.PRIORITY,
max_concurrent=3,
)
exp_low = self._make_and_start("_pr1", domain, priority=1)
exp_high = self._make_and_start("_pr2", domain, priority=10)
self.assertTrue(
resolve_domain_conflict(exp_high.pk, domain.pk, "u1")
)
self.assertFalse(
resolve_domain_conflict(exp_low.pk, domain.pk, "u1")
)
def test_priority_tie_first_created_wins(self) -> None:
domain = make_domain(
suffix="_tie",
policy=ConflictPolicy.PRIORITY,
max_concurrent=3,
)
exp1 = self._make_and_start("_tie1", domain, priority=5)
exp2 = self._make_and_start("_tie2", domain, priority=5)
self.assertTrue(
resolve_domain_conflict(exp1.pk, domain.pk, "u1")
)
self.assertFalse(
resolve_domain_conflict(exp2.pk, domain.pk, "u1")
)
def test_single_experiment_always_wins(self) -> None:
domain = make_domain(suffix="_single")
exp = self._make_and_start("_s", domain)
self.assertTrue(
resolve_domain_conflict(exp.pk, domain.pk, "u1")
)
+31
View File
@@ -8,6 +8,7 @@ from django.core.cache import cache
from django.utils import timezone
from prometheus_client import Counter
from apps.conflicts.services import resolve_domain_conflict
from apps.events.models import Decision
from apps.events.services import decision_create
from apps.experiments.models import Experiment, ExperimentStatus, Variant
@@ -143,6 +144,23 @@ def _check_participation_limits(
return not recent_completed
def _check_domain_conflicts(experiment: Experiment) -> bool:
from apps.conflicts.models import ExperimentConflictDomain
memberships = ExperimentConflictDomain.objects.filter(
experiment=experiment,
).select_related("conflict_domain")
for membership in memberships:
if not resolve_domain_conflict(
experiment_id=experiment.pk,
domain_id=membership.conflict_domain_id,
subject_id="",
):
return False
return True
def decide_for_flag(
flag_key: str,
subject_id: str,
@@ -202,6 +220,19 @@ def decide_for_flag(
_persist_decision(result, subject_id)
return result
if not _check_domain_conflicts(experiment):
DECIDE_REQUESTS.labels(reason="domain_conflict").inc()
result = {
"flag": flag_key,
"value": flag.default_value,
"decision_id": str(uuid.uuid4()),
"experiment_id": str(experiment.pk),
"variant_id": None,
"reason": "domain_conflict",
}
_persist_decision(result, subject_id)
return result
allocation_hash = _hash_subject(
subject_id,
str(experiment.pk),
-86
View File
@@ -67,11 +67,6 @@ EDITABLE_IN_DRAFT: tuple[str, ...] = (
)
class ConflictPolicy(models.TextChoices):
MUTUAL_EXCLUSION = "mutual_exclusion", _("Mutual exclusion")
PRIORITY = "priority", _("Priority tiers")
class OutcomeType(models.TextChoices):
ROLLOUT = "rollout", _("Rollout winner")
ROLLBACK = "rollback", _("Rollback")
@@ -431,87 +426,6 @@ class Approval(BaseModel):
return f"Approval by {self.approver} for {self.experiment}"
class ConflictDomain(BaseModel):
name = models.CharField(
max_length=200,
unique=True,
verbose_name=_("name"),
)
description = models.TextField(
blank=True,
verbose_name=_("description"),
)
policy = models.CharField(
max_length=30,
choices=ConflictPolicy.choices,
default=ConflictPolicy.MUTUAL_EXCLUSION,
verbose_name=_("conflict policy"),
)
max_concurrent = models.PositiveIntegerField(
default=1,
verbose_name=_("max concurrent experiments"),
)
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name=_("created at"),
)
updated_at = models.DateTimeField(
auto_now=True,
verbose_name=_("updated at"),
)
class Meta:
verbose_name = _("conflict domain")
verbose_name_plural = _("conflict domains")
ordering = ["name"]
@override
def __str__(self) -> str:
return f"{self.name} ({self.policy})"
class ExperimentConflictDomain(BaseModel):
experiment = models.ForeignKey(
Experiment,
on_delete=models.CASCADE,
related_name="conflict_memberships",
verbose_name=_("experiment"),
)
conflict_domain = models.ForeignKey(
ConflictDomain,
on_delete=models.CASCADE,
related_name="experiment_memberships",
verbose_name=_("conflict domain"),
)
priority = models.IntegerField(
default=0,
verbose_name=_("priority"),
help_text=_("Higher value wins in priority-based resolution"),
)
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name=_("created at"),
)
class Meta:
verbose_name = _("experiment conflict domain")
verbose_name_plural = _("experiment conflict domains")
ordering = ["-priority"]
constraints = [
models.UniqueConstraint(
fields=["experiment", "conflict_domain"],
name="unique_experiment_conflict_domain",
),
]
@override
def __str__(self) -> str:
return (
f"{self.experiment.name} in {self.conflict_domain.name} "
f"(priority={self.priority})"
)
class ExperimentOutcome(BaseModel):
experiment = models.OneToOneField(
Experiment,
+3
View File
@@ -16,6 +16,7 @@ from apps.experiments.models import (
OutcomeType,
Variant,
)
from apps.conflicts.services import validate_domain_conflicts
from apps.flags.models import FeatureFlag, validate_value_for_type
from apps.notifications.services import (
NotificationPayload,
@@ -373,6 +374,7 @@ def experiment_request_changes(
def experiment_start(*, experiment: Experiment, user: User) -> Experiment:
ensure_owner_or_admin(experiment, user)
_validate_no_active_flag_conflict(experiment)
validate_domain_conflicts(experiment)
experiment = _transition(
experiment,
ExperimentStatus.RUNNING,
@@ -401,6 +403,7 @@ def experiment_pause(
def experiment_resume(*, experiment: Experiment, user: User) -> Experiment:
ensure_owner_or_admin(experiment, user)
_validate_no_active_flag_conflict(experiment)
validate_domain_conflicts(experiment)
return _transition(
experiment,
ExperimentStatus.RUNNING,