diff --git a/src/backend/api/v1/flags/__init__.py b/src/backend/api/v1/flags/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/backend/api/v1/flags/__init__.py @@ -0,0 +1 @@ + diff --git a/src/backend/api/v1/flags/apps.py b/src/backend/api/v1/flags/apps.py new file mode 100644 index 0000000..5e78b97 --- /dev/null +++ b/src/backend/api/v1/flags/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class FlagsApiConfig(AppConfig): + name = "api.v1.flags" + label = "api_v1_flags" diff --git a/src/backend/api/v1/flags/endpoints.py b/src/backend/api/v1/flags/endpoints.py new file mode 100644 index 0000000..89d0aec --- /dev/null +++ b/src/backend/api/v1/flags/endpoints.py @@ -0,0 +1,105 @@ +from http import HTTPStatus +from uuid import UUID + +from django.http import HttpRequest +from django.shortcuts import get_object_or_404 +from ninja import Router + +from api.v1.flags.schemas import ( + FeatureFlagCreateIn, + FeatureFlagListOut, + FeatureFlagOut, + FeatureFlagUpdateIn, +) +from apps.flags.models import FeatureFlag, FeatureFlagType +from apps.flags.selectors import feature_flag_list +from apps.flags.services import ( + feature_flag_create, + feature_flag_update_default, +) +from apps.users.auth.bearer import jwt_bearer, require_admin_or_experimenter + +router = Router(tags=["flags"], auth=jwt_bearer) + + +@router.get( + "", + response={HTTPStatus.OK: FeatureFlagListOut}, + summary="List feature flags", + description="Return a filtered, paginated list of feature flags.", +) +def list_flags( + request: HttpRequest, + value_type: FeatureFlagType | None = None, + search: str | None = None, + limit: int = 50, + offset: int = 0, +) -> tuple[HTTPStatus, FeatureFlagListOut]: + qs = feature_flag_list(value_type=value_type, search=search) + total = qs.count() + items = [ + FeatureFlagOut.model_validate(flag) + for flag in qs[offset : offset + limit] + ] + return HTTPStatus.OK, FeatureFlagListOut(count=total, items=items) + + +@router.post( + "", + response={HTTPStatus.CREATED: FeatureFlagOut}, + summary="Create feature flag", + description=( + "Create a new feature flag with key, type, and default value. " + "Admin or Experimenter only." + ), +) +@require_admin_or_experimenter +def create_flag( + request: HttpRequest, + payload: FeatureFlagCreateIn, +) -> tuple[HTTPStatus, FeatureFlagOut]: + flag = feature_flag_create( + key=payload.key, + name=payload.name, + value_type=payload.value_type, + default_value=payload.default_value, + ) + return HTTPStatus.CREATED, FeatureFlagOut.model_validate(flag) + + +@router.get( + "/{flag_id}", + response={HTTPStatus.OK: FeatureFlagOut}, + summary="Get feature flag", + description="Retrieve a single feature flag by its UUID.", +) +def get_flag( + request: HttpRequest, + flag_id: UUID, +) -> tuple[HTTPStatus, FeatureFlagOut]: + flag = get_object_or_404(FeatureFlag, pk=flag_id) + return HTTPStatus.OK, FeatureFlagOut.model_validate(flag) + + +@router.patch( + "/{flag_id}", + response={HTTPStatus.OK: FeatureFlagOut}, + summary="Update feature flag default value", + description=( + "Update the default value of an existing feature flag. " + "Only the default value can be changed after creation. " + "Admin or Experimenter only." + ), +) +@require_admin_or_experimenter +def update_flag( + request: HttpRequest, + flag_id: UUID, + payload: FeatureFlagUpdateIn, +) -> tuple[HTTPStatus, FeatureFlagOut]: + flag = get_object_or_404(FeatureFlag, pk=flag_id) + updated = feature_flag_update_default( + flag=flag, + default_value=payload.default_value, + ) + return HTTPStatus.OK, FeatureFlagOut.model_validate(updated) diff --git a/src/backend/api/v1/flags/schemas.py b/src/backend/api/v1/flags/schemas.py new file mode 100644 index 0000000..a68ec79 --- /dev/null +++ b/src/backend/api/v1/flags/schemas.py @@ -0,0 +1,49 @@ +from typing import ClassVar + +from ninja import ModelSchema, Schema + +from apps.flags.models import FeatureFlag, FeatureFlagType + + +class FeatureFlagOut(ModelSchema): + value_type: FeatureFlagType + default_value: str | int | bool + + class Meta: + model = FeatureFlag + fields: ClassVar[tuple[str, ...]] = ( + FeatureFlag.id.field.name, + FeatureFlag.key.field.name, + FeatureFlag.name.field.name, + FeatureFlag.value_type.field.name, + FeatureFlag.default_value.field.name, + FeatureFlag.created_at.field.name, + FeatureFlag.updated_at.field.name, + ) + + +class FeatureFlagCreateIn(ModelSchema): + value_type: FeatureFlagType + default_value: str | int | bool + + class Meta: + model = FeatureFlag + fields: ClassVar[tuple[str, ...]] = ( + FeatureFlag.key.field.name, + FeatureFlag.name.field.name, + FeatureFlag.value_type.field.name, + FeatureFlag.default_value.field.name, + ) + + +class FeatureFlagUpdateIn(ModelSchema): + class Meta: + model = FeatureFlag + fields: ClassVar[tuple[str, ...]] = ( + FeatureFlag.default_value.field.name, + ) + + +class FeatureFlagListOut(Schema): + count: int + items: list[FeatureFlagOut] diff --git a/src/backend/api/v1/flags/tests/__init__.py b/src/backend/api/v1/flags/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/backend/api/v1/flags/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/src/backend/api/v1/flags/tests/test_flags_api.py b/src/backend/api/v1/flags/tests/test_flags_api.py new file mode 100644 index 0000000..6cdcdef --- /dev/null +++ b/src/backend/api/v1/flags/tests/test_flags_api.py @@ -0,0 +1,552 @@ +import json +import uuid +from typing import override + +from django.test import Client, TestCase +from django.urls import reverse + +from apps.flags.models import FeatureFlag, FeatureFlagType +from apps.flags.services import feature_flag_create +from apps.reviews.tests.helpers import ( + make_admin, + make_approver, + make_experimenter, + make_viewer, +) +from apps.users.models import User +from apps.users.tests.helpers import auth_header + + +class FeatureFlagCreateAPITest(TestCase): + @override + def setUp(self) -> None: + self.client = Client() + self.admin: User = make_admin("_flags") + self.experimenter: User = make_experimenter("_flags") + self.viewer: User = make_viewer("_flags") + self.approver: User = make_approver("_flags") + self.admin_auth: str = auth_header(self.admin) + self.exp_auth: str = auth_header(self.experimenter) + self.viewer_auth: str = auth_header(self.viewer) + self.approver_auth: str = auth_header(self.approver) + + def _create(self, data: dict, auth: str | None = None) -> object: + return self.client.post( + reverse("api-1:create_flag"), + data=json.dumps(data), + content_type="application/json", + **({"HTTP_AUTHORIZATION": auth} if auth else {}), + ) + + def test_create_string_flag_admin(self) -> None: + resp = self._create( + { + "key": "button_color", + "name": "Button Color", + "value_type": "string", + "default_value": "green", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 201) + data = resp.json() + self.assertEqual(data["key"], "button_color") + self.assertEqual(data["name"], "Button Color") + self.assertEqual(data["value_type"], "string") + self.assertEqual(data["default_value"], "green") + + def test_create_boolean_flag_true(self) -> None: + resp = self._create( + { + "key": "show_banner", + "name": "Show Banner", + "value_type": "boolean", + "default_value": "true", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 201) + data = resp.json() + self.assertEqual(data["default_value"], "true") + + def test_create_boolean_flag_false(self) -> None: + resp = self._create( + { + "key": "new_checkout", + "name": "New Checkout", + "value_type": "boolean", + "default_value": "false", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 201) + + def test_create_boolean_flag_yes_normalised(self) -> None: + resp = self._create( + { + "key": "dark_mode", + "name": "Dark Mode", + "value_type": "boolean", + "default_value": "yes", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 201) + data = resp.json() + self.assertEqual(data["default_value"], "true") + + def test_create_boolean_flag_invalid_value(self) -> None: + resp = self._create( + { + "key": "broken_bool", + "name": "Broken Boolean", + "value_type": "boolean", + "default_value": "maybe", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + def test_create_integer_flag(self) -> None: + resp = self._create( + { + "key": "max_retries", + "name": "Max Retries", + "value_type": "integer", + "default_value": "5", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 201) + data = resp.json() + self.assertEqual(data["default_value"], "5") + + def test_create_integer_flag_negative(self) -> None: + resp = self._create( + { + "key": "offset_val", + "name": "Offset", + "value_type": "integer", + "default_value": "-10", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 201) + + def test_create_integer_flag_invalid_value(self) -> None: + resp = self._create( + { + "key": "broken_int", + "name": "Broken Int", + "value_type": "integer", + "default_value": "abc", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + def test_create_integer_flag_float_value(self) -> None: + resp = self._create( + { + "key": "broken_float", + "name": "Broken float", + "value_type": "integer", + "default_value": "3.14", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + def test_create_invalid_value_type(self) -> None: + resp = self._create( + { + "key": "bad_type", + "name": "Bad Type", + "value_type": "float", + "default_value": "1.0", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + def test_create_duplicate_key(self) -> None: + self._create( + { + "key": "dup_key", + "name": "First", + "value_type": "string", + "default_value": "a", + }, + self.admin_auth, + ) + resp = self._create( + { + "key": "dup_key", + "name": "Second", + "value_type": "string", + "default_value": "b", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 409) + + def test_create_experimenter_allowed(self) -> None: + resp = self._create( + { + "key": "exp_flag", + "name": "Exp Flag", + "value_type": "string", + "default_value": "val", + }, + self.exp_auth, + ) + self.assertEqual(resp.status_code, 201) + + def test_create_viewer_denied(self) -> None: + resp = self._create( + { + "key": "viewer_flag", + "name": "Viewer Flag", + "value_type": "string", + "default_value": "val", + }, + self.viewer_auth, + ) + self.assertEqual(resp.status_code, 403) + + def test_create_approver_denied(self) -> None: + resp = self._create( + { + "key": "approver_flag", + "name": "Approver Flag", + "value_type": "string", + "default_value": "val", + }, + self.approver_auth, + ) + self.assertEqual(resp.status_code, 403) + + def test_create_unauthenticated(self) -> None: + resp = self._create( + { + "key": "anon_flag", + "name": "Anon Flag", + "value_type": "string", + "default_value": "val", + }, + ) + self.assertEqual(resp.status_code, 401) + + def test_create_key_invalid_pattern(self) -> None: + resp = self._create( + { + "key": "UPPER-CASE", + "name": "Bad key pattern", + "value_type": "string", + "default_value": "v", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + def test_create_key_empty(self) -> None: + resp = self._create( + { + "key": "", + "name": "Empty key", + "value_type": "string", + "default_value": "v", + }, + self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + +class FeatureFlagGetAPITest(TestCase): + @override + def setUp(self) -> None: + self.client = Client() + self.admin: User = make_admin("_get") + self.viewer: User = make_viewer("_get") + self.admin_auth: str = auth_header(self.admin) + self.viewer_auth: str = auth_header(self.viewer) + self.flag = feature_flag_create( + key="get_test_flag", + name="Get Test", + value_type=FeatureFlagType.STRING, + default_value="hello", + ) + + def test_get_flag(self) -> None: + resp = self.client.get( + reverse("api-1:get_flag", args=[str(self.flag.pk)]), + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["key"], "get_test_flag") + + def test_get_flag_viewer_allowed(self) -> None: + resp = self.client.get( + reverse("api-1:get_flag", args=[str(self.flag.pk)]), + HTTP_AUTHORIZATION=self.viewer_auth, + ) + self.assertEqual(resp.status_code, 200) + + def test_get_flag_not_found(self) -> None: + resp = self.client.get( + reverse("api-1:get_flag", args=[str(uuid.uuid4())]), + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 404) + + def test_get_flag_unauthenticated(self) -> None: + resp = self.client.get( + reverse("api-1:get_flag", args=[str(self.flag.pk)]), + ) + self.assertEqual(resp.status_code, 401) + + +class FeatureFlagListAPITest(TestCase): + @override + def setUp(self) -> None: + self.client = Client() + self.admin: User = make_admin("_list") + self.viewer: User = make_viewer("_list") + self.admin_auth: str = auth_header(self.admin) + self.viewer_auth: str = auth_header(self.viewer) + + def test_list_empty(self) -> None: + resp = self.client.get( + reverse("api-1:list_flags"), + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["count"], 0) + self.assertEqual(data["items"], []) + + def test_list_with_flags(self) -> None: + feature_flag_create( + key="list_a", + name="A", + value_type=FeatureFlagType.STRING, + default_value="a", + ) + feature_flag_create( + key="list_b", + name="B", + value_type=FeatureFlagType.BOOLEAN, + default_value="true", + ) + resp = self.client.get( + reverse("api-1:list_flags"), + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["count"], 2) + + def test_list_filter_by_type(self) -> None: + feature_flag_create( + key="type_str", + name="S", + value_type=FeatureFlagType.STRING, + default_value="s", + ) + feature_flag_create( + key="type_bool", + name="B", + value_type=FeatureFlagType.BOOLEAN, + default_value="true", + ) + resp = self.client.get( + reverse("api-1:list_flags"), + data={"value_type": "boolean"}, + HTTP_AUTHORIZATION=self.admin_auth, + ) + data = resp.json() + self.assertEqual(data["count"], 1) + + def test_list_search(self) -> None: + feature_flag_create( + key="search_match", + name="S", + value_type=FeatureFlagType.STRING, + default_value="s", + ) + feature_flag_create( + key="other_flag", + name="O", + value_type=FeatureFlagType.STRING, + default_value="o", + ) + resp = self.client.get( + reverse("api-1:list_flags"), + data={"search": "match"}, + HTTP_AUTHORIZATION=self.admin_auth, + ) + data = resp.json() + self.assertEqual(data["count"], 1) + + def test_list_pagination(self) -> None: + for i in range(3): + feature_flag_create( + key=f"page_{i}", + name=f"P{i}", + value_type=FeatureFlagType.STRING, + default_value="v", + ) + resp = self.client.get( + reverse("api-1:list_flags"), + data={"limit": 2, "offset": 0}, + HTTP_AUTHORIZATION=self.admin_auth, + ) + data = resp.json() + self.assertEqual(data["count"], 3) + self.assertEqual(len(data["items"]), 2) + + def test_list_viewer_allowed(self) -> None: + resp = self.client.get( + reverse("api-1:list_flags"), + HTTP_AUTHORIZATION=self.viewer_auth, + ) + self.assertEqual(resp.status_code, 200) + + +class FeatureFlagUpdateAPITest(TestCase): + @override + def setUp(self) -> None: + self.client = Client() + self.admin: User = make_admin("_upd") + self.experimenter: User = make_experimenter("_upd") + self.viewer: User = make_viewer("_upd") + self.admin_auth: str = auth_header(self.admin) + self.exp_auth: str = auth_header(self.experimenter) + self.viewer_auth: str = auth_header(self.viewer) + + def _create_flag( + self, + key: str = "upd_flag", + value_type: str = FeatureFlagType.STRING, + default_value: str = "old", + ) -> FeatureFlag: + return feature_flag_create( + key=key, + name="Update Test", + value_type=value_type, + default_value=default_value, + ) + + def test_update_default_value_string(self) -> None: + flag = self._create_flag() + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "new"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["default_value"], "new") + + def test_update_default_value_boolean_valid(self) -> None: + flag = self._create_flag( + key="upd_bool", + value_type=FeatureFlagType.BOOLEAN, + default_value="true", + ) + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "false"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["default_value"], "false") + + def test_update_default_value_boolean_invalid(self) -> None: + flag = self._create_flag( + key="upd_bool_inv", + value_type=FeatureFlagType.BOOLEAN, + default_value="true", + ) + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "not_a_bool"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + def test_update_default_value_integer_valid(self) -> None: + flag = self._create_flag( + key="upd_int", + value_type=FeatureFlagType.INTEGER, + default_value="10", + ) + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "42"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 200) + data = resp.json() + self.assertEqual(data["default_value"], "42") + + def test_update_default_value_integer_invalid(self) -> None: + flag = self._create_flag( + key="upd_int_inv", + value_type=FeatureFlagType.INTEGER, + default_value="10", + ) + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "abc"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 422) + + def test_update_not_found(self) -> None: + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(uuid.uuid4())]), + data=json.dumps({"default_value": "x"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.admin_auth, + ) + self.assertEqual(resp.status_code, 404) + + def test_update_viewer_denied(self) -> None: + flag = self._create_flag(key="upd_viewer") + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "x"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.viewer_auth, + ) + self.assertEqual(resp.status_code, 403) + + def test_update_experimenter_allowed(self) -> None: + flag = self._create_flag(key="upd_exp") + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "new_val"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.exp_auth, + ) + self.assertEqual(resp.status_code, 200) + + def test_update_preserves_key_and_type(self) -> None: + flag = self._create_flag(key="upd_preserve") + resp = self.client.patch( + reverse("api-1:update_flag", args=[str(flag.pk)]), + data=json.dumps({"default_value": "changed"}), + content_type="application/json", + HTTP_AUTHORIZATION=self.admin_auth, + ) + data = resp.json() + self.assertEqual(data["key"], "upd_preserve") + self.assertEqual(data["value_type"], "string") diff --git a/src/backend/apps/flags/__init__.py b/src/backend/apps/flags/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/flags/apps.py b/src/backend/apps/flags/apps.py new file mode 100644 index 0000000..d6091b5 --- /dev/null +++ b/src/backend/apps/flags/apps.py @@ -0,0 +1,6 @@ +from django.apps import AppConfig + + +class FlagsConfig(AppConfig): + default_auto_field = "django.db.models.BigAutoField" + name = "apps.flags" diff --git a/src/backend/apps/flags/migrations/0001_initial.py b/src/backend/apps/flags/migrations/0001_initial.py new file mode 100644 index 0000000..d0266be --- /dev/null +++ b/src/backend/apps/flags/migrations/0001_initial.py @@ -0,0 +1,34 @@ +# Generated by Django 5.2.11 on 2026-02-13 07:57 + +import django.core.validators +import uuid +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='FeatureFlag', + fields=[ + ('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)), + ('key', models.CharField(help_text='Unique identifier for the feature flag', max_length=100, unique=True, validators=[django.core.validators.RegexValidator(message='Key must start with a lowercase letter and contain only lowercase letters, digits, and underscores.', regex='^[a-z][a-z0-9_]*$')], verbose_name='key')), + ('name', models.CharField(help_text='Human-readable name for the feature flag', max_length=200, verbose_name='name')), + ('value_type', models.CharField(choices=[('string', 'String'), ('boolean', 'Boolean'), ('integer', 'Integer')], default='boolean', help_text='Data type of the feature flag value', max_length=20, verbose_name='flag type')), + ('default_value', models.CharField(help_text='Default value for the feature flag', max_length=200, verbose_name='default value')), + ('description', models.TextField(blank=True, help_text='Optional description of the feature flag', verbose_name='description')), + ('created_at', models.DateTimeField(auto_now_add=True, verbose_name='created at')), + ('updated_at', models.DateTimeField(auto_now=True, verbose_name='updated at')), + ], + options={ + 'verbose_name': 'feature flag', + 'verbose_name_plural': 'feature flags', + 'ordering': ['-created_at'], + }, + ), + ] diff --git a/src/backend/apps/flags/migrations/__init__.py b/src/backend/apps/flags/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/apps/flags/models.py b/src/backend/apps/flags/models.py new file mode 100644 index 0000000..29388f6 --- /dev/null +++ b/src/backend/apps/flags/models.py @@ -0,0 +1,116 @@ +from typing import override + +from django.core.exceptions import ValidationError +from django.core.validators import RegexValidator +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from apps.core.models import BaseModel + +KEY_PATTERN = r"^[a-z][a-z0-9_]*$" + + +class FeatureFlagType(models.TextChoices): + STRING = "string", _("String") + BOOLEAN = "boolean", _("Boolean") + INTEGER = "integer", _("Integer") + + +BOOLEAN_TRUE_VALUES = frozenset({"true", "1", "yes"}) +BOOLEAN_FALSE_VALUES = frozenset({"false", "0", "no"}) +BOOLEAN_ALLOWED_VALUES = BOOLEAN_TRUE_VALUES | BOOLEAN_FALSE_VALUES + + +def validate_value_for_type(value: str, value_type: str) -> str: + if value_type == FeatureFlagType.BOOLEAN: + if value.lower() not in BOOLEAN_ALLOWED_VALUES: + raise ValidationError( + { + "default_value": ( + f"Boolean flag value must be one of: " + f"{', '.join(sorted(BOOLEAN_ALLOWED_VALUES))}. " + f"Got '{value}'." + ) + } + ) + return "true" if value.lower() in BOOLEAN_TRUE_VALUES else "false" + + if value_type == FeatureFlagType.INTEGER: + try: + int(value) + except (ValueError, TypeError): + raise ValidationError( + { + "default_value": ( + f"Integer flag value must be a valid integer. " + f"Got '{value}'." + ) + } + ) from None + return str(int(value)) + + return value + + +class FeatureFlag(BaseModel): + key = models.CharField( + max_length=100, + unique=True, + verbose_name=_("key"), + help_text=_("Unique identifier for the feature flag"), + validators=[ + RegexValidator( + regex=KEY_PATTERN, + message=( + "Key must start with a lowercase letter and contain only " + "lowercase letters, digits, and underscores." + ), + ) + ], + ) + name = models.CharField( + max_length=200, + verbose_name=_("name"), + help_text=_("Human-readable name for the feature flag"), + ) + value_type = models.CharField( + max_length=20, + choices=FeatureFlagType.choices, + default=FeatureFlagType.BOOLEAN, + verbose_name=_("flag type"), + help_text=_("Data type of the feature flag value"), + ) + default_value = models.CharField( + max_length=200, + verbose_name=_("default value"), + help_text=_("Default value for the feature flag"), + ) + description = models.TextField( + blank=True, + verbose_name=_("description"), + help_text=_("Optional description of the feature flag"), + ) + created_at = models.DateTimeField( + auto_now_add=True, + verbose_name=_("created at"), + ) + updated_at = models.DateTimeField( + auto_now=True, + verbose_name=_("updated at"), + ) + + class Meta: + verbose_name = _("feature flag") + verbose_name_plural = _("feature flags") + ordering = ["-created_at"] + + @override + def __str__(self) -> str: + return f"{self.key} ({self.value_type})" + + @override + def clean(self) -> None: + super().clean() + self.default_value = validate_value_for_type( + self.default_value, self.value_type + ) diff --git a/src/backend/apps/flags/selectors.py b/src/backend/apps/flags/selectors.py new file mode 100644 index 0000000..7c47e1b --- /dev/null +++ b/src/backend/apps/flags/selectors.py @@ -0,0 +1,23 @@ +from django.db.models import QuerySet + +from apps.flags.models import FeatureFlag + + +def feature_flag_list( + *, + value_type: str | None = None, + search: str | None = None, +) -> QuerySet[FeatureFlag]: + qs = FeatureFlag.objects.all() + + if value_type: + qs = qs.filter(value_type=value_type) + + if search: + qs = qs.filter(key__icontains=search) + + return qs + + +def feature_flag_get_by_key(key: str) -> FeatureFlag | None: + return FeatureFlag.objects.filter(key=key).first() diff --git a/src/backend/apps/flags/services.py b/src/backend/apps/flags/services.py new file mode 100644 index 0000000..4a86f2f --- /dev/null +++ b/src/backend/apps/flags/services.py @@ -0,0 +1,41 @@ +from django.core.exceptions import ValidationError + +from apps.flags.models import FeatureFlag, FeatureFlagType + + +def feature_flag_create( + *, + key: str, + name: str, + value_type: str, + default_value: str, +) -> FeatureFlag: + valid_types = {choice[0] for choice in FeatureFlagType.choices} + if value_type not in valid_types: + raise ValidationError( + { + "value_type": ( + f"Invalid flag type '{value_type}'. " + f"Must be one of: {', '.join(sorted(valid_types))}" + ) + } + ) + + flag = FeatureFlag( + key=key, + name=name, + value_type=value_type, + default_value=default_value, + ) + flag.save() + return flag + + +def feature_flag_update_default( + *, + flag: FeatureFlag, + default_value: str, +) -> FeatureFlag: + flag.default_value = default_value + flag.save(update_fields=["default_value", "updated_at"]) + return flag diff --git a/src/backend/apps/flags/tests/__init__.py b/src/backend/apps/flags/tests/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/src/backend/apps/flags/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/src/backend/apps/flags/tests/test_flags.py b/src/backend/apps/flags/tests/test_flags.py new file mode 100644 index 0000000..201c69e --- /dev/null +++ b/src/backend/apps/flags/tests/test_flags.py @@ -0,0 +1,213 @@ +from django.core.exceptions import ValidationError +from django.test import TestCase + +from apps.flags.models import ( + FeatureFlagType, + validate_value_for_type, +) +from apps.flags.selectors import feature_flag_get_by_key, feature_flag_list +from apps.flags.services import ( + feature_flag_create, + feature_flag_update_default, +) +from config.errors import ConflictError + + +class ValidateValueForTypeTest(TestCase): + def test_boolean_true_variants(self) -> None: + for val in ("true", "True", "1", "yes", "YES"): + self.assertEqual( + validate_value_for_type(val, FeatureFlagType.BOOLEAN), "true" + ) + + def test_boolean_false_variants(self) -> None: + for val in ("false", "False", "0", "no", "NO"): + self.assertEqual( + validate_value_for_type(val, FeatureFlagType.BOOLEAN), "false" + ) + + def test_boolean_invalid(self) -> None: + with self.assertRaises(ValidationError): + validate_value_for_type("maybe", FeatureFlagType.BOOLEAN) + + def test_integer_valid(self) -> None: + self.assertEqual( + validate_value_for_type("42", FeatureFlagType.INTEGER), "42" + ) + + def test_integer_negative(self) -> None: + self.assertEqual( + validate_value_for_type("-7", FeatureFlagType.INTEGER), "-7" + ) + + def test_integer_invalid(self) -> None: + with self.assertRaises(ValidationError): + validate_value_for_type("abc", FeatureFlagType.INTEGER) + + def test_integer_float(self) -> None: + with self.assertRaises(ValidationError): + validate_value_for_type("3.14", FeatureFlagType.INTEGER) + + def test_string_passthrough(self) -> None: + self.assertEqual( + validate_value_for_type("anything", FeatureFlagType.STRING), + "anything", + ) + + +class FeatureFlagServiceTest(TestCase): + def test_create_string_flag(self) -> None: + flag = feature_flag_create( + key="svc_string", + name="Service String", + value_type=FeatureFlagType.STRING, + default_value="hello", + ) + self.assertEqual(flag.key, "svc_string") + self.assertEqual(flag.value_type, FeatureFlagType.STRING) + self.assertEqual(flag.default_value, "hello") + + def test_create_boolean_normalises(self) -> None: + flag = feature_flag_create( + key="svc_bool", + name="Service Bool", + value_type=FeatureFlagType.BOOLEAN, + default_value="Yes", + ) + self.assertEqual(flag.default_value, "true") + + def test_create_integer_normalises(self) -> None: + flag = feature_flag_create( + key="svc_int", + name="Service Int", + value_type=FeatureFlagType.INTEGER, + default_value="007", + ) + self.assertEqual(flag.default_value, "7") + + def test_create_invalid_type_raises(self) -> None: + with self.assertRaises(ValidationError): + feature_flag_create( + key="svc_bad", + name="Bad", + value_type="float", + default_value="1.0", + ) + + def test_create_bool_invalid_value_raises(self) -> None: + with self.assertRaises(ValidationError): + feature_flag_create( + key="svc_bad_bool", + name="Bad Bool", + value_type=FeatureFlagType.BOOLEAN, + default_value="maybe", + ) + + def test_create_int_invalid_value_raises(self) -> None: + with self.assertRaises(ValidationError): + feature_flag_create( + key="svc_bad_int", + name="Bad Int", + value_type=FeatureFlagType.INTEGER, + default_value="abc", + ) + + def test_create_duplicate_key_raises(self) -> None: + feature_flag_create( + key="svc_dup", + name="First", + value_type=FeatureFlagType.STRING, + default_value="a", + ) + with self.assertRaises(ConflictError): + feature_flag_create( + key="svc_dup", + name="Second", + value_type=FeatureFlagType.STRING, + default_value="b", + ) + + def test_update_default_value(self) -> None: + flag = feature_flag_create( + key="svc_upd", + name="Update", + value_type=FeatureFlagType.STRING, + default_value="old", + ) + updated = feature_flag_update_default(flag=flag, default_value="new") + self.assertEqual(updated.default_value, "new") + flag.refresh_from_db() + self.assertEqual(flag.default_value, "new") + + def test_update_bool_validates(self) -> None: + flag = feature_flag_create( + key="svc_upd_bool", + name="Update Bool", + value_type=FeatureFlagType.BOOLEAN, + default_value="true", + ) + with self.assertRaises(ValidationError): + feature_flag_update_default(flag=flag, default_value="maybe") + + def test_update_int_validates(self) -> None: + flag = feature_flag_create( + key="svc_upd_int", + name="Update Int", + value_type=FeatureFlagType.INTEGER, + default_value="5", + ) + with self.assertRaises(ValidationError): + feature_flag_update_default(flag=flag, default_value="bad") + + +class FeatureFlagSelectorTest(TestCase): + def test_list_all(self) -> None: + feature_flag_create( + key="sel_a", name="A", value_type="string", default_value="a" + ) + feature_flag_create( + key="sel_b", name="B", value_type="boolean", default_value="true" + ) + self.assertEqual(feature_flag_list().count(), 2) + + def test_list_filter_type(self) -> None: + feature_flag_create( + key="sel_ft_s", + name="S", + value_type="string", + default_value="s", + ) + feature_flag_create( + key="sel_ft_b", + name="B", + value_type="boolean", + default_value="true", + ) + qs = feature_flag_list(value_type="boolean") + self.assertEqual(qs.count(), 1) + self.assertEqual(qs.first().key, "sel_ft_b") + + def test_list_search(self) -> None: + feature_flag_create( + key="sel_search_match", + name="M", + value_type="string", + default_value="m", + ) + feature_flag_create( + key="sel_other", name="O", value_type="string", default_value="o" + ) + qs = feature_flag_list(search="match") + self.assertEqual(qs.count(), 1) + + def test_get_by_key(self) -> None: + feature_flag_create( + key="sel_get", name="G", value_type="string", default_value="g" + ) + flag = feature_flag_get_by_key("sel_get") + self.assertIsNotNone(flag) + self.assertEqual(flag.key, "sel_get") + + def test_get_by_key_not_found(self) -> None: + flag = feature_flag_get_by_key("nonexistent") + self.assertIsNone(flag)