chore(): test and validation improvements

This commit is contained in:
ITQ
2026-02-13 10:39:11 +03:00
parent 9c45d9883e
commit 94d99c72e6
12 changed files with 96 additions and 177 deletions
+3 -1
View File
@@ -3,7 +3,7 @@ from typing import ClassVar
from ninja import ModelSchema, Schema
from pydantic import Field
from apps.users.models import User
from apps.users.models import User, UserRole
class LoginIn(Schema):
@@ -51,6 +51,8 @@ class TokenRefreshOut(Schema):
class MeOut(ModelSchema):
role: UserRole
class Meta:
model = User
fields: ClassVar[tuple[str, ...]] = (
+28 -5
View File
@@ -34,6 +34,22 @@ def create_error_response(
return router.create_response(request, error_data, status=http_status)
def _clean_pydantic_loc(loc: list[Any]) -> str:
parts = [str(p) for p in loc]
if parts and parts[0] in {"body", "query", "path", "header", "cookie"}:
if parts[0] == "body" and len(parts) > 2:
parts = parts[2:]
else:
parts = parts[1:]
return ".".join(parts) if parts else "non_field_error"
def _extract_pydantic_rejected_value(error: dict[str, Any]) -> Any:
if error.get("type") == "missing":
return None
return error.get("input")
def handle_validation_error(
request: HttpRequest,
exc: ninja.errors.ValidationError,
@@ -42,12 +58,11 @@ def handle_validation_error(
field_errors_data: list[dict[str, Any]] = []
for error in exc.errors:
loc = error.get("loc", [])
field = ".".join(map(str, loc)) if loc else "non_field_error"
field_errors_data.append(
{
"field": field,
"field": _clean_pydantic_loc(loc),
"issue": error.get("msg", "Unknown error"),
"rejected_value": error.get("input"),
"rejected_value": _extract_pydantic_rejected_value(error),
}
)
@@ -63,6 +78,14 @@ def handle_validation_error(
)
def _extract_django_rejected_value(
error: django.core.exceptions.ValidationError,
) -> Any:
if error.params and "value" in error.params:
return error.params["value"]
return None
def handle_django_validation_error(
request: HttpRequest,
exc: django.core.exceptions.ValidationError,
@@ -77,7 +100,7 @@ def handle_django_validation_error(
{
"field": field,
"issue": str(error.message),
"rejected_value": None,
"rejected_value": _extract_django_rejected_value(error),
}
for error in errors
)
@@ -86,7 +109,7 @@ def handle_django_validation_error(
{
"field": "non_field_error",
"issue": str(error.message),
"rejected_value": None,
"rejected_value": _extract_django_rejected_value(error),
}
for error in exc.error_list
)
+2
View File
@@ -27,6 +27,8 @@ class ExperimenterOut(ModelSchema):
User.id.field.name,
User.username.field.name,
User.email.field.name,
User.first_name.field.name,
User.last_name.field.name,
)
@@ -162,7 +162,7 @@ class ApproverGroupAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_create_group_duplicate_raises(self) -> None:
self.client.post(
@@ -187,7 +187,7 @@ class ApproverGroupAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400, 409])
self.assertEqual(resp.status_code, 409)
def test_create_group_approver_wrong_role(self) -> None:
resp = self.client.post(
@@ -202,7 +202,7 @@ class ApproverGroupAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def testget_group_admin(self) -> None:
group: ApproverGroup = approver_group_create(
@@ -416,7 +416,7 @@ class ApproverGroupAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_add_approver_duplicate(self) -> None:
group: ApproverGroup = approver_group_create(
@@ -432,7 +432,7 @@ class ApproverGroupAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_add_approver_viewer_denied(self) -> None:
group: ApproverGroup = approver_group_create(
@@ -485,7 +485,7 @@ class ApproverGroupAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_remove_approver_not_in_group(self) -> None:
group: ApproverGroup = approver_group_create(
@@ -501,7 +501,7 @@ class ApproverGroupAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_remove_approver_viewer_denied(self) -> None:
group: ApproverGroup = approver_group_create(
@@ -672,7 +672,7 @@ class ReviewSettingsAPITest(TestCase):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
class EffectivePolicyAPITest(TestCase):
@@ -749,7 +749,7 @@ class EffectivePolicyAPITest(TestCase):
),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_effective_policy_nonexistent_user(self) -> None:
resp = self.client.get(
+5 -44
View File
@@ -37,7 +37,7 @@ router = Router(tags=["users"])
@require_admin
def list_users(
request: HttpRequest,
role: str | None = None,
role: UserRole | None = None,
is_active: bool | None = None, # noqa: FBT001
search: str | None = None,
limit: int = 50,
@@ -54,7 +54,7 @@ def list_users(
@router.post(
"",
response={201: UserOut},
response={HTTPStatus.CREATED: UserOut},
auth=jwt_bearer,
summary="Create user",
description=(
@@ -66,25 +66,7 @@ def create_user(
request: HttpRequest,
payload: UserCreateIn,
) -> tuple[HTTPStatus, UserOut]:
valid_roles = {choice[0] for choice in UserRole.choices}
if payload.role not in valid_roles:
raise ValidationError(
{
"role": (
f"Invalid role '{payload.role}'. "
f"Must be one of: {', '.join(sorted(valid_roles))}"
)
}
)
user = user_create(
username=payload.username,
email=payload.email,
password=payload.password,
role=payload.role,
first_name=payload.first_name,
last_name=payload.last_name,
)
user = user_create(**payload.model_dump())
return HTTPStatus.CREATED, UserOut.model_validate(user)
@@ -123,34 +105,13 @@ def update_user(
) -> tuple[HTTPStatus, UserOut]:
user = get_object_or_404(User, pk=user_id)
if payload.role is not None:
valid_roles = {choice[0] for choice in UserRole.choices}
if payload.role not in valid_roles:
raise ValidationError(
{
"role": (
f"Invalid role '{payload.role}'. "
f"Must be one of: {', '.join(sorted(valid_roles))}"
)
}
)
updated_user = user_update(
user=user,
username=payload.username,
email=payload.email,
password=payload.password,
role=payload.role,
is_active=payload.is_active,
first_name=payload.first_name,
last_name=payload.last_name,
)
updated_user = user_update(user=user, **payload.model_dump())
return HTTPStatus.OK, UserOut.model_validate(updated_user)
@router.delete(
"/{user_id}",
response={204: None},
response={HTTPStatus.NO_CONTENT: None},
auth=jwt_bearer,
summary="Delete user",
description="Permanently delete a user. Admin only.",
+28 -87
View File
@@ -1,14 +1,11 @@
from typing import ClassVar
from ninja import ModelSchema, Schema
from pydantic import Field
from apps.users.models import User
from apps.users.models import User, UserRole
class UserOut(ModelSchema):
is_active: bool
class Meta:
model = User
fields: ClassVar[tuple[str, ...]] = (
@@ -18,97 +15,41 @@ class UserOut(ModelSchema):
User.role.field.name,
User.first_name.field.name,
User.last_name.field.name,
User._meta.get_field("is_active").name,
User.is_active.field.name,
)
class UserCreateIn(Schema):
username: str = Field(
...,
min_length=1,
max_length=150,
description="Unique username for the new account.",
)
email: str = Field(
...,
min_length=1,
max_length=254,
description="Email address.",
)
password: str = Field(
...,
min_length=8,
max_length=128,
description="Account password (min 8 characters).",
)
role: str = Field(
"viewer",
description=(
"Platform role to assign. "
"One of: admin, experimenter, approver, viewer."
),
)
first_name: str = Field(
"",
max_length=150,
description="First name.",
)
last_name: str = Field(
"",
max_length=150,
description="Last name.",
)
class UserCreateIn(ModelSchema):
role: UserRole = UserRole.VIEWER
class Meta:
model = User
fields: ClassVar[tuple[str, ...]] = (
User.username.field.name,
User.email.field.name,
User.password.field.name,
User.role.field.name,
)
class UserUpdateIn(Schema):
username: str | None = Field(
None,
min_length=1,
max_length=150,
description="New username.",
)
email: str | None = Field(
None,
min_length=1,
max_length=254,
description="New email address.",
)
password: str | None = Field(
None,
min_length=8,
max_length=128,
description="New password (min 8 characters).",
)
role: str | None = Field(
None,
description=(
"New platform role. One of: admin, experimenter, approver, viewer."
),
)
first_name: str | None = Field(
None,
max_length=150,
description="New first name.",
)
last_name: str | None = Field(
None,
max_length=150,
description="New last name.",
)
is_active: bool | None = Field(
None,
description="Set active/inactive status.",
)
class UserUpdateIn(ModelSchema):
username: str | None = None
email: str | None = None
first_name: str | None = None
last_name: str | None = None
class Meta:
model = User
fields: ClassVar[tuple[str, ...]] = (
User.username.field.name,
User.email.field.name,
User.first_name.field.name,
User.last_name.field.name,
)
class UserRoleAssignIn(Schema):
role: str = Field(
...,
description=(
"Platform role to assign. "
"One of: admin, experimenter, approver, viewer."
),
)
role: UserRole
class UserListOut(Schema):
@@ -30,7 +30,7 @@ class UsersAPIDeleteAssignRoleTest(BaseUsersAPITest):
),
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_delete_user_viewer_denied(self) -> None:
resp = self.client.delete(
@@ -71,7 +71,7 @@ class UsersAPIDeleteAssignRoleTest(BaseUsersAPITest):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_assign_role_viewer_denied(self) -> None:
resp = self.client.post(
@@ -92,7 +92,7 @@ class UsersAPIListCreateTest(BaseUsersAPITest):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
self.assertEqual(resp.status_code, 422)
def test_create_user_viewer_denied(self) -> None:
resp = self.client.post(
@@ -122,4 +122,4 @@ class UsersAPIListCreateTest(BaseUsersAPITest):
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [409, 422, 400, 500])
self.assertEqual(resp.status_code, 409)
@@ -34,16 +34,13 @@ class UsersAPIReadUpdateTest(BaseUsersAPITest):
reverse(
"api-1:update_user", kwargs={"user_id": str(self.viewer.pk)}
),
data=json.dumps(
{"username": "renamed_viewer", "role": "approver"}
),
data=json.dumps({"username": "renamed_viewer"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertEqual(resp.status_code, 200)
data = resp.json()
self.assertEqual(data["username"], "renamed_viewer")
self.assertEqual(data["role"], "approver")
def test_update_user_partial(self) -> None:
original_role = self.viewer.role
@@ -59,17 +56,6 @@ class UsersAPIReadUpdateTest(BaseUsersAPITest):
data = resp.json()
self.assertEqual(data["role"], original_role)
def test_update_user_invalid_role(self) -> None:
resp = self.client.patch(
reverse(
"api-1:update_user", kwargs={"user_id": str(self.viewer.pk)}
),
data=json.dumps({"role": "superadmin"}),
content_type="application/json",
HTTP_AUTHORIZATION=self.admin_auth,
)
self.assertIn(resp.status_code, [422, 400])
def test_update_user_viewer_denied(self) -> None:
resp = self.client.patch(
reverse(
+10 -7
View File
@@ -1,5 +1,6 @@
from typing import Any
from config.errors import ConflictError
from django.core.exceptions import ValidationError
from django.db import transaction
@@ -91,13 +92,15 @@ def approver_group_create(
) -> ApproverGroup:
_validate_experimenter(experimenter)
if ApproverGroup.objects.filter(experimenter=experimenter).exists():
raise ValidationError(
{
"experimenter": (
f"An approver group already exists for "
f"experimenter '{experimenter.username}'."
)
}
raise ConflictError(
ValidationError(
{
"experimenter": (
f"An approver group already exists for "
f"experimenter '{experimenter.username}'."
)
}
)
)
approvers: list[User] = []
if approver_ids:
@@ -14,6 +14,7 @@ from apps.reviews.selectors import (
approver_group_list_for_approver,
)
from apps.reviews.services import (
ConflictError,
approver_group_add_approver,
approver_group_create,
approver_group_delete,
@@ -73,7 +74,7 @@ class ApproverGroupModelTest(TestCase):
def test_one_to_one_constraint(self) -> None:
approver_group_create(experimenter=self.experimenter)
with self.assertRaises(ValidationError):
with self.assertRaises(ConflictError):
approver_group_create(
experimenter=self.experimenter,
min_approvals=1,
@@ -148,7 +149,7 @@ class ApproverGroupCreateServiceTest(TestCase):
def test_create_rejects_duplicate_group(self) -> None:
approver_group_create(experimenter=self.experimenter)
with self.assertRaises(ValidationError):
with self.assertRaises(ConflictError):
approver_group_create(experimenter=self.experimenter)
def test_create_min_approvals_zero_raises(self) -> None:
+3 -3
View File
@@ -13,12 +13,12 @@ class UserAdmin(BaseUserAdmin):
User.role.field.name,
User.first_name.field.name,
User.last_name.field.name,
User._meta.get_field("is_active").name,
User.is_active.field.name,
User.is_staff.field.name,
)
list_filter = (
User.role.field.name,
User._meta.get_field("is_active").name,
User.is_active.field.name,
User.is_staff.field.name,
User.is_superuser.field.name,
)
@@ -56,7 +56,7 @@ class UserAdmin(BaseUserAdmin):
_("Permissions"),
{
"fields": (
User._meta.get_field("is_active").name,
User.is_active.field.name,
User.is_staff.field.name,
User.is_superuser.field.name,
User.groups.field.name,