fast init

This commit is contained in:
ivankirpichnikov
2025-10-16 23:03:50 +03:00
parent b84e0370d6
commit 652da07d12
50 changed files with 1012 additions and 0 deletions
+6
View File
@@ -0,0 +1,6 @@
config.toml
.venv
.idea
__pycache__
docker-compose.yml
*.egg-info
+13
View File
@@ -0,0 +1,13 @@
FROM python:3.13-alpine
WORKDIR /app
RUN pip install uv
RUN mkdir -p ./src/
COPY pyproject.toml /app/pyproject.toml
RUN uv pip install -e . --system --no-cache
COPY ./src/template_project /app/src/template_project
+11
View File
@@ -0,0 +1,11 @@
[server]
host = "0.0.0.0"
port = 8080
access_log = true
[database]
url = "postgresql+psycopg://username:password@host:post/database"
[access_token]
crypto_key = "..."
expires_in = 86400
+14
View File
@@ -0,0 +1,14 @@
[project]
name = "template_project"
requires-python = ">=3.13"
description = "template project"
version = "1.0.0"
dependencies = [
"uuid-utils==0.11.1",
"adaptix==3.0.0b11",
"fastapi==0.119.0",
"uvicorn==0.37.0",
"dishka==1.7.2",
"argon2-cffi==23.1.0",
"cryptography==46.0.3",
]
View File
@@ -0,0 +1,29 @@
from typing import override
from uuid import UUID
from cryptography.fernet import Fernet
from template_project.application.access_token.cryptographer import AccessTokenCryptographer
from template_project.application.access_token.entity import AccessTokenId
type RawAccessToken = str
class FernetAccessTokenCryptographer(AccessTokenCryptographer):
def __init__(self, fernet: Fernet) -> None:
self._fernet = fernet
@override
def crypto(self, access_token_id: AccessTokenId) -> RawAccessToken:
return self._fernet.encrypt(
str(access_token_id).encode("utf-8"),
).decode("utf-8")
@override
def decrypto(self, raw_access_token: RawAccessToken) -> AccessTokenId:
return AccessTokenId(
UUID(
self._fernet.decrypt(raw_access_token).decode("utf-8"),
),
)
@@ -0,0 +1,17 @@
from typing import override
from template_project.application.access_token.entity import AccessToken
from template_project.application.access_token.entity_factory import AccessTokenFactory
from template_project.application.user.entity import UserId
from template_project.web_api.configuration import AccessTokenConfiguration
class DefaultAccessTokenFactory(AccessTokenFactory):
def __init__(self, configuration: AccessTokenConfiguration) -> None:
self._configuration = configuration
@override
def execute(self, user_id: UserId) -> AccessToken:
return AccessToken.factory(
user_id=user_id,
expires_in=self._configuration.expires_in,
)
@@ -0,0 +1,20 @@
from typing import override
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from template_project.adapters.data_gateways.tables import access_token_table
from template_project.application.access_token.data_gateway import AccessTokenDataGateway
from template_project.application.access_token.entity import AccessToken, AccessTokenId
class DefaultAccessTokenDataGateway(AccessTokenDataGateway):
def __init__(self, session: AsyncSession) -> None:
self._session = session
@override
async def load_with_id(self, access_token_id: AccessTokenId) -> AccessToken | None:
statement = select(AccessToken).where(
access_token_table.c.id==access_token_id,
)
result = await self._session.execute(statement)
return result.scalar_one_or_none()
@@ -0,0 +1,40 @@
__all__ = (
"meta_data",
"user_table",
"access_token_table",
)
from sqlalchemy import UUID, Boolean, Column, DateTime, ForeignKey, MetaData, String, Table
from sqlalchemy.orm import registry
from template_project.application.access_token.entity import AccessToken
from template_project.application.user.entity import User
meta_data = MetaData()
user_table = Table(
"users",
meta_data,
Column("id", UUID, primary_key=True),
Column("email", String, unique=True, nullable=False),
Column("hashed_password", String, nullable=False),
Column("deleted_at", DateTime(timezone=True)),
Column("created_at", DateTime(timezone=True), nullable=False),
)
access_token_table = Table(
"access_token",
meta_data,
Column("id", UUID, primary_key=True),
Column("user_id", UUID, ForeignKey("users.id", ondelete="CASCADE"), nullable=False),
Column("revoked", Boolean, nullable=False),
Column("expires_in", DateTime(timezone=True), nullable=False),
Column("deleted_at", DateTime(timezone=True)),
Column("created_at", DateTime(timezone=True), nullable=False),
)
mapper_registry = registry()
mapper_registry.map_imperatively(User, user_table)
mapper_registry.map_imperatively(AccessToken, access_token_table)
@@ -0,0 +1,27 @@
from typing import override
from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession
from template_project.adapters.data_gateways.tables import user_table
from template_project.application.user.data_gateway import UserDataGateway
from template_project.application.user.entity import User, UserId
class DefaultUserDataGateway(UserDataGateway):
def __init__(self, session: AsyncSession) -> None:
self._session = session
@override
async def load_with_id(self, id_: UserId) -> User | None:
statement = select(User).where(user_table.c.id==id_)
result = await self._session.execute(statement)
return result.scalar_one_or_none()
@override
async def exists_by_email(self, email: str) -> bool:
statement = select(exists(select(user_table).where(user_table.c.email == email)))
result = await self._session.execute(statement)
result_fetchone = result.fetchone()
if result_fetchone is None:
return False
return result_fetchone[0]
@@ -0,0 +1,40 @@
from abc import abstractmethod
from typing import override
import argon2
from argon2.exceptions import Argon2Error
from template_project.application.common.containers import SecretString
from template_project.application.user.password_utils import PasswordHasher, PasswordVerifying
class ArgonPasswordHasher(PasswordHasher):
def __init__(
self,
password_hasher: argon2.PasswordHasher,
) -> None:
self._password_hasher = password_hasher
@override
def hash(self, password: SecretString) -> str:
return self._password_hasher.hash(password.get_value())
class ArgonPasswordVerifying(PasswordVerifying):
def __init__(
self,
password_hasher: argon2.PasswordHasher,
) -> None:
self._password_hasher = password_hasher
@abstractmethod
def verify(
self,
verifiable_password: SecretString,
hashed_password: str,
) -> bool:
try:
return self._password_hasher.verify(hashed_password, verifiable_password.get_value())
except Argon2Error as e:
return False
@@ -0,0 +1,18 @@
from typing import Any, override
from sqlalchemy.ext.asyncio import AsyncSession
from template_project.application.common.unit_of_work import UnitOfWork
class DefaultUnitOfWork(UnitOfWork):
def __init__(self, session: AsyncSession) -> None:
self._session = session
@override
def add(self, *entities: Any) -> None:
self._session.add_all(entities)
@override
async def commit(self) -> None:
await self._session.commit()
@@ -0,0 +1,17 @@
from abc import abstractmethod
from typing import Protocol
from template_project.application.access_token.entity import AccessTokenId
type RawAccessToken = str
class AccessTokenCryptographer(Protocol):
@abstractmethod
def crypto(self, access_token_id: AccessTokenId) -> RawAccessToken:
raise NotImplementedError
@abstractmethod
def decrypto(self, raw_access_token: RawAccessToken) -> AccessTokenId:
raise NotImplementedError
@@ -0,0 +1,10 @@
from abc import abstractmethod
from typing import Protocol
from template_project.application.access_token.entity import AccessToken, AccessTokenId
class AccessTokenDataGateway(Protocol):
@abstractmethod
async def load_with_id(self, access_token_id: AccessTokenId) -> AccessToken | None:
raise NotImplementedError
@@ -0,0 +1,49 @@
from datetime import UTC, datetime, timedelta
from typing import NewType, Self
from uuid import UUID
from uuid_utils.compat import uuid7
from template_project.application.access_token.errors import AccessTokenExpiredError
from template_project.application.common.entity import Entity, to_entity
from template_project.application.user.entity import UserId
AccessTokenId = NewType("AccessTokenId", UUID)
@to_entity
class AccessToken(Entity[AccessTokenId]):
user_id: UserId
revoked: bool
expires_in: datetime
@classmethod
def factory(
cls,
user_id: UserId,
expires_in: timedelta,
) -> Self:
current_date_time = datetime.now(tz=UTC)
return cls(
id=AccessTokenId(uuid7()),
created_at=current_date_time,
user_id=user_id,
expires_in=current_date_time + expires_in,
revoked=False,
)
def ensure_expired(self) -> None:
if self.expired_predicate():
raise AccessTokenExpiredError(id_=self.id)
def expired_predicate(self) -> bool:
return (
(self.expires_in < datetime.now(tz=UTC))
or self.revoked
or self.deleted_at is not None
)
def revoke(self) -> None:
self.revoked = True
self.deleted_at = datetime.now(tz=UTC)
@@ -0,0 +1,11 @@
from abc import abstractmethod
from typing import Protocol
from template_project.application.access_token.entity import AccessToken
from template_project.application.user.entity import UserId
class AccessTokenFactory(Protocol):
@abstractmethod
def execute(self, user_id: UserId) -> AccessToken:
raise NotImplementedError
@@ -0,0 +1,13 @@
from typing import override
from template_project.application.access_token.entity import AccessTokenId
from template_project.application.common.errors import ApplicationError, to_error
@to_error
class AccessTokenExpiredError(ApplicationError):
id_: AccessTokenId
@override
def __str__(self) -> str:
return f"Access token id={self.id_!r} expried"
@@ -0,0 +1,42 @@
from collections.abc import Container, Hashable, Sized
from typing import Any, Final, override
_SECRET_VALUE: Final = "********" # noqa: S105
class SecretString(Container[bool], Hashable, Sized):
__slots__ = ("_value",)
def __init__(self, value: str) -> None:
self._value = value
@override
def __hash__(self) -> int:
return hash(self._value)
@override
def __len__(self) -> int:
return len(self._value)
@override
def __eq__(self, value: object) -> bool:
if isinstance(value, str):
return self._value == value
return NotImplemented
@override
def __contains__(self, value: object) -> Any:
if isinstance(value, str):
return value in self._value
return NotImplemented
@override
def __str__(self) -> str:
return _SECRET_VALUE
@override
def __repr__(self) -> str:
return f"<{self.__class__.__name__}(value={_SECRET_VALUE!r})>"
def get_value(self) -> str:
return self._value
@@ -0,0 +1,17 @@
from dataclasses import dataclass
from typing import dataclass_transform
@dataclass_transform(
frozen_default=True,
eq_default=False,
kw_only_default=True,
)
def to_data_structure[_InteractorClsT](interactor_cls: type[_InteractorClsT]) -> type[_InteractorClsT]:
return dataclass(
kw_only=True,
eq=False,
match_args=False,
frozen=True,
slots=True,
)(interactor_cls)
@@ -0,0 +1,33 @@
from collections.abc import Hashable
from dataclasses import dataclass
from datetime import datetime
from typing import dataclass_transform, override
from uuid import UUID
from template_project.application.common.errors import EntityAlreadyDeletedError
@dataclass_transform(kw_only_default=True)
def to_entity[_EntityCLsT](entity_cls: type[_EntityCLsT]) -> type[_EntityCLsT]:
return dataclass(kw_only=True)(entity_cls)
@to_entity
class Entity[_EntityId: UUID](Hashable):
id: _EntityId
created_at: datetime
deleted_at: datetime | None = None
def ensure_not_deleted(self) -> None:
if self.deleted_at is not None:
raise EntityAlreadyDeletedError(entity_name=self.__class__.__name__)
@override
def __eq__(self, other: object) -> bool:
if isinstance(other, Entity):
return self.id == other.id
return NotImplemented
@override
def __hash__(self) -> int:
return hash(self.id)
@@ -0,0 +1,29 @@
from dataclasses import dataclass
from typing import dataclass_transform, override
@dataclass_transform(
kw_only_default=True,
eq_default=False,
)
def to_error[T](cls: type[T]) -> type[T]:
return dataclass(
kw_only=True,
eq=False,
repr=False,
match_args=False,
)(cls)
@to_error
class ApplicationError(Exception):
pass
@to_error
class EntityAlreadyDeletedError(ApplicationError):
entity_name: str
@override
def __str__(self) -> str:
return f"Entity {self.entity_name!r} already deleted"
@@ -0,0 +1,10 @@
from abc import abstractmethod
from typing import Protocol
from template_project.application.user.entity import User
class IdentityProvider(Protocol):
@abstractmethod
async def get_current_user(self) -> User:
raise NotImplementedError
@@ -0,0 +1,18 @@
from dataclasses import dataclass
from typing import dataclass_transform
@dataclass_transform(
frozen_default=True,
eq_default=False,
kw_only_default=True,
)
def to_interactor[_InteractorClsT](interactor_cls: type[_InteractorClsT]) -> type[_InteractorClsT]:
return dataclass(
kw_only=True,
eq=False,
repr=False,
frozen=True,
match_args=False,
slots=True,
)(interactor_cls)
@@ -0,0 +1,12 @@
from abc import abstractmethod
from typing import Any, Protocol
class UnitOfWork(Protocol):
@abstractmethod
def add(self, *entities: Any) -> None:
raise NotImplementedError
@abstractmethod
async def commit(self) -> None:
raise NotImplementedError
@@ -0,0 +1,17 @@
from abc import abstractmethod
from typing import Protocol
from template_project.application.user.entity import User, UserId
class UserDataGateway(Protocol):
@abstractmethod
async def load_with_id(self, id_: UserId) -> User | None:
raise NotImplementedError
@abstractmethod
async def exists_by_email(
self,
email: str
) -> bool:
raise NotImplementedError
@@ -0,0 +1,27 @@
from datetime import UTC, datetime
from typing import NewType, Self
from uuid import UUID
from uuid_utils.compat import uuid7
from template_project.application.common.entity import Entity, to_entity
UserId = NewType("UserId", UUID)
@to_entity
class User(Entity[UserId]):
email: str
hashed_password: str
@classmethod
def factory(
cls,
email: str,
hashed_password: str,
) -> Self:
return cls(
id=UserId(uuid7()),
email=email,
hashed_password=hashed_password,
created_at=datetime.now(tz=UTC),
)
@@ -0,0 +1,15 @@
from typing import override
from template_project.application.common.errors import ApplicationError, to_error
@to_error
class UserWithEmailAlreadyExistsError(ApplicationError):
email: str
@override
def __str__(self) -> str:
return f"User with the email={self.email!r} already exists"
@to_error
class UserUnauthorizedError(ApplicationError):
pass
@@ -0,0 +1,23 @@
from adaptix.conversion import get_converter
from template_project.application.common.data_structure import to_data_structure
from template_project.application.common.identity_provider import IdentityProvider
from template_project.application.common.interactor import to_interactor
from template_project.application.user.entity import User, UserId
@to_data_structure
class GetMeResponse:
id: UserId
email: str
response_converter = get_converter(User, GetMeResponse)
@to_interactor
class GetMeInteractor:
identity_provider: IdentityProvider
async def execute(self) -> GetMeResponse:
current_user = await self.identity_provider.get_current_user()
return response_converter(current_user)
@@ -0,0 +1,49 @@
from template_project.application.access_token.cryptographer import AccessTokenCryptographer
from template_project.application.access_token.entity_factory import AccessTokenFactory
from template_project.application.common.containers import SecretString
from template_project.application.common.data_structure import to_data_structure
from template_project.application.common.interactor import to_interactor
from template_project.application.common.unit_of_work import UnitOfWork
from template_project.application.user.data_gateway import UserDataGateway
from template_project.application.user.entity import User
from template_project.application.user.errors import UserWithEmailAlreadyExistsError
from template_project.application.user.password_utils import PasswordHasher
@to_data_structure
class UserSignUpResponse:
access_token: str
@to_interactor
class UserSignUpInteractor:
unit_of_work: UnitOfWork
password_hasher: PasswordHasher
user_data_gateway: UserDataGateway
access_token_factory: AccessTokenFactory
access_token_cryptographer: AccessTokenCryptographer
async def execute(
self,
email: str,
password: SecretString,
) -> UserSignUpResponse:
exists_by_email = await self.user_data_gateway.exists_by_email(email)
if exists_by_email:
raise UserWithEmailAlreadyExistsError(email=email)
hashed_password = self.password_hasher.hash(password)
user = User.factory(
email=email,
hashed_password=hashed_password,
)
access_token = self.access_token_factory.execute(user.id)
crypted_access_token = self.access_token_cryptographer.crypto(access_token)
response = UserSignUpResponse(access_token=crypted_access_token)
self.unit_of_work.add(user, access_token)
await self.unit_of_work.commit()
return response
@@ -0,0 +1,20 @@
from abc import abstractmethod
from typing import Protocol
from template_project.application.common.containers import SecretString
class PasswordHasher(Protocol):
@abstractmethod
def hash(self, password: SecretString) -> str:
raise NotImplementedError
class PasswordVerifying(Protocol):
@abstractmethod
def verify(
self,
verifiable_password: SecretString,
hashed_password: str,
) -> bool:
raise NotImplementedError
@@ -0,0 +1,53 @@
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from tomllib import loads
from typing import dataclass_transform
from adaptix import P, Retort, loader
from template_project.application.common.containers import SecretString
@dataclass_transform(frozen_default=True)
def to_configuration[ClsT](cls: type[ClsT]) -> type[ClsT]:
return dataclass(frozen=True, slots=True, repr=False)(cls)
@to_configuration
class DatabaseConfiguration:
url: SecretString
@to_configuration
class AccessTokenConfiguration:
crypto_key: str
expires_in: timedelta
@to_configuration
class ServerConfiguration:
host: str
port: int
access_log: bool
@to_configuration
class Configuration:
server: ServerConfiguration
database: DatabaseConfiguration
access_token: AccessTokenConfiguration
retort = Retort(
recipe=[
loader(SecretString, SecretString),
loader(P[AccessTokenConfiguration].expires_in, lambda value: timedelta(seconds=value)),
],
)
def load_configuration(path: Path) -> Configuration:
with path.open("r", encoding="utf-8") as config:
data = loads(config.read())
return retort.load(data, Configuration)
@@ -0,0 +1,99 @@
import argparse
import asyncio
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from pathlib import Path
import sys
from typing import Final
from dishka import AsyncContainer
from dishka.integrations.fastapi import setup_dishka
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
from template_project.web_api.configuration import load_configuration
from template_project.web_api.ioc.make import make_ioc
LOG_CONFIG: Final = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"default": {
"format": "%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "default",
},
},
"root": {
"level": "DEBUG",
"handlers": ["console"],
},
}
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
yield
await app.state.dishka_container.close()
def make_asgi_application(
ioc: AsyncContainer,
) -> FastAPI:
app = FastAPI(
lifespan=lifespan,
docs_url="/docs",
title="Template project",
description="Template project API",
version="1.0.0",
openapi_url="/openapi.json",
)
origins = ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
setup_dishka(container=ioc, app=app)
return app
def _main(
configuration_path: Path,
) -> None:
configuration = load_configuration(configuration_path)
ioc = make_ioc(configuration)
asgi_application = make_asgi_application(ioc)
uvicorn.run(
asgi_application,
port=configuration.server.port,
host=configuration.server.host,
log_config=LOG_CONFIG,
access_log=configuration.server.access_log,
)
def main() -> None:
if sys.platform == "win32":
from asyncio import WindowsSelectorEventLoopPolicy
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
arg_parser = argparse.ArgumentParser()
subparsers = arg_parser.add_subparsers()
web_api_parser = subparsers.add_parser("web_api")
web_api_parser.add_argument("configuration", dest="configuration", type=Path)
args = arg_parser.parse_args()
_main(args.configuration)
if __name__ == "__main__":
main()
@@ -0,0 +1,73 @@
from abc import abstractmethod
from fastapi import Request
from template_project.application.access_token.cryptographer import AccessTokenCryptographer
from template_project.application.access_token.data_gateway import AccessTokenDataGateway
from template_project.application.access_token.entity import AccessTokenId
from template_project.application.access_token.errors import AccessTokenExpiredError
from template_project.application.common.identity_provider import IdentityProvider
from template_project.application.user.data_gateway import UserDataGateway
from template_project.application.user.entity import User
from template_project.application.user.errors import UserUnauthorizedError
TOKEN_TYPE = "Bearer"
BEARER_SECTIONS = 2
AUTH_HEADER = "Authorization"
class WebApiIdentityProvider(IdentityProvider):
def __init__(
self,
request: Request,
user_data_gateway: UserDataGateway,
access_token_data_gateway: AccessTokenDataGateway,
access_token_cryptographer: AccessTokenCryptographer,
) -> None:
self._request = request
self._user_data_gateway = user_data_gateway
self._access_token_data_gateway = access_token_data_gateway
self._access_token_cryptographer = access_token_cryptographer
@abstractmethod
async def get_current_user(self) -> User:
auth_tokn = self._request.headers[AUTH_HEADER]
access_token_id = self._parse_token(auth_tokn)
if access_token_id is None:
raise UserUnauthorizedError
access_token = await self._access_token_data_gateway.load_with_id(access_token_id)
if access_token is None:
raise UserUnauthorizedError
try:
access_token.ensure_expired()
except AccessTokenExpiredError as error:
raise UserUnauthorizedError from error
user = await self._user_data_gateway.load_with_id(access_token.user_id)
if user is None:
raise UserUnauthorizedError
return user
def _parse_token(self, token: str) -> AccessTokenId | None:
authorization_header = self._request.headers.get(AUTH_HEADER)
if authorization_header is None:
return None
sections = authorization_header.split(" ")
if len(sections) != BEARER_SECTIONS:
return None
token_type, token = sections
if token_type != TOKEN_TYPE:
return None
return self._access_token_cryptographer.decrypto(token)
@@ -0,0 +1,22 @@
from typing import AsyncIterable
from dishka import Provider, Scope, provide
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from template_project.web_api.configuration import DatabaseConfiguration
class ConnectionProvider(Provider):
@provide(scope=Scope.APP)
async def make_engine(self, configuration: DatabaseConfiguration) -> AsyncIterable[AsyncEngine]:
engine = create_async_engine(configuration.url.get_value())
yield engine
await engine.dispose()
@provide()
async def make_connection(self, engine: AsyncEngine) -> AsyncIterable[AsyncSession]:
session = AsyncSession(
bind=engine,
expire_on_commit=True,
)
async with session:
yield session
@@ -0,0 +1,25 @@
import argon2
from cryptography.fernet import Fernet
from dishka import Provider, Scope, WithParents, provide, provide_all
from template_project.adapters.access_token.cryptographer import FernetAccessTokenCryptographer
from template_project.adapters.password_utils import ArgonPasswordHasher, ArgonPasswordVerifying
from template_project.web_api.configuration import AccessTokenConfiguration
class CryptographerProvider(Provider):
scope = Scope.APP
@provide
def argon_password_hasher(self) -> argon2.PasswordHasher:
return argon2.PasswordHasher()
@provide
def fernet(self, configuration: AccessTokenConfiguration) -> Fernet:
return Fernet(configuration.crypto_key)
access_token_cryptographer = provide(WithParents[FernetAccessTokenCryptographer])
password_utils = provide_all(
WithParents[ArgonPasswordHasher],
WithParents[ArgonPasswordVerifying],
)
@@ -0,0 +1,15 @@
from dishka import Provider, Scope, WithParents, provide, provide_all
from template_project.adapters.data_gateways.access_token import DefaultAccessTokenDataGateway
from template_project.adapters.data_gateways.user import DefaultUserDataGateway
from template_project.adapters.unit_of_work import DefaultUnitOfWork
class DataGatewayProvider(Provider):
scope = Scope.REQUEST
unit_of_work = provide(WithParents[DefaultUnitOfWork])
data_gateways = provide_all(
WithParents[DefaultUserDataGateway],
WithParents[DefaultAccessTokenDataGateway],
)
@@ -0,0 +1,11 @@
from dishka import Provider, Scope, provide_all
from template_project.adapters.access_token.factory import DefaultAccessTokenFactory
class FactoryProvider(Provider):
scope = Scope.APP
factories = provide_all(
DefaultAccessTokenFactory,
)
@@ -0,0 +1,11 @@
from dishka import Provider, Scope, provide_all
from template_project.application.user.interactors.sign_up import UserSignUpInteractor
class InteractorProvider(Provider):
scope = Scope.REQUEST
interactors = provide_all(
UserSignUpInteractor,
)
+23
View File
@@ -0,0 +1,23 @@
from dishka import AsyncContainer, make_async_container
from dishka.integrations.fastapi import FastapiProvider
from template_project.web_api.configuration import AccessTokenConfiguration, Configuration, DatabaseConfiguration, ServerConfiguration
from template_project.web_api.ioc.data_gateway import DataGatewayProvider
from template_project.web_api.ioc.factory import FactoryProvider
from template_project.web_api.ioc.interactor import InteractorProvider
from template_project.web_api.ioc.cryptographer import CryptographerProvider
def make_ioc(configuration: Configuration) -> AsyncContainer:
return make_async_container(
FactoryProvider(),
FastapiProvider(),
InteractorProvider(),
DataGatewayProvider(),
CryptographerProvider(),
context={
ServerConfiguration: configuration.server,
DatabaseConfiguration: configuration.database,
AccessTokenConfiguration: configuration.access_token,
},
)
@@ -0,0 +1,33 @@
from dishka import FromDishka
from dishka.integrations.fastapi import DishkaRoute
from fastapi import APIRouter
from pydantic import BaseModel, SecretStr
from template_project.application.common.containers import SecretString
from template_project.application.user.interactors.sign_up import UserSignUpInteractor
router = APIRouter(route_class=DishkaRoute)
class UserSignUpRequest(BaseModel):
email: str
password: SecretStr
class UserSignUpResponse(BaseModel):
access_token: str
@router.post("/user/sign_up")
async def sign_up(
request: UserSignUpRequest,
interactor: FromDishka[UserSignUpInteractor],
) -> UserSignUpResponse:
response_interactor = await interactor.execute(
email=request.email,
password=SecretString(request.password.get_secret_value())
)
return UserSignUpResponse(
access_token=response_interactor.access_token,
)