Files
RekomenciBackend/src/template_project/adapters/data_gateways/resume.py
T
ivankirpichnikov effbcfbc2d не помнб
2025-11-22 18:15:29 +03:00

72 lines
2.6 KiB
Python

from collections.abc import Sequence
from typing import override
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from template_project.adapters.data_gateways.tables import resume_prediction_table, resume_table
from template_project.application.resume.data_gateway import ResumeDataGateway, ResumePredictionDataGateway
from template_project.application.resume.entity import Resume, ResumeId, ResumePrediction
from template_project.application.resume.errors import ResumeNotFoundError
from template_project.application.user.entity import UserId
class DefaultResumeDataGateway(ResumeDataGateway):
def __init__(self, session: AsyncSession) -> None:
self._session = session
@override
async def load(self, resume_id: ResumeId) -> Resume:
resume = await self._session.get(Resume, resume_id)
if resume is None:
raise ResumeNotFoundError(resume_id=resume_id)
return resume
@override
async def list_by_user_id(self, user_id: UserId, limit: int, offset: int) -> Sequence[Resume]:
statement = (
select(Resume)
.where(resume_table.c.user_id == user_id)
.limit(limit)
.offset(offset)
)
result = await self._session.execute(statement)
return result.scalars().all()
@override
async def list_latest_by_user_id(self, user_id: UserId, limit: int, offset: int) -> Sequence[Resume]:
statement = (
select(Resume)
.where(resume_table.c.user_id == user_id)
.where(resume_table.c.up_resume_id.is_(None))
.limit(limit)
.offset(offset)
)
result = await self._session.execute(statement)
return result.scalars().all()
@override
async def get_history(self, resume_id: ResumeId) -> Sequence[Resume]:
# TODO: N+1
history: list[Resume] = []
current_resume = await self.load(resume_id)
history.append(current_resume)
while current_resume.down_resume_id is not None:
current_resume = await self.load(current_resume.down_resume_id)
history.append(current_resume)
return history
class DefaultResumePredictionDataGateway(ResumePredictionDataGateway):
def __init__(self, session: AsyncSession) -> None:
self._session = session
@override
async def load_by_resume_id(self, resume_id: ResumeId) -> ResumePrediction | None:
statement = select(ResumePrediction).where(resume_prediction_table.c.resume_id == resume_id)
result = await self._session.execute(statement)
return result.scalar()