add pipline

This commit is contained in:
ivankirpichnikov
2025-11-23 01:42:46 +03:00
parent 7f91b412b8
commit 96e792b122
20 changed files with 219 additions and 43 deletions
@@ -1,11 +1,12 @@
from collections.abc import Sequence
from typing import override
from sqlalchemy import select
from sqlalchemy import label, select
from sqlalchemy.ext.asyncio import AsyncSession
from template_project.adapters.data_gateways.tables import vacancy_embedding_table, vacancy_table
from template_project.application.vacancy.data_gateway import VacancyDataGateway
from template_project.application.vacancy.data_structure import SuitableVacancy
from template_project.application.vacancy.entity import Vacancy, VacancyEmbedding
@@ -14,11 +15,17 @@ class DefaultVacancyDataGateway(VacancyDataGateway):
self._session = session
@override
async def get_suitable(self, vector: list[float]) -> Sequence[Vacancy]:
async def get_suitable(self, vector: list[float]) -> Sequence[SuitableVacancy]:
statement = (
select(Vacancy)
select(Vacancy, label("resume_similarity", vacancy_embedding_table.c.vector.cosine_distance(vector)))
.join(VacancyEmbedding, vacancy_embedding_table.c.id == vacancy_table.c.id)
.where(vacancy_embedding_table.c.vector.cosine_distance(vector) > 0.5)
)
result = await self._session.execute(statement)
return result.scalars().all()
return [
SuitableVacancy(
vacancy=res[0],
resume_similarity=res[1],
)
for res in result.scalars()
]
@@ -1,6 +1,6 @@
from typing import Final, override
from template_project.adapters.ml_client import MlApiGateway
from template_project.adapters.ml_api_gateway import MlApiGateway
from template_project.application.common.enums import ExperienceType
from template_project.application.resume.vector_generator import ResumeEmbeddingVectorGenerator
@@ -0,0 +1,39 @@
from collections.abc import Sequence
from typing import override
from template_project.adapters.ml_api_gateway import MlApiGateway, SuitableVacancyDs
from template_project.application.resume.entity import Resume, ResumePrediction
from template_project.application.resume.resume_prediction_generator import ResumePredictionGenerator
from template_project.application.vacancy.data_structure import SuitableVacancy
class DefaultResumePredictionGenerator(ResumePredictionGenerator):
def __init__(self, ml_api_gateway: MlApiGateway) -> None:
self._ml_api_gateway = ml_api_gateway
@override
async def generate(
self,
resume: Resume,
suituble_vacancies: Sequence[SuitableVacancy],
) -> ResumePrediction:
response = await self._ml_api_gateway.generate_resume_prediction(
resume_id=resume.id,
key_skills=resume.key_skills,
suituble_vacancies=[
SuitableVacancyDs(
vacancy_id=suituble_vacancy.vacancy.id,
from_salary=suituble_vacancy.vacancy.from_salary,
to_salary=suituble_vacancy.vacancy.to_salary,
key_skills=suituble_vacancy.vacancy.key_skills,
resume_similarity=suituble_vacancy.resume_similarity,
)
for suituble_vacancy in suituble_vacancies
],
)
return ResumePrediction.factory(
resume_id=resume.id,
from_salary=response.salary_from,
to_salary=response.salary_to,
recommended_skills=response.recommended_skills,
)
@@ -0,0 +1,63 @@
from collections.abc import Sequence
from decimal import Decimal
from typing import cast
from httpx import AsyncClient
from template_project.application.common.data_structure import to_data_structure
from template_project.application.resume.entity import ResumeId
@to_data_structure
class SuitableVacancyDs:
vacancy_id: str
from_salary: Decimal
to_salary: Decimal
key_skills: list[str]
resume_similarity: float
@to_data_structure
class GenerateResumePredictionResponse:
salary_from: Decimal
salary_to: Decimal
recommended_skills: list[str]
class MlApiGateway:
def __init__(self, client: AsyncClient) -> None:
self._client = client
async def generate_embedding(self, text: str) -> list[float]:
response = await self._client.post("/get_embedding", json={"text": text})
return cast(list[float], response.json()["embedding"])
async def generate_resume_prediction(
self,
resume_id: ResumeId,
key_skills: list[str],
suituble_vacancies: Sequence[SuitableVacancyDs],
) -> GenerateResumePredictionResponse:
response = await self._client.post(
"/predict_salary",
json={
"resume_id": resume_id,
"key_skills": key_skills,
"vacancies": [
{
"vacancy_id": suituble_vacancy.vacancy_id,
"from_salary": suituble_vacancy.from_salary,
"to_salary": suituble_vacancy.to_salary,
"key_skills": suituble_vacancy.key_skills,
"resume_similarity": suituble_vacancy.resume_similarity,
} for suituble_vacancy in suituble_vacancies
],
},
)
response_json = response.json()
return GenerateResumePredictionResponse(
salary_from=response_json["salary_from"],
salary_to=response_json["salary_to"],
recommended_skills=response_json["recommended_skills"],
)
@@ -1,12 +0,0 @@
from typing import cast
from httpx import AsyncClient
class MlApiGateway:
def __init__(self, client: AsyncClient) -> None:
self._client = client
async def generate_embedding(self, text: str) -> list[float]:
response = await self._client.post("/get_embedding", json={"text": text})
return cast(list[float], response.json()["embedding"])
@@ -1,7 +1,7 @@
from collections.abc import Hashable
from dataclasses import dataclass, replace
from dataclasses import dataclass
from datetime import datetime
from typing import Self, cast, dataclass_transform, override
from typing import cast, dataclass_transform, override
from uuid import UUID
from template_project.application.common.errors import EntityAlreadyDeletedError
@@ -22,9 +22,6 @@ class Entity[EntityId: UUID](Hashable):
if self.deleted_at is not None:
raise EntityAlreadyDeletedError(entity_name=self.__class__.__name__)
def __copy__(self) -> Self:
return replace(self)
@override
def __eq__(self, other: object) -> bool:
if isinstance(other, Entity):
@@ -80,6 +80,23 @@ class ResumePrediction(Entity[ResumePredictionId]):
to_salary: Decimal
recommended_skills: list[str]
@classmethod
def factory(
cls,
resume_id: ResumeId,
from_salary: Decimal,
to_salary: Decimal,
recommended_skills: list[str],
) -> Self:
return cls(
id=ResumePredictionId(uuid7()),
created_at=datetime.now(tz=UTC),
resume_id=resume_id,
from_salary=from_salary,
to_salary=to_salary,
recommended_skills=recommended_skills,
)
@to_entity
class ResumeExperience(Entity[ResumeExperienceId]):
@@ -3,20 +3,21 @@ from collections.abc import Callable
from Levenshtein import ratio
from template_project.application.common.unit_of_work import UnitOfWork
from template_project.application.resume.entity import Resume, ResumeEmbedding, ResumePrediction
from template_project.application.resume.entity import Resume, ResumeEmbedding
from template_project.application.resume.resume_prediction_generator import ResumePredictionGenerator
from template_project.application.resume.vector_generator import ResumeEmbeddingVectorGenerator
from template_project.application.vacancy.data_gateway import VacancyDataGateway
from template_project.application.vacancy.entity import Vacancy
from template_project.application.vacancy.data_structure import SuitableVacancy
def suitable_vacancies_key(
resume: Resume,
) -> Callable[[Vacancy], bool]:
def wrapper(vacancy: Vacancy) -> bool:
) -> Callable[[SuitableVacancy], tuple[bool, bool]]:
def wrapper(suitable_vacancy: SuitableVacancy) -> tuple[bool, bool]:
count_skills = 0
ratio_skill_sum = 0.0
for resum_key_skill in resume.key_skills:
for suitable_resume_key_skill in vacancy.key_skills:
for suitable_resume_key_skill in suitable_vacancy.vacancy.key_skills:
ratio_skill = ratio(resum_key_skill, suitable_resume_key_skill)
if ratio_skill != 0:
count_skills += 1
@@ -27,26 +28,28 @@ def suitable_vacancies_key(
except ZeroDivisionError:
matching_skills = 0
return resume.experience_type == vacancy.experience_type and matching_skills >= 50
return resume.experience_type == suitable_vacancy.vacancy.experience_type, matching_skills >= 50
return wrapper
class ResumeEmbeddingPipeline:
class ResumeEmbeddingInteractor:
def __init__(
self,
unit_of_work: UnitOfWork,
vacancy_data_gateway: VacancyDataGateway,
vector_generator: ResumeEmbeddingVectorGenerator,
resume_prediction_generator: ResumePredictionGenerator,
) -> None:
self.unit_of_work = unit_of_work
self.vacancy_data_gateway = vacancy_data_gateway
self.vector_generator = vector_generator
self.vacancy_data_gateway = vacancy_data_gateway
self.resume_prediction_generator = resume_prediction_generator
async def run(
self,
resume: Resume,
) -> ResumePrediction:
) -> None:
vector = await self.vector_generator.generate(
position=resume.position,
about_me=resume.about_me,
@@ -62,13 +65,12 @@ class ResumeEmbeddingPipeline:
suitable_vacancies_filtered = sorted(
suitable_vacancies,
key=suitable_vacancies_key(resume),
)[:50]
resume_prediction = await self.resume_prediction_generator.generate(
resume=resume,
suituble_vacancies=suitable_vacancies_filtered,
)
suitable_vacancies = suitable_vacancies_filtered[:50]
# TODO: тут надо сделать отправку в ИИ
await self.unit_of_work.add(resume_embedding)
await self.unit_of_work.add(resume_embedding, resume_prediction)
await self.unit_of_work.commit()
raise NotImplementedError
@@ -1 +0,0 @@
# class ResumePredicition
@@ -0,0 +1,16 @@
from abc import abstractmethod
from collections.abc import Sequence
from typing import Protocol
from template_project.application.resume.entity import Resume, ResumePrediction
from template_project.application.vacancy.data_structure import SuitableVacancy
class ResumePredictionGenerator(Protocol):
@abstractmethod
async def generate(
self,
resume: Resume,
suituble_vacancies: Sequence[SuitableVacancy],
) -> ResumePrediction:
raise NotImplementedError
@@ -2,10 +2,10 @@ from abc import abstractmethod
from collections.abc import Sequence
from typing import Protocol
from template_project.application.vacancy.entity import Vacancy
from template_project.application.vacancy.data_structure import SuitableVacancy
class VacancyDataGateway(Protocol):
@abstractmethod
async def get_suitable(self, vector: list[float]) -> Sequence[Vacancy]:
async def get_suitable(self, vector: list[float]) -> Sequence[SuitableVacancy]:
raise NotImplementedError
@@ -0,0 +1,8 @@
from template_project.application.common.data_structure import to_data_structure
from template_project.application.vacancy.entity import Vacancy
@to_data_structure
class SuitableVacancy:
vacancy: Vacancy
resume_similarity: float
@@ -28,6 +28,11 @@ class S3Config:
secret_key: str
@to_configuration
class MlApiConfiguration:
url: str
@to_configuration
class AccessTokenConfiguration:
crypto_key: str
@@ -71,6 +76,7 @@ class Configuration:
access_token: AccessTokenConfiguration
yandex_oauth: YandexOAuthConfiguration
firebase: FirebaseConfiguration
ml_api: MlApiConfiguration
retort = Retort(
@@ -2,10 +2,12 @@ from collections.abc import AsyncIterable
from aioboto3.session import Session
from dishka import Provider, Scope, provide
from httpx import AsyncClient
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from template_project.adapters.ml_api_gateway import MlApiGateway
from template_project.adapters.s3_storage import AioBoto3ClientLike
from template_project.web_api.configuration import DatabaseConfiguration, S3Config
from template_project.web_api.configuration import DatabaseConfiguration, MlApiConfiguration, S3Config
class ConnectionProvider(Provider):
@@ -35,3 +37,8 @@ class ConnectionProvider(Provider):
aws_secret_access_key=config.secret_key,
) as s3_client:
yield s3_client
@provide(scope=Scope.APP)
async def ml_api_gateway(self, config: MlApiConfiguration) -> AsyncIterable[MlApiGateway]:
async with AsyncClient(base_url=config.url) as client:
yield MlApiGateway(client)
+4
View File
@@ -6,6 +6,7 @@ from template_project.web_api.configuration import (
Configuration,
DatabaseConfiguration,
FirebaseConfiguration,
MlApiConfiguration,
S3Config,
ServerConfiguration,
YandexOAuthConfiguration,
@@ -20,6 +21,7 @@ from template_project.web_api.ioc.notifications import (
NotificationServiceProvider,
)
from template_project.web_api.ioc.oauth import OAuthClientProvider
from template_project.web_api.ioc.other import OtherProvider
from template_project.web_api.ioc.storage import StorageProvider
@@ -35,6 +37,7 @@ def make_ioc(configuration: Configuration) -> AsyncContainer:
OAuthClientProvider(),
NotificationServiceProvider(),
StorageProvider(),
OtherProvider(),
validation_settings=STRICT_VALIDATION,
context={
ServerConfiguration: configuration.server,
@@ -44,5 +47,6 @@ def make_ioc(configuration: Configuration) -> AsyncContainer:
FirebaseConfiguration: configuration.firebase,
Configuration: configuration,
S3Config: configuration.s3,
MlApiConfiguration: configuration.ml_api,
},
)
+13
View File
@@ -0,0 +1,13 @@
from dishka import BaseScope, Provider, Scope, WithParents, provide_all
from template_project.adapters.generators.resume_embedding_vector import DefaultResumeEmbeddingVectorGenerator
from template_project.adapters.generators.resume_prediction import DefaultResumePredictionGenerator
class OtherProvider(Provider):
scope: BaseScope | None = Scope.REQUEST
other_providers = provide_all(
WithParents[DefaultResumePredictionGenerator],
WithParents[DefaultResumeEmbeddingVectorGenerator],
)