You've already forked RekomenciBackend
108 lines
2.9 KiB
Python
108 lines
2.9 KiB
Python
from http import HTTPStatus as status
|
|
from uuid import uuid4
|
|
|
|
from dishka import FromDishka
|
|
from httpx import AsyncClient, Response
|
|
|
|
from tests.web_api.ioc import DatabaseClearer, inject
|
|
|
|
DEFAULT_PASSWORD = "Sup3rSecret" # noqa: S105
|
|
|
|
|
|
async def _sign_up_email(client: AsyncClient, email: str, password: str = DEFAULT_PASSWORD) -> None:
|
|
response = await client.post(
|
|
"/auth/sign_up/email",
|
|
json={"email": email, "password": password},
|
|
)
|
|
assert response.status_code == status.OK, response.text
|
|
|
|
|
|
async def _sign_in_email(client: AsyncClient, email: str, password: str = DEFAULT_PASSWORD) -> Response:
|
|
return await client.post(
|
|
"/auth/sign_in/email",
|
|
json={"email": email, "password": password},
|
|
)
|
|
|
|
|
|
def _unique_email() -> str:
|
|
return f"user-{uuid4().hex}@example.com"
|
|
|
|
|
|
@inject
|
|
async def test_email_sign_up_creates_user(
|
|
http_client: FromDishka[AsyncClient],
|
|
database_clearer: FromDishka[DatabaseClearer],
|
|
) -> None:
|
|
await database_clearer.clear()
|
|
email = _unique_email()
|
|
|
|
response = await http_client.post(
|
|
"/auth/sign_up/email",
|
|
json={"email": email, "password": DEFAULT_PASSWORD},
|
|
)
|
|
body = response.json()
|
|
|
|
assert response.status_code == status.OK
|
|
assert isinstance(body["access_token"], str)
|
|
assert body["access_token"]
|
|
|
|
|
|
@inject
|
|
async def test_email_sign_up_existing_user_conflict(
|
|
http_client: FromDishka[AsyncClient],
|
|
database_clearer: FromDishka[DatabaseClearer],
|
|
) -> None:
|
|
await database_clearer.clear()
|
|
email = _unique_email()
|
|
await _sign_up_email(http_client, email)
|
|
|
|
response = await http_client.post(
|
|
"/auth/sign_up/email",
|
|
json={"email": email, "password": DEFAULT_PASSWORD},
|
|
)
|
|
|
|
assert response.status_code == status.CONFLICT
|
|
|
|
|
|
@inject
|
|
async def test_email_sign_in_returns_token(
|
|
http_client: FromDishka[AsyncClient],
|
|
database_clearer: FromDishka[DatabaseClearer],
|
|
) -> None:
|
|
await database_clearer.clear()
|
|
email = _unique_email()
|
|
await _sign_up_email(http_client, email)
|
|
|
|
response = await _sign_in_email(http_client, email)
|
|
body = response.json()
|
|
|
|
assert response.status_code == status.OK
|
|
assert isinstance(body["access_token"], str)
|
|
assert body["access_token"]
|
|
|
|
|
|
@inject
|
|
async def test_email_sign_in_invalid_password(
|
|
http_client: FromDishka[AsyncClient],
|
|
database_clearer: FromDishka[DatabaseClearer],
|
|
) -> None:
|
|
await database_clearer.clear()
|
|
email = _unique_email()
|
|
await _sign_up_email(http_client, email)
|
|
|
|
response = await _sign_in_email(http_client, email, password="wrong-password")
|
|
|
|
assert response.status_code == status.UNAUTHORIZED
|
|
|
|
|
|
@inject
|
|
async def test_email_sign_in_user_not_found(
|
|
http_client: FromDishka[AsyncClient],
|
|
database_clearer: FromDishka[DatabaseClearer],
|
|
) -> None:
|
|
await database_clearer.clear()
|
|
|
|
response = await _sign_in_email(http_client, email=_unique_email())
|
|
|
|
assert response.status_code == status.NOT_FOUND
|