From d08e40a683497e2ebd3e0b6206a868f151d641b1 Mon Sep 17 00:00:00 2001 From: ITQ Date: Fri, 13 Feb 2026 16:10:48 +0300 Subject: [PATCH] feat(dsl): added dsl for user targeting --- src/backend/libs/__init__.py | 0 src/backend/libs/dsl/README.md | 54 ++++ src/backend/libs/dsl/__init__.py | 3 + src/backend/libs/dsl/evaluator.py | 146 ++++++++++ src/backend/libs/dsl/exceptions.py | 18 ++ src/backend/libs/dsl/helpers.py | 20 ++ src/backend/libs/dsl/lexer.py | 162 +++++++++++ src/backend/libs/dsl/nodes.py | 41 +++ src/backend/libs/dsl/parser.py | 162 +++++++++++ src/backend/libs/dsl/tests/__init__.py | 0 src/backend/libs/dsl/tests/test_evaluator.py | 262 ++++++++++++++++++ .../libs/dsl/tests/test_integration.py | 155 +++++++++++ src/backend/libs/dsl/tests/test_lexer.py | 196 +++++++++++++ src/backend/libs/dsl/tests/test_parser.py | 173 ++++++++++++ src/backend/libs/dsl/tokens.py | 37 +++ 15 files changed, 1429 insertions(+) create mode 100644 src/backend/libs/__init__.py create mode 100644 src/backend/libs/dsl/README.md create mode 100644 src/backend/libs/dsl/__init__.py create mode 100644 src/backend/libs/dsl/evaluator.py create mode 100644 src/backend/libs/dsl/exceptions.py create mode 100644 src/backend/libs/dsl/helpers.py create mode 100644 src/backend/libs/dsl/lexer.py create mode 100644 src/backend/libs/dsl/nodes.py create mode 100644 src/backend/libs/dsl/parser.py create mode 100644 src/backend/libs/dsl/tests/__init__.py create mode 100644 src/backend/libs/dsl/tests/test_evaluator.py create mode 100644 src/backend/libs/dsl/tests/test_integration.py create mode 100644 src/backend/libs/dsl/tests/test_lexer.py create mode 100644 src/backend/libs/dsl/tests/test_parser.py create mode 100644 src/backend/libs/dsl/tokens.py diff --git a/src/backend/libs/__init__.py b/src/backend/libs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/libs/dsl/README.md b/src/backend/libs/dsl/README.md new file mode 100644 index 0000000..f8b248f --- /dev/null +++ b/src/backend/libs/dsl/README.md @@ -0,0 +1,54 @@ +# DSL + +# EBNF Notation + +``` + +expression = or_expr ; + +or_expr = and_expr { "OR" and_expr } ; + +and_expr = not_expr { "AND" not_expr } ; + +not_expr = "NOT" not_expr + | comparison ; + +comparison = primary comp_op primary + | primary "IN" list_literal + | primary "NOT" "IN" list_literal + | primary ; + +comp_op = "==" | "!=" | ">" | ">=" | "<" | "<=" ; + +primary = "(" or_expr ")" + | literal + | identifier ; + +list_literal = "[" [ literal { "," literal } ] "]" ; + +literal = string | number | bool ; + +string = ( '"' { char | escape } '"' ) + | ( "'" { char | escape } "'" ) ; + +escape = "\" ? any character ? ; + +number = [ "-" ] digit { digit } [ "." digit { digit } ] ; + +bool = "true" | "false" ; (* case-insensitive *) + +identifier = ( letter | "_" ) { letter | digit | "_" | "." } ; + +(* Terminal definitions *) + +letter = "A" | ... | "Z" | "a" | ... | "z" ; +digit = "0" | ... | "9" ; +char = ? any character except quote and backslash ? ; + +(* Notes *) +(* Keywords AND, OR, NOT, IN, true, false are case-insensitive. *) +(* Whitespace between tokens is ignored. *) +(* Operator precedence (low to high): OR, AND, NOT, comparison. *) +(* Dotted identifiers (e.g. user.country) resolve via nested map. *) +(* Date values are strings matching "YYYY-MM-DD"; coerced at evaluation time, not at parse time. *) +``` diff --git a/src/backend/libs/dsl/__init__.py b/src/backend/libs/dsl/__init__.py new file mode 100644 index 0000000..89d3c32 --- /dev/null +++ b/src/backend/libs/dsl/__init__.py @@ -0,0 +1,3 @@ +__all__ = ["evaluate", "parse"] + +from .helpers import evaluate, parse diff --git a/src/backend/libs/dsl/evaluator.py b/src/backend/libs/dsl/evaluator.py new file mode 100644 index 0000000..f47fc91 --- /dev/null +++ b/src/backend/libs/dsl/evaluator.py @@ -0,0 +1,146 @@ +import re +from datetime import date +from typing import Any + +from .exceptions import EvaluationError +from .nodes import ( + BinaryOp, + Comparison, + Expression, + Identifier, + ListLiteral, + Literal, + UnaryOp, +) + +_MISSING = object() + +_DATE_RE = re.compile(r"^\d{4}-\d{2}-\d{2}$") + + +def _try_date(value: Any) -> date | None: + if isinstance(value, date): + return value + if isinstance(value, str) and _DATE_RE.match(value): + try: + return date.fromisoformat(value) + except ValueError: + return None + return None + + +def _coerce(left: Any, right: Any) -> tuple[Any, Any]: + if type(left) is type(right): + return left, right + + left_date = _try_date(left) + right_date = _try_date(right) + if left_date is not None and right_date is not None: + return left_date, right_date + + return left, right + + +def _values_equal(left: Any, right: Any) -> bool: + coerced_left, coerced_right = _coerce(left, right) + try: + return coerced_left == coerced_right + except TypeError: + return False + + +class Evaluator: + def __init__(self, variables: dict[str, Any]) -> None: + self._variables = variables + + def evaluate(self, node: Expression) -> bool: + return self._as_bool(self._eval(node)) + + def _as_bool(self, value: Any) -> bool: + if value is _MISSING: + return False + return bool(value) + + def _eval(self, node: Expression) -> Any: + match node: + case Literal(value=value): + return value + case Identifier(name=name): + return self._resolve(name) + case UnaryOp(operator="NOT", operand=operand): + value = self._eval(operand) + if value is _MISSING: + return False + return not self._as_bool(value) + case BinaryOp(operator="AND", left=left, right=right): + if not self._as_bool(self._eval(left)): + return False + return self._as_bool(self._eval(right)) + case BinaryOp(operator="OR", left=left, right=right): + if self._as_bool(self._eval(left)): + return True + return self._as_bool(self._eval(right)) + case Comparison() as comp: + return self._compare(comp) + case _: + msg = f"Unknown node: {type(node).__name__}" + raise EvaluationError(msg) + + def _resolve(self, name: str) -> Any: + parts = name.split(".") + current: Any = self._variables + for part in parts: + if not isinstance(current, dict): + return _MISSING + if part not in current: + return _MISSING + current = current[part] + return current + + def _compare(self, node: Comparison) -> bool: + left = self._eval(node.left) + if left is _MISSING: + return False + + if node.operator in {"IN", "NOT IN"}: + return self._membership(left, node) + + right = self._eval(node.right) + if right is _MISSING: + return False + + left, right = _coerce(left, right) + return self._apply_operator(node.operator, left, right) + + def _membership(self, left: Any, node: Comparison) -> bool: + if not isinstance(node.right, ListLiteral): + msg = "IN/NOT IN requires a list" + raise EvaluationError(msg) + + items = [self._eval(item) for item in node.right.items] + found = any(_values_equal(left, item) for item in items) + + if node.operator == "IN": + return found + return not found + + def _apply_operator(self, operator: str, left: Any, right: Any) -> bool: + try: + match operator: + case "==": + return left == right + case "!=": + return left != right + case ">": + return left > right + case ">=": + return left >= right + case "<": + return left < right + case "<=": + return left <= right + case _: + msg = f"Unknown operator: {operator}" + raise EvaluationError(msg) + except TypeError: + return False diff --git a/src/backend/libs/dsl/exceptions.py b/src/backend/libs/dsl/exceptions.py new file mode 100644 index 0000000..cdd0875 --- /dev/null +++ b/src/backend/libs/dsl/exceptions.py @@ -0,0 +1,18 @@ +class DSLError(Exception): + def __init__(self, message: str, position: int | None = None) -> None: + self.position = position + if position is not None: + message = f"{message} (position {position})" + super().__init__(message) + + +class LexerError(DSLError): + pass + + +class ParserError(DSLError): + pass + + +class EvaluationError(DSLError): + pass diff --git a/src/backend/libs/dsl/helpers.py b/src/backend/libs/dsl/helpers.py new file mode 100644 index 0000000..42b7537 --- /dev/null +++ b/src/backend/libs/dsl/helpers.py @@ -0,0 +1,20 @@ +from typing import Any + +from .evaluator import Evaluator +from .lexer import Lexer +from .nodes import Expression +from .parser import Parser + + +def evaluate(expression: str, variables: dict[str, Any]) -> bool: + print("Expression:", expression) + tokens = Lexer(expression).tokenize() + print("Tokens:", tokens) + ast = Parser(tokens).parse() + print("AST:", ast) + return Evaluator(variables).evaluate(ast) + + +def parse(expression: str) -> Expression: + tokens = Lexer(expression).tokenize() + return Parser(tokens).parse() diff --git a/src/backend/libs/dsl/lexer.py b/src/backend/libs/dsl/lexer.py new file mode 100644 index 0000000..959e42e --- /dev/null +++ b/src/backend/libs/dsl/lexer.py @@ -0,0 +1,162 @@ +from .exceptions import LexerError +from .tokens import Token, TokenType + +_KEYWORDS: dict[str, TokenType] = { + "AND": TokenType.AND, + "OR": TokenType.OR, + "NOT": TokenType.NOT, + "IN": TokenType.IN, +} + +_SINGLE_CHAR_TOKENS: dict[str, TokenType] = { + "(": TokenType.LPAREN, + ")": TokenType.RPAREN, + "[": TokenType.LBRACKET, + "]": TokenType.RBRACKET, + ",": TokenType.COMMA, +} + + +class Lexer: + def __init__(self, expression: str) -> None: + self._expression = expression + self._pos = 0 + self._tokens: list[Token] = [] + + def tokenize(self) -> list[Token]: + while self._pos < len(self._expression): + self._skip_whitespace() + if self._pos >= len(self._expression): + break + + char = self._expression[self._pos] + + if char in {'"', "'"}: + self._read_string() + elif char.isdigit() or (char == "-" and self._next_is_digit()): + self._read_number() + elif char.isalpha() or char == "_": + self._read_word() + elif char in _SINGLE_CHAR_TOKENS: + self._tokens.append( + Token(_SINGLE_CHAR_TOKENS[char], char, self._pos) + ) + self._pos += 1 + else: + self._read_operator() + + self._tokens.append(Token(TokenType.EOF, None, self._pos)) + return self._tokens + + def _skip_whitespace(self) -> None: + while ( + self._pos < len(self._expression) + and self._expression[self._pos].isspace() + ): + self._pos += 1 + + def _peek(self, offset: int = 1) -> str | None: + target = self._pos + offset + if target < len(self._expression): + return self._expression[target] + return None + + def _next_is_digit(self) -> bool: + nxt = self._peek() + return nxt is not None and nxt.isdigit() + + def _read_string(self) -> None: + quote = self._expression[self._pos] + start = self._pos + self._pos += 1 + parts: list[str] = [] + + while self._pos < len(self._expression): + char = self._expression[self._pos] + if char == "\\": + self._pos += 1 + if self._pos >= len(self._expression): + raise LexerError("Unexpected end of string", start) + parts.append(self._expression[self._pos]) + self._pos += 1 + elif char == quote: + self._pos += 1 + self._tokens.append( + Token(TokenType.STRING, "".join(parts), start) + ) + return + else: + parts.append(char) + self._pos += 1 + + raise LexerError("Unterminated string", start) + + def _read_number(self) -> None: + start = self._pos + has_dot = False + + if self._expression[self._pos] == "-": + self._pos += 1 + + while self._pos < len(self._expression): + char = self._expression[self._pos] + if char.isdigit(): + self._pos += 1 + elif char == "." and not has_dot: + has_dot = True + self._pos += 1 + else: + break + + raw = self._expression[start : self._pos] + value: int | float = float(raw) if has_dot else int(raw) + self._tokens.append(Token(TokenType.NUMBER, value, start)) + + def _read_word(self) -> None: + start = self._pos + while self._pos < len(self._expression) and ( + self._expression[self._pos].isalnum() + or self._expression[self._pos] in {"_", "."} + ): + self._pos += 1 + + word = self._expression[start : self._pos] + upper = word.upper() + + if upper in {"TRUE", "FALSE"}: + self._tokens.append(Token(TokenType.BOOL, upper == "TRUE", start)) + elif upper in _KEYWORDS: + self._tokens.append(Token(_KEYWORDS[upper], word, start)) + else: + self._tokens.append(Token(TokenType.IDENTIFIER, word, start)) + + def _read_operator(self) -> None: + start = self._pos + char = self._expression[self._pos] + nxt = self._peek() + + two_char_ops: dict[str, TokenType] = { + "==": TokenType.EQ, + "!=": TokenType.NEQ, + ">=": TokenType.GTE, + "<=": TokenType.LTE, + } + + if nxt is not None: + pair = char + nxt + if pair in two_char_ops: + self._tokens.append(Token(two_char_ops[pair], pair, start)) + self._pos += 2 + return + + one_char_ops: dict[str, TokenType] = { + ">": TokenType.GT, + "<": TokenType.LT, + } + + if char in one_char_ops: + self._tokens.append(Token(one_char_ops[char], char, start)) + self._pos += 1 + return + + raise LexerError(f"Unexpected character: {char!r}", start) diff --git a/src/backend/libs/dsl/nodes.py b/src/backend/libs/dsl/nodes.py new file mode 100644 index 0000000..bc4e3f1 --- /dev/null +++ b/src/backend/libs/dsl/nodes.py @@ -0,0 +1,41 @@ +from dataclasses import dataclass + + +@dataclass(frozen=True, slots=True) +class Literal: + value: str | int | float | bool + + +@dataclass(frozen=True, slots=True) +class ListLiteral: + items: tuple[Literal, ...] + + +@dataclass(frozen=True, slots=True) +class Identifier: + name: str + + +@dataclass(frozen=True, slots=True) +class UnaryOp: + operator: str + operand: "Expression" + + +@dataclass(frozen=True, slots=True) +class BinaryOp: + operator: str + left: "Expression" + right: "Expression" + + +@dataclass(frozen=True, slots=True) +class Comparison: + operator: str + left: "Expression" + right: "Expression" + + +type Expression = ( + Literal | ListLiteral | Identifier | UnaryOp | BinaryOp | Comparison +) diff --git a/src/backend/libs/dsl/parser.py b/src/backend/libs/dsl/parser.py new file mode 100644 index 0000000..3378cf8 --- /dev/null +++ b/src/backend/libs/dsl/parser.py @@ -0,0 +1,162 @@ +from .exceptions import ParserError +from .nodes import ( + BinaryOp, + Comparison, + Expression, + Identifier, + ListLiteral, + Literal, + UnaryOp, +) +from .tokens import Token, TokenType + +_COMPARISON_OPS: dict[TokenType, str] = { + TokenType.EQ: "==", + TokenType.NEQ: "!=", + TokenType.GT: ">", + TokenType.GTE: ">=", + TokenType.LT: "<", + TokenType.LTE: "<=", +} + + +class Parser: + def __init__(self, tokens: list[Token]) -> None: + self._tokens = tokens + self._pos = 0 + + def parse(self) -> Expression: + result = self._or_expr() + if self._current().type != TokenType.EOF: + token = self._current() + raise ParserError( + f"Unexpected token: {token.value!r}", + token.position, + ) + return result + + def _current(self) -> Token: + return self._tokens[self._pos] + + def _advance(self) -> Token: + token = self._tokens[self._pos] + self._pos += 1 + return token + + def _expect(self, token_type: TokenType) -> Token: + token = self._current() + if token.type != token_type: + raise ParserError( + f"Expected {token_type.name}, got {token.type.name}", + token.position, + ) + return self._advance() + + def _or_expr(self) -> Expression: + left = self._and_expr() + while self._current().type == TokenType.OR: + self._advance() + right = self._and_expr() + left = BinaryOp(operator="OR", left=left, right=right) + return left + + def _and_expr(self) -> Expression: + left = self._not_expr() + while self._current().type == TokenType.AND: + self._advance() + right = self._not_expr() + left = BinaryOp(operator="AND", left=left, right=right) + return left + + def _not_expr(self) -> Expression: + if self._current().type == TokenType.NOT: + self._advance() + operand = self._not_expr() + return UnaryOp(operator="NOT", operand=operand) + return self._comparison() + + def _comparison(self) -> Expression: + left = self._primary() + + current = self._current() + + if current.type in _COMPARISON_OPS: + operator = _COMPARISON_OPS[current.type] + self._advance() + right = self._primary() + return Comparison(operator=operator, left=left, right=right) + + if current.type == TokenType.IN: + self._advance() + right_list = self._list_literal() + return Comparison(operator="IN", left=left, right=right_list) + + if current.type == TokenType.NOT: + next_pos = self._pos + 1 + if ( + next_pos < len(self._tokens) + and self._tokens[next_pos].type == TokenType.IN + ): + self._advance() + self._advance() + right_list = self._list_literal() + return Comparison( + operator="NOT IN", + left=left, + right=right_list, + ) + + return left + + def _list_literal(self) -> ListLiteral: + self._expect(TokenType.LBRACKET) + items: list[Literal] = [] + + if self._current().type != TokenType.RBRACKET: + items.append(self._literal_value()) + while self._current().type == TokenType.COMMA: + self._advance() + items.append(self._literal_value()) + + self._expect(TokenType.RBRACKET) + return ListLiteral(items=tuple(items)) + + def _literal_value(self) -> Literal: + token = self._current() + if token.type in { + TokenType.STRING, + TokenType.NUMBER, + TokenType.BOOL, + }: + self._advance() + return Literal(value=token.value) + raise ParserError( + f"Expected literal, got {token.type.name}", + token.position, + ) + + def _primary(self) -> Expression: + token = self._current() + + if token.type == TokenType.LPAREN: + self._advance() + expr = self._or_expr() + self._expect(TokenType.RPAREN) + return expr + + if token.type in { + TokenType.STRING, + TokenType.NUMBER, + TokenType.BOOL, + }: + self._advance() + return Literal(value=token.value) + + if token.type == TokenType.IDENTIFIER: + self._advance() + return Identifier(name=token.value) + + raise ParserError( + f"Unexpected token: {token.type.name}", + token.position, + ) diff --git a/src/backend/libs/dsl/tests/__init__.py b/src/backend/libs/dsl/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/backend/libs/dsl/tests/test_evaluator.py b/src/backend/libs/dsl/tests/test_evaluator.py new file mode 100644 index 0000000..a398fac --- /dev/null +++ b/src/backend/libs/dsl/tests/test_evaluator.py @@ -0,0 +1,262 @@ +import unittest +from datetime import date + +from libs.dsl.evaluator import Evaluator +from libs.dsl.lexer import Lexer +from libs.dsl.parser import Parser + + +def _eval(expression: str, variables: dict) -> bool: + tokens = Lexer(expression).tokenize() + ast = Parser(tokens).parse() + return Evaluator(variables).evaluate(ast) + + +class ComparisonTests(unittest.TestCase): + def test_equal_strings(self) -> None: + self.assertTrue(_eval('x == "hello"', {"x": "hello"})) + + def test_equal_strings_mismatch(self) -> None: + self.assertFalse(_eval('x == "hello"', {"x": "world"})) + + def test_not_equal(self) -> None: + self.assertTrue(_eval('x != "hello"', {"x": "world"})) + + def test_not_equal_same(self) -> None: + self.assertFalse(_eval('x != "hello"', {"x": "hello"})) + + def test_greater_than(self) -> None: + self.assertTrue(_eval("x > 5", {"x": 10})) + self.assertFalse(_eval("x > 5", {"x": 3})) + self.assertFalse(_eval("x > 5", {"x": 5})) + + def test_greater_equal(self) -> None: + self.assertTrue(_eval("x >= 5", {"x": 5})) + self.assertTrue(_eval("x >= 5", {"x": 6})) + self.assertFalse(_eval("x >= 5", {"x": 4})) + + def test_less_than(self) -> None: + self.assertTrue(_eval("x < 5", {"x": 3})) + self.assertFalse(_eval("x < 5", {"x": 10})) + self.assertFalse(_eval("x < 5", {"x": 5})) + + def test_less_equal(self) -> None: + self.assertTrue(_eval("x <= 5", {"x": 5})) + self.assertTrue(_eval("x <= 5", {"x": 4})) + self.assertFalse(_eval("x <= 5", {"x": 6})) + + +class MembershipTests(unittest.TestCase): + def test_in_list_match(self) -> None: + self.assertTrue(_eval('country IN ["RU", "KZ"]', {"country": "RU"})) + + def test_in_list_no_match(self) -> None: + self.assertFalse(_eval('country IN ["RU", "KZ"]', {"country": "US"})) + + def test_not_in_list_match(self) -> None: + self.assertTrue( + _eval( + 'country NOT IN ["RU", "KZ"]', + {"country": "US"}, + ) + ) + + def test_not_in_list_no_match(self) -> None: + self.assertFalse( + _eval( + 'country NOT IN ["RU", "KZ"]', + {"country": "RU"}, + ) + ) + + def test_in_empty_list(self) -> None: + self.assertFalse(_eval("x IN []", {"x": 1})) + + def test_not_in_empty_list(self) -> None: + self.assertTrue(_eval("x NOT IN []", {"x": 1})) + + def test_in_number_list(self) -> None: + self.assertTrue(_eval("status IN [1, 2, 3]", {"status": 2})) + self.assertFalse(_eval("status IN [1, 2, 3]", {"status": 5})) + + +class LogicTests(unittest.TestCase): + def test_and_both_true(self) -> None: + self.assertTrue(_eval("x == 1 AND y == 2", {"x": 1, "y": 2})) + + def test_and_left_false(self) -> None: + self.assertFalse(_eval("x == 1 AND y == 2", {"x": 0, "y": 2})) + + def test_and_right_false(self) -> None: + self.assertFalse(_eval("x == 1 AND y == 2", {"x": 1, "y": 3})) + + def test_or_both_false(self) -> None: + self.assertFalse(_eval("x == 1 OR y == 2", {"x": 0, "y": 0})) + + def test_or_left_true(self) -> None: + self.assertTrue(_eval("x == 1 OR y == 2", {"x": 1, "y": 0})) + + def test_or_right_true(self) -> None: + self.assertTrue(_eval("x == 1 OR y == 2", {"x": 0, "y": 2})) + + def test_not_true(self) -> None: + self.assertFalse(_eval("NOT active", {"active": True})) + + def test_not_false(self) -> None: + self.assertTrue(_eval("NOT active", {"active": False})) + + def test_and_precedence_over_or(self) -> None: + self.assertTrue( + _eval( + "a == 1 OR b == 2 AND c == 3", + {"a": 1, "b": 0, "c": 0}, + ) + ) + + def test_parentheses_override_precedence(self) -> None: + self.assertFalse( + _eval( + "(a == 1 OR b == 2) AND c == 3", + {"a": 1, "b": 0, "c": 0}, + ) + ) + + def test_double_not(self) -> None: + self.assertTrue(_eval("NOT NOT true", {})) + self.assertFalse(_eval("NOT NOT false", {})) + + +class MissingAttributeTests(unittest.TestCase): + def test_missing_in_equality(self) -> None: + self.assertFalse(_eval('x == "hello"', {})) + + def test_missing_in_membership(self) -> None: + self.assertFalse(_eval('x IN ["a", "b"]', {})) + + def test_missing_in_not_in(self) -> None: + self.assertFalse(_eval('x NOT IN ["a", "b"]', {})) + + def test_missing_in_and_left(self) -> None: + self.assertFalse(_eval("x == 1 AND y == 2", {"y": 2})) + + def test_missing_in_and_right(self) -> None: + self.assertFalse(_eval("x == 1 AND y == 2", {"x": 1})) + + def test_missing_in_or_recoverable(self) -> None: + self.assertTrue(_eval("x == 1 OR y == 2", {"y": 2})) + + def test_missing_in_or_both(self) -> None: + self.assertFalse(_eval("x == 1 OR y == 2", {})) + + def test_missing_with_not(self) -> None: + self.assertFalse(_eval("NOT missing", {})) + + def test_missing_right_side(self) -> None: + self.assertFalse(_eval("x == y", {"x": 1})) + + def test_missing_both_sides(self) -> None: + self.assertFalse(_eval("x == y", {})) + + +class NestedVariableTests(unittest.TestCase): + def test_single_level(self) -> None: + self.assertTrue( + _eval( + 'user.country == "RU"', + {"user": {"country": "RU"}}, + ) + ) + + def test_deep_nesting(self) -> None: + self.assertTrue( + _eval( + "a.b.c == 1", + {"a": {"b": {"c": 1}}}, + ) + ) + + def test_missing_nested_key(self) -> None: + self.assertFalse(_eval('user.country == "RU"', {"user": {}})) + + def test_missing_parent_key(self) -> None: + self.assertFalse(_eval('user.country == "RU"', {})) + + def test_non_dict_intermediate(self) -> None: + self.assertFalse(_eval('user.country == "RU"', {"user": "string"})) + + +class DateTests(unittest.TestCase): + def test_date_object_vs_string(self) -> None: + self.assertTrue( + _eval( + 'created >= "2024-01-01"', + {"created": date(2024, 6, 15)}, + ) + ) + + def test_date_strings_both_sides(self) -> None: + self.assertTrue( + _eval( + 'created >= "2024-01-01"', + {"created": "2024-06-15"}, + ) + ) + + def test_date_less_than(self) -> None: + self.assertFalse( + _eval( + 'created >= "2024-01-01"', + {"created": date(2023, 12, 31)}, + ) + ) + + def test_date_equality(self) -> None: + self.assertTrue( + _eval( + 'birthday == "2000-01-01"', + {"birthday": date(2000, 1, 1)}, + ) + ) + + def test_date_range(self) -> None: + self.assertTrue( + _eval( + 'created >= "2024-01-01" AND created <= "2024-12-31"', + {"created": date(2024, 6, 15)}, + ) + ) + self.assertFalse( + _eval( + 'created >= "2024-01-01" AND created <= "2024-12-31"', + {"created": date(2025, 1, 1)}, + ) + ) + + +class TypeTests(unittest.TestCase): + def test_integer_comparison(self) -> None: + self.assertTrue(_eval("age > 18", {"age": 21})) + + def test_float_comparison(self) -> None: + self.assertTrue(_eval("score >= 4.5", {"score": 4.7})) + + def test_boolean_equality(self) -> None: + self.assertTrue(_eval("active == true", {"active": True})) + self.assertFalse(_eval("active == true", {"active": False})) + + def test_boolean_direct_use(self) -> None: + self.assertTrue(_eval("active", {"active": True})) + self.assertFalse(_eval("active", {"active": False})) + + def test_negative_number(self) -> None: + self.assertTrue(_eval("temp > -10", {"temp": 5})) + self.assertFalse(_eval("temp > -10", {"temp": -20})) + + def test_incompatible_types_no_crash(self) -> None: + self.assertFalse(_eval('x > "hello"', {"x": 5})) + + def test_standalone_true(self) -> None: + self.assertTrue(_eval("true", {})) + + def test_standalone_false(self) -> None: + self.assertFalse(_eval("false", {})) diff --git a/src/backend/libs/dsl/tests/test_integration.py b/src/backend/libs/dsl/tests/test_integration.py new file mode 100644 index 0000000..d1cbb6d --- /dev/null +++ b/src/backend/libs/dsl/tests/test_integration.py @@ -0,0 +1,155 @@ +import unittest +from datetime import date + +from libs.dsl import evaluate, parse +from libs.dsl.exceptions import LexerError, ParserError +from libs.dsl.nodes import BinaryOp, Comparison + + +class FeatureFlagScenarioTests(unittest.TestCase): + def setUp(self) -> None: + self.rule = ( + 'country IN ["RU", "KZ"] ' + 'AND app_version >= "1.6.0" ' + 'AND platform == "ios"' + ) + + def test_passing_user(self) -> None: + variables = { + "country": "RU", + "app_version": "1.8.0", + "platform": "ios", + } + self.assertTrue(evaluate(self.rule, variables)) + + def test_wrong_country(self) -> None: + variables = { + "country": "US", + "app_version": "1.8.0", + "platform": "ios", + } + self.assertFalse(evaluate(self.rule, variables)) + + def test_old_version(self) -> None: + variables = { + "country": "RU", + "app_version": "1.5.0", + "platform": "ios", + } + self.assertFalse(evaluate(self.rule, variables)) + + def test_wrong_platform(self) -> None: + variables = { + "country": "RU", + "app_version": "1.8.0", + "platform": "android", + } + self.assertFalse(evaluate(self.rule, variables)) + + def test_missing_attribute(self) -> None: + variables = { + "country": "RU", + "app_version": "1.8.0", + } + self.assertFalse(evaluate(self.rule, variables)) + + def test_empty_variables(self) -> None: + self.assertFalse(evaluate(self.rule, {})) + + +class ComplexExpressionTests(unittest.TestCase): + def test_or_and_combination(self) -> None: + rule = '(platform == "ios" OR platform == "android") AND age >= 18' + self.assertTrue(evaluate(rule, {"platform": "ios", "age": 21})) + self.assertFalse(evaluate(rule, {"platform": "web", "age": 21})) + self.assertFalse(evaluate(rule, {"platform": "ios", "age": 16})) + + def test_nested_variables_scenario(self) -> None: + rule = 'user.profile.country == "RU" AND user.active == true' + variables = { + "user": { + "profile": {"country": "RU"}, + "active": True, + }, + } + self.assertTrue(evaluate(rule, variables)) + + def test_date_range_scenario(self) -> None: + rule = 'created >= "2024-01-01" AND created <= "2024-12-31"' + self.assertTrue(evaluate(rule, {"created": date(2024, 6, 15)})) + self.assertFalse(evaluate(rule, {"created": date(2025, 1, 1)})) + + def test_multiple_and_chain(self) -> None: + rule = "a == 1 AND b == 2 AND c == 3 AND d == 4" + self.assertTrue(evaluate(rule, {"a": 1, "b": 2, "c": 3, "d": 4})) + self.assertFalse(evaluate(rule, {"a": 1, "b": 2, "c": 3, "d": 0})) + + def test_multiple_or_chain(self) -> None: + rule = ( + 'status == "active" OR status == "pending" OR status == "review"' + ) + self.assertTrue(evaluate(rule, {"status": "pending"})) + self.assertFalse(evaluate(rule, {"status": "deleted"})) + + +class ParseFunctionTests(unittest.TestCase): + def test_returns_ast(self) -> None: + ast = parse("a == 1 AND b == 2") + self.assertIsInstance(ast, BinaryOp) + self.assertEqual(ast.operator, "AND") + + def test_simple_comparison_ast(self) -> None: + ast = parse('x == "hello"') + self.assertIsInstance(ast, Comparison) + + def test_invalid_syntax_raises_lexer_error(self) -> None: + with self.assertRaises(LexerError): + parse('"unclosed string') + + def test_invalid_syntax_raises_parser_error(self) -> None: + with self.assertRaises(ParserError): + parse("AND AND") + + +class KeywordCaseTests(unittest.TestCase): + def test_lowercase(self) -> None: + self.assertTrue(evaluate("x == 1 and y == 2", {"x": 1, "y": 2})) + + def test_mixed_case(self) -> None: + self.assertTrue(evaluate("x == 1 And y == 2", {"x": 1, "y": 2})) + + def test_uppercase(self) -> None: + self.assertTrue(evaluate("x == 1 AND y == 2", {"x": 1, "y": 2})) + + +class EdgeCaseTests(unittest.TestCase): + def test_single_true(self) -> None: + self.assertTrue(evaluate("true", {})) + + def test_single_false(self) -> None: + self.assertFalse(evaluate("false", {})) + + def test_not_in_with_missing(self) -> None: + self.assertFalse(evaluate('x NOT IN ["a", "b"]', {})) + + def test_double_not(self) -> None: + self.assertTrue(evaluate("NOT NOT true", {})) + self.assertFalse(evaluate("NOT NOT false", {})) + + def test_number_in_list(self) -> None: + self.assertTrue(evaluate("status IN [1, 2, 3]", {"status": 2})) + self.assertFalse(evaluate("status IN [1, 2, 3]", {"status": 5})) + + def test_boolean_in_expression(self) -> None: + self.assertTrue( + evaluate( + "active == true AND premium == false", + {"active": True, "premium": False}, + ) + ) + + def test_complex_nested_logic(self) -> None: + rule = "(a == 1 AND b == 2) OR (c == 3 AND d == 4)" + self.assertTrue(evaluate(rule, {"a": 1, "b": 2, "c": 0, "d": 0})) + self.assertTrue(evaluate(rule, {"a": 0, "b": 0, "c": 3, "d": 4})) + self.assertFalse(evaluate(rule, {"a": 1, "b": 0, "c": 3, "d": 0})) diff --git a/src/backend/libs/dsl/tests/test_lexer.py b/src/backend/libs/dsl/tests/test_lexer.py new file mode 100644 index 0000000..b0bb350 --- /dev/null +++ b/src/backend/libs/dsl/tests/test_lexer.py @@ -0,0 +1,196 @@ +import math +import unittest + +from libs.dsl.exceptions import LexerError +from libs.dsl.lexer import Lexer +from libs.dsl.tokens import TokenType + + +class LexerBasicTokenTests(unittest.TestCase): + def test_string_double_quotes(self) -> None: + tokens = Lexer('"hello"').tokenize() + self.assertEqual(tokens[0].type, TokenType.STRING) + self.assertEqual(tokens[0].value, "hello") + + def test_string_single_quotes(self) -> None: + tokens = Lexer("'world'").tokenize() + self.assertEqual(tokens[0].type, TokenType.STRING) + self.assertEqual(tokens[0].value, "world") + + def test_string_with_escape(self) -> None: + tokens = Lexer(r'"he\"llo"').tokenize() + self.assertEqual(tokens[0].value, 'he"llo') + + def test_integer(self) -> None: + tokens = Lexer("42").tokenize() + self.assertEqual(tokens[0].type, TokenType.NUMBER) + self.assertEqual(tokens[0].value, 42) + + def test_float(self) -> None: + tokens = Lexer("3.14").tokenize() + self.assertEqual(tokens[0].type, TokenType.NUMBER) + self.assertAlmostEqual(tokens[0].value, math.pi) + + def test_negative_integer(self) -> None: + tokens = Lexer("-5").tokenize() + self.assertEqual(tokens[0].type, TokenType.NUMBER) + self.assertEqual(tokens[0].value, -5) + + def test_negative_float(self) -> None: + tokens = Lexer("-2.5").tokenize() + self.assertEqual(tokens[0].type, TokenType.NUMBER) + self.assertAlmostEqual(tokens[0].value, -2.5) + + def test_boolean_true_lowercase(self) -> None: + tokens = Lexer("true").tokenize() + self.assertEqual(tokens[0].type, TokenType.BOOL) + self.assertIs(expr1=tokens[0].value, expr2=True) + + def test_boolean_false_uppercase(self) -> None: + tokens = Lexer("FALSE").tokenize() + self.assertEqual(tokens[0].type, TokenType.BOOL) + self.assertIs(expr1=tokens[0].value, expr2=False) + + def test_boolean_mixed_case(self) -> None: + tokens = Lexer("True").tokenize() + self.assertEqual(tokens[0].type, TokenType.BOOL) + self.assertIs(expr1=tokens[0].value, expr2=True) + + def test_identifier(self) -> None: + tokens = Lexer("country").tokenize() + self.assertEqual(tokens[0].type, TokenType.IDENTIFIER) + self.assertEqual(tokens[0].value, "country") + + def test_dotted_identifier(self) -> None: + tokens = Lexer("user.country").tokenize() + self.assertEqual(tokens[0].type, TokenType.IDENTIFIER) + self.assertEqual(tokens[0].value, "user.country") + + def test_underscore_identifier(self) -> None: + tokens = Lexer("app_version").tokenize() + self.assertEqual(tokens[0].type, TokenType.IDENTIFIER) + self.assertEqual(tokens[0].value, "app_version") + + +class LexerKeywordTests(unittest.TestCase): + def test_and(self) -> None: + tokens = Lexer("AND").tokenize() + self.assertEqual(tokens[0].type, TokenType.AND) + + def test_or(self) -> None: + tokens = Lexer("OR").tokenize() + self.assertEqual(tokens[0].type, TokenType.OR) + + def test_not(self) -> None: + tokens = Lexer("NOT").tokenize() + self.assertEqual(tokens[0].type, TokenType.NOT) + + def test_in(self) -> None: + tokens = Lexer("IN").tokenize() + self.assertEqual(tokens[0].type, TokenType.IN) + + def test_case_insensitive(self) -> None: + for word, expected in [ + ("and", TokenType.AND), + ("Or", TokenType.OR), + ("not", TokenType.NOT), + ("iN", TokenType.IN), + ]: + tokens = Lexer(word).tokenize() + self.assertEqual(tokens[0].type, expected, word) + + +class LexerOperatorTests(unittest.TestCase): + def test_all_comparison_operators(self) -> None: + tokens = Lexer("== != > >= < <=").tokenize() + types = [t.type for t in tokens[:-1]] + self.assertEqual( + types, + [ + TokenType.EQ, + TokenType.NEQ, + TokenType.GT, + TokenType.GTE, + TokenType.LT, + TokenType.LTE, + ], + ) + + def test_brackets_and_comma(self) -> None: + tokens = Lexer('["a", "b"]').tokenize() + types = [t.type for t in tokens[:-1]] + self.assertEqual( + types, + [ + TokenType.LBRACKET, + TokenType.STRING, + TokenType.COMMA, + TokenType.STRING, + TokenType.RBRACKET, + ], + ) + + def test_parentheses(self) -> None: + tokens = Lexer("(x)").tokenize() + self.assertEqual(tokens[0].type, TokenType.LPAREN) + self.assertEqual(tokens[1].type, TokenType.IDENTIFIER) + self.assertEqual(tokens[2].type, TokenType.RPAREN) + + +class LexerComplexExpressionTests(unittest.TestCase): + def test_full_expression(self) -> None: + expr = 'country IN ["RU", "KZ"] AND version >= "1.6.0"' + tokens = Lexer(expr).tokenize() + types = [t.type for t in tokens[:-1]] + self.assertEqual( + types, + [ + TokenType.IDENTIFIER, + TokenType.IN, + TokenType.LBRACKET, + TokenType.STRING, + TokenType.COMMA, + TokenType.STRING, + TokenType.RBRACKET, + TokenType.AND, + TokenType.IDENTIFIER, + TokenType.GTE, + TokenType.STRING, + ], + ) + + def test_empty_expression(self) -> None: + tokens = Lexer("").tokenize() + self.assertEqual(len(tokens), 1) + self.assertEqual(tokens[0].type, TokenType.EOF) + + def test_whitespace_only(self) -> None: + tokens = Lexer(" ").tokenize() + self.assertEqual(len(tokens), 1) + self.assertEqual(tokens[0].type, TokenType.EOF) + + def test_position_tracking(self) -> None: + tokens = Lexer("x == 1").tokenize() + self.assertEqual(tokens[0].position, 0) + self.assertEqual(tokens[1].position, 2) + self.assertEqual(tokens[2].position, 5) + + +class LexerErrorTests(unittest.TestCase): + def test_unterminated_string(self) -> None: + with self.assertRaises(LexerError): + Lexer('"hello').tokenize() + + def test_unexpected_character(self) -> None: + with self.assertRaises(LexerError): + Lexer("x @ y").tokenize() + + def test_lone_exclamation(self) -> None: + with self.assertRaises(LexerError): + Lexer("!x").tokenize() + + def test_error_contains_position(self) -> None: + try: + Lexer("abc @").tokenize() + except LexerError as exc: + self.assertEqual(exc.position, 4) diff --git a/src/backend/libs/dsl/tests/test_parser.py b/src/backend/libs/dsl/tests/test_parser.py new file mode 100644 index 0000000..fad8a0f --- /dev/null +++ b/src/backend/libs/dsl/tests/test_parser.py @@ -0,0 +1,173 @@ +import unittest + +from libs.dsl.exceptions import ParserError +from libs.dsl.lexer import Lexer +from libs.dsl.nodes import ( + BinaryOp, + Comparison, + Identifier, + ListLiteral, + Literal, + UnaryOp, +) +from libs.dsl.parser import Parser + + +def _parse(expression: str): + tokens = Lexer(expression).tokenize() + return Parser(tokens).parse() + + +class ParserComparisonTests(unittest.TestCase): + def test_equality(self) -> None: + result = _parse('x == "hello"') + self.assertIsInstance(result, Comparison) + self.assertEqual(result.operator, "==") + self.assertIsInstance(result.left, Identifier) + self.assertIsInstance(result.right, Literal) + + def test_not_equal(self) -> None: + result = _parse("x != 5") + self.assertIsInstance(result, Comparison) + self.assertEqual(result.operator, "!=") + + def test_all_comparison_operators(self) -> None: + for op in ("==", "!=", ">", ">=", "<", "<="): + result = _parse(f"x {op} 5") + self.assertIsInstance(result, Comparison) + self.assertEqual(result.operator, op) + + +class ParserMembershipTests(unittest.TestCase): + def test_in_list(self) -> None: + result = _parse('country IN ["RU", "KZ"]') + self.assertIsInstance(result, Comparison) + self.assertEqual(result.operator, "IN") + self.assertIsInstance(result.right, ListLiteral) + self.assertEqual(len(result.right.items), 2) + + def test_not_in_list(self) -> None: + result = _parse("x NOT IN [1, 2, 3]") + self.assertIsInstance(result, Comparison) + self.assertEqual(result.operator, "NOT IN") + self.assertIsInstance(result.right, ListLiteral) + self.assertEqual(len(result.right.items), 3) + + def test_empty_list(self) -> None: + result = _parse("x IN []") + self.assertIsInstance(result, Comparison) + self.assertIsInstance(result.right, ListLiteral) + self.assertEqual(len(result.right.items), 0) + + def test_single_item_list(self) -> None: + result = _parse('x IN ["only"]') + self.assertIsInstance(result.right, ListLiteral) + self.assertEqual(len(result.right.items), 1) + + +class ParserLogicTests(unittest.TestCase): + def test_and(self) -> None: + result = _parse("a == 1 AND b == 2") + self.assertIsInstance(result, BinaryOp) + self.assertEqual(result.operator, "AND") + self.assertIsInstance(result.left, Comparison) + self.assertIsInstance(result.right, Comparison) + + def test_or(self) -> None: + result = _parse("a == 1 OR b == 2") + self.assertIsInstance(result, BinaryOp) + self.assertEqual(result.operator, "OR") + + def test_not(self) -> None: + result = _parse("NOT active") + self.assertIsInstance(result, UnaryOp) + self.assertEqual(result.operator, "NOT") + self.assertIsInstance(result.operand, Identifier) + + def test_and_precedence_over_or(self) -> None: + result = _parse("a == 1 OR b == 2 AND c == 3") + self.assertIsInstance(result, BinaryOp) + self.assertEqual(result.operator, "OR") + self.assertIsInstance(result.right, BinaryOp) + self.assertEqual(result.right.operator, "AND") + + def test_parentheses_override_precedence(self) -> None: + result = _parse("(a == 1 OR b == 2) AND c == 3") + self.assertIsInstance(result, BinaryOp) + self.assertEqual(result.operator, "AND") + self.assertIsInstance(result.left, BinaryOp) + self.assertEqual(result.left.operator, "OR") + + def test_nested_parentheses(self) -> None: + result = _parse("((x == 1))") + self.assertIsInstance(result, Comparison) + + def test_not_before_comparison(self) -> None: + result = _parse('NOT country IN ["US"]') + self.assertIsInstance(result, UnaryOp) + self.assertEqual(result.operator, "NOT") + self.assertIsInstance(result.operand, Comparison) + self.assertEqual(result.operand.operator, "IN") + + def test_chained_and(self) -> None: + result = _parse("a == 1 AND b == 2 AND c == 3") + self.assertIsInstance(result, BinaryOp) + self.assertEqual(result.operator, "AND") + self.assertIsInstance(result.left, BinaryOp) + self.assertEqual(result.left.operator, "AND") + + def test_chained_or(self) -> None: + result = _parse("a == 1 OR b == 2 OR c == 3") + self.assertIsInstance(result, BinaryOp) + self.assertEqual(result.operator, "OR") + self.assertIsInstance(result.left, BinaryOp) + self.assertEqual(result.left.operator, "OR") + + +class ParserLiteralTests(unittest.TestCase): + def test_boolean_literal(self) -> None: + result = _parse("active == true") + self.assertIsInstance(result.right, Literal) + self.assertIs(expr1=result.right.value, expr2=True) + + def test_number_literal(self) -> None: + result = _parse("x == 42") + self.assertIsInstance(result.right, Literal) + self.assertEqual(result.right.value, 42) + + def test_string_literal(self) -> None: + result = _parse('x == "hello"') + self.assertIsInstance(result.right, Literal) + self.assertEqual(result.right.value, "hello") + + def test_standalone_literal(self) -> None: + result = _parse("true") + self.assertIsInstance(result, Literal) + self.assertIs(expr1=result.value, expr2=True) + + def test_standalone_identifier(self) -> None: + result = _parse("active") + self.assertIsInstance(result, Identifier) + self.assertEqual(result.name, "active") + + +class ParserErrorTests(unittest.TestCase): + def test_unexpected_token_at_start(self) -> None: + with self.assertRaises(ParserError): + _parse("==") + + def test_missing_closing_paren(self) -> None: + with self.assertRaises(ParserError): + _parse("(x == 1") + + def test_missing_list_bracket(self) -> None: + with self.assertRaises(ParserError): + _parse('x IN ["a", "b"') + + def test_trailing_operator(self) -> None: + with self.assertRaises(ParserError): + _parse("x == 1 AND") + + def test_non_literal_in_list(self) -> None: + with self.assertRaises(ParserError): + _parse("x IN [y]") diff --git a/src/backend/libs/dsl/tokens.py b/src/backend/libs/dsl/tokens.py new file mode 100644 index 0000000..3784430 --- /dev/null +++ b/src/backend/libs/dsl/tokens.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass +from enum import Enum, auto +from typing import Any + + +class TokenType(Enum): + AND = auto() + OR = auto() + NOT = auto() + IN = auto() + + EQ = auto() + NEQ = auto() + GT = auto() + GTE = auto() + LT = auto() + LTE = auto() + + LPAREN = auto() + RPAREN = auto() + LBRACKET = auto() + RBRACKET = auto() + COMMA = auto() + + STRING = auto() + NUMBER = auto() + BOOL = auto() + IDENTIFIER = auto() + + EOF = auto() + + +@dataclass(frozen=True, slots=True) +class Token: + type: TokenType + value: Any + position: int