feat(backend): added auth, reviews, users modules

also provided tests
This commit is contained in:
ITQ
2026-02-12 20:48:29 +03:00
parent cb9692089f
commit 613c99dce2
60 changed files with 5101 additions and 127 deletions
+92
View File
@@ -0,0 +1,92 @@
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin as BaseUserAdmin
from django.utils.translation import gettext_lazy as _
from apps.users.models import User
@admin.register(User)
class UserAdmin(BaseUserAdmin):
list_display = (
User.username.field.name,
User.email.field.name,
User.role.field.name,
User.first_name.field.name,
User.last_name.field.name,
User._meta.get_field("is_active").name,
User.is_staff.field.name,
)
list_filter = (
User.role.field.name,
User._meta.get_field("is_active").name,
User.is_staff.field.name,
User.is_superuser.field.name,
)
search_fields = (
User.username.field.name,
User.email.field.name,
User.first_name.field.name,
User.last_name.field.name,
)
ordering = (User.username.field.name,)
fieldsets = (
(None, {"fields": (User.username.field.name, "password")}),
(
_("Personal info"),
{
"fields": (
User.first_name.field.name,
User.last_name.field.name,
User.email.field.name,
)
},
),
(
_("Platform role"),
{
"fields": (User.role.field.name,),
"description": _(
"Platform role that defines user permissions: "
"admin, experimenter, approver, or viewer."
),
},
),
(
_("Permissions"),
{
"fields": (
User._meta.get_field("is_active").name,
User.is_staff.field.name,
User.is_superuser.field.name,
User.groups.field.name,
User.user_permissions.field.name,
),
},
),
(
_("Important dates"),
{
"fields": (
User.last_login.field.name,
User.date_joined.field.name,
)
},
),
)
add_fieldsets = (
(
None,
{
"classes": ("wide",),
"fields": (
User.username.field.name,
User.email.field.name,
"password1",
"password2",
User.role.field.name,
),
},
),
)
+85
View File
@@ -0,0 +1,85 @@
import logging
from collections.abc import Callable
from functools import wraps
from typing import Any
from django.http import HttpRequest
from ninja.security import HttpBearer
from apps.users.auth.jwt import TokenError, decode_access_token
from apps.users.models import User
from config.errors import ForbiddenError
logger: logging.Logger = logging.getLogger("lotty")
class JWTBearer(HttpBearer):
def authenticate(
self,
request: HttpRequest,
token: str,
) -> User | None:
try:
payload: dict[str, Any] = decode_access_token(token)
except TokenError:
logger.debug("JWT authentication failed: invalid token")
return None
user_id: str | None = payload.get("sub")
if not user_id:
logger.debug("JWT authentication failed: missing 'sub' claim")
return None
try:
user: User = User.objects.get(pk=user_id, is_active=True)
except User.DoesNotExist:
logger.debug(
"JWT authentication failed: user %s not found or inactive",
user_id,
)
return None
return user
# Singleton is not the best way, yep
jwt_bearer = JWTBearer()
def require_roles(*allowed_roles: str) -> Callable:
def checker(request: HttpRequest) -> User:
user: User | None = getattr(request, "auth", None)
if user is None:
raise ForbiddenError("Authentication required")
if user.role not in allowed_roles:
raise ForbiddenError(
f"Role '{user.role}' is not permitted for this action. "
f"Required one of: {', '.join(allowed_roles)}"
)
return user
def guard(arg: HttpRequest | Callable) -> Callable | User:
if isinstance(arg, HttpRequest):
return checker(arg)
if callable(arg):
@wraps(arg)
def wrapped(request: HttpRequest, *args, **kwargs) -> Callable:
checker(request)
return arg(request, *args, **kwargs)
return wrapped
raise TypeError("Role guard expects a request or a callable")
return guard
require_admin = require_roles("admin")
require_experimenter = require_roles("experimenter")
require_approver = require_roles("approver")
require_admin_or_experimenter = require_roles("admin", "experimenter")
require_admin_or_approver = require_roles("admin", "approver")
+104
View File
@@ -0,0 +1,104 @@
from datetime import UTC, datetime, timedelta
from typing import Any
from uuid import UUID
import jwt
from django.conf import settings
_ALGORITHM = "HS256"
_ACCESS_TOKEN_LIFETIME = timedelta(hours=24)
_REFRESH_TOKEN_LIFETIME = timedelta(days=7)
def _get_secret() -> str:
return settings.SECRET_KEY
def create_access_token(
user_id: UUID | str,
role: str,
*,
extra_claims: dict[str, Any] | None = None,
lifetime: timedelta | None = None,
) -> str:
now: datetime = datetime.now(tz=UTC)
exp: datetime = now + (lifetime or _ACCESS_TOKEN_LIFETIME)
payload: dict[str, Any] = {
"sub": str(user_id),
"role": role,
"type": "access",
"iat": now,
"exp": exp,
}
if extra_claims:
payload.update(extra_claims)
return jwt.encode(payload, _get_secret(), algorithm=_ALGORITHM)
def create_refresh_token(
user_id: UUID | str,
*,
lifetime: timedelta | None = None,
) -> str:
now: datetime = datetime.now(tz=UTC)
exp: datetime = now + (lifetime or _REFRESH_TOKEN_LIFETIME)
payload: dict[str, Any] = {
"sub": str(user_id),
"type": "refresh",
"iat": now,
"exp": exp,
}
return jwt.encode(payload, _get_secret(), algorithm=_ALGORITHM)
def create_token_pair(
user_id: UUID | str,
role: str,
) -> dict[str, str]:
return {
"access": create_access_token(user_id, role),
"refresh": create_refresh_token(user_id),
}
class TokenError(Exception):
def __init__(self, detail: str = "Invalid token") -> None:
self.detail: str = detail
super().__init__(detail)
def decode_token(
token: str,
*,
expected_type: str | None = None,
) -> dict[str, Any]:
try:
payload: dict[str, Any] = jwt.decode(
token,
_get_secret(),
algorithms=[_ALGORITHM],
)
except jwt.ExpiredSignatureError:
raise TokenError("Token has expired") from None
except jwt.InvalidTokenError as exc:
raise TokenError(f"Invalid token: {exc}") from None
if expected_type and payload.get("type") != expected_type:
raise TokenError(
f"Expected token type '{expected_type}', "
f"got '{payload.get('type')}'"
)
return payload
def decode_access_token(token: str) -> dict[str, Any]:
return decode_token(token, expected_type="access")
def decode_refresh_token(token: str) -> dict[str, Any]:
return decode_token(token, expected_type="refresh")
@@ -0,0 +1,152 @@
from django.core.management.base import BaseCommand, CommandParser
from apps.users.models import User, UserRole
SEED_USERS = [
{
"username": "admin",
"email": "admin@lotty.local",
"role": UserRole.ADMIN,
"first_name": "Admin",
"last_name": "User",
"is_staff": True,
"is_superuser": True,
},
{
"username": "experimenter",
"email": "experimenter@lotty.local",
"role": UserRole.EXPERIMENTER,
"first_name": "Experimenter",
"last_name": "User",
"is_staff": False,
"is_superuser": False,
},
{
"username": "approver",
"email": "approver@lotty.local",
"role": UserRole.APPROVER,
"first_name": "Approver",
"last_name": "User",
"is_staff": False,
"is_superuser": False,
},
{
"username": "approver2",
"email": "approver2@lotty.local",
"role": UserRole.APPROVER,
"first_name": "Approver Two",
"last_name": "User",
"is_staff": False,
"is_superuser": False,
},
{
"username": "viewer",
"email": "viewer@lotty.local",
"role": UserRole.VIEWER,
"first_name": "Viewer",
"last_name": "User",
"is_staff": False,
"is_superuser": False,
},
]
DEFAULT_PASSWORD = "password123" # noqa: S105
class Command(BaseCommand):
help = (
"Seed the database with demo users for each platform role "
"(admin, experimenter, approver, viewer)."
)
def add_arguments(self, parser: CommandParser) -> None:
parser.add_argument(
"--password",
type=str,
default=DEFAULT_PASSWORD,
help=(
f"Password to set for all seeded users "
f"(default: {DEFAULT_PASSWORD})."
),
)
parser.add_argument(
"--force",
action="store_true",
default=False,
help=(
"Overwrite existing users with the same username "
"(resets their password and fields)."
),
)
def handle(self, *args, **options) -> None:
password: str = options["password"]
force: bool = options["force"]
created_count = 0
updated_count = 0
skipped_count = 0
for user_data in SEED_USERS:
username = user_data["username"]
existing = User.objects.filter(username=username).first()
if existing and not force:
self.stdout.write(
self.style.WARNING(
f" SKIP {username} (already exists, "
f"use --force to overwrite)"
)
)
skipped_count += 1
continue
if existing and force:
existing.email = user_data["email"]
existing.role = user_data["role"]
existing.first_name = user_data["first_name"]
existing.last_name = user_data["last_name"]
existing.is_staff = user_data["is_staff"]
existing.is_superuser = user_data["is_superuser"]
existing.is_active = True
existing.set_password(password)
existing.save()
self.stdout.write(
self.style.WARNING(
f" UPDATE {username} (role={user_data['role']})"
)
)
updated_count += 1
continue
user = User(
username=username,
email=user_data["email"],
role=user_data["role"],
first_name=user_data["first_name"],
last_name=user_data["last_name"],
is_staff=user_data["is_staff"],
is_superuser=user_data["is_superuser"],
is_active=True,
)
user.set_password(password)
user.save()
self.stdout.write(
self.style.SUCCESS(
f" CREATE {username} (role={user_data['role']})"
)
)
created_count += 1
self.stdout.write("")
self.stdout.write(
self.style.SUCCESS(
f"Done: {created_count} created, "
f"{updated_count} updated, "
f"{skipped_count} skipped."
)
)
self.stdout.write(
self.style.NOTICE(f"All seeded users have password: {password}")
)
+21 -108
View File
@@ -1,4 +1,4 @@
# Generated by Django 5.2.11 on 2026-02-10 20:37
# Generated by Django 5.2.11 on 2026-02-12 13:05
import django.contrib.auth.models
import django.contrib.auth.validators
@@ -12,122 +12,35 @@ class Migration(migrations.Migration):
initial = True
dependencies = [
("auth", "0012_alter_user_first_name_max_length"),
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name="User",
name='User',
fields=[
("password", models.CharField(max_length=128, verbose_name="password")),
(
"last_login",
models.DateTimeField(
blank=True, null=True, verbose_name="last login"
),
),
(
"is_superuser",
models.BooleanField(
default=False,
help_text="Designates that this user has all permissions without explicitly assigning them.",
verbose_name="superuser status",
),
),
(
"username",
models.CharField(
error_messages={
"unique": "A user with that username already exists."
},
help_text="Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.",
max_length=150,
unique=True,
validators=[
django.contrib.auth.validators.UnicodeUsernameValidator()
],
verbose_name="username",
),
),
(
"first_name",
models.CharField(
blank=True, max_length=150, verbose_name="first name"
),
),
(
"last_name",
models.CharField(
blank=True, max_length=150, verbose_name="last name"
),
),
(
"email",
models.EmailField(
blank=True, max_length=254, verbose_name="email address"
),
),
(
"is_staff",
models.BooleanField(
default=False,
help_text="Designates whether the user can log into this admin site.",
verbose_name="staff status",
),
),
(
"is_active",
models.BooleanField(
default=True,
help_text="Designates whether this user should be treated as active. Unselect this instead of deleting accounts.",
verbose_name="active",
),
),
(
"date_joined",
models.DateTimeField(
default=django.utils.timezone.now, verbose_name="date joined"
),
),
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
(
"groups",
models.ManyToManyField(
blank=True,
help_text="The groups this user belongs to. A user will get all permissions granted to each of their groups.",
related_name="user_set",
related_query_name="user",
to="auth.group",
verbose_name="groups",
),
),
(
"user_permissions",
models.ManyToManyField(
blank=True,
help_text="Specific permissions for this user.",
related_name="user_set",
related_query_name="user",
to="auth.permission",
verbose_name="user permissions",
),
),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('role', models.CharField(choices=[('admin', 'Admin'), ('experimenter', 'Experimenter'), ('approver', 'Approver'), ('viewer', 'Viewer')], db_index=True, default='viewer', help_text='Platform role that defines user permissions.', max_length=20, verbose_name='role')),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
options={
"verbose_name": "user",
"verbose_name_plural": "users",
"swappable": "AUTH_USER_MODEL",
'verbose_name': 'user',
'verbose_name_plural': 'users',
'swappable': 'AUTH_USER_MODEL',
},
managers=[
("objects", django.contrib.auth.models.UserManager()),
('objects', django.contrib.auth.models.UserManager()),
],
),
]
+33
View File
@@ -1,11 +1,44 @@
from django.contrib.auth.models import AbstractUser
from django.db import models
from django.utils.translation import gettext_lazy as _
from apps.core.models import BaseModel
class UserRole(models.TextChoices):
ADMIN = "admin", _("Admin")
EXPERIMENTER = "experimenter", _("Experimenter")
APPROVER = "approver", _("Approver")
VIEWER = "viewer", _("Viewer")
class User(AbstractUser, BaseModel):
role = models.CharField(
max_length=20,
choices=UserRole.choices,
default=UserRole.VIEWER,
db_index=True,
verbose_name=_("role"),
help_text=_("Platform role that defines user permissions"),
)
class Meta:
swappable = "AUTH_USER_MODEL"
verbose_name = _("user")
verbose_name_plural = _("users")
@property
def is_admin_role(self) -> bool:
return self.role == UserRole.ADMIN
@property
def is_experimenter(self) -> bool:
return self.role == UserRole.EXPERIMENTER
@property
def is_approver(self) -> bool:
return self.role == UserRole.APPROVER
@property
def is_viewer(self) -> bool:
return self.role == UserRole.VIEWER
+71 -4
View File
@@ -1,8 +1,8 @@
import uuid
from django.db.models import QuerySet
from django.db.models import Q, QuerySet
from apps.users.models import User
from apps.users.models import User, UserRole
def user_get_by_id(user_id: str) -> User | None:
@@ -13,5 +13,72 @@ def user_get_by_id(user_id: str) -> User | None:
return User.objects.filter(id=user_id).first()
def user_list() -> QuerySet[User]:
return User.objects.all()
def user_get_by_username(username: str) -> User | None:
return User.objects.filter(username=username).first()
def user_get_by_email(email: str) -> User | None:
return User.objects.filter(email=email).first()
def user_list(
*,
is_active: bool | None = None,
role: str | None = None,
search: str | None = None,
) -> QuerySet[User]:
qs = User.objects.all()
if is_active is not None:
qs = qs.filter(is_active=is_active)
if role is not None:
qs = qs.filter(role=role)
if search:
qs = qs.filter(
Q(username__icontains=search)
| Q(email__icontains=search)
| Q(first_name__icontains=search)
| Q(last_name__icontains=search)
)
return qs.order_by("username")
def user_list_by_role(role: str) -> QuerySet[User]:
return User.objects.filter(role=role, is_active=True).order_by("username")
def user_list_admins() -> QuerySet[User]:
return user_list_by_role(UserRole.ADMIN)
def user_list_experimenters() -> QuerySet[User]:
return user_list_by_role(UserRole.EXPERIMENTER)
def user_list_approvers() -> QuerySet[User]:
return user_list_by_role(UserRole.APPROVER)
def user_list_viewers() -> QuerySet[User]:
return user_list_by_role(UserRole.VIEWER)
def user_exists_with_username(
username: str, *, exclude_id: str | None = None
) -> bool:
qs = User.objects.filter(username=username)
if exclude_id is not None:
qs = qs.exclude(id=exclude_id)
return qs.exists()
def user_exists_with_email(
email: str, *, exclude_id: str | None = None
) -> bool:
qs = User.objects.filter(email=email)
if exclude_id is not None:
qs = qs.exclude(id=exclude_id)
return qs.exists()
+69 -2
View File
@@ -1,6 +1,9 @@
from typing import Any
from apps.users.models import User
from django.core.exceptions import ValidationError
from django.db import transaction
from apps.users.models import User, UserRole
def user_create(
@@ -8,9 +11,10 @@ def user_create(
username: str,
email: str,
password: str | None = None,
role: str = UserRole.VIEWER,
**extra_fields: Any,
) -> User:
user = User(username=username, email=email, **extra_fields)
user = User(username=username, email=email, role=role, **extra_fields)
if password is not None:
user.set_password(password)
else:
@@ -18,3 +22,66 @@ def user_create(
user.save()
return user
def user_update(
*,
user: User,
username: str | None = None,
email: str | None = None,
password: str | None = None,
role: str | None = None,
is_active: bool | None = None,
first_name: str | None = None,
last_name: str | None = None,
) -> User:
if username is not None:
user.username = username
if email is not None:
user.email = email
if role is not None:
user.role = role
if is_active is not None:
user.is_active = is_active
if first_name is not None:
user.first_name = first_name
if last_name is not None:
user.last_name = last_name
if password is not None:
user.set_password(password)
user.save()
return user
def user_assign_role(*, user: User, role: str) -> User:
valid_roles = {choice[0] for choice in UserRole.choices}
if role not in valid_roles:
raise ValidationError(
{
"role": f"Invalid role '{role}'. "
f"Must be one of: {', '.join(sorted(valid_roles))}"
}
)
user.role = role
user.save()
return user
@transaction.atomic
def user_delete(*, user: User) -> None:
user.delete()
def user_deactivate(*, user: User) -> User:
user.is_active = False
user.save()
return user
def user_activate(*, user: User) -> User:
user.is_active = True
user.save()
return user
+24
View File
@@ -0,0 +1,24 @@
from apps.users.auth.jwt import create_access_token
from apps.users.models import User, UserRole
from apps.users.services import user_create
def _make_user(
username="testuser",
email="test@lotty.local",
password="testpass123", # noqa: S107
role=UserRole.VIEWER,
**kwargs,
) -> User:
return user_create(
username=username,
email=email,
password=password,
role=role,
**kwargs,
)
def _auth_header(user) -> str:
token: str = create_access_token(user.pk, user.role)
return f"Bearer {token}"
+116
View File
@@ -0,0 +1,116 @@
import uuid
from datetime import timedelta
from typing import Any
from django.core.handlers.wsgi import WSGIRequest
from django.test import RequestFactory, TestCase
from apps.users.auth.bearer import JWTBearer
from apps.users.auth.jwt import (
TokenError,
create_access_token,
create_refresh_token,
create_token_pair,
decode_access_token,
decode_refresh_token,
decode_token,
)
from apps.users.models import User, UserRole
from ._helpers import _make_user
class JWTCreateTest(TestCase):
def test_create_access_token(self) -> None:
token: str = create_access_token(uuid.uuid4(), "admin")
self.assertIsInstance(token, str)
self.assertTrue(len(token) > 0)
def test_create_refresh_token(self) -> None:
token: str = create_refresh_token(uuid.uuid4())
self.assertIsInstance(token, str)
def test_create_token_pair(self) -> None:
pair: dict[str, str] = create_token_pair(uuid.uuid4(), "viewer")
self.assertIn("access", pair)
self.assertIn("refresh", pair)
class JWTDecodeTest(TestCase):
def setUp(self) -> None:
self.uid: uuid.UUID = uuid.uuid4()
def test_decode_access_token(self) -> None:
token: str = create_access_token(self.uid, "experimenter")
payload: dict[str, Any] = decode_access_token(token)
self.assertEqual(payload["sub"], str(self.uid))
self.assertEqual(payload["role"], "experimenter")
self.assertEqual(payload["type"], "access")
def test_decode_refresh_token(self) -> None:
token: str = create_refresh_token(self.uid)
payload: dict[str, Any] = decode_refresh_token(token)
self.assertEqual(payload["sub"], str(self.uid))
self.assertEqual(payload["type"], "refresh")
def test_decode_wrong_type_raises(self) -> None:
token: str = create_refresh_token(self.uid)
with self.assertRaises(TokenError):
decode_access_token(token)
def test_decode_expired_token_raises(self) -> None:
token: str = create_access_token(
self.uid, "admin", lifetime=timedelta(seconds=-1)
)
with self.assertRaises(TokenError):
decode_access_token(token)
def test_decode_invalid_token_raises(self) -> None:
with self.assertRaises(TokenError):
decode_token("not.a.jwt")
def test_extra_claims(self) -> None:
token: str = create_access_token(
self.uid, "admin", extra_claims={"org": "lotty"}
)
payload: dict[str, Any] = decode_access_token(token)
self.assertEqual(payload["org"], "lotty")
class JWTBearerTest(TestCase):
def setUp(self) -> None:
self.bearer = JWTBearer()
self.user: User = _make_user(
username="bearer_user",
email="bearer@x.com",
role=UserRole.ADMIN,
)
def test_valid_token_returns_user(self) -> None:
token: str = create_access_token(self.user.pk, self.user.role)
request: WSGIRequest = RequestFactory().get("/")
result: User | None = self.bearer.authenticate(request, token)
self.assertEqual(result, self.user)
def test_invalid_token_returns_none(self) -> None:
request: WSGIRequest = RequestFactory().get("/")
result: User | None = self.bearer.authenticate(request, "garbage")
self.assertIsNone(result)
def test_nonexistent_user_returns_none(self) -> None:
token: str = create_access_token(uuid.uuid4(), "admin")
request: WSGIRequest = RequestFactory().get("/")
result: User | None = self.bearer.authenticate(request, token)
self.assertIsNone(result)
def test_inactive_user_returns_none(self) -> None:
self.user.is_active = False
self.user.save()
token: str = create_access_token(self.user.pk, self.user.role)
request: WSGIRequest = RequestFactory().get("/")
result: User | None = self.bearer.authenticate(request, token)
self.assertIsNone(result)
@@ -0,0 +1,56 @@
import uuid
from django.test import TestCase
from apps.users.models import User, UserRole
from ._helpers import _make_user
class UserRoleChoicesTest(TestCase):
def test_choices_count(self) -> None:
self.assertEqual(len(UserRole.choices), 4)
def test_choice_values(self) -> None:
values = {c[0] for c in UserRole.choices}
self.assertEqual(
values, {"admin", "experimenter", "approver", "viewer"}
)
class UserModelTest(TestCase):
def test_default_role_is_viewer(self) -> None:
user: User = _make_user()
self.assertEqual(user.role, UserRole.VIEWER)
def test_role_properties(self) -> None:
admin: User = _make_user(
username="a", email="a@x.com", role=UserRole.ADMIN
)
self.assertTrue(admin.is_admin_role)
self.assertFalse(admin.is_experimenter)
self.assertFalse(admin.is_approver)
self.assertFalse(admin.is_viewer)
exp: User = _make_user(
username="e", email="e@x.com", role=UserRole.EXPERIMENTER
)
self.assertTrue(exp.is_experimenter)
appr: User = _make_user(
username="ap", email="ap@x.com", role=UserRole.APPROVER
)
self.assertTrue(appr.is_approver)
viewer: User = _make_user(
username="v", email="v@x.com", role=UserRole.VIEWER
)
self.assertTrue(viewer.is_viewer)
def test_uuid_primary_key(self) -> None:
user: User = _make_user()
self.assertIsInstance(user.pk, uuid.UUID)
def test_str_representation(self) -> None:
user: User = _make_user(username="hello")
self.assertEqual(str(user), "hello")
@@ -0,0 +1,44 @@
from django.core.handlers.wsgi import WSGIRequest
from django.test import RequestFactory, TestCase
from apps.users.auth.bearer import require_admin, require_roles
from apps.users.models import User, UserRole
from config.errors import ForbiddenError
from ._helpers import _make_user
class RequireRolesTest(TestCase):
def setUp(self) -> None:
self.admin: User = _make_user(
username="rr_admin", email="rr_admin@x.com", role=UserRole.ADMIN
)
self.viewer: User = _make_user(
username="rr_viewer", email="rr_viewer@x.com", role=UserRole.VIEWER
)
def _make_request(self, user) -> WSGIRequest:
request: WSGIRequest = RequestFactory().get("/")
request.auth = user
return request
def test_require_admin_passes(self) -> None:
request: WSGIRequest = self._make_request(self.admin)
result = require_admin(request)
self.assertEqual(result, self.admin)
def test_require_admin_denies_viewer(self) -> None:
request: WSGIRequest = self._make_request(self.viewer)
with self.assertRaises(ForbiddenError):
require_admin(request)
def test_require_roles_multiple(self) -> None:
checker = require_roles("admin", "viewer")
request: WSGIRequest = self._make_request(self.viewer)
result = checker(request)
self.assertEqual(result, self.viewer)
def test_require_roles_no_auth_raises(self) -> None:
request: WSGIRequest = RequestFactory().get("/")
with self.assertRaises(ForbiddenError):
require_admin(request)
@@ -0,0 +1,123 @@
import uuid
from django.db.models import QuerySet
from django.test import TestCase
from apps.users.models import User, UserRole
from apps.users.selectors import (
user_exists_with_email,
user_exists_with_username,
user_get_by_email,
user_get_by_id,
user_get_by_username,
user_list,
user_list_admins,
user_list_approvers,
user_list_by_role,
user_list_experimenters,
user_list_viewers,
)
from ._helpers import _make_user
class UserSelectorsTest(TestCase):
def setUp(self) -> None:
self.admin: User = _make_user(
username="sel_admin",
email="sel_admin@x.com",
role=UserRole.ADMIN,
)
self.exp: User = _make_user(
username="sel_exp",
email="sel_exp@x.com",
role=UserRole.EXPERIMENTER,
)
self.appr: User = _make_user(
username="sel_appr",
email="sel_appr@x.com",
role=UserRole.APPROVER,
)
self.viewer: User = _make_user(
username="sel_viewer",
email="sel_viewer@x.com",
role=UserRole.VIEWER,
)
def test_get_by_id(self) -> None:
found: User | None = user_get_by_id(str(self.admin.pk))
self.assertEqual(found, self.admin)
def test_get_by_id_invalid_uuid(self) -> None:
self.assertIsNone(user_get_by_id("not-a-uuid"))
def test_get_by_id_nonexistent(self) -> None:
self.assertIsNone(user_get_by_id(str(uuid.uuid4())))
def test_get_by_username(self) -> None:
found: User | None = user_get_by_username("sel_exp")
self.assertEqual(found, self.exp)
def test_get_by_username_nonexistent(self) -> None:
self.assertIsNone(user_get_by_username("ghost"))
def test_get_by_email(self) -> None:
found: User | None = user_get_by_email("sel_appr@x.com")
self.assertEqual(found, self.appr)
def test_list_all(self) -> None:
qs: QuerySet[User] = user_list()
self.assertEqual(qs.count(), 4)
def test_list_filter_by_role(self) -> None:
qs: QuerySet[User] = user_list(role=UserRole.ADMIN)
self.assertEqual(qs.count(), 1)
self.assertEqual(qs.first(), self.admin)
def test_list_filter_by_is_active(self) -> None:
self.viewer.is_active = False
self.viewer.save()
qs: QuerySet[User] = user_list(is_active=True)
self.assertEqual(qs.count(), 3)
def test_list_filter_by_search(self) -> None:
qs: QuerySet[User] = user_list(search="sel_exp")
self.assertEqual(qs.count(), 1)
def test_list_by_role(self) -> None:
qs: QuerySet[User] = user_list_by_role(UserRole.APPROVER)
self.assertEqual(qs.count(), 1)
def test_list_admins(self) -> None:
self.assertEqual(user_list_admins().count(), 1)
def test_list_experimenters(self) -> None:
self.assertEqual(user_list_experimenters().count(), 1)
def test_list_approvers(self) -> None:
self.assertEqual(user_list_approvers().count(), 1)
def test_list_viewers(self) -> None:
self.assertEqual(user_list_viewers().count(), 1)
def test_exists_with_username(self) -> None:
self.assertTrue(user_exists_with_username("sel_admin"))
self.assertFalse(user_exists_with_username("ghost"))
def test_exists_with_username_exclude(self) -> None:
self.assertFalse(
user_exists_with_username(
"sel_admin", exclude_id=str(self.admin.pk)
)
)
def test_exists_with_email(self) -> None:
self.assertTrue(user_exists_with_email("sel_viewer@x.com"))
self.assertFalse(user_exists_with_email("ghost@x.com"))
def test_exists_with_email_exclude(self) -> None:
self.assertFalse(
user_exists_with_email(
"sel_viewer@x.com", exclude_id=str(self.viewer.pk)
)
)
@@ -0,0 +1,128 @@
from django.core.exceptions import ValidationError
from django.test import TestCase
from apps.users.models import User, UserRole
from apps.users.services import (
user_activate,
user_assign_role,
user_create,
user_deactivate,
user_delete,
user_update,
)
from ._helpers import _make_user
class UserCreateServiceTest(TestCase):
def test_create_with_defaults(self) -> None:
user: User = user_create(
username="svc_user",
email="svc@lotty.local",
password="pass1234",
)
self.assertEqual(user.role, UserRole.VIEWER)
self.assertTrue(user.check_password("pass1234"))
self.assertTrue(user.is_active)
def test_create_with_role(self) -> None:
user: User = user_create(
username="admin_svc",
email="admin_svc@lotty.local",
password="pass1234",
role=UserRole.ADMIN,
)
self.assertEqual(user.role, UserRole.ADMIN)
def test_create_without_password(self) -> None:
user: User = user_create(
username="nopw",
email="nopw@lotty.local",
)
self.assertFalse(user.has_usable_password())
def test_create_with_extra_fields(self) -> None:
user: User = user_create(
username="extra",
email="extra@lotty.local",
password="pass1234",
first_name="First",
last_name="Last",
)
self.assertEqual(user.first_name, "First")
self.assertEqual(user.last_name, "Last")
class UserUpdateServiceTest(TestCase):
def setUp(self) -> None:
self.user: User = _make_user()
def test_update_username(self) -> None:
updated: User = user_update(user=self.user, username="newname")
self.assertEqual(updated.username, "newname")
def test_update_email(self) -> None:
updated: User = user_update(user=self.user, email="new@lotty.local")
self.assertEqual(updated.email, "new@lotty.local")
def test_update_role(self) -> None:
updated: User = user_update(user=self.user, role=UserRole.ADMIN)
self.assertEqual(updated.role, UserRole.ADMIN)
def test_update_password(self) -> None:
updated: User = user_update(user=self.user, password="newpass99")
self.assertTrue(updated.check_password("newpass99"))
def test_update_is_active(self) -> None:
updated: User = user_update(user=self.user, is_active=False)
self.assertFalse(updated.is_active)
def test_partial_update_leaves_other_fields(self) -> None:
original_email = self.user.email
updated: User = user_update(user=self.user, username="changed")
self.assertEqual(updated.email, original_email)
def test_update_names(self) -> None:
updated: User = user_update(
user=self.user, first_name="Jane", last_name="Doe"
)
self.assertEqual(updated.first_name, "Jane")
self.assertEqual(updated.last_name, "Doe")
class UserAssignRoleServiceTest(TestCase):
def setUp(self) -> None:
self.user: User = _make_user()
def test_assign_valid_role(self) -> None:
updated: User = user_assign_role(
user=self.user, role=UserRole.EXPERIMENTER
)
self.assertEqual(updated.role, UserRole.EXPERIMENTER)
def test_assign_invalid_role_raises(self) -> None:
with self.assertRaises(ValidationError):
user_assign_role(user=self.user, role="superadmin")
class UserDeleteServiceTest(TestCase):
def test_hard_delete(self) -> None:
user: User = _make_user()
pk = user.pk
user_delete(user=user)
self.assertFalse(User.objects.filter(pk=pk).exists())
class UserActivateDeactivateServiceTest(TestCase):
def setUp(self) -> None:
self.user: User = _make_user()
def test_deactivate(self) -> None:
updated: User = user_deactivate(user=self.user)
self.assertFalse(updated.is_active)
def test_activate(self) -> None:
self.user.is_active = False
self.user.save()
updated: User = user_activate(user=self.user)
self.assertTrue(updated.is_active)