chore: restructured project

This commit is contained in:
ITQ
2025-03-07 19:32:09 +03:00
parent bfb7ad901a
commit 0a35951c62
178 changed files with 304 additions and 376 deletions
+185
View File
@@ -0,0 +1,185 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
# PyPI configuration file
.pypirc
# Ruff files
.ruff_cache
# Docker files
Dockerfile
Dockerfile.staticfiles
.dockerignore
# Git files
.git
.gitignore
# Template env file
.env.template
# Collected static files
static
+33
View File
@@ -0,0 +1,33 @@
# Change all vars before going to production and remove all comments (!)
# Below all environment variables and default values
DJANGO_SECRET_KEY=very_insecure_key
DJANGO_DEBUG=False
DJANGO_ALLOWED_HOSTS=localhost,127.0.0.1
DJANGO_CSRF_TRUSTED_ORIGINS=http://localhost,http://127.0.0.1
DJANGO_CORS_ALLOWED_ORIGINS=*
DJANGO_INTERNAL_IPS=127.0.0.1
DJANGO_LANGUAGE_CODE=en-us
DJANGO_STATIC_URL=static/
REDIS_URI=redis://localhost:6379
DJANGO_DB_URI=sqlite:///db.sqlite3
YANDEX_CLOUD_FOLDER_ID=
YANDEX_CLOUD_API_KEY=
# Storages
MINIO_ENDPOINT=
MINIO_CUSTOM_ENDPOINT_URL=
MINIO_ACCESS_KEY=
MINIO_SECRET_KEY=
MINIO_USE_HTTPS=False
MINIO_MEDIA_BUCKET_NAME=adnova-media
# Applyable if you installing using docker compose
DJANGO_CREATE_SUPERUSER=False
DJANGO_SUPERUSER_USERNAME=
DJANGO_SUPERUSER_EMAIL=
DJANGO_SUPERUSER_PASSWORD=
+173
View File
@@ -0,0 +1,173 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
.python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
.pdm-python
.pdm-build/
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
# PyPI configuration file
.pypirc
# Ruff files
.ruff_cache
# Collected static files
static
+42
View File
@@ -0,0 +1,42 @@
# Stage 1: Install dependencies
FROM docker.io/python:3.11-alpine3.20 AS builder
COPY --from=ghcr.io/astral-sh/uv:0.4.30 /uv /uvx /bin/
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONOPTIMIZE=2 \
UV_COMPILE_BYTECODE=1 \
UV_PROJECT_ENVIRONMENT=/opt/venv
COPY pyproject.toml .
RUN uv sync --no-dev --no-install-project --no-cache
# Stage 2: Start the application
FROM docker.io/python:3.11-alpine3.20
WORKDIR /app
COPY --from=builder /opt/venv /opt/venv
COPY . .
RUN adduser -D -g '' app && chown -R app:app ./
USER app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONOPTIMIZE=2 \
PATH="/opt/venv/bin:$PATH"
EXPOSE 8080
HEALTHCHECK --interval=30s --timeout=5s --start-period=5s --start-interval=2s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://127.0.0.1:8080/health?format=json || exit 1
CMD gunicorn config.wsgi --workers=8 -b 0.0.0.0:8080 --access-logfile - --error-logfile -
+27
View File
@@ -0,0 +1,27 @@
# Stage 1: Install dependencies and compile staticfiles
FROM docker.io/python:3.11-alpine3.20 AS builder
COPY --from=ghcr.io/astral-sh/uv:0.4.30 /uv /uvx /bin/
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PYTHONOPTIMIZE=2 \
UV_COMPILE_BYTECODE=1 \
UV_PROJECT_ENVIRONMENT=/opt/venv
COPY pyproject.toml .
RUN uv sync --no-dev --no-install-project --no-cache
COPY . .
RUN uv run python manage.py collectstatic --noinput
# Stage 2: Start nginx and serve staticfiles
FROM docker.io/nginx:latest
COPY --from=builder /app/static /usr/share/nginx/html
CMD ["nginx", "-g", "daemon off;"]
+147
View File
@@ -0,0 +1,147 @@
# AdNova Backend
## Prerequisites
Ensure you have the following installed on your system:
- [Python](https://www.python.org/) (>=3.10,<3.12)
- [uv](https://docs.astral.sh/uv/)
- [Docker](https://www.docker.com/) (for containerized setup)
## Basic setup
### Installation
#### Clone the project
```bash
git clone https://gitlab.prodcontest.ru/2025-final-projects-back/devitq.git
```
#### Go to the project directory
```bash
cd devitq/solution/services/backend
```
#### Customize environment
```bash
cp .env.template .env
```
And setup env vars according to your needs.
#### Install dependencies
##### For dev environment
```bash
uv sync --all-extras
```
##### For prod environment
```bash
uv sync --no-dev
```
#### Running
##### Apply migrations
```bash
uv run python manage.py migrate
```
##### Start celery worker
```bash
celery -A config worker -l INFO
```
##### Start server
In dev mode:
```bash
uv run python manage.py runserver
```
In prod mode:
```bash
uv run gunicorn config.wsgi
```
## Containerized setup
### Clone the project
```bash
git clone https://gitlab.prodcontest.ru/2025-final-projects-back/devitq.git
```
### Go to the project directory
```bash
cd devitq/solution/services/backend
```
### Build docker image
```bash
docker build -t adnova-backend .
```
### Customize environment
Customize environment with `docker run` command (or bind .env file to container), for all environment vars and default values see [.env.template](./.env.template).
### Run docker image
#### Backend
```bash
docker run -p 8080:8080 --name adnova-backend adnova-backend
```
#### Celery worker
```bash
docker run --name adnova-celery-worker adnova-backend celery -A config worker -l INFO
```
Backend will be available on [127.0.0.1:8080](http://127.0.0.1:8080).
## Testing
### Clone the project
```bash
git clone https://gitlab.prodcontest.ru/2025-final-projects-back/devitq.git
```
### Go to the project directory
```bash
cd devitq/solution/services/backend
```
### Install dependencies
```bash
uv sync --all-extras
```
### Run tests
```bash
uv run coverage run --source="." manage.py test
```
### Check coverage
```bash
uv run coverage report
```
View File
+10
View File
@@ -0,0 +1,10 @@
from django.urls import path
from health_check.views import MainView
from api.v1.router import router as api_v1_router
urlpatterns = [
path("", api_v1_router.urls),
# Health endpoint
path("health", MainView.as_view(), name="health_check_home"),
]
View File
+23
View File
@@ -0,0 +1,23 @@
from typing import ClassVar
from uuid import UUID
from ninja import ModelSchema, Schema
from apps.campaign.models import Campaign
class Advertisment(ModelSchema):
advertiser_id: UUID
ad_id: UUID
class Meta:
model = Campaign
fields: ClassVar[tuple[str]] = (
Campaign.ad_title.field.name,
Campaign.ad_text.field.name,
Campaign.ad_image.field.name,
)
class ClickIn(Schema):
client_id: UUID
+58
View File
@@ -0,0 +1,58 @@
from http import HTTPStatus as status
from uuid import UUID
from django.http import Http404, HttpRequest
from django.shortcuts import get_object_or_404
from ninja import Router
from api.v1 import schemas as global_schemas
from api.v1.ads import schemas
from apps.campaign.models import Campaign
from apps.client.models import Client
router = Router(tags=["ads"])
@router.get(
"",
response={
status.OK: schemas.Advertisment,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_advertisment(
request: HttpRequest, client_id: UUID
) -> tuple[status, Campaign]:
client = get_object_or_404(Client, id=client_id)
campaign = Campaign.suggest(client)
if not campaign:
raise Http404
campaign.view(client)
return status.OK, campaign
@router.post(
"/{advertisment_id}/click",
response={
status.NO_CONTENT: None,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.FORBIDDEN: global_schemas.ForbiddenError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def click_on_advertisment(
request: HttpRequest, advertisment_id: UUID, client: schemas.ClickIn
) -> tuple[status, None]:
campaign_instance: Campaign = get_object_or_404(
Campaign, id=advertisment_id
)
client_instance: Client = get_object_or_404(Client, id=client.client_id)
campaign_instance.click(client_instance)
return status.NO_CONTENT, None
@@ -0,0 +1,21 @@
from typing import ClassVar
from uuid import UUID
from ninja import ModelSchema
from apps.advertiser.models import Advertiser as AdvertiserModel
from apps.mlscore.models import Mlscore as MlscoreModel
class Advertiser(ModelSchema):
advertiser_id: UUID
class Meta:
model = AdvertiserModel
exclude: ClassVar[tuple[str]] = (AdvertiserModel.id.field.name,)
class Mlscore(ModelSchema):
class Meta:
model = MlscoreModel
exclude: ClassVar[tuple[str]] = (MlscoreModel.id.field.name,)
@@ -0,0 +1,215 @@
import json
import uuid
from http import HTTPStatus as status
from django.test import TestCase, Client, override_settings
from apps.advertiser.models import Advertiser
from apps.client.models import Client as ClientModel
from apps.mlscore.models import Mlscore
class TestMlscoreEndpoint(TestCase):
def setUp(self):
self.client = Client()
self.advertiser = Advertiser.objects.create(name="Test Advertiser")
self.client_obj = ClientModel.objects.create(
login="test_client",
age=14,
location="test_location",
gender=ClientModel.GenderChoices.FEMALE,
)
self.url = "/ml-scores"
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_create_mlscore_success(self):
data = {
"advertiser_id": str(self.advertiser.id),
"client_id": str(self.client_obj.id),
"score": 90,
}
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
mlscore = Mlscore.objects.first()
self.assertEqual(response.status_code, status.OK)
self.assertEqual(mlscore.score, 90)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_update_mlscore_success(self):
mlscore = Mlscore.objects.create(
advertiser=self.advertiser,
client=self.client_obj,
score=80,
)
data = {
"advertiser_id": str(self.advertiser.id),
"client_id": str(self.client_obj.id),
"score": 85,
}
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
mlscore.refresh_from_db()
self.assertEqual(response.status_code, status.OK)
self.assertEqual(mlscore.score, 85)
def test_missing_required_field(self):
data = {"score": 90}
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_invalid_uuid_format(self):
data = {
"advertiser_id": "invalid-uuid",
"client_id": str(self.client_obj.id),
"score": 90,
}
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_non_existing_client(self):
data = {
"advertiser_id": str(self.advertiser.id),
"client_id": str(uuid.uuid4()),
"score": 90,
}
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_non_existing_advertiser(self):
data = {
"advertiser_id": str(uuid.uuid4()),
"client_id": str(self.client_obj.id),
"score": 90,
}
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
class TestBulkAdvertisersEndpoint(TestCase):
def setUp(self):
self.client = Client()
self.url = "/advertisers/bulk"
self.advertiser = Advertiser.objects.create(name="Advertiser 1")
def test_bulk_create_success(self):
uuid1 = self.advertiser.id
uuid2 = uuid.uuid4()
data = [
{"advertiser_id": str(uuid1), "name": "Advertiser 4"},
{"advertiser_id": str(uuid2), "name": "Advertiser 1"},
{"advertiser_id": str(uuid2), "name": "Advertiser 5"},
{"advertiser_id": str(uuid2), "name": "Advertiser 2"},
{"advertiser_id": str(uuid1), "name": "Advertiser 2"},
]
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
self.advertiser.refresh_from_db()
self.assertEqual(response.status_code, status.CREATED)
self.assertEqual(self.advertiser.name, "Advertiser 2")
self.assertEqual(Advertiser.objects.count(), 2)
def test_bulk_update_success(self):
advertiser = Advertiser.objects.create(name="Old Name")
data = [{"advertiser_id": str(advertiser.id), "name": "New Name"}]
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
advertiser.refresh_from_db()
self.assertEqual(response.status_code, status.CREATED)
self.assertEqual(advertiser.name, "New Name")
def test_duplicate_advertiser_ids(self):
adv_id = uuid.uuid4()
data = [
{"advertiser_id": str(adv_id), "name": "First"},
{"advertiser_id": str(adv_id), "name": "Last"},
]
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
advertiser = Advertiser.objects.get(id=adv_id)
self.assertEqual(response.status_code, status.CREATED)
self.assertEqual(advertiser.name, "Last")
def test_invalid_advertiser_id_format(self):
data = [{"advertiser_id": "invalid", "name": "Invalid"}]
response = self.client.post(
self.url, json.dumps(data), content_type="application/json"
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_empty_bulk_request(self):
response = self.client.post(
self.url, json.dumps([]), content_type="application/json"
)
self.assertEqual(response.status_code, status.CREATED)
self.assertEqual(len(response.json()), 0)
class TestGetAdvertiserEndpoint(TestCase):
def setUp(self):
self.client = Client()
self.advertiser = Advertiser.objects.create(name="Test Advertiser")
self.url = "/advertisers"
self.valid_url = f"{self.url}/{self.advertiser.id}"
def test_get_advertiser_success(self):
response = self.client.get(self.valid_url)
self.assertEqual(response.status_code, status.OK)
self.assertEqual(
response.json()["advertiser_id"], str(self.advertiser.id)
)
self.assertEqual(response.json()["name"], self.advertiser.name)
def test_non_existent_advertiser(self):
non_existent_url = f"{self.url}/{uuid.uuid4()}"
response = self.client.get(non_existent_url)
self.assertEqual(response.status_code, status.NOT_FOUND)
def test_invalid_uuid_format(self):
response = self.client.get(f"{self.url}/invalid-uuid")
self.assertEqual(response.status_code, status.BAD_REQUEST)
@@ -0,0 +1,87 @@
from collections import defaultdict
from http import HTTPStatus as status
from uuid import UUID
from django.db import transaction
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from ninja import Router
from api.v1 import schemas as global_schemas
from api.v1.advertisers import schemas
from apps.advertiser.models import Advertiser
from apps.mlscore.models import Mlscore
router = Router(tags=["advertisers"])
@router.post(
"/ml-scores",
response={
status.OK: schemas.Mlscore,
status.BAD_REQUEST: global_schemas.BadRequestError,
},
)
def create_or_update_mlscore(
request: HttpRequest, mlscore: schemas.Mlscore
) -> tuple[status, schemas.Mlscore]:
mlscore_instance, _ = Mlscore.objects.update_or_create(
advertiser_id=mlscore.advertiser,
client_id=mlscore.client,
defaults={"score": mlscore.score},
)
return status.OK, mlscore_instance
@router.post(
"/advertisers/bulk",
response={
status.CREATED: list[schemas.Advertiser],
status.BAD_REQUEST: global_schemas.BadRequestError,
},
)
def bulk_create_or_update(
request: HttpRequest, data: list[schemas.Advertiser]
) -> tuple[status, list[Advertiser]]:
latest_advertisers: dict[UUID, schemas.Advertiser] = defaultdict(
lambda: None
)
for item in reversed(data):
if latest_advertisers[item.advertiser_id] is None:
Advertiser(
id=item.advertiser_id, **item.dict(exclude={"client_id"})
).validate(
validate_unique=False,
validate_constraints=False,
)
latest_advertisers[item.advertiser_id] = item
unique_advertisers = reversed(list(latest_advertisers.values()))
result = []
with transaction.atomic():
for advertiser in unique_advertisers:
advertiser_instance, _ = Advertiser.objects.update_or_create(
id=advertiser.advertiser_id,
defaults={**dict(advertiser)},
)
result.append(advertiser_instance)
return status.CREATED, result
@router.get(
"/advertisers/{advertiser_id}",
response={
status.OK: schemas.Advertiser,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_advertiser(
request: HttpRequest, advertiser_id: UUID
) -> tuple[status, Advertiser]:
return status.OK, get_object_or_404(Advertiser, id=advertiser_id)
@@ -0,0 +1,109 @@
from typing import Any, ClassVar
from uuid import UUID
from ninja import ModelSchema, Schema
from pydantic import field_validator
from pydantic.types import NonNegativeInt, PositiveInt
from apps.campaign.models import Campaign
class CampaignTargeting(ModelSchema):
class Meta:
model = Campaign
fields: ClassVar[tuple[str]] = (
Campaign.gender.field.name,
Campaign.age_from.field.name,
Campaign.age_to.field.name,
Campaign.location.field.name,
)
fields_optional = "__all__"
class CampaignOut(ModelSchema):
campaign_id: UUID
advertiser_id: UUID
targeting: CampaignTargeting = None
class Meta:
model = Campaign
fields: ClassVar[tuple[str]] = (
Campaign.ad_title.field.name,
Campaign.ad_text.field.name,
Campaign.ad_image.field.name,
Campaign.impressions_limit.field.name,
Campaign.clicks_limit.field.name,
Campaign.cost_per_impression.field.name,
Campaign.cost_per_click.field.name,
Campaign.start_date.field.name,
Campaign.end_date.field.name,
)
class CampaignCreateIn(ModelSchema):
targeting: CampaignTargeting = None
class Meta:
model = Campaign
fields: ClassVar[tuple[str]] = (
Campaign.ad_title.field.name,
Campaign.ad_text.field.name,
Campaign.impressions_limit.field.name,
Campaign.clicks_limit.field.name,
Campaign.cost_per_impression.field.name,
Campaign.cost_per_click.field.name,
Campaign.start_date.field.name,
Campaign.end_date.field.name,
)
@field_validator("targeting", mode="before")
@classmethod
def validate_targeting(cls, value: Any) -> Any:
if (
not isinstance(value, dict)
and not isinstance(
value,
CampaignTargeting,
)
and value
):
err = "The 'targeting' field must be a valid object or null."
raise ValueError(err)
return value
class CampaignUpdateIn(ModelSchema):
targeting: CampaignTargeting = None
class Meta:
model = Campaign
fields: ClassVar[tuple[str]] = (
Campaign.impressions_limit.field.name,
Campaign.clicks_limit.field.name,
Campaign.ad_title.field.name,
Campaign.ad_text.field.name,
Campaign.cost_per_impression.field.name,
Campaign.cost_per_click.field.name,
Campaign.start_date.field.name,
Campaign.end_date.field.name,
)
@field_validator("targeting", mode="before")
@classmethod
def validate_targeting(cls, value: Any) -> Any:
if (
not isinstance(value, dict)
and not isinstance(
value,
CampaignTargeting,
)
and value
):
err = "The 'targeting' field must be a valid object or null."
raise ValueError(err)
return value
class CampaignListFilters(Schema):
page: PositiveInt = 1
size: NonNegativeInt = 100
@@ -0,0 +1,7 @@
from api.v1.campaigns import schemas
from apps.campaign.models import Campaign
def normalize_campaign(campaign: Campaign) -> schemas.CampaignOut:
campaign.targeting = schemas.CampaignTargeting.from_orm(campaign)
return schemas.CampaignOut.from_orm(campaign)
+214
View File
@@ -0,0 +1,214 @@
from http import HTTPStatus as status
from uuid import UUID
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from ninja import File, Query, Router
from ninja.errors import HttpError
from ninja.files import UploadedFile
from PIL import Image
from api.v1 import schemas as global_schemas
from api.v1.campaigns import schemas, utils
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign
from config.errors import ForbiddenError
router = Router(tags=["campaigns"])
@router.post(
"/{advertiser_id}/campaigns",
response={
status.CREATED: schemas.CampaignOut,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def create_campaign(
request: HttpRequest, advertiser_id: UUID, data: schemas.CampaignCreateIn
) -> tuple[status, schemas.CampaignOut]:
advertiser = get_object_or_404(Advertiser, id=advertiser_id)
campaign = Campaign.objects.create(
advertiser_id=advertiser.id,
**data.dict(exclude={"targeting"}),
**data.targeting.dict() if data.targeting else {},
)
return status.CREATED, utils.normalize_campaign(campaign)
@router.get(
"/{advertiser_id}/campaigns",
response={
status.OK: list[schemas.CampaignOut],
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def list_campaigns(
request: HttpRequest,
advertiser_id: UUID,
filters: Query[schemas.CampaignListFilters],
) -> tuple[status, list[schemas.CampaignOut]]:
advertaiser = get_object_or_404(Advertiser, id=advertiser_id)
campaigns = Campaign.objects.filter(advertiser=advertaiser).order_by(
"-end_date"
)
paginated_campaigns = campaigns[
(filters.page - 1) * filters.size : filters.page * filters.size
]
return status.OK, [
utils.normalize_campaign(campaign) for campaign in paginated_campaigns
]
@router.get(
"/{advertiser_id}/campaigns/{campaign_id}",
response={
status.OK: schemas.CampaignOut,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_campaign(
request: HttpRequest, advertiser_id: UUID, campaign_id: UUID
) -> tuple[status, schemas.CampaignOut]:
return status.OK, utils.normalize_campaign(
get_object_or_404(
Campaign,
id=campaign_id,
advertiser_id=advertiser_id,
)
)
@router.put(
"/{advertiser_id}/campaigns/{campaign_id}",
response={
status.OK: schemas.CampaignOut,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.FORBIDDEN: global_schemas.ForbiddenError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def update_campaign(
request: HttpRequest,
advertiser_id: UUID,
campaign_id: UUID,
data: schemas.CampaignUpdateIn,
) -> tuple[status, schemas.CampaignOut]:
campaign = get_object_or_404(
Campaign,
id=campaign_id,
advertiser_id=advertiser_id,
)
for attr, value in data.dict().items():
if attr == "targeting":
for t_attr, t_value in value.items():
setattr(campaign, t_attr, t_value)
elif not (
attr in Campaign.READONLY_AFTER_START_FIELDS
and campaign.started
and getattr(campaign, attr) != value
):
setattr(campaign, attr, value)
else:
raise ForbiddenError
campaign.save()
return status.OK, utils.normalize_campaign(campaign)
@router.delete(
"/{advertiser_id}/campaigns/{campaign_id}",
response={
status.NO_CONTENT: None,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def delete_campaign(
request: HttpRequest, advertiser_id: UUID, campaign_id: UUID
) -> tuple[status, None]:
campaign = get_object_or_404(
Campaign,
id=campaign_id,
advertiser_id=advertiser_id,
)
if campaign.ad_image:
campaign.ad_image.delete()
campaign.delete()
return status.NO_CONTENT, None
@router.post(
"/{advertiser_id}/campaigns/{campaign_id}/ad_image",
response={
status.OK: schemas.CampaignOut,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
description=(
"Uploads image to ad_image field. "
"If image already exists then image will be overridden."
),
)
def upload_ad_image(
request: HttpRequest,
advertiser_id: UUID,
campaign_id: UUID,
ad_image: UploadedFile = File(...), # noqa: B008
) -> tuple[status, Campaign]:
campaign = get_object_or_404(
Campaign,
id=campaign_id,
advertiser_id=advertiser_id,
)
if ad_image.size >= 10 * 1024 * 1024:
raise HttpError(status.BAD_REQUEST, "File can't be bigger than 10MB.")
try:
Image.open(ad_image).verify()
except (OSError, SyntaxError):
raise HttpError(
status.BAD_REQUEST, "File must be a valid image."
) from None
if campaign.ad_image:
campaign.ad_image.delete(save=True)
campaign.ad_image = ad_image
campaign.save()
return status.OK, campaign
@router.delete(
"/{advertiser_id}/campaigns/{campaign_id}/ad_image",
response={
status.NO_CONTENT: None,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
description=(
"Deletes image from ad_image field. "
"If no image exists still returns 204."
),
)
def delete_ad_image(
request: HttpRequest, advertiser_id: UUID, campaign_id: UUID
) -> tuple[status, None]:
campaign = get_object_or_404(
Campaign,
id=campaign_id,
advertiser_id=advertiser_id,
)
if campaign.ad_image:
campaign.ad_image.delete(save=True)
return status.NO_CONTENT, None
@@ -0,0 +1,14 @@
from typing import ClassVar
from uuid import UUID
from ninja import ModelSchema
from apps.client.models import Client as ClientModel
class Client(ModelSchema):
client_id: UUID
class Meta:
model = ClientModel
exclude: ClassVar[tuple[str]] = (ClientModel.id.field.name,)
+155
View File
@@ -0,0 +1,155 @@
from http import HTTPStatus as status
from django.test import TestCase
import json
from uuid import uuid4
from apps.client.models import Client
class ClientTests(TestCase):
def setUp(self):
self.client_1 = Client.objects.create(
login="testuser1", age=25, location="City1", gender="MALE"
)
self.client_2 = Client.objects.create(
login="testuser2", age=30, location="City2", gender="FEMALE"
)
self.bulk_url = "/clients/bulk"
self.get_url = "/clients"
def test_bulk_create_or_update(self):
client_3_id = str(uuid4())
client_data = [
{
"client_id": client_3_id,
"login": "newusers",
"age": 21,
"location": "City1",
"gender": "FEMALE",
},
{
"client_id": str(self.client_1.id),
"login": "updateduser",
"age": 26,
"location": "City1",
"gender": "MALE",
},
{
"client_id": client_3_id,
"login": "newusersa",
"age": 25,
"location": "City1",
"gender": "FEMALE",
},
{
"client_id": client_3_id,
"login": "newuser",
"age": 22,
"location": "City3",
"gender": "MALE",
},
]
response = self.client.post(
self.bulk_url,
data=json.dumps(client_data),
content_type="application/json",
)
client_3 = Client.objects.get(id=client_3_id)
self.client_1.refresh_from_db()
self.assertEqual(response.status_code, status.CREATED)
self.assertEqual(Client.objects.count(), 3)
self.assertEqual(self.client_1.login, "updateduser")
self.assertEqual(client_3.location, "City3")
self.assertEqual(client_3.login, "newuser")
self.assertEqual(len(response.json()), 2)
self.assertEqual(response.json()[1]["client_id"], str(client_3.id))
self.assertEqual(response.json()[1]["login"], client_3.login)
def test_bulk_create_invalid_data(self):
client_data = [
{
"client_id": "invalid_uuid",
"login": "baduser",
"age": 150,
"location": "City4",
"gender": "UNKNOWN",
}
]
response = self.client.post(
self.bulk_url,
data=json.dumps(client_data),
content_type="application/json",
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_duplicate_advertiser_ids(self):
adv_id = uuid4()
data = [
{
"client_id": str(adv_id),
"login": "baduser",
"age": 10,
"location": "City4",
"gender": "FEMALE",
},
{
"client_id": str(adv_id),
"login": "Last",
"age": 14,
"location": "City4",
"gender": "MALE",
},
]
response = self.client.post(
self.bulk_url, json.dumps(data), content_type="application/json"
)
client = Client.objects.get(id=adv_id)
self.assertEqual(response.status_code, status.CREATED)
self.assertEqual(client.login, "Last")
self.assertEqual(client.age, 14)
self.assertEqual(client.gender, "MALE")
def test_invalid_client_id_format(self):
client_data = [
{
"client_id": "invalid_uuid",
"login": "baduser",
"age": 150,
"location": "City4",
"gender": "UNKNOWN",
}
]
response = self.client.post(
self.bulk_url,
json.dumps(client_data),
content_type="application/json",
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_empty_bulk_request(self):
response = self.client.post(
self.bulk_url, json.dumps([]), content_type="application/json"
)
self.assertEqual(response.status_code, status.CREATED)
self.assertEqual(len(response.json()), 0)
def test_get_client_success(self):
response = self.client.get(f"{self.get_url}/{self.client_1.id}")
self.assertEqual(response.status_code, status.OK)
self.assertEqual(response.json()["login"], self.client_1.login)
def test_get_client_not_found(self):
response = self.client.get(f"{self.get_url}/{uuid4()}")
self.assertEqual(response.status_code, status.NOT_FOUND)
def test_get_client_invalid_uuid(self):
response = self.client.get(f"{self.get_url}/invalid_uuid")
self.assertEqual(response.status_code, status.BAD_REQUEST)
+63
View File
@@ -0,0 +1,63 @@
from collections import defaultdict
from http import HTTPStatus as status
from uuid import UUID
from django.db import transaction
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from ninja import Router
from api.v1 import schemas as global_schemas
from api.v1.clients import schemas
from apps.client.models import Client
router = Router(tags=["clients"])
@router.post(
"/bulk",
response={
status.CREATED: list[schemas.Client],
status.BAD_REQUEST: global_schemas.BadRequestError,
},
)
def bulk_create_or_update(
request: HttpRequest, data: list[schemas.Client]
) -> tuple[status, list[Client]]:
latest_clients = defaultdict(lambda: None)
for item in reversed(data):
Client(id=item.client_id, **item.dict(exclude={"client_id"})).validate(
validate_unique=False,
validate_constraints=False,
)
if latest_clients[item.client_id] is None:
latest_clients[item.client_id] = item
unique_clients = reversed(list(latest_clients.values()))
result = []
with transaction.atomic():
for client in unique_clients:
client_instance, _ = Client.objects.update_or_create(
id=client.client_id,
defaults={**dict(client)},
)
result.append(client_instance)
return status.CREATED, result
@router.get(
"/{client_id}",
response={
status.OK: schemas.Client,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_client(
request: HttpRequest, client_id: UUID
) -> tuple[status, schemas.Client]:
return status.OK, get_object_or_404(Client, id=client_id)
@@ -0,0 +1,23 @@
from typing import Any, Literal
from uuid import UUID
from ninja import Schema
class GenerateAdTextIn(Schema):
advertiser_name: str
ad_title: str
class Promise(Schema):
task_id: UUID
status: Literal[
"PENDING",
"RECEIVED",
"STARTED",
"SUCCESS",
"FAILURE",
"RETRY",
"REVOKED",
]
result: Any
+56
View File
@@ -0,0 +1,56 @@
from http import HTTPStatus as status
from uuid import UUID
import celery.states
from celery.result import AsyncResult
from django.http import Http404, HttpRequest
from ninja import Router
from api.v1 import schemas as global_schemas
from api.v1.generate import schemas
from apps.campaign.tasks import generate_ad_text_task
router = Router(tags=["generate"])
@router.post(
"/ad_text",
response={
status.OK: schemas.Promise,
status.BAD_REQUEST: global_schemas.BadRequestError,
},
)
def generate_ad_text(
request: HttpRequest, prompt: schemas.GenerateAdTextIn
) -> tuple[status, schemas.Promise]:
task = generate_ad_text_task.delay(prompt.advertiser_name, prompt.ad_title)
task_result = AsyncResult(task.id)
return status.OK, schemas.Promise(
task_id=task.id,
status=task_result.status,
result=task_result.result,
)
@router.get(
"/ad_text/{task_id}/result",
response={
status.OK: schemas.Promise,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_generate_ad_text_result(
request: HttpRequest, task_id: UUID
) -> tuple[status, schemas.Promise]:
task_result = AsyncResult(str(task_id))
if task_result.status == celery.states.PENDING:
raise Http404
return status.OK, schemas.Promise(
task_id=task_result.task_id,
status=task_result.status,
result=task_result.result,
)
+121
View File
@@ -0,0 +1,121 @@
import logging
from collections.abc import Callable
from http import HTTPStatus as status
from typing import Any
import django.core.exceptions
import django.http
import ninja.errors
from django.http import HttpRequest, HttpResponse
from ninja import NinjaAPI
from config.errors import ConflictError, ForbiddenError
logger = logging.getLogger("django")
def handle_validation_error(
request: HttpRequest,
exc: ninja.errors.ValidationError,
router: NinjaAPI,
) -> HttpResponse:
return router.create_response(
request,
{"detail": exc.errors},
status=status.BAD_REQUEST,
)
def handle_django_validation_error(
request: HttpRequest,
exc: django.core.exceptions.ValidationError,
router: NinjaAPI,
) -> HttpResponse:
detail = list(exc)
if hasattr(exc, "error_dict"):
detail = dict(exc)
return router.create_response(
request,
{"detail": detail},
status=status.BAD_REQUEST,
)
def handle_authentication_error(
request: HttpRequest,
exc: ninja.errors.AuthenticationError,
router: NinjaAPI,
) -> HttpResponse:
return router.create_response(
request,
{"detail": status.UNAUTHORIZED.phrase},
status=status.UNAUTHORIZED,
)
def handle_forbidden_error(
request: HttpRequest,
exc: ForbiddenError,
router: NinjaAPI,
) -> HttpResponse:
return router.create_response(
request,
{"detail": exc.message},
status=status.FORBIDDEN,
)
def handle_not_found_error(
request: HttpRequest,
exc: Exception,
router: NinjaAPI,
) -> HttpResponse:
return router.create_response(
request,
{"detail": status.NOT_FOUND.phrase},
status=status.NOT_FOUND,
)
def handle_conflict_error(
request: HttpRequest,
exc: ConflictError,
router: NinjaAPI,
) -> HttpResponse:
detail = list(exc.validation_error)
if hasattr(exc, "error_dict"):
detail = dict(exc.validation_error)
return router.create_response(
request,
{"detail": detail},
status=status.CONFLICT,
)
def handle_unknown_exception(
request: HttpRequest,
exc: Exception,
router: NinjaAPI,
) -> HttpResponse:
logger.exception(exc)
return router.create_response(
request,
{"detail": status.INTERNAL_SERVER_ERROR.phrase},
status=status.INTERNAL_SERVER_ERROR,
)
exception_handlers: list[tuple[Any, Callable]] = [
(ninja.errors.ValidationError, handle_validation_error),
(django.core.exceptions.ValidationError, handle_django_validation_error),
(ninja.errors.AuthenticationError, handle_authentication_error),
(ForbiddenError, handle_forbidden_error),
(django.http.Http404, handle_not_found_error),
(ConflictError, handle_conflict_error),
(Exception, handle_unknown_exception),
]
+18
View File
@@ -0,0 +1,18 @@
from typing import ClassVar
from uuid import UUID
from ninja import ModelSchema, Schema
from apps.campaign.models import CampaignReport
class SubmitReportIn(ModelSchema):
client_id: UUID
class Meta:
model = CampaignReport
fields: ClassVar[tuple[str]] = (CampaignReport.message.field.name,)
class SubmitReportOut(Schema):
status: str = "ok"
+48
View File
@@ -0,0 +1,48 @@
from http import HTTPStatus as status
from uuid import UUID
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from ninja import Router
from api.v1 import schemas as global_schemas
from api.v1.report import schemas
from apps.campaign.models import Campaign, CampaignImpression, CampaignReport
from apps.campaign.tasks import moderate_campaign_task
from apps.client.models import Client
from config.errors import ForbiddenError
router = Router(tags=["report"])
@router.post(
"/{campaign_id}",
response={
status.OK: schemas.SubmitReportOut,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.FORBIDDEN: global_schemas.ForbiddenError,
status.NOT_FOUND: global_schemas.NotFoundError,
status.CONFLICT: global_schemas.ConflictError,
},
)
def submit_report(
request: HttpRequest, campaign_id: UUID, report: schemas.SubmitReportIn
) -> tuple[status, schemas.SubmitReportOut]:
campaign = get_object_or_404(Campaign, id=campaign_id)
client = get_object_or_404(Client, id=report.client_id)
try:
CampaignImpression.objects.get(campaign=campaign, client=client)
except CampaignImpression.DoesNotExist:
raise ForbiddenError from None
report_instance = CampaignReport.objects.create(
campaign=campaign,
client=client,
message=report.message,
)
moderate_campaign_task.delay(
report_instance.id, campaign.ad_title, campaign.ad_text
)
return status.OK, schemas.SubmitReportOut()
+58
View File
@@ -0,0 +1,58 @@
from functools import partial
from ninja import NinjaAPI
from api.v1 import handlers
from api.v1.ads.views import router as ads_router
from api.v1.advertisers.views import router as advertisers_router
from api.v1.campaigns.views import router as compaigns_router
from api.v1.clients.views import router as clients_router
from api.v1.generate.views import router as generate_router
from api.v1.report.views import router as report_router
from api.v1.stats.views import router as stats_router
from api.v1.time.views import router as time_router
router = NinjaAPI(
title="AdNova API",
version="1",
description="API docs for AdNova",
openapi_url="/docs/openapi.json",
)
router.add_router(
"clients",
clients_router,
)
router.add_router(
"",
advertisers_router,
)
router.add_router(
"advertisers",
compaigns_router,
)
router.add_router(
"ads",
ads_router,
)
router.add_router(
"stats",
stats_router,
)
router.add_router(
"generate",
generate_router,
)
router.add_router(
"report",
report_router,
)
router.add_router(
"time",
time_router,
)
for exception, handler in handlers.exception_handlers:
router.add_exception_handler(exception, partial(handler, router=router))
+24
View File
@@ -0,0 +1,24 @@
from http import HTTPStatus as status
from typing import Any
from ninja import Schema
class BadRequestError(Schema):
detail: Any
class UnauthorizedError(Schema):
detail: str = status.UNAUTHORIZED.phrase
class ForbiddenError(Schema):
detail: str = status.FORBIDDEN.phrase
class NotFoundError(Schema):
detail: str = status.NOT_FOUND.phrase
class ConflictError(Schema):
detail: Any
+20
View File
@@ -0,0 +1,20 @@
from ninja import Schema
class Stat(Schema):
impressions_count: int
clicks_count: int
conversion: float
spent_impressions: float
spent_clicks: float
spent_total: float
class DailyStat(Schema):
impressions_count: int
clicks_count: int
conversion: float
spent_impressions: float
spent_clicks: float
spent_total: float
date: int
+133
View File
@@ -0,0 +1,133 @@
import uuid
from django.test import TestCase, Client, override_settings
from http import HTTPStatus as status
from apps.campaign.models import Advertiser, Campaign
class AdvertiserCampaignTestCase(TestCase):
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUp(self):
self.client = Client()
self.advertiser = Advertiser.objects.create(name="Test Advertiser")
self.campaign = Campaign.objects.create(
advertiser=self.advertiser,
impressions_limit=0,
clicks_limit=0,
cost_per_impression=0,
cost_per_click=0,
ad_title="title",
ad_text="text",
start_date=0,
end_date=0,
)
self.campaigns_prefix = "/stats/campaigns"
self.advertisers_prefix = "/stats/advertisers"
def test_get_campaign_statistics_invalid_uuid(self):
response = self.client.get(f"{self.campaigns_prefix}/invalid-uuid")
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_get_campaign_statistics_campaign_not_found(self):
non_existent_campaign_id = uuid.uuid4()
response = self.client.get(
f"{self.campaigns_prefix}/{non_existent_campaign_id}"
)
self.assertEqual(response.status_code, status.NOT_FOUND)
def test_get_campaign_statistics_success(self):
response = self.client.get(
f"{self.campaigns_prefix}/{self.campaign.id}"
)
self.assertEqual(response.status_code, status.OK)
self.assertIsInstance(response.json(), dict)
def test_get_daily_campaign_statistics_invalid_uuid(self):
response = self.client.get(
f"{self.campaigns_prefix}/invalid-uuid/daily"
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_get_daily_campaign_statistics_campaign_not_found(self):
non_existent_campaign_id = uuid.uuid4()
response = self.client.get(
f"{self.campaigns_prefix}/{non_existent_campaign_id}/daily"
)
self.assertEqual(response.status_code, status.NOT_FOUND)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_daily_campaign_statistics_success(self):
response = self.client.get(
f"{self.campaigns_prefix}/{self.campaign.id}/daily"
)
self.assertEqual(response.status_code, status.OK)
self.assertIsInstance(response.json(), list)
def test_get_advertiser_statistics_invalid_uuid(self):
response = self.client.get(f"{self.advertisers_prefix}/invalid-uuid")
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_get_advertiser_statistics_not_found(self):
non_existent_advertiser_id = uuid.uuid4()
response = self.client.get(
f"{self.advertisers_prefix}/{non_existent_advertiser_id}"
)
self.assertEqual(response.status_code, status.NOT_FOUND)
def test_get_advertiser_statistics_success(self):
response = self.client.get(
f"{self.advertisers_prefix}/{self.advertiser.id}"
)
self.assertEqual(response.status_code, status.OK)
self.assertIsInstance(response.json(), dict)
def test_get_daily_advertiser_statistics_invalid_uuid(self):
response = self.client.get(
f"{self.advertisers_prefix}/invalid-uuid/campaigns/daily"
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
def test_get_daily_advertiser_statistics_advertiser_not_found(self):
non_existent_advertiser_id = uuid.uuid4()
response = self.client.get(
f"{self.advertisers_prefix}/{non_existent_advertiser_id}/campaigns/daily"
)
self.assertEqual(response.status_code, status.NOT_FOUND)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_daily_advertiser_statistics_success(self):
response = self.client.get(
f"{self.advertisers_prefix}/{self.advertiser.id}/campaigns/daily"
)
self.assertEqual(response.status_code, status.OK)
self.assertIsInstance(response.json(), list)
+77
View File
@@ -0,0 +1,77 @@
from http import HTTPStatus as status
from typing import Any
from uuid import UUID
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from ninja import Router
from api.v1 import schemas as global_schemas
from api.v1.stats import schemas
from apps.campaign.models import Advertiser, Campaign
router = Router(tags=["stats"])
@router.get(
"/campaigns/{campaign_id}",
response={
status.OK: schemas.Stat,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_campaign_statistics(
request: HttpRequest, campaign_id: UUID
) -> tuple[status, dict[str, Any]]:
campaign = get_object_or_404(Campaign, id=campaign_id)
return status.OK, campaign.get_statistics()
@router.get(
"/campaigns/{campaign_id}/daily",
response={
status.OK: list[schemas.DailyStat],
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_daily_campaign_statistics(
request: HttpRequest, campaign_id: UUID
) -> tuple[status, list[dict[str, Any]]]:
campaign = get_object_or_404(Campaign, id=campaign_id)
return status.OK, campaign.get_daily_statistics()
@router.get(
"/advertisers/{advertiser_id}",
response={
status.OK: schemas.Stat,
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_advertiser_statistics(
request: HttpRequest, advertiser_id: UUID
) -> tuple[status, dict[str, Any]]:
advertiser = get_object_or_404(Advertiser, id=advertiser_id)
return status.OK, advertiser.get_statistics()
@router.get(
"/advertisers/{advertiser_id}/campaigns/daily",
response={
status.OK: list[schemas.DailyStat],
status.BAD_REQUEST: global_schemas.BadRequestError,
status.NOT_FOUND: global_schemas.NotFoundError,
},
)
def get_daily_advertiser_statistics(
request: HttpRequest, advertiser_id: UUID
) -> tuple[status, dict[str, Any]]:
advertiser = get_object_or_404(Advertiser, id=advertiser_id)
return status.OK, advertiser.get_daily_statistics()
+1
View File
@@ -0,0 +1 @@
# noqa: A005
+21
View File
@@ -0,0 +1,21 @@
from django.core.cache import cache
from ninja import Schema
from pydantic import field_validator
from pydantic.types import NonNegativeInt
class CurrentDate(Schema):
current_date: NonNegativeInt
@field_validator("current_date", mode="after")
@classmethod
def check_bigger_than_setted_date(cls, value: int) -> int:
current_date = cache.get("current_date", default=0)
if value < current_date:
err = (
"current_date can't be less than setted "
f"date ({current_date})."
)
raise ValueError(err)
return value
+120
View File
@@ -0,0 +1,120 @@
from http import HTTPStatus as status
from django.test import TestCase, override_settings
from django.core.cache import cache
import json
class AdvanceTimeTests(TestCase):
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUp(self):
cache.clear()
cache.set("current_date", 10)
self.url = "/time/advance"
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_advance_time_success(self):
self.assertEqual(cache.get("current_date"), 10)
response = self.client.post(
self.url,
data=json.dumps({"current_date": 15}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.OK)
self.assertEqual(response.json()["current_date"], 15)
self.assertEqual(cache.get("current_date"), 15)
# unittest & django pobeda so i can't use override_settings and parametrized at the same time, sorry
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_advance_time_failure_invalid_value1(self):
response = self.client.post(
self.url,
data=json.dumps({"current_date": list()}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_advance_time_failure_invalid_value2(self):
response = self.client.post(
self.url,
data=json.dumps({"current_date": -1241}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_advance_time_failure_invalid_value3(self):
response = self.client.post(
self.url,
data=json.dumps({"current_date": "lol"}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_advance_time_failure_invalid_value4(self):
response = self.client.post(
self.url,
data=json.dumps({"current_date": dict()}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_advance_time_failure_less_than_actual(self):
response = self.client.post(
self.url,
data=json.dumps({"current_date": 5}),
content_type="application/json",
)
self.assertEqual(response.status_code, status.BAD_REQUEST)
+25
View File
@@ -0,0 +1,25 @@
from http import HTTPStatus as status
from django.core.cache import cache
from django.http import HttpRequest
from ninja import Router
from api.v1 import schemas as global_schemas
from api.v1.time import schemas
router = Router(tags=["time"])
@router.post(
"/advance",
response={
status.OK: schemas.CurrentDate,
status.BAD_REQUEST: global_schemas.BadRequestError,
},
)
def advance_time(
request: HttpRequest, new_date: schemas.CurrentDate
) -> tuple[status, schemas.CurrentDate]:
cache.set("current_date", new_date.current_date)
return status.OK, new_date
View File
+14
View File
@@ -0,0 +1,14 @@
from django.contrib import admin
from apps.advertiser.models import Advertiser
class AdvertiserAdmin(admin.ModelAdmin):
readonly_fields = (Advertiser.id.field.name,)
fields = (
Advertiser.id.field.name,
Advertiser.name.field.name,
)
admin.site.register(Advertiser, AdvertiserAdmin)
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class AdvertiserConfig(AppConfig):
name = "apps.advertiser"
label = "advertiser"
@@ -0,0 +1,25 @@
# Generated by Django 5.1.6 on 2025-02-13 21:41
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Advertiser',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('name', models.TextField()),
],
options={
'abstract': False,
},
),
]
+151
View File
@@ -0,0 +1,151 @@
from decimal import ROUND_HALF_UP, Decimal
from uuid import UUID
from django.core.cache import cache
from django.db import models
from apps.core.models import BaseModel
class Advertiser(BaseModel):
name = models.TextField()
def __str__(self) -> str:
return self.name
@property
def advertiser_id(self) -> UUID:
return self.id
@advertiser_id.setter
def advertiser_id(self, value: UUID) -> None:
self.id = value
def get_statistics(self) -> dict[str, int | float]:
campaigns = self.campaigns.all()
total_impressions = 0
total_clicks = 0
total_spent_impressions = Decimal("0.0")
total_spent_clicks = Decimal("0.0")
for campaign in campaigns:
stats = campaign.get_statistics()
total_impressions += stats["impressions_count"]
total_clicks += stats["clicks_count"]
total_spent_impressions += Decimal(str(stats["spent_impressions"]))
total_spent_clicks += Decimal(str(stats["spent_clicks"]))
total_spent = total_spent_impressions + total_spent_clicks
conversion = (
(
Decimal(str(total_clicks))
/ Decimal(str(total_impressions))
* Decimal("100")
)
if total_impressions > 0
else Decimal("0")
)
return {
"impressions_count": total_impressions,
"clicks_count": total_clicks,
"conversion": float(
conversion.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
),
"spent_impressions": float(
total_spent_impressions.quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
),
"spent_clicks": float(
total_spent_clicks.quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
),
"spent_total": float(
total_spent.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
),
}
def get_daily_statistics(self) -> list[dict[str, int | float]]:
campaigns = self.campaigns.all()
daily_stats_map = {}
for campaign in campaigns:
daily_stats = campaign.get_daily_statistics()
for stat in daily_stats:
date = stat["date"]
if date not in daily_stats_map:
daily_stats_map[date] = {
"impressions_count": 0,
"clicks_count": 0,
"spent_impressions": Decimal("0.0"),
"spent_clicks": Decimal("0.0"),
}
daily_stats_map[date]["impressions_count"] += stat[
"impressions_count"
]
daily_stats_map[date]["clicks_count"] += stat["clicks_count"]
daily_stats_map[date]["spent_impressions"] += Decimal(
str(stat["spent_impressions"])
)
daily_stats_map[date]["spent_clicks"] += Decimal(
str(stat["spent_clicks"])
)
days_range = range(cache.get("current_date", 0) + 1)
for day in days_range:
if day not in daily_stats_map:
daily_stats_map[day] = {
"impressions_count": 0,
"clicks_count": 0,
"spent_impressions": Decimal("0.0"),
"spent_clicks": Decimal("0.0"),
}
daily_stats = []
for date, metrics in daily_stats_map.items():
total_spent = (
metrics["spent_impressions"] + metrics["spent_clicks"]
)
conversion = (
Decimal(str(metrics["clicks_count"]))
/ Decimal(str(metrics["impressions_count"]))
* Decimal("100")
if metrics["impressions_count"] > 0
else Decimal("0")
)
daily_stats.append(
{
"date": date,
"impressions_count": metrics["impressions_count"],
"clicks_count": metrics["clicks_count"],
"conversion": float(
conversion.quantize(
Decimal("0.01"), rounding=ROUND_HALF_UP
)
),
"spent_impressions": float(
metrics["spent_impressions"].quantize(
Decimal("0.0000000001"), rounding=ROUND_HALF_UP
)
),
"spent_clicks": float(
metrics["spent_clicks"].quantize(
Decimal("0.0000000001"), rounding=ROUND_HALF_UP
)
),
"spent_total": float(
total_spent.quantize(
Decimal("0.0000000001"), rounding=ROUND_HALF_UP
)
),
}
)
return sorted(daily_stats, key=lambda item: item["date"])
@@ -0,0 +1,48 @@
from uuid import uuid4
from django.test import TestCase, override_settings
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign
class AdvertiserModelTest(TestCase):
def setUp(self) -> None:
self.advertiser = Advertiser.objects.create(name="Test Advertiser")
def test_advertiser_creation(self) -> None:
self.assertIsInstance(self.advertiser, Advertiser)
self.assertEqual(self.advertiser.name, "Test Advertiser")
def test_advertiser_str_method(self) -> None:
self.assertEqual(str(self.advertiser), "Test Advertiser")
def test_advertiser_id_property(self) -> None:
self.assertEqual(self.advertiser.advertiser_id, self.advertiser.id)
new_id = uuid4()
self.advertiser.advertiser_id = new_id
self.assertEqual(self.advertiser.id, new_id)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_advertiser_campaigns_relationship(self) -> None:
campaign = Campaign.objects.create(
advertiser=self.advertiser,
impressions_limit=0,
clicks_limit=0,
cost_per_impression=0,
cost_per_click=0,
ad_title="title",
ad_text="text",
start_date=15,
end_date=16,
)
self.assertIn(campaign, self.advertiser.campaigns.all())
@@ -0,0 +1,190 @@
from django.core.cache import cache
from django.test import TestCase, override_settings
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign, CampaignClick, CampaignImpression
from apps.client.models import Client
class AdvertiserStatisticsTest(TestCase):
@classmethod
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUpTestData(cls) -> None:
cache.set("current_date", 1)
cls.advertiser = Advertiser.objects.create(name="Test Advertiser")
cls.campaign1 = Campaign.objects.create(
advertiser=cls.advertiser,
impressions_limit=1000,
clicks_limit=500,
cost_per_impression=0.05,
cost_per_click=0.10,
ad_title="Campaign 1",
ad_text="This is the first test campaign.",
start_date=1,
end_date=10,
)
cls.campaign2 = Campaign.objects.create(
advertiser=cls.advertiser,
impressions_limit=2000,
clicks_limit=1000,
cost_per_impression=0.04,
cost_per_click=0.08,
ad_title="Campaign 2",
ad_text="This is the second test campaign.",
start_date=2,
end_date=12,
)
cls.client_instance = Client.objects.create(
login="test_client",
age=30,
gender="MALE",
location="Test Location",
)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUp(self) -> None:
cache.clear()
cache.set("current_date", 5)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_statistics_no_data(self) -> None:
stats = self.advertiser.get_statistics()
expected_stats = {
"impressions_count": 0,
"clicks_count": 0,
"conversion": 0,
"spent_impressions": 0.0,
"spent_clicks": 0.0,
"spent_total": 0.0,
}
self.assertEqual(stats, expected_stats)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_statistics_with_data(self) -> None:
CampaignImpression.objects.create(
campaign=self.campaign1,
client=self.client_instance,
price=self.campaign1.cost_per_impression,
date=3,
)
CampaignClick.objects.create(
campaign=self.campaign1,
client=self.client_instance,
price=self.campaign1.cost_per_click,
date=3,
)
CampaignImpression.objects.create(
campaign=self.campaign2,
client=self.client_instance,
price=self.campaign2.cost_per_impression,
date=4,
)
stats = self.advertiser.get_statistics()
expected_stats = {
"impressions_count": 2,
"clicks_count": 1,
"conversion": 50.0,
"spent_impressions": 0.09,
"spent_clicks": 0.10,
"spent_total": 0.19,
}
self.assertEqual(stats, expected_stats)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_daily_statistics_no_data(self) -> None:
daily_stats = self.advertiser.get_daily_statistics()
expected_stats = [
{
"impressions_count": 0,
"clicks_count": 0,
"conversion": 0,
"spent_impressions": 0.0,
"spent_clicks": 0.0,
"spent_total": 0.0,
"date": day,
}
for day in range(cache.get("current_date", 0) + 1)
]
self.assertEqual(daily_stats, expected_stats)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_daily_statistics_with_data(self) -> None:
CampaignImpression.objects.create(
campaign=self.campaign1,
client=self.client_instance,
price=self.campaign1.cost_per_impression,
date=3,
)
CampaignClick.objects.create(
campaign=self.campaign1,
client=self.client_instance,
price=self.campaign1.cost_per_click,
date=3,
)
CampaignImpression.objects.create(
campaign=self.campaign2,
client=self.client_instance,
price=self.campaign2.cost_per_impression,
date=4,
)
daily_stats = self.advertiser.get_daily_statistics()
expected_stats = [
{
"impressions_count": 1 if day == 3 else 1 if day == 4 else 0,
"clicks_count": 1 if day == 3 else 0,
"conversion": 100.0 if day == 3 else 0.0,
"spent_impressions": 0.05
if day == 3
else 0.04
if day == 4
else 0.0,
"spent_clicks": 0.10 if day == 3 else 0.0,
"spent_total": 0.15 if day == 3 else 0.04 if day == 4 else 0.0,
"date": day,
}
for day in range(cache.get("current_date") + 1)
]
self.assertEqual(daily_stats, expected_stats)
+120
View File
@@ -0,0 +1,120 @@
from django.contrib import admin
from django.http import HttpRequest
from apps.campaign.forms import CampaignForm, CampaignReportForm
from apps.campaign.models import (
Campaign,
CampaignClick,
CampaignImpression,
CampaignReport,
)
class CampaignAdmin(admin.ModelAdmin):
form = CampaignForm
readonly_fields = (
Campaign.id.field.name,
Campaign.advertiser.field.name,
)
fields = (
Campaign.id.field.name,
Campaign.advertiser.field.name,
Campaign.impressions_limit.field.name,
Campaign.clicks_limit.field.name,
Campaign.cost_per_impression.field.name,
Campaign.cost_per_click.field.name,
Campaign.ad_title.field.name,
Campaign.ad_text.field.name,
Campaign.ad_image.field.name,
Campaign.start_date.field.name,
Campaign.end_date.field.name,
Campaign.gender.field.name,
Campaign.age_from.field.name,
Campaign.age_to.field.name,
Campaign.location.field.name,
)
def has_add_permission(
self, request: HttpRequest, obj: Campaign = None
) -> bool:
return False
class CampaignImpressionAdmin(admin.ModelAdmin):
readonly_fields = (
CampaignImpression.id.field.name,
CampaignImpression.campaign.field.name,
CampaignImpression.client.field.name,
CampaignImpression.date.field.name,
)
fields = (
CampaignImpression.id.field.name,
CampaignImpression.campaign.field.name,
CampaignImpression.client.field.name,
CampaignImpression.date.field.name,
CampaignImpression.price.field.name,
)
def has_add_permission(
self, request: HttpRequest, obj: CampaignImpression = None
) -> bool:
return False
class CampaignClickAdmin(admin.ModelAdmin):
readonly_fields = (
CampaignClick.id.field.name,
CampaignClick.campaign.field.name,
CampaignClick.client.field.name,
CampaignClick.date.field.name,
)
fields = (
CampaignClick.id.field.name,
CampaignClick.campaign.field.name,
CampaignClick.client.field.name,
CampaignClick.date.field.name,
CampaignClick.price.field.name,
)
def has_add_permission(
self, request: HttpRequest, obj: CampaignClick = None
) -> bool:
return False
class CampaignReportAdmin(admin.ModelAdmin):
form = CampaignReportForm
readonly_fields = (
CampaignReport.id.field.name,
CampaignReport.campaign.field.name,
CampaignReport.client.field.name,
CampaignReport.message.field.name,
CampaignReport.flagged_by_llm.field.name,
)
fields = (
CampaignReport.id.field.name,
CampaignReport.campaign.field.name,
CampaignReport.client.field.name,
CampaignReport.state.field.name,
CampaignReport.message.field.name,
CampaignReport.flagged_by_llm.field.name,
)
list_filter = (
CampaignReport.state.field.name,
CampaignReport.flagged_by_llm.field.name,
)
list_display = (
"__str__",
CampaignReport.flagged_by_llm.field.name,
)
def has_add_permission(
self, request: HttpRequest, obj: CampaignReport = None
) -> bool:
return False
admin.site.register(Campaign, CampaignAdmin)
admin.site.register(CampaignImpression, CampaignImpressionAdmin)
admin.site.register(CampaignClick, CampaignClickAdmin)
admin.site.register(CampaignReport, CampaignReportAdmin)
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class CampaignConfig(AppConfig):
name = "apps.campaign"
label = "campaign"
+35
View File
@@ -0,0 +1,35 @@
from typing import Any
from django import forms
from apps.campaign.models import Campaign, CampaignReport
class CampaignForm(forms.ModelForm):
class Meta:
model = Campaign
fields = "__all__"
def clean(self) -> dict[str, Any]:
cleaned_data = super().clean()
location = cleaned_data.get("location")
if location == "":
cleaned_data["location"] = None
return cleaned_data
class CampaignReportForm(forms.ModelForm):
class Meta:
model = CampaignReport
fields = "__all__"
def clean(self) -> dict[str, Any]:
cleaned_data = super().clean()
message = cleaned_data.get("message")
if message == "":
cleaned_data["message"] = None
return cleaned_data
@@ -0,0 +1,35 @@
from typing import Any
from django.core.management.base import BaseCommand
from apps.campaign.models import Campaign
from apps.mlscore.models import Mlscore
class Command(BaseCommand):
help = (
"Initialize cache with current counts of "
"impressions, clicks, and ML scores."
)
def handle(self, *args: Any, **kwargs: Any) -> None:
for campaign in Campaign.objects.all():
campaign.setup_cache()
self.stdout.write(
self.style.SUCCESS(
f"Initialized cache for Campaign {campaign.id}: "
f"{campaign.impressions_count} impressions, "
f"{campaign.clicks_count} clicks."
)
)
for mlscore in Mlscore.objects.all():
mlscore.setup_cache()
self.stdout.write(
self.style.SUCCESS(
f"Initialized cache for MLscore: "
f"Client {mlscore.client_id}, "
f"Advertiser {mlscore.advertiser_id}, "
f"Score {mlscore.score}."
)
)
@@ -0,0 +1,84 @@
# Generated by Django 5.1.6 on 2025-02-21 03:50
import apps.campaign.models
import django.core.validators
import django.db.models.deletion
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('advertiser', '0001_initial'),
('client', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='Campaign',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('impressions_limit', models.PositiveBigIntegerField()),
('clicks_limit', models.PositiveBigIntegerField()),
('cost_per_impression', models.FloatField(validators=[django.core.validators.MinValueValidator(0)])),
('cost_per_click', models.FloatField(validators=[django.core.validators.MinValueValidator(0)])),
('ad_title', models.TextField()),
('ad_text', models.TextField()),
('ad_image', models.ImageField(blank=True, max_length=256, null=True, upload_to=apps.campaign.models.Campaign.ad_image_directory_path)),
('start_date', models.PositiveIntegerField(db_index=True)),
('end_date', models.PositiveIntegerField(db_index=True)),
('gender', models.CharField(blank=True, choices=[('MALE', 'MALE'), ('FEMALE', 'FEMALE'), ('ALL', 'ALL')], db_index=True, max_length=6, null=True)),
('age_from', models.PositiveSmallIntegerField(blank=True, db_index=True, null=True, validators=[django.core.validators.MaxValueValidator(100)])),
('age_to', models.PositiveSmallIntegerField(blank=True, db_index=True, null=True, validators=[django.core.validators.MaxValueValidator(100)])),
('location', models.TextField(blank=True, db_index=True, null=True, validators=[django.core.validators.MinLengthValidator(1)])),
('advertiser', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='campaigns', to='advertiser.advertiser')),
],
options={
'abstract': False,
},
),
migrations.CreateModel(
name='CampaignClick',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('price', models.FloatField()),
('date', models.PositiveIntegerField(db_index=True)),
('campaign', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='clicks', to='campaign.campaign')),
('client', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='clicks', to='client.client')),
],
options={
'unique_together': {('campaign', 'client')},
},
),
migrations.CreateModel(
name='CampaignImpression',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('price', models.FloatField()),
('date', models.PositiveIntegerField(db_index=True)),
('campaign', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='impressions', to='campaign.campaign')),
('client', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='impressions', to='client.client')),
],
options={
'unique_together': {('campaign', 'client')},
},
),
migrations.CreateModel(
name='CampaignReport',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('state', models.CharField(choices=[('s', 'Sent'), ('r', 'Under review'), ('t', 'Took action'), ('f', 'Skipped')], default='s', max_length=1)),
('message', models.TextField(blank=True, null=True)),
('flagged_by_llm', models.BooleanField(blank=True, null=True)),
('timestamp', models.DateTimeField(auto_now_add=True)),
('campaign', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='reports', to='campaign.campaign')),
('client', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='reports', to='client.client')),
],
options={
'unique_together': {('campaign', 'client')},
},
),
]
+531
View File
@@ -0,0 +1,531 @@
import random
from decimal import ROUND_HALF_UP, Decimal
from logging import Logger
from typing import Any, Self
from uuid import UUID
from django.conf import settings
from django.core.cache import cache
from django.core.validators import (
MaxValueValidator,
MinLengthValidator,
MinValueValidator,
)
from django.db import models
from apps.advertiser.models import Advertiser
from apps.campaign.validators import (
CampaignAgeValidator,
CampaignDurationValidator,
CampaignLimitsValidator,
CampaignReportMessageValidator,
CampaignStartDateValidator,
CampaignTargetingGenderValidator,
CampaignTargetingLocationValidator,
)
from apps.client.models import Client
from apps.core.models import BaseModel
from config.errors import ConflictError, ForbiddenError
logger: Logger = settings.LOGGER
class Campaign(BaseModel):
class GenderChoices(models.TextChoices):
MALE = "MALE", "MALE"
FEMALE = "FEMALE", "FEMALE"
ALL = "ALL", "ALL"
def ad_image_directory_path(instance, filename: str) -> str: # noqa: N805
return f"campaigns/{instance.id}/{filename}"
advertiser = models.ForeignKey(
Advertiser,
on_delete=models.CASCADE,
related_name="campaigns",
)
impressions_limit = models.PositiveBigIntegerField()
clicks_limit = models.PositiveBigIntegerField()
cost_per_impression = models.FloatField(validators=[MinValueValidator(0)])
cost_per_click = models.FloatField(validators=[MinValueValidator(0)])
ad_title = models.TextField()
ad_text = models.TextField()
ad_image = models.ImageField(
max_length=256,
blank=True,
null=True,
upload_to=ad_image_directory_path,
)
start_date = models.PositiveIntegerField(db_index=True)
end_date = models.PositiveIntegerField(db_index=True)
gender = models.CharField(
max_length=6,
blank=True,
null=True,
db_index=True,
choices=GenderChoices,
)
age_from = models.PositiveSmallIntegerField(
blank=True,
null=True,
db_index=True,
validators=[MaxValueValidator(100)],
)
age_to = models.PositiveSmallIntegerField(
blank=True,
null=True,
db_index=True,
validators=[MaxValueValidator(100)],
)
location = models.TextField(
blank=True,
null=True,
db_index=True,
validators=[MinLengthValidator(1)],
)
READONLY_AFTER_START_FIELDS = (
"impressions_limit",
"clicks_limit",
"start_date",
"end_date",
)
def __str__(self) -> str:
return self.ad_title
def clean(self) -> None:
CampaignTargetingGenderValidator()(self)
CampaignTargetingLocationValidator()(self)
CampaignAgeValidator()(self)
CampaignDurationValidator()(self)
CampaignLimitsValidator()(self)
CampaignStartDateValidator()(self)
def save(self, *args: Any, **kwargs: Any) -> None:
created = self.pk is None
super().save(*args, **kwargs)
if created:
self.setup_cache()
def setup_cache(self) -> None:
cache.add(
f"campaign_{self.id}_impressions_count", self.impressions.count()
)
cache.add(f"campaign_{self.id}_clicks_count", self.clicks.count())
cache.set(
f"campaign_{self.id}_impressions_count", self.impressions.count()
)
cache.set(f"campaign_{self.id}_clicks_count", self.clicks.count())
def inc_views(self) -> None:
try:
cache.incr(f"campaign_{self.id}_impressions_count", 1)
except ValueError:
self.setup_cache()
logger.warning("Seems that %s missing caches", self.campaign_id)
def inc_clicks(self) -> None:
try:
cache.incr(f"campaign_{self.id}_clicks_count", 1)
except ValueError:
self.setup_cache()
logger.warning("Seems that %s missing caches", self.campaign_id)
@property
def ad_id(self) -> UUID:
return self.id
@property
def campaign_id(self) -> UUID:
return self.id
@campaign_id.setter
def campaign_id(self, value: UUID) -> None:
self.id = value
@property
def started(self) -> bool:
return isinstance(
self.start_date, int
) and self.start_date <= cache.get("current_date", default=0)
@property
def active(self) -> bool:
return (
self.started
and cache.get("current_date", default=0) <= self.end_date
)
@property
def impressions_count(self) -> int:
return cache.get(f"campaign_{self.id}_impressions_count", 0)
@property
def clicks_count(self) -> int:
return cache.get(f"campaign_{self.id}_clicks_count", 0)
def view(self, client: Client) -> None:
try:
CampaignImpression.objects.create(
campaign_id=self.id,
client_id=client.id,
price=self.cost_per_impression,
date=cache.get("current_date", default=0),
)
self.inc_views()
except ConflictError:
pass
def click(self, client: Client) -> None:
try:
CampaignImpression.objects.get(campaign=self, client=client)
except CampaignImpression.DoesNotExist:
raise ForbiddenError from None
try:
CampaignClick.objects.create(
campaign_id=self.id,
client_id=client.id,
price=self.cost_per_click,
date=cache.get("current_date", default=0),
)
self.inc_clicks()
except ConflictError:
pass
def get_statistics(self) -> dict[str, Any]:
impressions = self.impressions.aggregate(
total=models.Count("id"), spent=models.Sum("price")
)
clicks = self.clicks.aggregate(
total=models.Count("id"), spent=models.Sum("price")
)
return self._calculate_metrics(impressions, clicks)
def get_daily_statistics(self) -> list[dict[str, Any]]:
last_click_date = self.clicks.aggregate(last_date=models.Max("date"))[
"last_date"
]
if not last_click_date:
last_click_date = self.end_date
current_day = cache.get("current_date", 0)
start_day = self.start_date
end_day = min(last_click_date, current_day)
days_range = list(range(start_day, end_day + 1))
impressions = self.impressions.values("date").annotate(
total=models.Count("id"),
spent=models.Sum("price", default=0.0),
)
clicks = self.clicks.values("date").annotate(
total=models.Count("id"),
spent=models.Sum("price", default=0.0),
)
imp_map = {imp["date"]: imp for imp in impressions}
clk_map = {clk["date"]: clk for clk in clicks}
daily_stats = []
for day in days_range:
imp = imp_map.get(day, {"total": 0, "spent": 0})
clk = clk_map.get(day, {"total": 0, "spent": 0})
metrics = self._calculate_metrics(imp, clk)
metrics["date"] = day
daily_stats.append(metrics)
daily_stats.sort(key=lambda x: x["date"])
return daily_stats
@staticmethod
def _calculate_metrics(
impressions: dict[str, Any], clicks: dict[str, Any]
) -> dict[str, Any]:
impressions_count = impressions.get("total", 0) or 0
clicks_count = clicks.get("total", 0) or 0
conversion = (
(
Decimal(str(clicks_count))
/ Decimal(str(impressions_count))
* Decimal("100")
)
if impressions_count > 0
else Decimal("0")
)
spent_impressions = Decimal(str(impressions.get("spent", 0) or 0))
spent_clicks = Decimal(str(clicks.get("spent", 0) or 0))
spent_total = spent_impressions + spent_clicks
return {
"impressions_count": impressions_count,
"clicks_count": clicks_count,
"conversion": float(
conversion.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
),
"spent_impressions": float(
spent_impressions.quantize(
Decimal("0.000000001"), rounding=ROUND_HALF_UP
)
),
"spent_clicks": float(
spent_clicks.quantize(
Decimal("0.000000001"), rounding=ROUND_HALF_UP
)
),
"spent_total": float(
spent_total.quantize(
Decimal("0.000000001"), rounding=ROUND_HALF_UP
)
),
}
@classmethod
def get_available_campaigns(
cls, client: Client
) -> models.manager.BaseManager[Self]:
current_date = cache.get("current_date", default=0)
date_filter = models.Q(start_date__lte=current_date) & models.Q(
end_date__gte=current_date
)
location_filter = models.Q(location__isnull=True) | models.Q(
location=client.location
)
gender_filter = (
models.Q(gender__isnull=True)
| models.Q(gender=cls.GenderChoices.ALL)
| models.Q(gender=client.gender)
)
age_filter = (
models.Q(age_from__lte=client.age)
| models.Q(age_from__isnull=True)
) & (models.Q(age_to__gte=client.age) | models.Q(age_to__isnull=True))
return cls.objects.filter(
date_filter,
location_filter,
gender_filter,
age_filter,
).only(
Campaign.id.field.name,
Campaign.advertiser_id.field.name,
Campaign.impressions_limit.field.name,
Campaign.clicks_limit.field.name,
Campaign.cost_per_impression.field.name,
Campaign.cost_per_click.field.name,
)
@classmethod
def suggest(cls, client: Client) -> Self:
campaigns = cls.get_available_campaigns(client)
if not campaigns or campaigns == []:
return None
campaign_ids = [c.id for c in campaigns]
client_impressions = CampaignImpression.objects.filter(
client=client, campaign_id__in=campaign_ids
).values_list("campaign_id", flat=True)
client_clicks = CampaignClick.objects.filter(
client=client, campaign_id__in=campaign_ids
).values_list("campaign_id", flat=True)
prioritized = []
ml_values = []
profit_values = []
exceed_impressions_chance = ( # oh, can i just skip commenting this?
*(0 for i in range(3)),
*(1 for i in range(1)),
)
for campaign in campaigns:
has_impression = campaign.id in client_impressions
has_click = campaign.id in client_clicks
campaign_impressions_count = campaign.impressions_count
if not has_impression:
allow_exceed_impressions = random.choice(
exceed_impressions_chance
)
impressions_limit = round(
campaign.impressions_limit
+ campaign.impressions_limit
* 0.1
* allow_exceed_impressions
)
if campaign_impressions_count >= impressions_limit:
continue
ml_score = cache.get(
f"mlscore_{client.id}_{campaign.advertiser_id}", 0
)
ml_values.append(ml_score)
if has_impression:
profit = campaign.cost_per_click if not has_click else 0
else:
profit = campaign.cost_per_impression + campaign.cost_per_click
profit_values.append(profit)
remaining_imp = (
campaign.impressions_limit - campaign_impressions_count
)
capacity_ratio = (
remaining_imp / campaign.impressions_limit
if campaign.impressions_limit > 0
else 1
)
prioritized.append(
(
campaign,
{
"profit": profit,
"ml": ml_score,
"capacity": 1 - capacity_ratio,
},
)
)
if not ml_values or not profit_values:
return None
max_ml = max(ml_values)
max_profit = max(profit_values)
min_profit = min(profit_values)
profit_range = (
max_profit - min_profit if max_profit != min_profit else 1
)
final_list = []
for campaign, metrics in prioritized:
norm_profit = (metrics["profit"] - min_profit) / profit_range
norm_ml = metrics["ml"] / max_ml if max_ml > 0 else 0
priority = (
0.8 * norm_profit + 0.4 * norm_ml + 0.05 * metrics["capacity"]
)
final_list.append((campaign, priority))
final_list.sort(key=lambda x: -x[1])
if len(final_list) != 0:
campaign = final_list[0][0]
return Campaign.objects.only(
Campaign.id.field.name,
Campaign.advertiser_id.field.name,
Campaign.ad_title.field.name,
Campaign.ad_text.field.name,
Campaign.ad_image.field.name,
Campaign.cost_per_impression.field.name,
Campaign.cost_per_click.field.name,
).get(id=campaign.id)
return None
class CampaignImpression(BaseModel):
campaign = models.ForeignKey(
Campaign,
on_delete=models.CASCADE,
related_name="impressions",
)
client = models.ForeignKey(
Client,
on_delete=models.CASCADE,
related_name="impressions",
)
price = models.FloatField()
date = models.PositiveIntegerField(db_index=True)
def __str__(self) -> str:
return f"{self.client.login} > {self.campaign.ad_title}"
class Meta:
unique_together = (
"campaign",
"client",
)
class CampaignClick(BaseModel):
campaign = models.ForeignKey(
Campaign,
on_delete=models.CASCADE,
related_name="clicks",
)
client = models.ForeignKey(
Client,
on_delete=models.CASCADE,
related_name="clicks",
)
price = models.FloatField()
date = models.PositiveIntegerField(db_index=True)
def __str__(self) -> str:
return f"{self.client.login} > {self.campaign.ad_title}"
class Meta:
unique_together = (
"campaign",
"client",
)
class CampaignReport(BaseModel):
class CampaignReportState(models.TextChoices):
SENT = "s", "Sent"
UNDER_REVIEW = "r", "Under review"
TOOK_ACTION = "t", "Took action"
SKIPPED = "f", "Skipped"
campaign = models.ForeignKey(
Campaign,
on_delete=models.SET_NULL,
related_name="reports",
blank=True,
null=True,
)
client = models.ForeignKey(
Client,
on_delete=models.SET_NULL,
related_name="reports",
blank=True,
null=True,
)
state = models.CharField(
max_length=1,
choices=CampaignReportState,
default=CampaignReportState.SENT,
)
message = models.TextField(null=True, blank=True)
flagged_by_llm = models.BooleanField(null=True, blank=True)
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = (
"campaign",
"client",
)
def __str__(self) -> str:
login = self.client.login if self.client else "(client deleted)"
ad_title = (
self.campaign.ad_title if self.campaign else "(campaign deleted)"
)
return f"{login} > {ad_title}"
def clean(self) -> None:
CampaignReportMessageValidator()(self)
+38
View File
@@ -0,0 +1,38 @@
import contextlib
from concurrent.futures import ThreadPoolExecutor
from celery import shared_task
from apps.campaign.models import CampaignReport
from integrations.yandexai.generators.ad_text import YandexAIAdTextGenerator
from integrations.yandexai.moderation import YandexAIModerator
@shared_task
def generate_ad_text_task(advertiser_name: str, ad_title: str) -> str | None:
return YandexAIAdTextGenerator().generate_ad_text(
advertiser_name, ad_title
)
@shared_task(ignore_result=True)
def moderate_campaign_task(
report_id: int, ad_title: str, ad_text: str
) -> None:
with ThreadPoolExecutor(max_workers=2) as executor:
future_text = executor.submit(
YandexAIModerator().get_moderation_verdict, ad_text
)
future_title = executor.submit(
YandexAIModerator().get_moderation_verdict, ad_title
)
ad_text_verdict = future_text.result()
ad_title_verdict = future_title.result()
overall_verdict = ad_title_verdict or ad_text_verdict
with contextlib.suppress(CampaignReport.DoesNotExist):
report = CampaignReport.objects.get(id=report_id)
report.flagged_by_llm = overall_verdict
report.save()
@@ -0,0 +1,55 @@
from django.core.cache import cache
from django.test import TestCase, override_settings
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign, CampaignClick
from apps.client.models import Client
from config.errors import ConflictError
class CampaignClickModelTest(TestCase):
@classmethod
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUpTestData(cls) -> None:
cache.set("current_date", 1)
cls.advertiser = Advertiser.objects.create(name="Test Advertiser")
cls.campaign = Campaign.objects.create(
advertiser=cls.advertiser,
impressions_limit=1000,
clicks_limit=500,
cost_per_impression=0.05,
cost_per_click=0.10,
ad_title="Test Campaign",
ad_text="This is a test campaign.",
start_date=1,
end_date=10,
)
cls.client_instance = Client.objects.create(
login="test_client", age=15, location="Moscow", gender="FEMALE"
)
cls.click = CampaignClick.objects.create(
campaign=cls.campaign,
client=cls.client_instance,
price=0.10,
date=1,
)
def test_campaign_click_creation(self) -> None:
self.assertIsInstance(self.click, CampaignClick)
self.assertEqual(self.click.price, 0.10)
def test_unique_together_constraint(self) -> None:
with self.assertRaises(ConflictError):
CampaignClick.objects.create(
campaign=self.campaign,
client=self.client_instance,
price=0.10,
date=1,
)
@@ -0,0 +1,52 @@
from django.test import TestCase, override_settings
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign, CampaignImpression
from apps.client.models import Client
from config.errors import ConflictError
class CampaignImpressionModelTest(TestCase):
@classmethod
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUpTestData(cls) -> None:
cls.advertiser = Advertiser.objects.create(name="Test Advertiser")
cls.campaign = Campaign.objects.create(
advertiser=cls.advertiser,
impressions_limit=1000,
clicks_limit=500,
cost_per_impression=0.05,
cost_per_click=0.10,
ad_title="Test Campaign",
ad_text="This is a test campaign.",
start_date=1,
end_date=10,
)
cls.client_instance = Client.objects.create(
login="test_client", age=15, location="Moscow", gender="FEMALE"
)
cls.impression = CampaignImpression.objects.create(
campaign=cls.campaign,
client=cls.client_instance,
price=0.05,
date=1,
)
def test_campaign_impression_creation(self) -> None:
self.assertIsInstance(self.impression, CampaignImpression)
self.assertEqual(self.impression.price, 0.05)
def test_unique_together_constraint(self) -> None:
with self.assertRaises(ConflictError):
CampaignImpression.objects.create(
campaign=self.campaign,
client=self.client_instance,
price=0.05,
date=1,
)
@@ -0,0 +1,120 @@
from uuid import uuid4
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.test import TestCase, override_settings
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign
from apps.client.models import Client
class CampaignModelTest(TestCase):
@classmethod
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUpTestData(cls) -> None:
cls.advertiser = Advertiser.objects.create(name="Test Advertiser")
cls.campaign = Campaign.objects.create(
advertiser=cls.advertiser,
impressions_limit=1000,
clicks_limit=500,
cost_per_impression=0.05,
cost_per_click=0.10,
ad_title="Test Campaign",
ad_text="This is a test campaign.",
start_date=1,
end_date=10,
)
def test_campaign_creation(self) -> None:
self.assertIsInstance(self.campaign, Campaign)
self.assertEqual(self.campaign.ad_title, "Test Campaign")
def test_campaign_str_method(self) -> None:
self.assertEqual(str(self.campaign), "Test Campaign")
def test_campaign_id_property(self) -> None:
self.assertEqual(self.campaign.campaign_id, self.campaign.id)
new_id = uuid4()
self.campaign.campaign_id = new_id
self.assertEqual(self.campaign.id, new_id)
def test_ad_id_property(self) -> None:
self.assertEqual(self.campaign.ad_id, self.campaign.id)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_started_property(self) -> None:
cache.set("current_date", 5)
self.assertTrue(self.campaign.started)
cache.set("current_date", 0)
self.assertFalse(self.campaign.started)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_active_property(self) -> None:
cache.set("current_date", 5)
self.assertTrue(self.campaign.active)
cache.set("current_date", 11)
self.assertFalse(self.campaign.active)
cache.set("current_date", 5)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_clean_method(self) -> None:
self.campaign.start_date = -1
with self.assertRaises(ValidationError):
self.campaign.clean()
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_view_method(self) -> None:
client = Client.objects.create(
login="test_client", age=15, location="Moscow", gender="FEMALE"
)
self.campaign.view(client)
self.assertEqual(self.campaign.impressions.count(), 1)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_click_method(self) -> None:
client = Client.objects.create(
login="test_client", age=15, location="Moscow", gender="FEMALE"
)
self.campaign.view(client)
self.campaign.click(client)
self.assertEqual(self.campaign.clicks.count(), 1)
@@ -0,0 +1,66 @@
from django.test import TestCase, override_settings
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign, CampaignReport
from apps.client.models import Client
from config.errors import ConflictError
class CampaignReportModelTest(TestCase):
@classmethod
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUpTestData(cls) -> None:
cls.advertiser = Advertiser.objects.create(name="Test Advertiser")
cls.campaign = Campaign.objects.create(
advertiser=cls.advertiser,
impressions_limit=1000,
clicks_limit=500,
cost_per_impression=0.05,
cost_per_click=0.10,
ad_title="Test Campaign",
ad_text="This is a test campaign.",
start_date=1,
end_date=10,
)
cls.client_instance = Client.objects.create(
login="test_client",
age=30,
gender="MALE",
location="Test Location",
)
def test_campaign_report_creation(self) -> None:
report = CampaignReport.objects.create(
campaign=self.campaign,
client=self.client_instance,
state=CampaignReport.CampaignReportState.SENT,
message="Inappropriate content",
flagged_by_llm=True,
)
self.assertIsInstance(report, CampaignReport)
self.assertEqual(report.campaign, self.campaign)
self.assertEqual(report.client, self.client_instance)
self.assertEqual(report.state, CampaignReport.CampaignReportState.SENT)
self.assertEqual(report.message, "Inappropriate content")
self.assertTrue(report.flagged_by_llm)
def test_campaign_report_unique_together_constraint(self) -> None:
CampaignReport.objects.create(
campaign=self.campaign,
client=self.client_instance,
state=CampaignReport.CampaignReportState.SENT,
)
with self.assertRaises(ConflictError):
CampaignReport.objects.create(
campaign=self.campaign,
client=self.client_instance,
state=CampaignReport.CampaignReportState.UNDER_REVIEW,
)
@@ -0,0 +1,149 @@
from django.core.cache import cache
from django.test import TestCase, override_settings
from apps.advertiser.models import Advertiser
from apps.campaign.models import Campaign, CampaignClick, CampaignImpression
from apps.client.models import Client
class CampaignStatisticsTest(TestCase):
@classmethod
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUpTestData(cls) -> None:
cls.advertiser = Advertiser.objects.create(name="Test Advertiser")
cls.campaign = Campaign.objects.create(
advertiser=cls.advertiser,
impressions_limit=1000,
clicks_limit=500,
cost_per_impression=0.05,
cost_per_click=0.10,
ad_title="Test Campaign",
ad_text="This is a test campaign.",
start_date=1,
end_date=10,
)
cls.client_instance = Client.objects.create(
login="test_client",
age=30,
gender="MALE",
location="Test Location",
)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def setUp(self) -> None:
cache.clear()
cache.set("current_date", 5)
def test_get_statistics_no_data(self) -> None:
stats = self.campaign.get_statistics()
expected_stats = {
"impressions_count": 0,
"clicks_count": 0,
"conversion": 0,
"spent_impressions": 0,
"spent_clicks": 0,
"spent_total": 0,
}
self.assertEqual(stats, expected_stats)
def test_get_statistics_with_data(self) -> None:
CampaignImpression.objects.create(
campaign=self.campaign,
client=self.client_instance,
price=self.campaign.cost_per_impression,
date=5,
)
CampaignClick.objects.create(
campaign=self.campaign,
client=self.client_instance,
price=self.campaign.cost_per_click,
date=5,
)
stats = self.campaign.get_statistics()
expected_stats = {
"impressions_count": 1,
"clicks_count": 1,
"conversion": 100.0,
"spent_impressions": 0.05,
"spent_clicks": 0.10,
"spent_total": 0.15,
}
self.assertEqual(stats, expected_stats)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_daily_statistics_no_data(self) -> None:
daily_stats = self.campaign.get_daily_statistics()
expected_stats = [
{
"date": day,
"impressions_count": 0,
"clicks_count": 0,
"conversion": 0,
"spent_impressions": 0,
"spent_clicks": 0,
"spent_total": 0,
}
for day in range(
self.campaign.start_date, cache.get("current_date") + 1
)
]
self.assertEqual(daily_stats, expected_stats)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_get_daily_statistics_with_data(self) -> None:
CampaignImpression.objects.create(
campaign=self.campaign,
client=self.client_instance,
price=self.campaign.cost_per_impression,
date=5,
)
CampaignClick.objects.create(
campaign=self.campaign,
client=self.client_instance,
price=self.campaign.cost_per_click,
date=5,
)
daily_stats = self.campaign.get_daily_statistics()
expected_stats = [
{
"date": day,
"impressions_count": 1 if day == 5 else 0,
"clicks_count": 1 if day == 5 else 0,
"conversion": 100.0 if day == 5 else 0,
"spent_impressions": 0.05 if day == 5 else 0,
"spent_clicks": 0.10 if day == 5 else 0,
"spent_total": 0.15 if day == 5 else 0,
}
for day in range(
self.campaign.start_date, cache.get("current_date") + 1
)
]
self.assertEqual(daily_stats, expected_stats)
@@ -0,0 +1,77 @@
from typing import TYPE_CHECKING
from django.core.cache import cache
from django.core.exceptions import ValidationError
if TYPE_CHECKING:
from apps.campaign.models import Campaign, CampaignReport
class CampaignTargetingLocationValidator:
def __call__(self, instance: "Campaign") -> None:
if instance.location == "":
err = "targeting.location can't be blank."
raise ValidationError(err)
class CampaignTargetingGenderValidator:
def __call__(self, instance: "Campaign") -> None:
if instance.gender == "":
err = "gender can't be blank."
raise ValidationError(err)
class CampaignAgeValidator:
def __call__(self, instance: "Campaign") -> None:
if (
isinstance(instance.age_from, int)
and isinstance(instance.age_to, int)
and instance.age_from > instance.age_to
):
err = "targeting.age_from can't be greater than targeting.age_to."
raise ValidationError(err)
class CampaignDurationValidator:
def __call__(self, instance: "Campaign") -> None:
if (
isinstance(instance.start_date, int)
and isinstance(instance.end_date, int)
and instance.start_date > instance.end_date
):
err = "start_date can't be greater than end_date."
raise ValidationError(err)
class CampaignLimitsValidator:
def __call__(self, instance: "Campaign") -> None:
if (
isinstance(instance.impressions_limit, int)
and isinstance(instance.clicks_limit, int)
and instance.impressions_limit < instance.clicks_limit
):
err = "clicks_limit can't be greater than impressions_limit."
raise ValidationError(err)
class CampaignStartDateValidator:
def __call__(self, instance: "Campaign") -> None:
current_date = cache.get("current_date", default=0)
err = "start_date must be greater or equal than the current_date."
try:
original = type(instance).objects.get(id=instance.id or "")
if (
original.start_date != instance.start_date
and instance.start_date < current_date
):
raise ValidationError(err)
except type(instance).DoesNotExist:
if instance.start_date < current_date:
raise ValidationError(err) from None
class CampaignReportMessageValidator:
def __call__(self, instance: "CampaignReport") -> None:
if instance.message == "":
err = "message can't be blank."
raise ValidationError(err)
+17
View File
@@ -0,0 +1,17 @@
from django.contrib import admin
from apps.client.models import Client
class ClientAdmin(admin.ModelAdmin):
readonly_fields = (Client.id.field.name,)
fields = (
Client.id.field.name,
Client.login.field.name,
Client.age.field.name,
Client.location.field.name,
Client.gender.field.name,
)
admin.site.register(Client, ClientAdmin)
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class UserConfig(AppConfig):
name = "apps.client"
label = "client"
@@ -0,0 +1,29 @@
# Generated by Django 5.1.6 on 2025-02-13 21:41
import django.core.validators
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Client',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('login', models.TextField()),
('age', models.PositiveSmallIntegerField(validators=[django.core.validators.MaxValueValidator(100)])),
('location', models.TextField()),
('gender', models.CharField(choices=[('MALE', 'MALE'), ('FEMALE', 'FEMALE')], max_length=6)),
],
options={
'abstract': False,
},
),
]
+28
View File
@@ -0,0 +1,28 @@
from uuid import UUID
from django.core.validators import MaxValueValidator
from django.db import models
from apps.core.models import BaseModel
class Client(BaseModel):
class GenderChoices(models.TextChoices):
MALE = "MALE", "MALE"
FEMALE = "FEMALE", "FEMALE"
login = models.TextField()
age = models.PositiveSmallIntegerField(validators=[MaxValueValidator(100)])
location = models.TextField()
gender = models.CharField(max_length=6, choices=GenderChoices)
def __str__(self) -> str:
return self.login
@property
def client_id(self) -> UUID:
return self.id
@client_id.setter
def client_id(self, value: UUID) -> None:
self.id = value
+53
View File
@@ -0,0 +1,53 @@
from django.core.exceptions import ValidationError
from django.test import TestCase
from apps.client.models import Client
class ClientModelTest(TestCase):
def setUp(self):
self.client = Client.objects.create(
login="test_client",
age=25,
location="Test City",
gender=Client.GenderChoices.MALE,
)
def test_client_creation_success(self):
self.assertEqual(self.client.login, "test_client")
self.assertEqual(self.client.age, 25)
self.assertEqual(self.client.location, "Test City")
self.assertEqual(self.client.gender, Client.GenderChoices.MALE)
def test_client_string_representation(self):
self.assertEqual(str(self.client), "test_client")
def test_client_id_property(self):
new_id = self.client.id
self.client.client_id = new_id
self.assertEqual(self.client.client_id, new_id)
def test_age_cannot_exceed_max_value(self):
self.client.age = 120
with self.assertRaises(ValidationError):
self.client.full_clean()
def test_valid_gender_choices(self):
self.client.gender = "MALE"
self.client.full_clean()
self.client.gender = "FEMALE"
self.client.full_clean()
def test_invalid_gender_choice(self):
self.client.gender = "OTHER"
with self.assertRaises(ValidationError):
self.client.full_clean()
def test_blank_login(self):
self.client.login = ""
with self.assertRaises(ValidationError):
self.client.full_clean()
+13
View File
@@ -0,0 +1,13 @@
import contextlib
from django.apps import AppConfig
from django.core.cache import cache
class CoreConfig(AppConfig):
name = "apps.core"
label = "core"
def ready(self) -> None:
with contextlib.suppress(Exception):
cache.add("current_date", 0, timeout=None)
+48
View File
@@ -0,0 +1,48 @@
import uuid
from typing import Any
from django.core.exceptions import ValidationError
from django.db import models
from config.errors import ConflictError
class BaseModel(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
class Meta:
abstract = True
def save(self, *args: Any, **kwargs: Any) -> None:
self.validate()
super().save(*args, **kwargs)
def validate(
self,
validate_unique: bool = True,
validate_constraints: bool = True,
include: list[models.Field] | None = None,
) -> None:
self.full_clean(
validate_unique=False,
validate_constraints=False,
exclude=(
field.name
for field in set(self._meta.get_fields()) - set(include)
)
if include
else None,
)
if validate_unique:
try:
self.validate_unique()
except ValidationError as e:
raise ConflictError(e) from None
if validate_constraints:
try:
self.validate_constraints()
except ValidationError as e:
raise ConflictError(e) from None
+16
View File
@@ -0,0 +1,16 @@
from django.contrib import admin
from apps.mlscore.models import Mlscore
class MlscoreAdmin(admin.ModelAdmin):
readonly_fields = (Mlscore.id.field.name,)
fields = (
Mlscore.id.field.name,
Mlscore.advertiser.field.name,
Mlscore.client.field.name,
Mlscore.score.field.name,
)
admin.site.register(Mlscore, MlscoreAdmin)
+6
View File
@@ -0,0 +1,6 @@
from django.apps import AppConfig
class MlscoreConfig(AppConfig):
name = "apps.mlscore"
label = "mlscore"
@@ -0,0 +1,30 @@
# Generated by Django 5.1.6 on 2025-02-14 16:40
import django.db.models.deletion
import uuid
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('advertiser', '0001_initial'),
('client', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='Mlscore',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('score', models.PositiveIntegerField()),
('advertiser', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='mlscores', to='advertiser.advertiser')),
('client', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='mlscores', to='client.client')),
],
options={
'unique_together': {('advertiser', 'client')},
},
),
]
+40
View File
@@ -0,0 +1,40 @@
from typing import Any
from django.core.cache import cache
from django.db import models
from apps.advertiser.models import Advertiser
from apps.client.models import Client
from apps.core.models import BaseModel
class Mlscore(BaseModel):
advertiser = models.ForeignKey(
Advertiser,
on_delete=models.CASCADE,
related_name="mlscores",
)
client = models.ForeignKey(
Client,
on_delete=models.CASCADE,
related_name="mlscores",
)
score = models.PositiveIntegerField()
def __str__(self) -> str:
return f"{self.advertiser.name} | {self.client.login}"
def save(self, *args: Any, **kwargs: Any) -> None:
super().save(*args, **kwargs)
self.setup_cache()
def setup_cache(self) -> None:
cache.add(f"mlscore_{self.client_id}_{self.advertiser_id}", self.score)
cache.set(f"mlscore_{self.client_id}_{self.advertiser_id}", self.score)
class Meta:
unique_together = (
"advertiser",
"client",
)
+97
View File
@@ -0,0 +1,97 @@
from django.test import TestCase, override_settings
from django.core.exceptions import ValidationError
from config.errors import ConflictError
from apps.advertiser.models import Advertiser
from apps.client.models import Client
from apps.mlscore.models import Mlscore
class MlscoreModelTest(TestCase):
def setUp(self):
self.advertiser = Advertiser.objects.create(name="Test Advertiser")
self.client_obj = Client.objects.create(
login="test_client",
age=25,
location="test_location",
gender=Client.GenderChoices.MALE,
)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_create_mlscore(self):
mlscore = Mlscore.objects.create(
advertiser=self.advertiser,
client=self.client_obj,
score=95,
)
self.assertEqual(mlscore.score, 95)
self.assertEqual(str(mlscore), "Test Advertiser | test_client")
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_mlscore_unique_together_constraint(self):
Mlscore.objects.create(
advertiser=self.advertiser,
client=self.client_obj,
score=80,
)
with self.assertRaises(ConflictError):
Mlscore.objects.create(
advertiser=self.advertiser,
client=self.client_obj,
score=85,
)
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_delete_advertiser_cascades(self):
mlscore = Mlscore.objects.create(
advertiser=self.advertiser,
client=self.client_obj,
score=90,
)
self.advertiser.delete()
self.assertFalse(Mlscore.objects.filter(id=mlscore.id).exists())
@override_settings(
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
)
def test_delete_client_cascades(self):
mlscore = Mlscore.objects.create(
advertiser=self.advertiser,
client=self.client_obj,
score=90,
)
self.client_obj.delete()
self.assertFalse(Mlscore.objects.filter(id=mlscore.id).exists())
def test_score_positive_integer_constraint(self):
with self.assertRaises(ValidationError):
Mlscore.objects.create(
advertiser=self.advertiser,
client=self.client_obj,
score=-5,
)
+3
View File
@@ -0,0 +1,3 @@
from config.celery import app as celery_app
__all__ = ("celery_app",)
+9
View File
@@ -0,0 +1,9 @@
"""ASGI config for AdNova."""
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
application = get_asgi_application()
+10
View File
@@ -0,0 +1,10 @@
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("adnova")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
+13
View File
@@ -0,0 +1,13 @@
from http import HTTPStatus as status
from django.core.exceptions import ValidationError
class ConflictError(Exception):
def __init__(self, validation_error: ValidationError) -> None:
self.validation_error = validation_error
class ForbiddenError(Exception):
def __init__(self, message: str = status.FORBIDDEN.phrase) -> None:
self.message = message
+43
View File
@@ -0,0 +1,43 @@
from http import HTTPStatus as status
from django.http import HttpRequest, JsonResponse
def handler400(
request: HttpRequest,
exception: Exception | None = None,
) -> JsonResponse:
return JsonResponse(
status=status.BAD_REQUEST,
data={"detail": status.BAD_REQUEST.phrase},
)
def handler403(
request: HttpRequest,
exception: Exception | None = None,
) -> JsonResponse:
return JsonResponse(
status=status.FORBIDDEN,
data={"detail": status.FORBIDDEN.phrase},
)
def handler404(
request: HttpRequest,
exception: Exception | None = None,
) -> JsonResponse:
return JsonResponse(
status=status.NOT_FOUND,
data={"detail": status.NOT_FOUND.phrase},
)
def handler500(
request: HttpRequest,
exception: Exception | None = None,
) -> JsonResponse:
return JsonResponse(
status=status.INTERNAL_SERVER_ERROR,
data={"detail": status.INTERNAL_SERVER_ERROR.phrase},
)
+596
View File
@@ -0,0 +1,596 @@
"""Django settings for AdNova."""
import contextlib
import logging
from collections.abc import Callable
from pathlib import Path
import django_stubs_ext
import environ
from django.utils.translation import gettext_lazy as _
from health_check.plugins import plugin_dir
from integrations.yandexai.healthcheck import YandexAIHealthCheck
BASE_DIR = Path(__file__).resolve().parent.parent
env = environ.Env()
environ.Env.read_env(BASE_DIR / ".env")
django_stubs_ext.monkeypatch()
# Common settings
DEBUG = env("DJANGO_DEBUG", default=False)
ALLOWED_HOSTS = env(
"DJANGO_ALLOWED_HOSTS",
list,
default=["localhost", "127.0.0.1"],
)
# Integrations
YANDEX_CLOUD_FOLDER_ID = env("YANDEX_CLOUD_FOLDER_ID", default=None)
YANDEX_CLOUD_API_KEY = env("YANDEX_CLOUD_API_KEY", default=None)
YANDEX_CLOUD_INTEGRATION_ENABLED = (
YANDEX_CLOUD_FOLDER_ID and YANDEX_CLOUD_API_KEY
)
# Register healthchecks
plugin_dir.register(YandexAIHealthCheck)
# Caching
REDIS_URI = env("REDIS_URI", default="redis://localhost:6379")
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.redis.RedisCache",
"LOCATION": REDIS_URI,
"TIMEOUT": None,
"KEY_PREFIX": "backend",
"VERSION": 1,
},
}
# Celery
CELERY_BROKER_URL = REDIS_URI
CELERY_RESULT_BACKEND = REDIS_URI
CELERY_TIMEZONE = "UTC"
CELERY_WORKER_SEND_TASK_EVENTS = True
CELERY_TASK_SEND_SENT_EVENT = True
CELERY_TASK_TRACK_STARTED = True
# Database
DB_URI = env.db_url("DJANGO_DB_URI", default="sqlite:///db.sqlite3")
DATABASES = {"default": {**DB_URI, "CONN_MAX_AGE": 50}}
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
# Password validation
AUTH_PASSWORD_VALIDATORS = [
{
"NAME": (
"django.contrib.auth."
"password_validation.UserAttributeSimilarityValidator"
),
},
{
"NAME": (
"django.contrib.auth.password_validation.MinimumLengthValidator"
),
},
{
"NAME": (
"django.contrib.auth.password_validation.CommonPasswordValidator"
),
},
{
"NAME": (
"django.contrib.auth.password_validation.NumericPasswordValidator"
),
},
]
# Static files (CSS, JavaScript, Images)
STATIC_ROOT = BASE_DIR / "static"
STATIC_URL = env("DJANGO_STATIC_URL", default="static/")
STATICFILES_DIRS: list[str] = []
STATICFILES_STORAGE = "django.contrib.staticfiles.storage.StaticFilesStorage"
STATICFILES_FINDERS = [
"django.contrib.staticfiles.finders.FileSystemFinder",
"django.contrib.staticfiles.finders.AppDirectoriesFinder",
]
# Files
FILE_UPLOAD_MAX_MEMORY_SIZE = 2621440
# Minio
MINIO_STORAGE_ENDPOINT = env("MINIO_ENDPOINT", default=None)
MINIO_STORAGE_ACCESS_KEY = env("MINIO_ACCESS_KEY", default=None)
MINIO_STORAGE_SECRET_KEY = env("MINIO_SECRET_KEY", default=None)
MINIO_STORAGE_USE_HTTPS = env("MINIO_USE_HTTPS", default=False)
MINIO_STORAGE_MEDIA_BUCKET_NAME = env(
"MINIO_MEDIA_BUCKET_NAME", default="adnova-media"
)
MINIO_STORAGE_AUTO_CREATE_MEDIA_BUCKET = True
MINIO_STORAGE_AUTO_CREATE_MEDIA_POLICY = "GET_ONLY"
MINIO_DEFAULT_CUSTOM_ENDPOINT_URL = (
"https://"
if MINIO_STORAGE_USE_HTTPS
else "http://" + str(MINIO_STORAGE_ENDPOINT)
)
MINIO_STORAGE_MEDIA_URL = (
env("MINIO_CUSTOM_ENDPOINT_URL", default=MINIO_DEFAULT_CUSTOM_ENDPOINT_URL)
+ "/"
f"{MINIO_STORAGE_MEDIA_BUCKET_NAME}"
)
MINIO_STORAGE_DEFAULT_ACL = "public-read"
STORAGES = {
"default": {
"BACKEND": "minio_storage.storage.MinioMediaStorage",
},
"staticfiles": {
"BACKEND": "django.contrib.staticfiles.storage.StaticFilesStorage",
},
}
# Cors
CORS_ALLOWED_ORIGINS_FROM_ENV = env("DJANGO_CORS_ALLOWED_ORIGINS", list, ["*"])
if CORS_ALLOWED_ORIGINS_FROM_ENV == ["*"]:
CORS_ALLOW_ALL_ORIGINS = True
else:
CORS_ALLOWED_ORIGINS = CORS_ALLOWED_ORIGINS_FROM_ENV
# Forms
FORM_RENDERER = "django.forms.renderers.DjangoTemplates"
FORMS_URLFIELD_ASSUME_HTTPS = False
# Internationalization
DATE_FORMAT = "N j, Y"
DATE_INPUT_FORMATS = [
"%Y-%m-%d", # '2006-10-25'
"%m/%d/%Y", # '10/25/2006'
"%m/%d/%y", # '10/25/06'
"%b %d %Y", # 'Oct 25 2006'
"%b %d, %Y", # 'Oct 25, 2006'
"%d %b %Y", # '25 Oct 2006'
"%d %b, %Y", # '25 Oct, 2006'
"%B %d %Y", # 'October 25 2006'
"%B %d, %Y", # 'October 25, 2006'
"%d %B %Y", # '25 October 2006'
"%d %B, %Y", # '25 October, 2006'
]
DATETIME_FORMAT = "N j, Y, H:i:s"
DATETIME_INPUT_FORMATS = [
"%Y-%m-%d %H:%M:%S", # '2006-10-25 14:30:59'
"%Y-%m-%d %H:%M:%S.%f", # '2006-10-25 14:30:59.000200'
"%Y-%m-%d %H:%M", # '2006-10-25 14:30'
"%m/%d/%Y %H:%M:%S", # '10/25/2006 14:30:59'
"%m/%d/%Y %H:%M:%S.%f", # '10/25/2006 14:30:59.000200'
"%m/%d/%Y %H:%M", # '10/25/2006 14:30'
"%m/%d/%y %H:%M:%S", # '10/25/06 14:30:59'
"%m/%d/%y %H:%M:%S.%f", # '10/25/06 14:30:59.000200'
"%m/%d/%y %H:%M", # '10/25/06 14:30'
]
DECIMAL_SEPARATOR = "."
FIRST_DAY_OF_WEEK = 1
FORMAT_MODULE_PATH = None
LANGUAGE_CODE = env("DJANGO_LANGUAGE_CODE", default="en-us")
LANGUAGES = [("en", _("English")), ("ru", _("Russian"))]
LOCALE_PATHS: list[str] = []
MONTH_DAY_FORMAT = "F j"
NUMBER_GROUPING = 0
SHORT_DATE_FORMAT = "m/d/Y"
SHORT_DATETIME_FORMAT = "m/d/Y H:i:s"
THOUSAND_SEPARATOR = ","
TIME_FORMAT = "H:i:s"
TIME_INPUT_FORMATS = [
"%H:%M:%S", # '14:30:59'
"%H:%M:%S.%f", # '14:30:59.000200'
"%H:%M", # '14:30'
]
TIME_ZONE = "UTC"
USE_I18N = True
USE_THOUSAND_SEPARATOR = True
USE_TZ = True
YEAR_MONTH_FORMAT = "F Y"
# HTTP
DATA_UPLOAD_MAX_MEMORY_SIZE = None
DATA_UPLOAD_MAX_NUMBER_FIELDS = None
DATA_UPLOAD_MAX_NUMBER_FILES = None
DEFAULT_CHARSET = "utf-8"
FORCE_SCRIPT_NAME = None
INTERNAL_IPS = env(
"DJANGO_INTERNAL_IPS",
list,
default=["127.0.0.1"],
)
MIDDLEWARE = [
"django_guid.middleware.guid_middleware",
"corsheaders.middleware.CorsMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
]
SIGNING_BACKEND = "django.core.signing.TimestampSigner"
USE_X_FORWARDED_HOST = False
USE_X_FORWARDED_PORT = False
WSGI_APPLICATION = "config.wsgi.application"
# Logging
LOGGER_NAME = "adnova"
LOGGER = logging.getLogger(LOGGER_NAME)
LOGGING_FILTERS = {
"require_debug_true": {
"()": "django.utils.log.RequireDebugTrue",
},
"require_debug_false": {
"()": "django.utils.log.RequireDebugFalse",
},
"correlation_id": {
"()": "django_guid.log_filters.CorrelationId",
},
}
LOGGING_FORMATTERS = {
"json": {
"()": "pythonjsonlogger.jsonlogger.JsonFormatter",
"format": (
"{levelname}{correlation_id}{asctime}"
"{name}{pathname}{lineno}{message}"
),
"style": "{",
},
"text": {
"()": "colorlog.ColoredFormatter",
"format": (
"{log_color}[{levelname}]{reset} "
"{light_black}{asctime} {name} | {pathname}:{lineno}{reset}\n"
"{bold_black}{message}{reset}"
),
"log_colors": {
"DEBUG": "bold_green",
"INFO": "bold_cyan",
"WARNING": "bold_yellow",
"ERROR": "bold_red",
"CRITICAL": "bold_purple",
},
"style": "{",
},
}
LOGGING_HANDLERS = {
"console_debug": {
"class": "logging.StreamHandler",
"level": "DEBUG",
"filters": ["require_debug_true"],
"formatter": "text",
},
"console_prod": {
"class": "logging.StreamHandler",
"level": "INFO",
"filters": ["require_debug_false", "correlation_id"],
"formatter": "json",
},
}
LOGGING_LOGGERS = {
"django": {
"handlers": ["console_debug", "console_prod"],
"level": "INFO" if DEBUG else "ERROR",
"propagate": False,
},
"django.request": {
"handlers": ["console_debug", "console_prod"],
"level": "INFO" if DEBUG else "ERROR",
"propagate": False,
},
"django.server": {
"handlers": ["console_debug"],
"level": "INFO",
"filters": ["require_debug_true"],
"propagate": False,
},
"django.template": {"handlers": []},
"django.db.backends.schema": {"handlers": []},
"django.security": {"handlers": [], "propagate": True},
"django.db.backends": {
"handlers": ["console_debug"],
"filters": ["require_debug_true"],
"level": "DEBUG",
"propagate": False,
},
"health-check": {
"handlers": ["console_debug", "console_prod"],
"level": "INFO" if DEBUG else "ERROR",
"propagate": False,
},
LOGGER_NAME: {
"handlers": ["console_debug", "console_prod"],
"level": "DEBUG" if DEBUG else "INFO",
"propagate": False,
},
"root": {
"handlers": ["console_debug", "console_prod"],
"level": "INFO" if DEBUG else "ERROR",
"propagate": False,
},
}
LOGGING = {
"version": 1,
"disable_existing_loggers": False,
"filters": LOGGING_FILTERS,
"formatters": LOGGING_FORMATTERS,
"handlers": LOGGING_HANDLERS,
"loggers": LOGGING_LOGGERS,
}
LOGGING_CONFIG = "logging.config.dictConfig"
# Models
ABSOLUTE_URL_OVERRIDES: dict[str, Callable] = {}
FIXTURE_DIRS: list[str] = []
INSTALLED_APPS = [
# Build-in apps
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
# Healthcheck
"health_check",
"health_check.db",
"health_check.cache",
"health_check.storage",
"health_check.contrib.migrations",
"health_check.contrib.celery",
"health_check.contrib.celery_ping",
# Third-party apps
"corsheaders",
"django_extensions",
"django_guid",
"ninja",
"minio_storage",
# Internal apps
"apps.core",
"apps.advertiser",
"apps.client",
"apps.mlscore",
"apps.campaign",
# API v1 apps
"api.v1.clients",
]
# GUID
DJANGO_GUID = {
"GUID_HEADER_NAME": "Correlation-ID",
"VALIDATE_GUID": True,
"RETURN_HEADER": True,
"EXPOSE_HEADER": True,
"INTEGRATIONS": [],
"IGNORE_URLS": [],
"UUID_LENGTH": 32,
}
# Security
LANGUAGE_COOKIE_AGE = 31449600
LANGUAGE_COOKIE_DOMAIN = None
LANGUAGE_COOKIE_HTTPONLY = False
LANGUAGE_COOKIE_NAME = "django_language"
LANGUAGE_COOKIE_PATH = "/"
LANGUAGE_COOKIE_SAMESITE = "Lax"
LANGUAGE_COOKIE_SECURE = False
SECURE_PROXY_SSL_HEADER = None
CSRF_COOKIE_AGE = 31449600
CSRF_COOKIE_DOMAIN = None
CSRF_COOKIE_HTTPONLY = False
CSRF_COOKIE_NAME = "djangocsrftoken"
CSRF_COOKIE_PATH = "/"
CSRF_COOKIE_SAMESITE = "Lax"
CSRF_COOKIE_SECURE = False
CSRF_FAILURE_VIEW = "django.views.csrf.csrf_failure"
CSRF_HEADER_NAME = "HTTP_X_CSRFTOKEN"
CSRF_TRUSTED_ORIGINS = env(
"DJANGO_CSRF_TRUSTED_ORIGINS",
list,
default=["http://localhost", "http://127.0.0.1"],
)
CSRF_USE_SESSIONS = False
SECRET_KEY = env("DJANGO_SECRET_KEY", default="very_insecure_key")
SECRET_KEY_FALLBACKS: list[str] = []
# Sessions
SESSION_CACHE_ALIAS = "default"
SESSION_COOKIE_AGE = 1209600
SESSION_COOKIE_DOMAIN = None
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_NAME = "djangosessionid"
SESSION_COOKIE_PATH = "/"
SESSION_COOKIE_SAMESITE = "Lax"
SESSION_COOKIE_SECURE = False
SESSION_ENGINE = "django.contrib.sessions.backends.db"
SESSION_EXPIRE_AT_BROWSER_CLOSE = False
SESSION_FILE_PATH = None
SESSION_SAVE_EVERY_REQUEST = False
SESSION_SERIALIZER = "django.contrib.sessions.serializers.JSONSerializer"
# Templates
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"autoescape": True,
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
"debug": DEBUG,
"string_if_invalid": "",
"file_charset": "utf-8",
},
},
]
# Testing
TEST_NON_SERIALIZED_APPS: list[str] = []
TEST_RUNNER = "django.test.runner.DiscoverRunner"
# URLs
ROOT_URLCONF = "config.urls"
# debug-toolbar
DEBUG_TOOLBAR_ENABLED = False
with contextlib.suppress(Exception):
import debug_toolbar # noqa: F401
DEBUG_TOOLBAR_ENABLED = True
DEBUG_TOOLBAR_CONFIG = {"SHOW_COLLAPSED": True, "UPDATE_ON_FETCH": True}
if DEBUG and DEBUG_TOOLBAR_ENABLED:
INSTALLED_APPS.append("debug_toolbar")
MIDDLEWARE.append("debug_toolbar.middleware.DebugToolbarMiddleware")
+34
View File
@@ -0,0 +1,34 @@
"""URL configuration for AdNova."""
from django.conf import settings
from django.contrib import admin
from django.urls import include, path
from config import handlers
admin.site.site_title = "AdNova"
admin.site.site_header = "AdNova"
admin.site.index_title = "AdNova"
urlpatterns = [
# Admin urls
path("admin/", admin.site.urls),
# API urls
path("", include("api.urls")),
]
if settings.DEBUG and settings.DEBUG_TOOLBAR_ENABLED:
from debug_toolbar.toolbar import debug_toolbar_urls
urlpatterns += debug_toolbar_urls()
handler400 = handlers.handler400
handler403 = handlers.handler403
handler404 = handlers.handler404
handler500 = handlers.handler500
+9
View File
@@ -0,0 +1,9 @@
"""WSGI config for AdNova."""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
application = get_wsgi_application()
@@ -0,0 +1,63 @@
# ruff: noqa: E501, W291
import logging
from django.conf import settings
from yandex_cloud_ml_sdk import YCloudML
from yandex_cloud_ml_sdk.exceptions import YCloudMLError
logger = logging.getLogger(__name__)
AD_PROMPT_TEMPLATE = """
Сгенерируй креативный рекламный текст для рекламодателя: "{advertiser_name}",
который проводит рекламную кампанию с названием: "{ad_title}"
Требования:
1. Текст должен быть максимально привлекательным и продающим
2. Использовать современные маркетинговые приемы
3. Включить призыв к действию
4. Соблюдать структуру: заголовок - основной текст - заключение
5. Длина: 3-6 коротких предложений
6. Ответ должен содержать только текст рекламы без дополнительных комментариев
Пример хорошего текста:
Запустите свой бизнес в космос с {{advertiser_name}}! Кампания "{{ad_title}}" предлагает
уникальные решения для цифрового продвижения. Присоединяйтесь к лидерам рынка - получите
персональную консультацию сегодня!
""".strip()
class YandexAIAdTextGenerator:
def __init__(self) -> None:
self.sdk = YCloudML(
folder_id=settings.YANDEX_CLOUD_FOLDER_ID,
auth=settings.YANDEX_CLOUD_API_KEY,
)
def generate_ad_text(
self, advertiser_name: str, ad_title: str
) -> str | None:
try:
prompt = AD_PROMPT_TEMPLATE.format(
advertiser_name=advertiser_name, ad_title=ad_title
)
promise = (
self.sdk.models.completions(
"yandexgpt-lite", model_version="latest"
)
.configure(max_tokens=400, temperature=1.0)
.run_deferred([{"role": "user", "text": prompt}])
)
result = promise.wait()
logger.debug("Generated ad text: %s", result)
return self._clean_response(result.alternatives[0].text)
except YCloudMLError:
return None
def _clean_response(self, text: str) -> str:
cleaned = text.strip()
cleaned = cleaned.replace('"', "")
return " ".join(cleaned.splitlines())
@@ -0,0 +1,27 @@
from django.conf import settings
from health_check.backends import BaseHealthCheckBackend
from yandex_cloud_ml_sdk import YCloudML
from yandex_cloud_ml_sdk.exceptions import YCloudMLError
class YandexAIHealthCheck(BaseHealthCheckBackend):
critical_service = False
def check_status(self) -> None:
try:
sdk = YCloudML(
folder_id=settings.YANDEX_CLOUD_FOLDER_ID,
auth=settings.YANDEX_CLOUD_API_KEY,
)
result = sdk.models.completions(
"yandexgpt-lite", model_version="latest"
).tokenize("ping")
if not result:
self.add_error("YandexAI API is unaccessible")
except YCloudMLError:
self.add_error("YandexAI API is unaccessible")
def identifier(self) -> str:
return self.__class__.__name__
@@ -0,0 +1,57 @@
# ruff: noqa: E501, W291
import logging
from django.conf import settings
from yandex_cloud_ml_sdk import YCloudML
from yandex_cloud_ml_sdk.exceptions import YCloudMLError
logger = logging.getLogger(__name__)
DEFAULT_INVALID_SIGNAL = (
"В интернете есть много сайтов с информацией на эту тему. "
"[Посмотрите, что нашлось в поиске](https://ya.ru)"
).lower()
MODERATION_PROMPT = """
Ты — строгий AI-модератор контента. Анализируй текст ПО ВСЕМ указанным критериям.
Если ЛЮБОЙ из критериев нарушен — верни true. Только если ВСЕ критерии соблюдены — верни false.
Критерии нарушений (true):
1. Нецензурная лексика: мат, эвфемизмы, оскорбительные выражения
2. Угрозы: прямые/косвенные угрозы жизни, шантаж, буллинг
3. Дискриминация: расизм, сексизм, ксенофобия, гомофобия
""".strip()
class YandexAIModerator:
def __init__(self) -> None:
self.sdk = YCloudML(
folder_id=settings.YANDEX_CLOUD_FOLDER_ID,
auth=settings.YANDEX_CLOUD_API_KEY,
)
def get_moderation_verdict(self, text: str) -> bool:
try:
promise = (
self.sdk.models.completions(
"yandexgpt-lite", model_version="latest"
)
.configure(max_tokens=500, temperature=0.1)
.run_deferred(
[
{"role": "system", "text": MODERATION_PROMPT},
{"role": "user", "text": text},
]
)
)
result = promise.wait()
logger.debug("Moderation API response: %s", result)
return self._normalize_response(result.alternatives[0].text)
except YCloudMLError:
return False
def _normalize_response(self, text: str) -> bool:
clean_verdict = text.strip().lower().split("\n")[0]
return clean_verdict in ("true", DEFAULT_INVALID_SIGNAL)

Some files were not shown because too many files have changed in this diff Show More