You've already forked RekomenciBackend
feat(): push notifications via firebase admin
This commit is contained in:
@@ -0,0 +1,31 @@
|
||||
from dishka import FromDishka
|
||||
from dishka.integrations.fastapi import DishkaRoute
|
||||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
from fastapi.security import HTTPBearer
|
||||
from pydantic import BaseModel
|
||||
|
||||
from template_project.application.user.notification.interactors.send_notification import (
|
||||
NotificationInteractor,
|
||||
SendNotificationRequest,
|
||||
)
|
||||
from template_project.application.user.notification_device.errors import NotificationDeviceNotFoundError
|
||||
|
||||
security = HTTPBearer()
|
||||
router = APIRouter(route_class=DishkaRoute, tags=["Notifications"], dependencies=[Depends(security)])
|
||||
|
||||
|
||||
class SendNotificationRequestModel(BaseModel):
|
||||
title: str
|
||||
body: str
|
||||
|
||||
|
||||
@router.post("/notifications/send")
|
||||
async def send_notification(
|
||||
request: SendNotificationRequestModel,
|
||||
interactor: FromDishka[NotificationInteractor],
|
||||
) -> None:
|
||||
try:
|
||||
notification_request = SendNotificationRequest(title=request.title, body=request.body)
|
||||
await interactor.send_notification(notification_request)
|
||||
except NotificationDeviceNotFoundError as error:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Notification device not found") from error
|
||||
Reference in New Issue
Block a user