feat(flags): added feature flags business and presentation logic

This commit is contained in:
ITQ
2026-02-14 10:59:04 +03:00
parent d4a3876147
commit 10c0ba01db
15 changed files with 1148 additions and 0 deletions
+1
View File
@@ -0,0 +1 @@
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class FlagsApiConfig(AppConfig):
name = "api.v1.flags"
label = "api_v1_flags"
+105
View File
@@ -0,0 +1,105 @@
from http import HTTPStatus
from uuid import UUID
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from ninja import Router
from api.v1.flags.schemas import (
FeatureFlagCreateIn,
FeatureFlagListOut,
FeatureFlagOut,
FeatureFlagUpdateIn,
)
from apps.flags.models import FeatureFlag, FeatureFlagType
from apps.flags.selectors import feature_flag_list
from apps.flags.services import (
feature_flag_create,
feature_flag_update_default,
)
from apps.users.auth.bearer import jwt_bearer, require_admin_or_experimenter
router = Router(tags=["flags"], auth=jwt_bearer)
@router.get(
"",
response={HTTPStatus.OK: FeatureFlagListOut},
summary="List feature flags",
description="Return a filtered, paginated list of feature flags.",
)
def list_flags(
request: HttpRequest,
value_type: FeatureFlagType | None = None,
search: str | None = None,
limit: int = 50,
offset: int = 0,
) -> tuple[HTTPStatus, FeatureFlagListOut]:
qs = feature_flag_list(value_type=value_type, search=search)
total = qs.count()
items = [
FeatureFlagOut.model_validate(flag)
for flag in qs[offset : offset + limit]
]
return HTTPStatus.OK, FeatureFlagListOut(count=total, items=items)
@router.post(
"",
response={HTTPStatus.CREATED: FeatureFlagOut},
summary="Create feature flag",
description=(
"Create a new feature flag with key, type, and default value. "
"Admin or Experimenter only."
),
)
@require_admin_or_experimenter
def create_flag(
request: HttpRequest,
payload: FeatureFlagCreateIn,
) -> tuple[HTTPStatus, FeatureFlagOut]:
flag = feature_flag_create(
key=payload.key,
name=payload.name,
value_type=payload.value_type,
default_value=payload.default_value,
)
return HTTPStatus.CREATED, FeatureFlagOut.model_validate(flag)
@router.get(
"/{flag_id}",
response={HTTPStatus.OK: FeatureFlagOut},
summary="Get feature flag",
description="Retrieve a single feature flag by its UUID.",
)
def get_flag(
request: HttpRequest,
flag_id: UUID,
) -> tuple[HTTPStatus, FeatureFlagOut]:
flag = get_object_or_404(FeatureFlag, pk=flag_id)
return HTTPStatus.OK, FeatureFlagOut.model_validate(flag)
@router.patch(
"/{flag_id}",
response={HTTPStatus.OK: FeatureFlagOut},
summary="Update feature flag default value",
description=(
"Update the default value of an existing feature flag. "
"Only the default value can be changed after creation. "
"Admin or Experimenter only."
),
)
@require_admin_or_experimenter
def update_flag(
request: HttpRequest,
flag_id: UUID,
payload: FeatureFlagUpdateIn,
) -> tuple[HTTPStatus, FeatureFlagOut]:
flag = get_object_or_404(FeatureFlag, pk=flag_id)
updated = feature_flag_update_default(
flag=flag,
default_value=payload.default_value,
)
return HTTPStatus.OK, FeatureFlagOut.model_validate(updated)
+49
View File
@@ -0,0 +1,49 @@
from typing import ClassVar
from ninja import ModelSchema, Schema
from apps.flags.models import FeatureFlag, FeatureFlagType
class FeatureFlagOut(ModelSchema):
value_type: FeatureFlagType
default_value: str | int | bool
class Meta:
model = FeatureFlag
fields: ClassVar[tuple[str, ...]] = (
FeatureFlag.id.field.name,
FeatureFlag.key.field.name,
FeatureFlag.name.field.name,
FeatureFlag.value_type.field.name,
FeatureFlag.default_value.field.name,
FeatureFlag.created_at.field.name,
FeatureFlag.updated_at.field.name,
)
class FeatureFlagCreateIn(ModelSchema):
value_type: FeatureFlagType
default_value: str | int | bool
class Meta:
model = FeatureFlag
fields: ClassVar[tuple[str, ...]] = (
FeatureFlag.key.field.name,
FeatureFlag.name.field.name,
FeatureFlag.value_type.field.name,
FeatureFlag.default_value.field.name,
)
class FeatureFlagUpdateIn(ModelSchema):
class Meta:
model = FeatureFlag
fields: ClassVar[tuple[str, ...]] = (
FeatureFlag.default_value.field.name,
)
class FeatureFlagListOut(Schema):
count: int
items: list[FeatureFlagOut]
@@ -0,0 +1 @@
@@ -0,0 +1,552 @@
import json
import uuid
from typing import override
from django.test import Client, TestCase
from django.urls import reverse
from apps.flags.models import FeatureFlag, FeatureFlagType
from apps.flags.services import feature_flag_create
from apps.reviews.tests.helpers import (
make_admin,
make_approver,
make_experimenter,
make_viewer,
)
from apps.users.models import User
from apps.users.tests.helpers import auth_header
class FeatureFlagCreateAPITest(TestCase):
@override
def setUp(self) -> None:
self.client = Client()
self.admin: User = make_admin("_flags")
self.experimenter: User = make_experimenter("_flags")
self.viewer: User = make_viewer("_flags")
self.approver: User = make_approver("_flags")
self.admin_auth: str = auth_header(self.admin)
self.exp_auth: str = auth_header(self.experimenter)
self.viewer_auth: str = auth_header(self.viewer)
self.approver_auth: str = auth_header(self.approver)
def _create(self, data: dict, auth: str | None = None) -> object:
return self.client.post(
reverse("api-1:create_flag"),
data=json.dumps(data),
content_type="application/json",
**({"HTTP_AUTHORIZATION": auth} if auth else {}),
)
def test_create_string_flag_admin(self) -> None:
resp = self._create(
{
"key": "button_color",
"name": "Button Color",
"value_type": "string",
"default_value": "green",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["key"], "button_color")
self.assertEqual(data["name"], "Button Color")
self.assertEqual(data["value_type"], "string")
self.assertEqual(data["default_value"], "green")
def test_create_boolean_flag_true(self) -> None:
resp = self._create(
{
"key": "show_banner",
"name": "Show Banner",
"value_type": "boolean",
"default_value": "true",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["default_value"], "true")
def test_create_boolean_flag_false(self) -> None:
resp = self._create(
{
"key": "new_checkout",
"name": "New Checkout",
"value_type": "boolean",
"default_value": "false",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
def test_create_boolean_flag_yes_normalised(self) -> None:
resp = self._create(
{
"key": "dark_mode",
"name": "Dark Mode",
"value_type": "boolean",
"default_value": "yes",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["default_value"], "true")
def test_create_boolean_flag_invalid_value(self) -> None:
resp = self._create(
{
"key": "broken_bool",
"name": "Broken Boolean",
"value_type": "boolean",
"default_value": "maybe",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
def test_create_integer_flag(self) -> None:
resp = self._create(
{
"key": "max_retries",
"name": "Max Retries",
"value_type": "integer",
"default_value": "5",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
data = resp.json()
self.assertEqual(data["default_value"], "5")
def test_create_integer_flag_negative(self) -> None:
resp = self._create(
{
"key": "offset_val",
"name": "Offset",
"value_type": "integer",
"default_value": "-10",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 201)
def test_create_integer_flag_invalid_value(self) -> None:
resp = self._create(
{
"key": "broken_int",
"name": "Broken Int",
"value_type": "integer",
"default_value": "abc",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
def test_create_integer_flag_float_value(self) -> None:
resp = self._create(
{
"key": "broken_float",
"name": "Broken float",
"value_type": "integer",
"default_value": "3.14",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
def test_create_invalid_value_type(self) -> None:
resp = self._create(
{
"key": "bad_type",
"name": "Bad Type",
"value_type": "float",
"default_value": "1.0",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
def test_create_duplicate_key(self) -> None:
self._create(
{
"key": "dup_key",
"name": "First",
"value_type": "string",
"default_value": "a",
},
self.admin_auth,
)
resp = self._create(
{
"key": "dup_key",
"name": "Second",
"value_type": "string",
"default_value": "b",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 409)
def test_create_experimenter_allowed(self) -> None:
resp = self._create(
{
"key": "exp_flag",
"name": "Exp Flag",
"value_type": "string",
"default_value": "val",
},
self.exp_auth,
)
self.assertEqual(resp.status_code, 201)
def test_create_viewer_denied(self) -> None:
resp = self._create(
{
"key": "viewer_flag",
"name": "Viewer Flag",
"value_type": "string",
"default_value": "val",
},
self.viewer_auth,
)
self.assertEqual(resp.status_code, 403)
def test_create_approver_denied(self) -> None:
resp = self._create(
{
"key": "approver_flag",
"name": "Approver Flag",
"value_type": "string",
"default_value": "val",
},
self.approver_auth,
)
self.assertEqual(resp.status_code, 403)
def test_create_unauthenticated(self) -> None:
resp = self._create(
{
"key": "anon_flag",
"name": "Anon Flag",
"value_type": "string",
"default_value": "val",
},
)
self.assertEqual(resp.status_code, 401)
def test_create_key_invalid_pattern(self) -> None:
resp = self._create(
{
"key": "UPPER-CASE",
"name": "Bad key pattern",
"value_type": "string",
"default_value": "v",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
def test_create_key_empty(self) -> None:
resp = self._create(
{
"key": "",
"name": "Empty key",
"value_type": "string",
"default_value": "v",
},
self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
class FeatureFlagGetAPITest(TestCase):
@override
def setUp(self) -> None:
self.client = Client()
self.admin: User = make_admin("_get")
self.viewer: User = make_viewer("_get")
self.admin_auth: str = auth_header(self.admin)
self.viewer_auth: str = auth_header(self.viewer)
self.flag = feature_flag_create(
key="get_test_flag",
name="Get Test",
value_type=FeatureFlagType.STRING,
default_value="hello",
)
def test_get_flag(self) -> None:
resp = self.client.get(
reverse("api-1:get_flag", args=[str(self.flag.pk)]),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["key"], "get_test_flag")
def test_get_flag_viewer_allowed(self) -> None:
resp = self.client.get(
reverse("api-1:get_flag", args=[str(self.flag.pk)]),
HTTP_AUTHORIZATION=self.viewer_auth,
)
self.assertEqual(resp.status_code, 200)
def test_get_flag_not_found(self) -> None:
resp = self.client.get(
reverse("api-1:get_flag", args=[str(uuid.uuid4())]),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 404)
def test_get_flag_unauthenticated(self) -> None:
resp = self.client.get(
reverse("api-1:get_flag", args=[str(self.flag.pk)]),
)
self.assertEqual(resp.status_code, 401)
class FeatureFlagListAPITest(TestCase):
@override
def setUp(self) -> None:
self.client = Client()
self.admin: User = make_admin("_list")
self.viewer: User = make_viewer("_list")
self.admin_auth: str = auth_header(self.admin)
self.viewer_auth: str = auth_header(self.viewer)
def test_list_empty(self) -> None:
resp = self.client.get(
reverse("api-1:list_flags"),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["count"], 0)
self.assertEqual(data["items"], [])
def test_list_with_flags(self) -> None:
feature_flag_create(
key="list_a",
name="A",
value_type=FeatureFlagType.STRING,
default_value="a",
)
feature_flag_create(
key="list_b",
name="B",
value_type=FeatureFlagType.BOOLEAN,
default_value="true",
)
resp = self.client.get(
reverse("api-1:list_flags"),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["count"], 2)
def test_list_filter_by_type(self) -> None:
feature_flag_create(
key="type_str",
name="S",
value_type=FeatureFlagType.STRING,
default_value="s",
)
feature_flag_create(
key="type_bool",
name="B",
value_type=FeatureFlagType.BOOLEAN,
default_value="true",
)
resp = self.client.get(
reverse("api-1:list_flags"),
data={"value_type": "boolean"},
HTTP_AUTHORIZATION=self.admin_auth,
)
data = resp.json()
self.assertEqual(data["count"], 1)
def test_list_search(self) -> None:
feature_flag_create(
key="search_match",
name="S",
value_type=FeatureFlagType.STRING,
default_value="s",
)
feature_flag_create(
key="other_flag",
name="O",
value_type=FeatureFlagType.STRING,
default_value="o",
)
resp = self.client.get(
reverse("api-1:list_flags"),
data={"search": "match"},
HTTP_AUTHORIZATION=self.admin_auth,
)
data = resp.json()
self.assertEqual(data["count"], 1)
def test_list_pagination(self) -> None:
for i in range(3):
feature_flag_create(
key=f"page_{i}",
name=f"P{i}",
value_type=FeatureFlagType.STRING,
default_value="v",
)
resp = self.client.get(
reverse("api-1:list_flags"),
data={"limit": 2, "offset": 0},
HTTP_AUTHORIZATION=self.admin_auth,
)
data = resp.json()
self.assertEqual(data["count"], 3)
self.assertEqual(len(data["items"]), 2)
def test_list_viewer_allowed(self) -> None:
resp = self.client.get(
reverse("api-1:list_flags"),
HTTP_AUTHORIZATION=self.viewer_auth,
)
self.assertEqual(resp.status_code, 200)
class FeatureFlagUpdateAPITest(TestCase):
@override
def setUp(self) -> None:
self.client = Client()
self.admin: User = make_admin("_upd")
self.experimenter: User = make_experimenter("_upd")
self.viewer: User = make_viewer("_upd")
self.admin_auth: str = auth_header(self.admin)
self.exp_auth: str = auth_header(self.experimenter)
self.viewer_auth: str = auth_header(self.viewer)
def _create_flag(
self,
key: str = "upd_flag",
value_type: str = FeatureFlagType.STRING,
default_value: str = "old",
) -> FeatureFlag:
return feature_flag_create(
key=key,
name="Update Test",
value_type=value_type,
default_value=default_value,
)
def test_update_default_value_string(self) -> None:
flag = self._create_flag()
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "new"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["default_value"], "new")
def test_update_default_value_boolean_valid(self) -> None:
flag = self._create_flag(
key="upd_bool",
value_type=FeatureFlagType.BOOLEAN,
default_value="true",
)
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "false"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["default_value"], "false")
def test_update_default_value_boolean_invalid(self) -> None:
flag = self._create_flag(
key="upd_bool_inv",
value_type=FeatureFlagType.BOOLEAN,
default_value="true",
)
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "not_a_bool"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
def test_update_default_value_integer_valid(self) -> None:
flag = self._create_flag(
key="upd_int",
value_type=FeatureFlagType.INTEGER,
default_value="10",
)
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "42"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["default_value"], "42")
def test_update_default_value_integer_invalid(self) -> None:
flag = self._create_flag(
key="upd_int_inv",
value_type=FeatureFlagType.INTEGER,
default_value="10",
)
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "abc"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 422)
def test_update_not_found(self) -> None:
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(uuid.uuid4())]),
data=json.dumps({"default_value": "x"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 404)
def test_update_viewer_denied(self) -> None:
flag = self._create_flag(key="upd_viewer")
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "x"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.viewer_auth,
)
self.assertEqual(resp.status_code, 403)
def test_update_experimenter_allowed(self) -> None:
flag = self._create_flag(key="upd_exp")
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "new_val"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.exp_auth,
)
self.assertEqual(resp.status_code, 200)
def test_update_preserves_key_and_type(self) -> None:
flag = self._create_flag(key="upd_preserve")
resp = self.client.patch(
reverse("api-1:update_flag", args=[str(flag.pk)]),
data=json.dumps({"default_value": "changed"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
data = resp.json()
self.assertEqual(data["key"], "upd_preserve")
self.assertEqual(data["value_type"], "string")
View File
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class FlagsConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "apps.flags"
@@ -0,0 +1,34 @@
# Generated by Django 5.2.11 on 2026-02-13 07:57
import django.core.validators
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='FeatureFlag',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('key', models.CharField(help_text='Unique identifier for the feature flag', max_length=100, unique=True, validators=[django.core.validators.RegexValidator(message='Key must start with a lowercase letter and contain only lowercase letters, digits, and underscores.', regex='^[a-z][a-z0-9_]*$')], verbose_name='key')),
('name', models.CharField(help_text='Human-readable name for the feature flag', max_length=200, verbose_name='name')),
('value_type', models.CharField(choices=[('string', 'String'), ('boolean', 'Boolean'), ('integer', 'Integer')], default='boolean', help_text='Data type of the feature flag value', max_length=20, verbose_name='flag type')),
('default_value', models.CharField(help_text='Default value for the feature flag', max_length=200, verbose_name='default value')),
('description', models.TextField(blank=True, help_text='Optional description of the feature flag', verbose_name='description')),
('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')),
('updated_at', models.DateTimeField(auto_now=True, verbose_name='updated at')),
],
options={
'verbose_name': 'feature flag',
'verbose_name_plural': 'feature flags',
'ordering': ['-created_at'],
},
),
]
+116
View File
@@ -0,0 +1,116 @@
from typing import override
from django.core.exceptions import ValidationError
from django.core.validators import RegexValidator
from django.db import models
from django.utils.translation import gettext_lazy as _
from apps.core.models import BaseModel
KEY_PATTERN = r"^[a-z][a-z0-9_]*$"
class FeatureFlagType(models.TextChoices):
STRING = "string", _("String")
BOOLEAN = "boolean", _("Boolean")
INTEGER = "integer", _("Integer")
BOOLEAN_TRUE_VALUES = frozenset({"true", "1", "yes"})
BOOLEAN_FALSE_VALUES = frozenset({"false", "0", "no"})
BOOLEAN_ALLOWED_VALUES = BOOLEAN_TRUE_VALUES | BOOLEAN_FALSE_VALUES
def validate_value_for_type(value: str, value_type: str) -> str:
if value_type == FeatureFlagType.BOOLEAN:
if value.lower() not in BOOLEAN_ALLOWED_VALUES:
raise ValidationError(
{
"default_value": (
f"Boolean flag value must be one of: "
f"{', '.join(sorted(BOOLEAN_ALLOWED_VALUES))}. "
f"Got '{value}'."
)
}
)
return "true" if value.lower() in BOOLEAN_TRUE_VALUES else "false"
if value_type == FeatureFlagType.INTEGER:
try:
int(value)
except (ValueError, TypeError):
raise ValidationError(
{
"default_value": (
f"Integer flag value must be a valid integer. "
f"Got '{value}'."
)
}
) from None
return str(int(value))
return value
class FeatureFlag(BaseModel):
key = models.CharField(
max_length=100,
unique=True,
verbose_name=_("key"),
help_text=_("Unique identifier for the feature flag"),
validators=[
RegexValidator(
regex=KEY_PATTERN,
message=(
"Key must start with a lowercase letter and contain only "
"lowercase letters, digits, and underscores."
),
)
],
)
name = models.CharField(
max_length=200,
verbose_name=_("name"),
help_text=_("Human-readable name for the feature flag"),
)
value_type = models.CharField(
max_length=20,
choices=FeatureFlagType.choices,
default=FeatureFlagType.BOOLEAN,
verbose_name=_("flag type"),
help_text=_("Data type of the feature flag value"),
)
default_value = models.CharField(
max_length=200,
verbose_name=_("default value"),
help_text=_("Default value for the feature flag"),
)
description = models.TextField(
blank=True,
verbose_name=_("description"),
help_text=_("Optional description of the feature flag"),
)
created_at = models.DateTimeField(
auto_now_add=True,
verbose_name=_("created at"),
)
updated_at = models.DateTimeField(
auto_now=True,
verbose_name=_("updated at"),
)
class Meta:
verbose_name = _("feature flag")
verbose_name_plural = _("feature flags")
ordering = ["-created_at"]
@override
def __str__(self) -> str:
return f"{self.key} ({self.value_type})"
@override
def clean(self) -> None:
super().clean()
self.default_value = validate_value_for_type(
self.default_value, self.value_type
)
+23
View File
@@ -0,0 +1,23 @@
from django.db.models import QuerySet
from apps.flags.models import FeatureFlag
def feature_flag_list(
*,
value_type: str | None = None,
search: str | None = None,
) -> QuerySet[FeatureFlag]:
qs = FeatureFlag.objects.all()
if value_type:
qs = qs.filter(value_type=value_type)
if search:
qs = qs.filter(key__icontains=search)
return qs
def feature_flag_get_by_key(key: str) -> FeatureFlag | None:
return FeatureFlag.objects.filter(key=key).first()
+41
View File
@@ -0,0 +1,41 @@
from django.core.exceptions import ValidationError
from apps.flags.models import FeatureFlag, FeatureFlagType
def feature_flag_create(
*,
key: str,
name: str,
value_type: str,
default_value: str,
) -> FeatureFlag:
valid_types = {choice[0] for choice in FeatureFlagType.choices}
if value_type not in valid_types:
raise ValidationError(
{
"value_type": (
f"Invalid flag type '{value_type}'. "
f"Must be one of: {', '.join(sorted(valid_types))}"
)
}
)
flag = FeatureFlag(
key=key,
name=name,
value_type=value_type,
default_value=default_value,
)
flag.save()
return flag
def feature_flag_update_default(
*,
flag: FeatureFlag,
default_value: str,
) -> FeatureFlag:
flag.default_value = default_value
flag.save(update_fields=["default_value", "updated_at"])
return flag
+1
View File
@@ -0,0 +1 @@
+213
View File
@@ -0,0 +1,213 @@
from django.core.exceptions import ValidationError
from django.test import TestCase
from apps.flags.models import (
FeatureFlagType,
validate_value_for_type,
)
from apps.flags.selectors import feature_flag_get_by_key, feature_flag_list
from apps.flags.services import (
feature_flag_create,
feature_flag_update_default,
)
from config.errors import ConflictError
class ValidateValueForTypeTest(TestCase):
def test_boolean_true_variants(self) -> None:
for val in ("true", "True", "1", "yes", "YES"):
self.assertEqual(
validate_value_for_type(val, FeatureFlagType.BOOLEAN), "true"
)
def test_boolean_false_variants(self) -> None:
for val in ("false", "False", "0", "no", "NO"):
self.assertEqual(
validate_value_for_type(val, FeatureFlagType.BOOLEAN), "false"
)
def test_boolean_invalid(self) -> None:
with self.assertRaises(ValidationError):
validate_value_for_type("maybe", FeatureFlagType.BOOLEAN)
def test_integer_valid(self) -> None:
self.assertEqual(
validate_value_for_type("42", FeatureFlagType.INTEGER), "42"
)
def test_integer_negative(self) -> None:
self.assertEqual(
validate_value_for_type("-7", FeatureFlagType.INTEGER), "-7"
)
def test_integer_invalid(self) -> None:
with self.assertRaises(ValidationError):
validate_value_for_type("abc", FeatureFlagType.INTEGER)
def test_integer_float(self) -> None:
with self.assertRaises(ValidationError):
validate_value_for_type("3.14", FeatureFlagType.INTEGER)
def test_string_passthrough(self) -> None:
self.assertEqual(
validate_value_for_type("anything", FeatureFlagType.STRING),
"anything",
)
class FeatureFlagServiceTest(TestCase):
def test_create_string_flag(self) -> None:
flag = feature_flag_create(
key="svc_string",
name="Service String",
value_type=FeatureFlagType.STRING,
default_value="hello",
)
self.assertEqual(flag.key, "svc_string")
self.assertEqual(flag.value_type, FeatureFlagType.STRING)
self.assertEqual(flag.default_value, "hello")
def test_create_boolean_normalises(self) -> None:
flag = feature_flag_create(
key="svc_bool",
name="Service Bool",
value_type=FeatureFlagType.BOOLEAN,
default_value="Yes",
)
self.assertEqual(flag.default_value, "true")
def test_create_integer_normalises(self) -> None:
flag = feature_flag_create(
key="svc_int",
name="Service Int",
value_type=FeatureFlagType.INTEGER,
default_value="007",
)
self.assertEqual(flag.default_value, "7")
def test_create_invalid_type_raises(self) -> None:
with self.assertRaises(ValidationError):
feature_flag_create(
key="svc_bad",
name="Bad",
value_type="float",
default_value="1.0",
)
def test_create_bool_invalid_value_raises(self) -> None:
with self.assertRaises(ValidationError):
feature_flag_create(
key="svc_bad_bool",
name="Bad Bool",
value_type=FeatureFlagType.BOOLEAN,
default_value="maybe",
)
def test_create_int_invalid_value_raises(self) -> None:
with self.assertRaises(ValidationError):
feature_flag_create(
key="svc_bad_int",
name="Bad Int",
value_type=FeatureFlagType.INTEGER,
default_value="abc",
)
def test_create_duplicate_key_raises(self) -> None:
feature_flag_create(
key="svc_dup",
name="First",
value_type=FeatureFlagType.STRING,
default_value="a",
)
with self.assertRaises(ConflictError):
feature_flag_create(
key="svc_dup",
name="Second",
value_type=FeatureFlagType.STRING,
default_value="b",
)
def test_update_default_value(self) -> None:
flag = feature_flag_create(
key="svc_upd",
name="Update",
value_type=FeatureFlagType.STRING,
default_value="old",
)
updated = feature_flag_update_default(flag=flag, default_value="new")
self.assertEqual(updated.default_value, "new")
flag.refresh_from_db()
self.assertEqual(flag.default_value, "new")
def test_update_bool_validates(self) -> None:
flag = feature_flag_create(
key="svc_upd_bool",
name="Update Bool",
value_type=FeatureFlagType.BOOLEAN,
default_value="true",
)
with self.assertRaises(ValidationError):
feature_flag_update_default(flag=flag, default_value="maybe")
def test_update_int_validates(self) -> None:
flag = feature_flag_create(
key="svc_upd_int",
name="Update Int",
value_type=FeatureFlagType.INTEGER,
default_value="5",
)
with self.assertRaises(ValidationError):
feature_flag_update_default(flag=flag, default_value="bad")
class FeatureFlagSelectorTest(TestCase):
def test_list_all(self) -> None:
feature_flag_create(
key="sel_a", name="A", value_type="string", default_value="a"
)
feature_flag_create(
key="sel_b", name="B", value_type="boolean", default_value="true"
)
self.assertEqual(feature_flag_list().count(), 2)
def test_list_filter_type(self) -> None:
feature_flag_create(
key="sel_ft_s",
name="S",
value_type="string",
default_value="s",
)
feature_flag_create(
key="sel_ft_b",
name="B",
value_type="boolean",
default_value="true",
)
qs = feature_flag_list(value_type="boolean")
self.assertEqual(qs.count(), 1)
self.assertEqual(qs.first().key, "sel_ft_b")
def test_list_search(self) -> None:
feature_flag_create(
key="sel_search_match",
name="M",
value_type="string",
default_value="m",
)
feature_flag_create(
key="sel_other", name="O", value_type="string", default_value="o"
)
qs = feature_flag_list(search="match")
self.assertEqual(qs.count(), 1)
def test_get_by_key(self) -> None:
feature_flag_create(
key="sel_get", name="G", value_type="string", default_value="g"
)
flag = feature_flag_get_by_key("sel_get")
self.assertIsNotNone(flag)
self.assertEqual(flag.key, "sel_get")
def test_get_by_key_not_found(self) -> None:
flag = feature_flag_get_by_key("nonexistent")
self.assertIsNone(flag)