<type>(scope): <description>

[body]

[footer(s)]
This commit is contained in:
ITQ
2025-03-01 13:06:30 +03:00
parent 688862ca78
commit 07dc5210a0
4 changed files with 207 additions and 18 deletions
+18 -5
View File
@@ -1,4 +1,3 @@
from random import choice
from uuid import uuid4
from django.db import models
@@ -7,7 +6,6 @@ from apps.task.validators import ContestTaskCriteriesValidator
from apps.competition.models import Competition
from apps.core.models import BaseModel
from apps.user.models import User
from apps.task.models import CompetitionTask
class CompetitionTask(BaseModel):
@@ -52,14 +50,29 @@ class CompetetionTaskSumbission(BaseModel):
def submission_stdout_upload_to(instance, filename) -> str:
return f"/submissions/{instance.id}/stdout"
status = models.CharField(
choices=StatusChoices.choices, default=StatusChoices.SENT, max_length=2
)
user = models.ForeignKey(User, on_delete=models.CASCADE)
task = models.ForeignKey(CompetitionTask, on_delete=models.CASCADE)
status = models.CharField(
choices=StatusChoices.choices,
default=StatusChoices.SENT,
max_length=8,
)
# code or text or file
content = models.FileField(upload_to=submission_content_upload_to)
# only if task type is checker
stdout = models.FileField(
upload_to=submission_stdout_upload_to, null=True, blank=True
)
# depends on task type:
# - input: {"correct": boolean}
# - file: {"total_points": integer, "by_criteria": ["criteria_name": integer]}
# - code: {"correct": boolean}
result = models.JSONField(default=None, null=True, blank=True)
# just more readable result representation, maybe will be calcuated somehow more complex depends on criteria
earned_points = models.IntegerField()
timestamp = models.DateTimeField(auto_now_add=True)
+128
View File
@@ -0,0 +1,128 @@
import tempfile
import os
import sys
import ast
from io import StringIO
import hashlib
from config.celery import app
ALLOWED_MODULES = {
"pandas",
"numpy",
"matplotlib",
"seaborn",
"scipy",
"sklearn",
"datetime",
"json",
"csv",
"math",
"statistics",
}
class SecurityException(Exception):
pass
def validate_code(code_str):
try:
tree = ast.parse(code_str)
except SyntaxError as e:
raise SecurityException(f"Syntax error: {str(e)}")
class ImportVisitor(ast.NodeVisitor):
def visit_Import(self, node):
for alias in node.names:
module = alias.name.split(".")[0]
if module not in ALLOWED_MODULES:
raise SecurityException(f"Disallowed import: {module}")
def visit_ImportFrom(self, node):
if node.module:
module = node.module.split(".")[0]
if module not in ALLOWED_MODULES:
raise SecurityException(
f"Disallowed import from: {module}"
)
class SecurityVisitor(ast.NodeVisitor):
def generic_visit(self, node):
if isinstance(node, (ast.Call, ast.Attribute)):
if "system" in getattr(node, "attr", ""):
raise SecurityException("Dangerous system call detected")
super().generic_visit(node)
try:
ImportVisitor().visit(tree)
SecurityVisitor().visit(tree)
except SecurityException as e:
raise
except Exception as e:
raise SecurityException(f"Security check failed: {str(e)}")
def secure_exec(code_str, result_path):
original_dir = os.getcwd()
original_stdout = sys.stdout
sys.stdout = captured_stdout = StringIO()
result_content = None
with tempfile.TemporaryDirectory() as temp_dir:
try:
os.chdir(temp_dir)
restricted_globals = {
"__builtins__": {
"open": lambda f, *a, **kw: open(f, *a, **kw),
"print": print,
"str": str,
"int": int,
"float": float,
"bool": bool,
"list": list,
"dict": dict,
"tuple": tuple,
"set": set,
}
}
exec(code_str, restricted_globals)
if result_path == "stdout":
result_content = captured_stdout.getvalue().encode("utf-8")
else:
with open(result_path, "rb") as f:
result_content = f.read()
except Exception as e:
raise RuntimeError(f"Execution error: {str(e)}")
finally:
os.chdir(original_dir)
sys.stdout = original_stdout
return result_content
@app.task(bind=True)
def analyze_data_task(self, code_str, result_path, expected_bytes):
try:
validate_code(code_str)
result_content = secure_exec(code_str, result_path)
result_hash = hashlib.sha256(result_content).hexdigest()
expected_hash = hashlib.sha256(expected_bytes).hexdigest()
return {
"success": True,
"match": result_hash == expected_hash,
"result_hash": result_hash,
"expected_hash": expected_hash,
}
except SecurityException as e:
return {"success": False, "error": f"Security violation: {str(e)}"}
except RuntimeError as e:
return {"success": False, "error": f"Execution error: {str(e)}"}
except Exception as e:
return {"success": False, "error": f"Unexpected error: {str(e)}"}