Merge branch 'refactor/tests' into 'main'

Refactor/tests

See merge request prod-hackathon-moscow/hackaton!2
This commit is contained in:
Ivan Kirpichnikov
2025-11-21 10:17:27 +00:00
10 changed files with 227 additions and 138 deletions
+1 -1
View File
@@ -16,7 +16,7 @@ build:
[group("Docker")] [group("Docker")]
[doc("Compose start")] [doc("Compose start")]
up: up:
docker compose --profile migrations --profile observability up -d --remove-orphans --quiet-pull --force-recreate docker compose --profile migrations --profile observability up -d --remove-orphans --quiet-pull --force-recreate --build
# ========= # =========
# > Tests # > Tests
+1
View File
@@ -38,6 +38,7 @@ tests = [
"pytest==8.4.0", "pytest==8.4.0",
"coverage==7.11.0", "coverage==7.11.0",
"pytest_asyncio==1.2.0", "pytest_asyncio==1.2.0",
"dirty-equals>=0.11",
] ]
dev = [ dev = [
{ include-group = "tests" }, { include-group = "tests" },
+6
View File
@@ -5,6 +5,7 @@ import pytest
from dishka import AsyncContainer from dishka import AsyncContainer
from template_project.web_api.configuration import load_configuration from template_project.web_api.configuration import load_configuration
from tests.web_api.helpers import get_unique_email
from tests.web_api.ioc import make_ioc from tests.web_api.ioc import make_ioc
@@ -15,3 +16,8 @@ async def dishka_container() -> AsyncIterable[AsyncContainer]:
ioc = make_ioc(configuration) ioc = make_ioc(configuration)
yield ioc yield ioc
await ioc.close() await ioc.close()
@pytest.fixture
def unique_email() -> str:
return get_unique_email()
+61 -56
View File
@@ -1,107 +1,112 @@
from http import HTTPStatus as status from typing import Final
from uuid import uuid4
from dirty_equals import IsDict, IsPartialDict, IsStr
from dishka import FromDishka from dishka import FromDishka
from httpx import AsyncClient, Response
from tests.web_api.helpers import (
is_conflict_response,
is_not_found_response,
is_success_response,
is_unauthorized_response,
)
from tests.web_api.ioc import DatabaseClearer, inject from tests.web_api.ioc import DatabaseClearer, inject
from tests.web_api.test_api_gateway import TestApiGateway
DEFAULT_PASSWORD = "Sup3rSecret" # noqa: S105 DEFAULT_PASSWORD: Final = "Sup3rPuperS3cret" # noqa: S105
async def _sign_up_email(client: AsyncClient, email: str, password: str = DEFAULT_PASSWORD) -> None:
response = await client.post(
"/auth/sign_up/email",
json={"email": email, "password": password},
)
assert response.status_code == status.OK, response.text
async def _sign_in_email(client: AsyncClient, email: str, password: str = DEFAULT_PASSWORD) -> Response:
return await client.post(
"/auth/sign_in/email",
json={"email": email, "password": password},
)
def _unique_email() -> str:
return f"user-{uuid4().hex}@example.com"
@inject @inject
async def test_email_sign_up_creates_user( async def test_email_sign_up_creates_user(
http_client: FromDishka[AsyncClient], unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
email = _unique_email()
response = await http_client.post( response = await test_api_gateway.sign_up_email(
"/auth/sign_up/email", email=unique_email,
json={"email": email, "password": DEFAULT_PASSWORD}, password=DEFAULT_PASSWORD,
)
assert is_success_response(response)
assert response.json() == IsPartialDict(
access_token=IsStr()
) )
body = response.json()
assert response.status_code == status.OK
assert isinstance(body["access_token"], str)
assert body["access_token"]
@inject @inject
async def test_email_sign_up_existing_user_conflict( async def test_email_sign_up_existing_user_conflict(
http_client: FromDishka[AsyncClient], unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
email = _unique_email()
await _sign_up_email(http_client, email)
response = await http_client.post( await test_api_gateway.sign_up_email(
"/auth/sign_up/email", email=unique_email,
json={"email": email, "password": DEFAULT_PASSWORD}, password=DEFAULT_PASSWORD,
) )
assert response.status_code == status.CONFLICT response = await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert is_conflict_response(response)
@inject @inject
async def test_email_sign_in_returns_token( async def test_email_sign_in_returns_token(
http_client: FromDishka[AsyncClient], unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
email = _unique_email()
await _sign_up_email(http_client, email)
response = await _sign_in_email(http_client, email) await test_api_gateway.sign_up_email(
body = response.json() email=unique_email,
password=DEFAULT_PASSWORD,
)
assert response.status_code == status.OK response = await test_api_gateway.sign_in_email(
assert isinstance(body["access_token"], str) email=unique_email,
assert body["access_token"] password=DEFAULT_PASSWORD,
)
assert is_success_response(response)
assert response.json() == IsDict(
access_token=IsStr,
)
@inject @inject
async def test_email_sign_in_invalid_password( async def test_email_sign_in_invalid_password(
http_client: FromDishka[AsyncClient], unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
email = _unique_email()
await _sign_up_email(http_client, email)
response = await _sign_in_email(http_client, email, password="wrong-password") await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert response.status_code == status.UNAUTHORIZED response = await test_api_gateway.sign_in_email(
email=unique_email,
password="wrong-password",
)
assert is_unauthorized_response(response)
@inject @inject
async def test_email_sign_in_user_not_found( async def test_email_sign_in_user_not_found(
http_client: FromDishka[AsyncClient], unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
response = await _sign_in_email(http_client, email=_unique_email()) response = await test_api_gateway.sign_in_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert response.status_code == status.NOT_FOUND assert is_not_found_response(response)
+8 -6
View File
@@ -1,8 +1,8 @@
from http import HTTPStatus as status from dirty_equals import IsDict
from dishka import FromDishka from dishka import FromDishka
from httpx import AsyncClient from httpx import AsyncClient
from tests.web_api.helpers import is_success_response
from tests.web_api.ioc import inject from tests.web_api.ioc import inject
@@ -11,7 +11,9 @@ async def test_healthcheck(
http_client: FromDishka[AsyncClient], http_client: FromDishka[AsyncClient],
) -> None: ) -> None:
response = await http_client.get("/healthcheck") response = await http_client.get("/healthcheck")
response_json = response.json() assert is_success_response(response)
assert response.json() == IsDict(
assert response.status_code == status.OK {
assert response_json["ok"] "ok": True,
}
)
+54 -75
View File
@@ -1,108 +1,87 @@
from collections.abc import Mapping from typing import Final
from http import HTTPStatus as status
from uuid import uuid4
from dirty_equals import IsPartialDict
from dishka import FromDishka from dishka import FromDishka
from httpx import AsyncClient, Response
from tests.web_api.helpers import is_forbidden_response, is_success_response
from tests.web_api.ioc import DatabaseClearer, inject from tests.web_api.ioc import DatabaseClearer, inject
from tests.web_api.test_api_gateway import TestApiGateway
DEFAULT_PASSWORD = "Sup3rSecret" # noqa: S105 DEFAULT_PASSWORD: Final = "Sup3rSecret" # noqa: S105
def _unique_email() -> str:
return f"user-{uuid4().hex}@example.com"
def _auth_headers(token: str) -> Mapping[str, str]:
return {"Authorization": f"Bearer {token}"}
async def _sign_up_and_get_token(client: AsyncClient, email: str) -> str:
response = await client.post(
"/auth/sign_up/email",
json={"email": email, "password": DEFAULT_PASSWORD},
)
assert response.status_code == status.OK, response.text
body = response.json()
return str(body["access_token"])
async def _get_profile(client: AsyncClient, token: str) -> Response:
return await client.get("/profile", headers=_auth_headers(token))
async def _patch_profile(
client: AsyncClient,
token: str,
payload: Mapping[str, str | None],
) -> Response:
return await client.patch("/profile", headers=_auth_headers(token), json=payload)
@inject @inject
async def test_get_profile_returns_current_user( async def test_get_profile_returns_current_user(
http_client: FromDishka[AsyncClient], unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
email = _unique_email()
token = await _sign_up_and_get_token(http_client, email)
response = await _get_profile(http_client, token) response_sign_up = await test_api_gateway.sign_up_email(unique_email, DEFAULT_PASSWORD)
body = response.json() access_token = response_sign_up.json()["access_token"]
assert response.status_code == status.OK response = await test_api_gateway.get_profile(access_token)
assert body["email"] == email assert is_success_response(response)
assert body["display_name"] is None assert response.json() == IsPartialDict(
email=unique_email,
display_name=None,
)
@inject @inject
async def test_patch_profile_updates_profile_data( async def test_patch_profile_updates_profile_data(
http_client: FromDishka[AsyncClient], unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
email = _unique_email()
token = await _sign_up_and_get_token(http_client, email)
payload = { response = await test_api_gateway.sign_up_email(
"display_name": "Alice", email=unique_email,
"first_name": "Alice", password=DEFAULT_PASSWORD,
"last_name": "Smith", )
"avatar_url": "https://example.com/avatar.png", access_token = response.json()["access_token"]
"phone": "+1234567890",
}
patch_response = await _patch_profile(http_client, token, payload)
patch_body = patch_response.json()
assert patch_response.status_code == status.OK patch_response = await test_api_gateway.patch_profile(
assert patch_body["display_name"] == payload["display_name"] access_token=access_token,
assert patch_body["first_name"] == payload["first_name"] display_name="Alice",
assert patch_body["last_name"] == payload["last_name"] first_name="Alice",
assert patch_body["avatar_url"] == payload["avatar_url"] last_name="Smith",
assert patch_body["phone"] == payload["phone"] avatar_url="https://example.com/avatar.png",
assert patch_body["email"] == email phone="+1234567890",
)
assert is_success_response(patch_response)
assert patch_response.json() == IsPartialDict(
display_name="Alice",
first_name="Alice",
last_name="Smith",
avatar_url="https://example.com/avatar.png",
phone="+1234567890",
email=unique_email,
)
get_response = await _get_profile(http_client, token) get_response = await test_api_gateway.get_profile(access_token)
get_body = get_response.json() assert is_success_response(get_response)
assert get_response.json() == IsPartialDict(
assert get_body["display_name"] == payload["display_name"] display_name="Alice",
assert get_body["first_name"] == payload["first_name"] first_name="Alice",
assert get_body["last_name"] == payload["last_name"] last_name="Smith",
assert get_body["avatar_url"] == payload["avatar_url"] avatar_url="https://example.com/avatar.png",
assert get_body["phone"] == payload["phone"] phone="+1234567890",
email=unique_email,
)
@inject @inject
async def test_profile_routes_require_authentication( async def test_profile_routes_require_authentication(
http_client: FromDishka[AsyncClient], test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer], database_clearer: FromDishka[DatabaseClearer],
) -> None: ) -> None:
await database_clearer.clear() await database_clearer.clear()
response_get = await http_client.get("/profile") response_get = await test_api_gateway.get_profile(None)
response_patch = await http_client.patch("/profile", json={}) assert is_forbidden_response(response_get)
assert response_get.status_code == status.FORBIDDEN response_patch = await test_api_gateway.patch_profile()
assert response_patch.status_code == status.FORBIDDEN assert is_forbidden_response(response_patch)
+28
View File
@@ -0,0 +1,28 @@
from http import HTTPStatus
from uuid import uuid4
from httpx import Response
def is_success_response(response: Response) -> bool:
return response.status_code == HTTPStatus.OK
def is_unauthorized_response(response: Response) -> bool:
return response.status_code == HTTPStatus.UNAUTHORIZED
def is_not_found_response(response: Response) -> bool:
return response.status_code == HTTPStatus.NOT_FOUND
def is_forbidden_response(response: Response) -> bool:
return response.status_code == HTTPStatus.FORBIDDEN
def is_conflict_response(response: Response) -> bool:
return response.status_code == HTTPStatus.CONFLICT
def get_unique_email() -> str:
return f"user-{uuid4().hex}@example.com"
+2
View File
@@ -35,6 +35,7 @@ from template_project.web_api.ioc.interactor import InteractorProvider
from template_project.web_api.ioc.notifications import NotificationServiceProvider from template_project.web_api.ioc.notifications import NotificationServiceProvider
from template_project.web_api.ioc.oauth import OAuthClientProvider from template_project.web_api.ioc.oauth import OAuthClientProvider
from template_project.web_api.ioc.storage import StorageProvider from template_project.web_api.ioc.storage import StorageProvider
from tests.web_api.test_api_gateway import TestApiGateway
class DatabaseClearer: class DatabaseClearer:
@@ -66,6 +67,7 @@ class DatabaseClearer:
class TestProvider(Provider): class TestProvider(Provider):
scope: BaseScope | None = Scope.REQUEST scope: BaseScope | None = Scope.REQUEST
api_gateway = provide(TestApiGateway)
database_clearer = provide(DatabaseClearer) database_clearer = provide(DatabaseClearer)
@provide @provide
+53
View File
@@ -0,0 +1,53 @@
from httpx import AsyncClient, Response
class TestApiGateway:
def __init__(self, client: AsyncClient) -> None:
self._client = client
async def sign_up_email(self, email: str, password: str) -> Response:
return await self._client.post(
"/auth/sign_up/email",
json={"email": email, "password": password},
)
async def sign_in_email(self, email: str, password: str) -> Response:
return await self._client.post(
"/auth/sign_in/email",
json={"email": email, "password": password},
)
async def get_profile(self, access_token: str | None) -> Response:
headers = {} if access_token is None else {"Authorization": f"Bearer {access_token}"}
return await self._client.get(
"/profile",
headers=headers,
)
async def patch_profile(
self,
access_token: str | None = None,
display_name: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
avatar_url: str | None = None,
phone: str | None = None,
) -> Response:
headers = {} if access_token is None else {"Authorization": f"Bearer {access_token}"}
return await self._client.patch(
"/profile",
json={
key: value
for key, value in {
"display_name": display_name,
"first_name": first_name,
"last_name": last_name,
"avatar_url": avatar_url,
"phone": phone,
}.items()
if value is not None
},
headers=headers,
)
Generated
+13
View File
@@ -614,6 +614,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/cb/2da4cc83f5edb9c3257d09e1e7ab7b23f049c7962cae8d842bbef0a9cec9/cryptography-46.0.3-cp38-abi3-win_arm64.whl", hash = "sha256:d89c3468de4cdc4f08a57e214384d0471911a3830fcdaf7a8cc587e42a866372", size = 2918740, upload-time = "2025-10-15T23:18:12.277Z" }, { url = "https://files.pythonhosted.org/packages/e8/cb/2da4cc83f5edb9c3257d09e1e7ab7b23f049c7962cae8d842bbef0a9cec9/cryptography-46.0.3-cp38-abi3-win_arm64.whl", hash = "sha256:d89c3468de4cdc4f08a57e214384d0471911a3830fcdaf7a8cc587e42a866372", size = 2918740, upload-time = "2025-10-15T23:18:12.277Z" },
] ]
[[package]]
name = "dirty-equals"
version = "0.11"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/30/1d/c5913ac9d6615515a00f4bdc71356d302437cb74ff2e9aaccd3c14493b78/dirty_equals-0.11.tar.gz", hash = "sha256:f4ac74ee88f2d11e2fa0f65eb30ee4f07105c5f86f4dc92b09eb1138775027c3", size = 128067, upload-time = "2025-11-17T01:51:24.451Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bb/8d/dbff05239043271dbeace563a7686212a3dd517864a35623fe4d4a64ca19/dirty_equals-0.11-py3-none-any.whl", hash = "sha256:b1d7093273fc2f9be12f443a8ead954ef6daaf6746fd42ef3a5616433ee85286", size = 28051, upload-time = "2025-11-17T01:51:22.849Z" },
]
[[package]] [[package]]
name = "dishka" name = "dishka"
version = "1.7.2" version = "1.7.2"
@@ -1964,6 +1973,7 @@ dev = [
{ name = "bandit" }, { name = "bandit" },
{ name = "codespell" }, { name = "codespell" },
{ name = "coverage" }, { name = "coverage" },
{ name = "dirty-equals" },
{ name = "mypy" }, { name = "mypy" },
{ name = "pytest" }, { name = "pytest" },
{ name = "pytest-asyncio" }, { name = "pytest-asyncio" },
@@ -1982,6 +1992,7 @@ migrations = [
] ]
tests = [ tests = [
{ name = "coverage" }, { name = "coverage" },
{ name = "dirty-equals" },
{ name = "pytest" }, { name = "pytest" },
{ name = "pytest-asyncio" }, { name = "pytest-asyncio" },
] ]
@@ -2013,6 +2024,7 @@ dev = [
{ name = "bandit", specifier = "==1.8.6" }, { name = "bandit", specifier = "==1.8.6" },
{ name = "codespell", specifier = "==2.4.1" }, { name = "codespell", specifier = "==2.4.1" },
{ name = "coverage", specifier = "==7.11.0" }, { name = "coverage", specifier = "==7.11.0" },
{ name = "dirty-equals", specifier = ">=0.11" },
{ name = "mypy", specifier = "==1.18.1" }, { name = "mypy", specifier = "==1.18.1" },
{ name = "pytest", specifier = "==8.4.0" }, { name = "pytest", specifier = "==8.4.0" },
{ name = "pytest-asyncio", specifier = "==1.2.0" }, { name = "pytest-asyncio", specifier = "==1.2.0" },
@@ -2029,6 +2041,7 @@ linters = [
migrations = [{ name = "alembic", specifier = "==1.17.0" }] migrations = [{ name = "alembic", specifier = "==1.17.0" }]
tests = [ tests = [
{ name = "coverage", specifier = "==7.11.0" }, { name = "coverage", specifier = "==7.11.0" },
{ name = "dirty-equals", specifier = ">=0.11" },
{ name = "pytest", specifier = "==8.4.0" }, { name = "pytest", specifier = "==8.4.0" },
{ name = "pytest-asyncio", specifier = "==1.2.0" }, { name = "pytest-asyncio", specifier = "==1.2.0" },
] ]