from typing import Any from django.core.exceptions import ValidationError from django.db import transaction from apps.reviews.models import ApproverGroup, ReviewSettings from apps.users.models import User, UserRole from config.errors import ConflictError def review_settings_update( *, default_min_approvals: int | None = None, allow_any_approver: bool | None = None, ) -> ReviewSettings: settings = ReviewSettings.load() if default_min_approvals is not None: if default_min_approvals < 1: raise ValidationError( { "default_min_approvals": ( "Minimum approvals must be at least 1." ) } ) settings.default_min_approvals = default_min_approvals if allow_any_approver is not None: settings.allow_any_approver = allow_any_approver settings.save() return settings def _validate_experimenter(user: User) -> None: if user.role != UserRole.EXPERIMENTER: raise ValidationError( { "experimenter": ( f"User '{user.username}' has role '{user.role}'. " f"Only users with the '{UserRole.EXPERIMENTER}' role " f"can be assigned an approver group." ) } ) if not user.is_active: raise ValidationError( {"experimenter": "The experimenter must be an active user."} ) def _validate_approvers(approvers: list[User]) -> None: invalid = [u for u in approvers if u.role != UserRole.APPROVER] if invalid: names = ", ".join(u.username for u in invalid) raise ValidationError( { "approvers": ( f"The following users do not have the " f"'{UserRole.APPROVER}' role: {names}" ) } ) def _validate_min_approvals( min_approvals: int, approver_count: int | None = None, ) -> None: if min_approvals < 1: raise ValidationError( {"min_approvals": "Minimum approvals must be at least 1."} ) if approver_count is not None and min_approvals > approver_count: raise ValidationError( { "min_approvals": ( f"min_approvals ({min_approvals}) cannot exceed the " f"number of assigned approvers ({approver_count})." ) } ) @transaction.atomic def approver_group_create( *, experimenter: User, approver_ids: list[Any] | None = None, min_approvals: int = 1, ) -> ApproverGroup: _validate_experimenter(experimenter) if ApproverGroup.objects.filter(experimenter=experimenter).exists(): raise ConflictError( ValidationError( { "experimenter": ( f"An approver group already exists for " f"experimenter '{experimenter.username}'." ) } ) ) approvers: list[User] = [] if approver_ids: approvers = list( User.objects.filter(pk__in=approver_ids, is_active=True) ) found_ids = {str(u.pk) for u in approvers} missing = [ str(aid) for aid in approver_ids if str(aid) not in found_ids ] if missing: raise ValidationError( { "approvers": ( f"Users not found or inactive: {', '.join(missing)}" ) } ) _validate_approvers(approvers) _validate_min_approvals( min_approvals, len(approvers) if approvers else None ) group = ApproverGroup( experimenter=experimenter, min_approvals=min_approvals, ) group.save() if approvers: group.approvers.set(approvers) return group @transaction.atomic def approver_group_update( *, group: ApproverGroup, approver_ids: list[Any] | None = None, min_approvals: int | None = None, ) -> ApproverGroup: approvers: list[User] | None = None if approver_ids is not None: approvers = list( User.objects.filter(pk__in=approver_ids, is_active=True) ) found_ids = {str(u.pk) for u in approvers} missing = [ str(aid) for aid in approver_ids if str(aid) not in found_ids ] if missing: raise ValidationError( { "approvers": ( f"Users not found or inactive: {', '.join(missing)}" ) } ) _validate_approvers(approvers) if min_approvals is not None: approver_count = ( len(approvers) if approvers is not None else group.approvers.count() ) _validate_min_approvals(min_approvals, approver_count) group.min_approvals = min_approvals group.save() if approvers is not None: group.approvers.set(approvers) return group @transaction.atomic def approver_group_delete(*, group: ApproverGroup) -> None: group.delete() @transaction.atomic def approver_group_add_approver( *, group: ApproverGroup, approver: User, ) -> ApproverGroup: _validate_approvers([approver]) if group.approvers.filter(pk=approver.pk).exists(): raise ValidationError( { "approver": ( f"User '{approver.username}' is already in this " f"approver group." ) } ) group.approvers.add(approver) return group @transaction.atomic def approver_group_remove_approver( *, group: ApproverGroup, approver: User, ) -> ApproverGroup: if not group.approvers.filter(pk=approver.pk).exists(): raise ValidationError( { "approver": ( f"User '{approver.username}' is not in this " f"approver group." ) } ) remaining = group.approvers.count() - 1 if remaining < group.min_approvals: raise ValidationError( { "approver": ( f"Cannot remove approver: would leave {remaining} " f"approver(s), but {group.min_approvals} required." ) } ) group.approvers.remove(approver) return group