You've already forked RekomenciBackend
49 lines
1.7 KiB
Python
49 lines
1.7 KiB
Python
from dishka import FromDishka
|
|
from dishka.integrations.fastapi import DishkaRoute
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer
|
|
from pydantic import BaseModel
|
|
|
|
from template_project.application.notification_device.errors import NotificationDeviceNotFoundError
|
|
from template_project.application.notification_device.interactors.register_device import (
|
|
RegisterNotificationDeviceInteractor,
|
|
RegisterNotificationDeviceRequest,
|
|
)
|
|
from template_project.application.notification_device.interactors.send_notification import (
|
|
NotificationInteractor,
|
|
SendNotificationRequest,
|
|
)
|
|
|
|
security = HTTPBearer()
|
|
router = APIRouter(route_class=DishkaRoute, tags=["Notifications"], dependencies=[Depends(security)])
|
|
|
|
|
|
class SendNotificationRequestModel(BaseModel):
|
|
title: str
|
|
body: str
|
|
|
|
|
|
class RegisterNotificationDeviceRequestModel(BaseModel):
|
|
device_id: str
|
|
|
|
|
|
@router.post("/notifications/send")
|
|
async def send_notification(
|
|
request: SendNotificationRequestModel,
|
|
interactor: FromDishka[NotificationInteractor],
|
|
) -> None:
|
|
try:
|
|
notification_request = SendNotificationRequest(title=request.title, body=request.body)
|
|
await interactor.send_notification(notification_request)
|
|
except NotificationDeviceNotFoundError as error:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Notification device not found") from error
|
|
|
|
|
|
@router.post("/notifications/register_device")
|
|
async def register_notification_device(
|
|
request: RegisterNotificationDeviceRequestModel,
|
|
interactor: FromDishka[RegisterNotificationDeviceInteractor],
|
|
) -> None:
|
|
register_request = RegisterNotificationDeviceRequest(device_id=request.device_id)
|
|
await interactor.execute(register_request)
|