Merge remote-tracking branch 'origin/main'

This commit is contained in:
gitgernit
2025-11-21 13:46:04 +03:00
10 changed files with 228 additions and 138 deletions
+1 -1
View File
@@ -16,7 +16,7 @@ build:
[group("Docker")]
[doc("Compose start")]
up:
docker compose --profile migrations --profile observability up -d --remove-orphans --quiet-pull --force-recreate
docker compose --profile migrations --profile observability up -d --remove-orphans --quiet-pull --force-recreate --build
# =========
# > Tests
+1
View File
@@ -39,6 +39,7 @@ tests = [
"pytest==8.4.0",
"coverage==7.11.0",
"pytest_asyncio==1.2.0",
"dirty-equals>=0.11",
]
dev = [
{ include-group = "tests" },
+6
View File
@@ -5,6 +5,7 @@ import pytest
from dishka import AsyncContainer
from template_project.web_api.configuration import load_configuration
from tests.web_api.helpers import get_unique_email
from tests.web_api.ioc import make_ioc
@@ -15,3 +16,8 @@ async def dishka_container() -> AsyncIterable[AsyncContainer]:
ioc = make_ioc(configuration)
yield ioc
await ioc.close()
@pytest.fixture
def unique_email() -> str:
return get_unique_email()
+61 -56
View File
@@ -1,107 +1,112 @@
from http import HTTPStatus as status
from uuid import uuid4
from typing import Final
from dirty_equals import IsDict, IsPartialDict, IsStr
from dishka import FromDishka
from httpx import AsyncClient, Response
from tests.web_api.helpers import (
is_conflict_response,
is_not_found_response,
is_success_response,
is_unauthorized_response,
)
from tests.web_api.ioc import DatabaseClearer, inject
from tests.web_api.test_api_gateway import TestApiGateway
DEFAULT_PASSWORD = "Sup3rSecret" # noqa: S105
async def _sign_up_email(client: AsyncClient, email: str, password: str = DEFAULT_PASSWORD) -> None:
response = await client.post(
"/auth/sign_up/email",
json={"email": email, "password": password},
)
assert response.status_code == status.OK, response.text
async def _sign_in_email(client: AsyncClient, email: str, password: str = DEFAULT_PASSWORD) -> Response:
return await client.post(
"/auth/sign_in/email",
json={"email": email, "password": password},
)
def _unique_email() -> str:
return f"user-{uuid4().hex}@example.com"
DEFAULT_PASSWORD: Final = "Sup3rPuperS3cret" # noqa: S105
@inject
async def test_email_sign_up_creates_user(
http_client: FromDishka[AsyncClient],
unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
email = _unique_email()
response = await http_client.post(
"/auth/sign_up/email",
json={"email": email, "password": DEFAULT_PASSWORD},
response = await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert is_success_response(response)
assert response.json() == IsPartialDict(
access_token=IsStr()
)
body = response.json()
assert response.status_code == status.OK
assert isinstance(body["access_token"], str)
assert body["access_token"]
@inject
async def test_email_sign_up_existing_user_conflict(
http_client: FromDishka[AsyncClient],
unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
email = _unique_email()
await _sign_up_email(http_client, email)
response = await http_client.post(
"/auth/sign_up/email",
json={"email": email, "password": DEFAULT_PASSWORD},
await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert response.status_code == status.CONFLICT
response = await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert is_conflict_response(response)
@inject
async def test_email_sign_in_returns_token(
http_client: FromDishka[AsyncClient],
unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
email = _unique_email()
await _sign_up_email(http_client, email)
response = await _sign_in_email(http_client, email)
body = response.json()
await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert response.status_code == status.OK
assert isinstance(body["access_token"], str)
assert body["access_token"]
response = await test_api_gateway.sign_in_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert is_success_response(response)
assert response.json() == IsDict(
access_token=IsStr,
)
@inject
async def test_email_sign_in_invalid_password(
http_client: FromDishka[AsyncClient],
unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
email = _unique_email()
await _sign_up_email(http_client, email)
response = await _sign_in_email(http_client, email, password="wrong-password")
await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert response.status_code == status.UNAUTHORIZED
response = await test_api_gateway.sign_in_email(
email=unique_email,
password="wrong-password",
)
assert is_unauthorized_response(response)
@inject
async def test_email_sign_in_user_not_found(
http_client: FromDishka[AsyncClient],
unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
response = await _sign_in_email(http_client, email=_unique_email())
response = await test_api_gateway.sign_in_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
assert response.status_code == status.NOT_FOUND
assert is_not_found_response(response)
+8 -6
View File
@@ -1,8 +1,8 @@
from http import HTTPStatus as status
from dirty_equals import IsDict
from dishka import FromDishka
from httpx import AsyncClient
from tests.web_api.helpers import is_success_response
from tests.web_api.ioc import inject
@@ -11,7 +11,9 @@ async def test_healthcheck(
http_client: FromDishka[AsyncClient],
) -> None:
response = await http_client.get("/healthcheck")
response_json = response.json()
assert response.status_code == status.OK
assert response_json["ok"]
assert is_success_response(response)
assert response.json() == IsDict(
{
"ok": True,
}
)
+54 -75
View File
@@ -1,108 +1,87 @@
from collections.abc import Mapping
from http import HTTPStatus as status
from uuid import uuid4
from typing import Final
from dirty_equals import IsPartialDict
from dishka import FromDishka
from httpx import AsyncClient, Response
from tests.web_api.helpers import is_forbidden_response, is_success_response
from tests.web_api.ioc import DatabaseClearer, inject
from tests.web_api.test_api_gateway import TestApiGateway
DEFAULT_PASSWORD = "Sup3rSecret" # noqa: S105
def _unique_email() -> str:
return f"user-{uuid4().hex}@example.com"
def _auth_headers(token: str) -> Mapping[str, str]:
return {"Authorization": f"Bearer {token}"}
async def _sign_up_and_get_token(client: AsyncClient, email: str) -> str:
response = await client.post(
"/auth/sign_up/email",
json={"email": email, "password": DEFAULT_PASSWORD},
)
assert response.status_code == status.OK, response.text
body = response.json()
return str(body["access_token"])
async def _get_profile(client: AsyncClient, token: str) -> Response:
return await client.get("/profile", headers=_auth_headers(token))
async def _patch_profile(
client: AsyncClient,
token: str,
payload: Mapping[str, str | None],
) -> Response:
return await client.patch("/profile", headers=_auth_headers(token), json=payload)
DEFAULT_PASSWORD: Final = "Sup3rSecret" # noqa: S105
@inject
async def test_get_profile_returns_current_user(
http_client: FromDishka[AsyncClient],
unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
email = _unique_email()
token = await _sign_up_and_get_token(http_client, email)
response = await _get_profile(http_client, token)
body = response.json()
response_sign_up = await test_api_gateway.sign_up_email(unique_email, DEFAULT_PASSWORD)
access_token = response_sign_up.json()["access_token"]
assert response.status_code == status.OK
assert body["email"] == email
assert body["display_name"] is None
response = await test_api_gateway.get_profile(access_token)
assert is_success_response(response)
assert response.json() == IsPartialDict(
email=unique_email,
display_name=None,
)
@inject
async def test_patch_profile_updates_profile_data(
http_client: FromDishka[AsyncClient],
unique_email: str,
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
email = _unique_email()
token = await _sign_up_and_get_token(http_client, email)
payload = {
"display_name": "Alice",
"first_name": "Alice",
"last_name": "Smith",
"avatar_url": "https://example.com/avatar.png",
"phone": "+1234567890",
}
patch_response = await _patch_profile(http_client, token, payload)
patch_body = patch_response.json()
response = await test_api_gateway.sign_up_email(
email=unique_email,
password=DEFAULT_PASSWORD,
)
access_token = response.json()["access_token"]
assert patch_response.status_code == status.OK
assert patch_body["display_name"] == payload["display_name"]
assert patch_body["first_name"] == payload["first_name"]
assert patch_body["last_name"] == payload["last_name"]
assert patch_body["avatar_url"] == payload["avatar_url"]
assert patch_body["phone"] == payload["phone"]
assert patch_body["email"] == email
patch_response = await test_api_gateway.patch_profile(
access_token=access_token,
display_name="Alice",
first_name="Alice",
last_name="Smith",
avatar_url="https://example.com/avatar.png",
phone="+1234567890",
)
assert is_success_response(patch_response)
assert patch_response.json() == IsPartialDict(
display_name="Alice",
first_name="Alice",
last_name="Smith",
avatar_url="https://example.com/avatar.png",
phone="+1234567890",
email=unique_email,
)
get_response = await _get_profile(http_client, token)
get_body = get_response.json()
assert get_body["display_name"] == payload["display_name"]
assert get_body["first_name"] == payload["first_name"]
assert get_body["last_name"] == payload["last_name"]
assert get_body["avatar_url"] == payload["avatar_url"]
assert get_body["phone"] == payload["phone"]
get_response = await test_api_gateway.get_profile(access_token)
assert is_success_response(get_response)
assert get_response.json() == IsPartialDict(
display_name="Alice",
first_name="Alice",
last_name="Smith",
avatar_url="https://example.com/avatar.png",
phone="+1234567890",
email=unique_email,
)
@inject
async def test_profile_routes_require_authentication(
http_client: FromDishka[AsyncClient],
test_api_gateway: FromDishka[TestApiGateway],
database_clearer: FromDishka[DatabaseClearer],
) -> None:
await database_clearer.clear()
response_get = await http_client.get("/profile")
response_patch = await http_client.patch("/profile", json={})
response_get = await test_api_gateway.get_profile(None)
assert is_forbidden_response(response_get)
assert response_get.status_code == status.FORBIDDEN
assert response_patch.status_code == status.FORBIDDEN
response_patch = await test_api_gateway.patch_profile()
assert is_forbidden_response(response_patch)
+28
View File
@@ -0,0 +1,28 @@
from http import HTTPStatus
from uuid import uuid4
from httpx import Response
def is_success_response(response: Response) -> bool:
return response.status_code == HTTPStatus.OK
def is_unauthorized_response(response: Response) -> bool:
return response.status_code == HTTPStatus.UNAUTHORIZED
def is_not_found_response(response: Response) -> bool:
return response.status_code == HTTPStatus.NOT_FOUND
def is_forbidden_response(response: Response) -> bool:
return response.status_code == HTTPStatus.FORBIDDEN
def is_conflict_response(response: Response) -> bool:
return response.status_code == HTTPStatus.CONFLICT
def get_unique_email() -> str:
return f"user-{uuid4().hex}@example.com"
+3
View File
@@ -35,6 +35,7 @@ from template_project.web_api.ioc.interactor import InteractorProvider
from template_project.web_api.ioc.notifications import NotificationServiceProvider
from template_project.web_api.ioc.oauth import OAuthClientProvider
from template_project.web_api.ioc.storage import StorageProvider
from tests.web_api.test_api_gateway import TestApiGateway
class DatabaseClearer:
@@ -66,6 +67,7 @@ class DatabaseClearer:
class TestProvider(Provider):
scope: BaseScope | None = Scope.REQUEST
api_gateway = provide(TestApiGateway)
database_clearer = provide(DatabaseClearer)
@provide
@@ -85,6 +87,7 @@ def make_ioc(configuration: Configuration) -> AsyncContainer:
OAuthClientProvider(),
NotificationServiceProvider(),
StorageProvider(),
TestProvider(),
validation_settings=STRICT_VALIDATION,
context={
ServerConfiguration: configuration.server,
+53
View File
@@ -0,0 +1,53 @@
from httpx import AsyncClient, Response
class TestApiGateway:
def __init__(self, client: AsyncClient) -> None:
self._client = client
async def sign_up_email(self, email: str, password: str) -> Response:
return await self._client.post(
"/auth/sign_up/email",
json={"email": email, "password": password},
)
async def sign_in_email(self, email: str, password: str) -> Response:
return await self._client.post(
"/auth/sign_in/email",
json={"email": email, "password": password},
)
async def get_profile(self, access_token: str | None) -> Response:
headers = {} if access_token is None else {"Authorization": f"Bearer {access_token}"}
return await self._client.get(
"/profile",
headers=headers,
)
async def patch_profile(
self,
access_token: str | None = None,
display_name: str | None = None,
first_name: str | None = None,
last_name: str | None = None,
avatar_url: str | None = None,
phone: str | None = None,
) -> Response:
headers = {} if access_token is None else {"Authorization": f"Bearer {access_token}"}
return await self._client.patch(
"/profile",
json={
key: value
for key, value in {
"display_name": display_name,
"first_name": first_name,
"last_name": last_name,
"avatar_url": avatar_url,
"phone": phone,
}.items()
if value is not None
},
headers=headers,
)
Generated
+13
View File
@@ -614,6 +614,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/cb/2da4cc83f5edb9c3257d09e1e7ab7b23f049c7962cae8d842bbef0a9cec9/cryptography-46.0.3-cp38-abi3-win_arm64.whl", hash = "sha256:d89c3468de4cdc4f08a57e214384d0471911a3830fcdaf7a8cc587e42a866372", size = 2918740, upload-time = "2025-10-15T23:18:12.277Z" },
]
[[package]]
name = "dirty-equals"
version = "0.11"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/30/1d/c5913ac9d6615515a00f4bdc71356d302437cb74ff2e9aaccd3c14493b78/dirty_equals-0.11.tar.gz", hash = "sha256:f4ac74ee88f2d11e2fa0f65eb30ee4f07105c5f86f4dc92b09eb1138775027c3", size = 128067, upload-time = "2025-11-17T01:51:24.451Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/bb/8d/dbff05239043271dbeace563a7686212a3dd517864a35623fe4d4a64ca19/dirty_equals-0.11-py3-none-any.whl", hash = "sha256:b1d7093273fc2f9be12f443a8ead954ef6daaf6746fd42ef3a5616433ee85286", size = 28051, upload-time = "2025-11-17T01:51:22.849Z" },
]
[[package]]
name = "dishka"
version = "1.7.2"
@@ -1992,6 +2001,7 @@ dev = [
{ name = "bandit" },
{ name = "codespell" },
{ name = "coverage" },
{ name = "dirty-equals" },
{ name = "mypy" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
@@ -2010,6 +2020,7 @@ migrations = [
]
tests = [
{ name = "coverage" },
{ name = "dirty-equals" },
{ name = "pytest" },
{ name = "pytest-asyncio" },
]
@@ -2042,6 +2053,7 @@ dev = [
{ name = "bandit", specifier = "==1.8.6" },
{ name = "codespell", specifier = "==2.4.1" },
{ name = "coverage", specifier = "==7.11.0" },
{ name = "dirty-equals", specifier = ">=0.11" },
{ name = "mypy", specifier = "==1.18.1" },
{ name = "pytest", specifier = "==8.4.0" },
{ name = "pytest-asyncio", specifier = "==1.2.0" },
@@ -2058,6 +2070,7 @@ linters = [
migrations = [{ name = "alembic", specifier = "==1.17.0" }]
tests = [
{ name = "coverage", specifier = "==7.11.0" },
{ name = "dirty-equals", specifier = ">=0.11" },
{ name = "pytest", specifier = "==8.4.0" },
{ name = "pytest-asyncio", specifier = "==1.2.0" },
]