Files
RekomenciBackend/src/template_project/application/resume/interactors/get.py
T
2025-11-22 17:30:46 +03:00

187 lines
5.7 KiB
Python

from decimal import Decimal
from template_project.application.common.data_structure import to_data_structure
from template_project.application.common.enums import EducationGrade, ExperienceType
from template_project.application.common.identity_provider import IdentityProvider
from template_project.application.common.interactor import to_interactor
from template_project.application.resume.data_gateway import (
ResumeDataGateway,
ResumeEducationDataGateway,
ResumeExperienceDataGateway,
ResumePredictionDataGateway,
ResumeProjectDataGateway,
)
from template_project.application.resume.entity import ResumeId
from template_project.application.resume.errors import ResumeDoesBelongUserError
@to_data_structure
class ResumePredictionResponse:
from_salary: Decimal
to_salary: Decimal
recommended_skills: list[str]
@to_data_structure
class ExperienceItemResponse:
place: str
description: str
months_duration: int
@to_data_structure
class EducationItemResponse:
place: str
grade: EducationGrade
specialization: str
description: str | None
@to_data_structure
class ProjectItemResponse:
name: str
description: str
@to_data_structure
class GetResumeResponse:
id: ResumeId
position: str
location: str
about_me: str
key_skills: list[str]
experience_type: ExperienceType
experience: list[ExperienceItemResponse]
education: list[EducationItemResponse]
projects: list[ProjectItemResponse]
prediction: ResumePredictionResponse | None
@to_interactor
class GetResumeInteractor:
identity_provider: IdentityProvider
resume_data_gateway: ResumeDataGateway
resume_prediction_data_gateway: ResumePredictionDataGateway
resume_experience_data_gateway: ResumeExperienceDataGateway
resume_education_data_gateway: ResumeEducationDataGateway
resume_project_data_gateway: ResumeProjectDataGateway
async def execute(
self,
resume_id: ResumeId,
) -> GetResumeResponse:
user = await self.identity_provider.get_current_user()
resume = await self.resume_data_gateway.load(resume_id)
if resume.user_id != user.id:
raise ResumeDoesBelongUserError
resume_prediction = await self.resume_prediction_data_gateway.load_by_resume_id(resume.id)
if resume_prediction is not None:
prediction = ResumePredictionResponse(
from_salary=resume_prediction.from_salary,
to_salary=resume_prediction.to_salary,
recommended_skills=resume_prediction.recommended_skills,
)
else:
prediction = None
experiences = await self.resume_experience_data_gateway.load_by_resume_id(resume.id)
educations = await self.resume_education_data_gateway.load_by_resume_id(resume.id)
projects = await self.resume_project_data_gateway.load_by_resume_id(resume.id)
return GetResumeResponse(
id=resume.id,
position=resume.position,
location=resume.location,
about_me=resume.about_me,
key_skills=resume.key_skills,
experience_type=resume.experience_type,
experience=[
ExperienceItemResponse(
place=exp.place,
description=exp.description,
months_duration=exp.months_duration,
)
for exp in experiences
],
education=[
EducationItemResponse(
place=edu.place,
grade=edu.grade,
specialization=edu.specialization,
description=edu.description,
)
for edu in educations
],
projects=[
ProjectItemResponse(
name=proj.name,
description=proj.description,
)
for proj in projects
],
prediction=prediction,
)
@to_data_structure
class ResumeListItemResponse:
id: ResumeId
position: str
location: str
about_me: str
key_skills: list[str]
experience_type: ExperienceType
@to_interactor
class GetResumeListInteractor:
identity_provider: IdentityProvider
resume_data_gateway: ResumeDataGateway
async def execute(self, limit: int, offset: int) -> list[ResumeListItemResponse]:
user = await self.identity_provider.get_current_user()
resumes = await self.resume_data_gateway.list_latest_by_user_id(user.id, limit=limit, offset=offset)
return [
ResumeListItemResponse(
id=r.id,
position=r.position,
location=r.location,
about_me=r.about_me,
key_skills=r.key_skills,
experience_type=r.experience_type,
)
for r in resumes
]
@to_interactor
class GetResumeHistoryInteractor:
identity_provider: IdentityProvider
resume_data_gateway: ResumeDataGateway
async def execute(self, resume_id: ResumeId) -> list[ResumeListItemResponse]:
user = await self.identity_provider.get_current_user()
resume = await self.resume_data_gateway.load(resume_id)
if resume.user_id != user.id:
raise ResumeDoesBelongUserError
history = await self.resume_data_gateway.get_history(resume_id)
return [
ResumeListItemResponse(
id=r.id,
position=r.position,
location=r.location,
about_me=r.about_me,
key_skills=r.key_skills,
experience_type=r.experience_type,
)
for r in history
]